Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions py/host-emulator/src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Host emulator for embedded C++ applications."""

from .common import Status, UnhandledMessageError
from .emulator import DeviceEmulator
from .i2c import I2C
from .pin import Pin
from .uart import Uart

__all__ = [
"DeviceEmulator",
"Pin",
"Uart",
"I2C",
"Status",
"UnhandledMessageError",
]
19 changes: 19 additions & 0 deletions py/host-emulator/src/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Common types and exceptions for the host emulator."""

from enum import Enum


class UnhandledMessageError(Exception):
"""Exception raised when a message cannot be handled."""

pass


class Status(Enum):
"""Status codes for emulator responses."""

Ok = "Ok"
Unknown = "Unknown"
InvalidArgument = "InvalidArgument"
InvalidState = "InvalidState"
InvalidOperation = "InvalidOperation"
311 changes: 4 additions & 307 deletions py/host-emulator/src/emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,317 +2,14 @@

import json
import sys
from enum import Enum
from threading import Thread

import zmq


class UnhandledMessageError(Exception):
pass


class Status(Enum):
Ok = "Ok"
Unknown = "Unknown"
InvalidArgument = "InvalidArgument"
InvalidState = "InvalidState"
InvalidOperation = "InvalidOperation"


class Pin:
direction = Enum("direction", ["IN", "OUT"])
state = Enum("state", ["Low", "High", "Hi_Z"])

def __init__(self, name, direction, state, to_device_socket):
self.name = name
self.direction = direction
self.state = state
self.to_device_socket = to_device_socket
self.on_response = None
self.on_request = None

def handle_request(self, message):
response = {
"type": "Response",
"object": "Pin",
"name": self.name,
"state": self.state.name,
"status": Status.InvalidOperation.name,
}
if message["operation"] == "Get":
response.update(
{
"status": Status.Ok.name,
}
)
elif message["operation"] == "Set":
self.state = Pin.state[message["state"]]
response.update(
{
"state": self.state.name,
"status": Status.Ok.name,
}
)
else:
pass
# default response status is InvalidOperation
if self.on_request:
self.on_request(message)
return json.dumps(response)

def set_state(self, state):
self.state = state
request = {
"type": "Request",
"object": "Pin",
"name": self.name,
"operation": "Set",
"state": self.state.name,
}
print(f"[Pin Set] Sending request: {request}")
self.to_device_socket.send_string(json.dumps(request))
reply = self.to_device_socket.recv()
print(f"[Pin Set] Received response: {reply}")
self.handle_response(json.loads(reply))
return json.loads(reply)

def get_state(self):
request = {
"type": "Request",
"object": "Pin",
"name": self.name,
"operation": "Get",
"state": self.state.Hi_Z.name,
}
print(f"[Pin Get] Sending request: {request}")
self.to_device_socket.send_string(json.dumps(request))
reply = self.to_device_socket.recv()
print(f"[Pin Get] Received response: {reply}")
self.handle_response(json.loads(reply))
return json.loads(reply)

def handle_response(self, message):
print(f"[Pin Handler] Received response: {message}")
if self.on_response:
self.on_response(message)
return None

def set_on_request(self, on_request):
print(f"[Pin Handler] Setting on_request for {self.name}: {on_request}")
self.on_request = on_request

def set_on_response(self, on_response):
print(f"[Pin Handler] Setting on_response for {self.name}: {on_response}")
self.on_response = on_response

def handle_message(self, message):
if message["object"] != "Pin":
return None
if message["name"] != self.name:
return None
if message["type"] == "Request":
return self.handle_request(message)
if message["type"] == "Response":
return self.handle_response(message)


class Uart:
def __init__(self, name, to_device_socket):
self.name = name
self.to_device_socket = to_device_socket
self.rx_buffer = bytearray() # Data waiting to be read
self.on_response = None
self.on_request = None

def handle_request(self, message):
response = {
"type": "Response",
"object": "Uart",
"name": self.name,
"data": [],
"bytes_transferred": 0,
"status": Status.InvalidOperation.name,
}

if message["operation"] == "Init":
# Initialize UART with given configuration
print(f"[UART {self.name}] Initialized")
response.update({"status": Status.Ok.name})

elif message["operation"] == "Send":
# Receive data from the device and store in RX buffer
data = message.get("data", [])
self.rx_buffer.extend(data)
response.update(
{
"bytes_transferred": len(data),
"status": Status.Ok.name,
}
)
print(f"[UART {self.name}] Received {len(data)} bytes: {bytes(data)}")

elif message["operation"] == "Receive":
# Send buffered data back to the device
size = message.get("size", 0)
bytes_to_send = min(size, len(self.rx_buffer))
data = list(self.rx_buffer[:bytes_to_send])
self.rx_buffer = self.rx_buffer[bytes_to_send:]
response.update(
{
"data": data,
"bytes_transferred": bytes_to_send,
"status": Status.Ok.name,
}
)
print(f"[UART {self.name}] Sent {bytes_to_send} bytes: {bytes(data)}")

if self.on_request:
self.on_request(message)
return json.dumps(response)

def send_data(self, data):
"""Send data to the device (emulator -> device)"""
request = {
"type": "Request",
"object": "Uart",
"name": self.name,
"operation": "Receive",
"data": list(data),
"size": len(data),
"timeout_ms": 0,
}
print(f"[UART {self.name}] Sending data to device: {data}")
self.to_device_socket.send_string(json.dumps(request))
reply = self.to_device_socket.recv()
print(f"[UART {self.name}] Received response: {reply}")
return json.loads(reply)

def handle_response(self, message):
print(f"[UART {self.name}] Received response: {message}")
if self.on_response:
self.on_response(message)
return None

def set_on_request(self, on_request):
self.on_request = on_request

def set_on_response(self, on_response):
self.on_response = on_response

def handle_message(self, message):
if message["object"] != "Uart":
return None
if message["name"] != self.name:
return None
if message["type"] == "Request":
return self.handle_request(message)
if message["type"] == "Response":
return self.handle_response(message)


class I2C:
def __init__(self, name):
self.name = name
# Store data for each I2C address (address -> bytearray)
self.device_buffers = {}
self.on_response = None
self.on_request = None

def handle_request(self, message):
response = {
"type": "Response",
"object": "I2C",
"name": self.name,
"address": message.get("address", 0),
"data": [],
"bytes_transferred": 0,
"status": Status.InvalidOperation.name,
}

address = message.get("address", 0)

if message["operation"] == "Send":
# Device is sending data to I2C peripheral
# Store the data in the buffer for this address
data = message.get("data", [])
if address not in self.device_buffers:
self.device_buffers[address] = bytearray()
self.device_buffers[address] = bytearray(data)
response.update(
{
"bytes_transferred": len(data),
"status": Status.Ok.name,
}
)
print(
f"[I2C {self.name}] Wrote {len(data)} bytes to address "
f"0x{address:02X}: {bytes(data)}"
)

elif message["operation"] == "Receive":
# Device is receiving data from I2C peripheral
# Return data from the buffer for this address
size = message.get("size", 0)
if address in self.device_buffers:
bytes_to_send = min(size, len(self.device_buffers[address]))
data = list(self.device_buffers[address][:bytes_to_send])
else:
# No data available, return empty
bytes_to_send = 0
data = []
response.update(
{
"data": data,
"bytes_transferred": bytes_to_send,
"status": Status.Ok.name,
}
)
print(
f"[I2C {self.name}] Read {bytes_to_send} bytes from address "
f"0x{address:02X}: {bytes(data)}"
)

if self.on_request:
self.on_request(message)
return json.dumps(response)

def handle_response(self, message):
print(f"[I2C {self.name}] Received response: {message}")
if self.on_response:
self.on_response(message)
return None

def set_on_request(self, on_request):
self.on_request = on_request

def set_on_response(self, on_response):
self.on_response = on_response

def handle_message(self, message):
if message["object"] != "I2C":
return None
if message["name"] != self.name:
return None
if message["type"] == "Request":
return self.handle_request(message)
if message["type"] == "Response":
return self.handle_response(message)

def write_to_device(self, address, data):
"""Write data to a simulated I2C device (for testing)"""
if address not in self.device_buffers:
self.device_buffers[address] = bytearray()
self.device_buffers[address] = bytearray(data)
print(
f"[I2C {self.name}] Device buffer at 0x{address:02X} set to: {bytes(data)}"
)

def read_from_device(self, address):
"""Read data from a simulated I2C device (for testing)"""
if address in self.device_buffers:
return bytes(self.device_buffers[address])
return b""
from .common import UnhandledMessageError
from .i2c import I2C
from .pin import Pin
from .uart import Uart


class DeviceEmulator:
Expand Down
Loading