Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PROJECT_PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Explore modern C++ (C++23) and software engineering practices in embedded system
- [x] Code coverage reporting
- [x] Add static analysis through clang-tidy by default
- [x] Add UART abstraction with RxHandler
- [ ] Add I2C abstraction
- [x] Add I2C abstraction
- [ ] Add SPI abstraction
- [ ] Add PWM abstraction
- [ ] Add ADC abstraction
Expand Down
13 changes: 5 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,18 @@ The `uart_echo` application demonstrates:

**Run on host emulator**:
```bash
# Terminal 1: Start Python emulator
cd py/host-emulator
python -m src.emulator

# Terminal 2: Run uart_echo
# Terminal 1: Run uart_echo
cd build/host/bin
./uart_echo

# Terminal 3: Send data via Python
# Terminal 2: Send data via Python
python
>>> from src.emulator import DeviceEmulator
>>> emu = DeviceEmulator()
>>> emu.start()
>>> emu.Uart1().send_data([72, 101, 108, 108, 111]) # "Hello"
>>> bytes(emu.Uart1().rx_buffer) # See echoed data
>>> emu.uart1().send_data([72, 101, 108, 108, 111]) # "Hello"
>>> bytes(emu.uart1().rx_buffer) # See echoed data
>>> emu.uart1().rx_buffer.clear()
```

## Software Engineering Principles
Expand Down
3 changes: 2 additions & 1 deletion py/host-emulator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ add_test(
COMMAND ${host_emulator_venv_PYTHON} -m pytest ${CMAKE_CURRENT_SOURCE_DIR}
--blinky=${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/$<CONFIG>/blinky
--uart-echo=${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/$<CONFIG>/uart_echo
--i2c-demo=${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/$<CONFIG>/i2c_demo
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
)

set_tests_properties(host_emulator_test PROPERTIES DEPENDS "blinky;uart_echo")
set_tests_properties(host_emulator_test PROPERTIES DEPENDS "blinky;uart_echo;i2c_demo")
set_tests_properties(host_emulator_test PROPERTIES DEPENDS host_emulator_venv)
set_tests_properties(host_emulator_test PROPERTIES FIXTURES_SETUP host_emulator_venv)

129 changes: 124 additions & 5 deletions py/host-emulator/src/emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,110 @@ def handle_message(self, message):
return self.handle_response(message)


class I2C:
def __init__(self, name):
self.name = name
# Store data for each I2C address (address -> bytearray)
self.device_buffers = {}
self.on_response = None
self.on_request = None

def handle_request(self, message):
response = {
"type": "Response",
"object": "I2C",
"name": self.name,
"address": message.get("address", 0),
"data": [],
"bytes_transferred": 0,
"status": Status.InvalidOperation.name,
}

address = message.get("address", 0)

if message["operation"] == "Send":
# Device is sending data to I2C peripheral
# Store the data in the buffer for this address
data = message.get("data", [])
if address not in self.device_buffers:
self.device_buffers[address] = bytearray()
self.device_buffers[address] = bytearray(data)
response.update(
{
"bytes_transferred": len(data),
"status": Status.Ok.name,
}
)
print(
f"[I2C {self.name}] Wrote {len(data)} bytes to address "
f"0x{address:02X}: {bytes(data)}"
)

elif message["operation"] == "Receive":
# Device is receiving data from I2C peripheral
# Return data from the buffer for this address
size = message.get("size", 0)
if address in self.device_buffers:
bytes_to_send = min(size, len(self.device_buffers[address]))
data = list(self.device_buffers[address][:bytes_to_send])
else:
# No data available, return empty
bytes_to_send = 0
data = []
response.update(
{
"data": data,
"bytes_transferred": bytes_to_send,
"status": Status.Ok.name,
}
)
print(
f"[I2C {self.name}] Read {bytes_to_send} bytes from address "
f"0x{address:02X}: {bytes(data)}"
)

if self.on_request:
self.on_request(message)
return json.dumps(response)

def handle_response(self, message):
print(f"[I2C {self.name}] Received response: {message}")
if self.on_response:
self.on_response(message)
return None

def set_on_request(self, on_request):
self.on_request = on_request

def set_on_response(self, on_response):
self.on_response = on_response

def handle_message(self, message):
if message["object"] != "I2C":
return None
if message["name"] != self.name:
return None
if message["type"] == "Request":
return self.handle_request(message)
if message["type"] == "Response":
return self.handle_response(message)

def write_to_device(self, address, data):
"""Write data to a simulated I2C device (for testing)"""
if address not in self.device_buffers:
self.device_buffers[address] = bytearray()
self.device_buffers[address] = bytearray(data)
print(
f"[I2C {self.name}] Device buffer at 0x{address:02X} set to: {bytes(data)}"
)

def read_from_device(self, address):
"""Read data from a simulated I2C device (for testing)"""
if address in self.device_buffers:
return bytes(self.device_buffers[address])
return b""


class DeviceEmulator:
def __init__(self):
print("Creating DeviceEmulator")
Expand All @@ -233,6 +337,9 @@ def __init__(self):
self.uart_1 = Uart("UART 1", self.to_device_socket)
self.uarts = [self.uart_1]

self.i2c_1 = I2C("I2C 1")
self.i2cs = [self.i2c_1]

def user_led1(self):
return self.led_1

Expand All @@ -245,6 +352,9 @@ def user_button1(self):
def uart1(self):
return self.uart_1

def i2c1(self):
return self.i2c_1

def run(self):
print("Starting emulator thread")
try:
Expand All @@ -255,28 +365,37 @@ def run(self):
while self.running:
print("Waiting for message...")
message = from_device_socket.recv()
print(f"[Emulator] Received request: {message}")
# print(f"[Emulator] Received request: {message}")
if message.startswith(b"{") and message.endswith(b"}"):
# JSON message
json_message = json.loads(message)
if json_message["object"] == "Pin":
for pin in self.pins:
if response := pin.handle_message(json_message):
print(f"[Emulator] Sending response: {response}")
# print(f"[Emulator] Sending response: {response}")
from_device_socket.send_string(response)
print("")
# print("")
break
else:
raise UnhandledMessageError(message, " - Pin not found")
elif json_message["object"] == "Uart":
for uart in self.uarts:
if response := uart.handle_message(json_message):
print(f"[Emulator] Sending response: {response}")
# print(f"[Emulator] Sending response: {response}")
from_device_socket.send_string(response)
print("")
# print("")
break
else:
raise UnhandledMessageError(message, " - Uart not found")
elif json_message["object"] == "I2C":
for i2c in self.i2cs:
if response := i2c.handle_message(json_message):
# print(f"[Emulator] Sending response: {response}")
from_device_socket.send_string(response)
# print("")
break
else:
raise UnhandledMessageError(message, " - I2C not found")
else:
raise UnhandledMessageError(
message, f" - unknown object type: {json_message['object']}"
Expand Down
27 changes: 27 additions & 0 deletions py/host-emulator/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ def pytest_addoption(parser):
default=None,
help="Path to the uart_echo executable",
)
parser.addoption(
"--i2c-demo",
action="store",
default=None,
help="Path to the i2c_demo executable",
)


# Emulator must be stopped manually within each test
Expand Down Expand Up @@ -73,3 +79,24 @@ def uart_echo(request):
uart_echo_process.kill()
uart_echo_process.wait(timeout=1)
print(f"[Fixture] UartEcho return code: {uart_echo_process.returncode}")


# I2CDemo must be stopped manually within each test
@fixture()
def i2c_demo(request):
i2c_demo_arg = request.config.getoption("--i2c-demo")
i2c_demo_executable = pathlib.Path(i2c_demo_arg).resolve()
assert i2c_demo_executable.exists()
i2c_demo_process = subprocess.Popen(
[str(i2c_demo_executable)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)

yield i2c_demo_process

if i2c_demo_process.poll() is None:
print("[Fixture] Stopping i2c_demo")
i2c_demo_process.kill()
i2c_demo_process.wait(timeout=1)
print(f"[Fixture] I2CDemo return code: {i2c_demo_process.returncode}")
134 changes: 134 additions & 0 deletions py/host-emulator/tests/test_i2c_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""Integration tests for I2C test application."""

import time


def test_i2c_demo_starts(emulator, i2c_demo):
"""Test that i2c_demo starts successfully."""
try:
# Give i2c_demo time to initialize
time.sleep(0.5)

# Check that the process is still running
assert i2c_demo.poll() is None, "i2c_demo process terminated unexpectedly"

finally:
emulator.stop()
i2c_demo.terminate()
i2c_demo.wait(timeout=1)


def test_i2c_demo_write_read_cycle(emulator, i2c_demo):
"""Test that i2c_demo writes and reads from I2C device."""
try:
device_address = 0x50
test_pattern = [0xDE, 0xAD, 0xBE, 0xEF]
write_count = 0
read_count = 0

def i2c_handler(message):
nonlocal write_count, read_count
if message.get("operation") == "Send":
# Device is writing to I2C peripheral
write_count += 1
data = message.get("data", [])
address = message.get("address", 0)

# Verify the data and address
assert address == device_address, f"Wrong address: 0x{address:02X}"
assert data == test_pattern, f"Wrong data: {data}"

elif message.get("operation") == "Receive":
# Device is reading from I2C peripheral
read_count += 1
address = message.get("address", 0)
assert address == device_address, f"Wrong address: 0x{address:02X}"

emulator.i2c1().set_on_request(i2c_handler)

# Pre-populate I2C device buffer with test pattern
emulator.i2c1().write_to_device(device_address, test_pattern)

# Give i2c_demo time to run a few cycles
time.sleep(1.5)

# Verify that writes and reads occurred
assert write_count > 0, "No I2C writes occurred"
assert read_count > 0, "No I2C reads occurred"
assert write_count == read_count, (
f"Write/read mismatch: {write_count} writes, {read_count} reads"
)

finally:
emulator.stop()
i2c_demo.terminate()
i2c_demo.wait(timeout=1)


def test_i2c_demo_toggles_leds(emulator, i2c_demo):
"""Test that i2c_demo toggles LEDs based on I2C operations."""
try:
device_address = 0x50
test_pattern = [0xDE, 0xAD, 0xBE, 0xEF]

# Pre-populate I2C device buffer with correct test pattern
emulator.i2c1().write_to_device(device_address, test_pattern)

# Give i2c_demo time to initialize
time.sleep(0.5)

# Record initial LED states
initial_led1 = emulator.get_pin_state("LED 1")
initial_led2 = emulator.get_pin_state("LED 2")

# Wait for exactly one more toggle cycle (~550ms per cycle)
time.sleep(0.3)

# Check that LEDs have toggled
final_led1 = emulator.get_pin_state("LED 1")
final_led2 = emulator.get_pin_state("LED 2")

# LED2 should have toggled (heartbeat)
assert final_led2 != initial_led2, (
f"LED2 didn't toggle: {initial_led2} -> {final_led2}"
)

# LED1 should have toggled (data verification success)
assert final_led1 != initial_led1, (
f"LED1 didn't toggle: {initial_led1} -> {final_led1}"
)

finally:
emulator.stop()
i2c_demo.terminate()
i2c_demo.wait(timeout=1)


def test_i2c_demo_data_mismatch(emulator, i2c_demo):
"""Test that i2c_demo handles data mismatch correctly."""
try:
device_address = 0x50
wrong_pattern = [0x00, 0x11, 0x22, 0x33] # Different from test pattern

# Pre-populate I2C device buffer with wrong data
emulator.i2c1().write_to_device(device_address, wrong_pattern)

# Give i2c_demo time to run a few cycles
time.sleep(1.0)

# LED1 should be off due to data mismatch
led1_state = emulator.get_pin_state("LED 1")
assert led1_state.name == "Low", f"LED1 should be off, but is {led1_state.name}"

# LED2 should still be blinking (alive indicator)
initial_led2 = emulator.get_pin_state("LED 2")
time.sleep(0.6)
final_led2 = emulator.get_pin_state("LED 2")
assert final_led2 != initial_led2, (
f"LED2 didn't toggle: {initial_led2} -> {final_led2}"
)

finally:
emulator.stop()
i2c_demo.terminate()
i2c_demo.wait(timeout=1)
Loading