Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 56 additions & 98 deletions py/host-emulator/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,101 +59,59 @@ def _wait_for_process_ready(process, timeout=1.0):
time.sleep(0.1)


@pytest.fixture(scope="module")
def blinky(request, emulator):
"""Start blinky application after emulator is ready."""
blinky_arg = request.config.getoption("--blinky")
if not blinky_arg:
pytest.skip("--blinky not provided")

blinky_executable = pathlib.Path(blinky_arg).resolve()
assert blinky_executable.exists(), (
f"Blinky executable not found: {blinky_executable}"
)

# Emulator is already started and ready (fixture dependency)
blinky_process = subprocess.Popen(
[str(blinky_executable)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)

try:
# Wait for blinky to be ready
_wait_for_process_ready(blinky_process)

yield blinky_process

finally:
# Automatic cleanup
if blinky_process.poll() is None:
logger.debug("[Fixture] Stopping blinky")
blinky_process.terminate()
try:
blinky_process.wait(timeout=2)
except subprocess.TimeoutExpired:
blinky_process.kill()
blinky_process.wait()
logger.debug(f"[Fixture] Blinky exit code: {blinky_process.returncode}")


@pytest.fixture(scope="module")
def uart_echo(request, emulator):
"""Start uart_echo application after emulator is ready."""
uart_echo_arg = request.config.getoption("--uart-echo")
if not uart_echo_arg:
pytest.skip("--uart-echo not provided")

uart_echo_executable = pathlib.Path(uart_echo_arg).resolve()
assert uart_echo_executable.exists()

uart_echo_process = subprocess.Popen(
[str(uart_echo_executable)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)

try:
_wait_for_process_ready(uart_echo_process)
yield uart_echo_process
finally:
if uart_echo_process.poll() is None:
logger.debug("[Fixture] Stopping uart_echo")
uart_echo_process.terminate()
try:
uart_echo_process.wait(timeout=2)
except subprocess.TimeoutExpired:
uart_echo_process.kill()
uart_echo_process.wait()
logger.debug(f"[Fixture] UartEcho exit code: {uart_echo_process.returncode}")


@pytest.fixture(scope="module")
def i2c_demo(request, emulator):
"""Start i2c_demo application after emulator is ready."""
i2c_demo_arg = request.config.getoption("--i2c-demo")
if not i2c_demo_arg:
pytest.skip("--i2c-demo not provided")

i2c_demo_executable = pathlib.Path(i2c_demo_arg).resolve()
assert i2c_demo_executable.exists()

i2c_demo_process = subprocess.Popen(
[str(i2c_demo_executable)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)

try:
_wait_for_process_ready(i2c_demo_process)
yield i2c_demo_process
finally:
if i2c_demo_process.poll() is None:
logger.debug("[Fixture] Stopping i2c_demo")
i2c_demo_process.terminate()
try:
i2c_demo_process.wait(timeout=2)
except subprocess.TimeoutExpired:
i2c_demo_process.kill()
i2c_demo_process.wait()
logger.debug(f"[Fixture] I2CDemo exit code: {i2c_demo_process.returncode}")
def _application_fixture_factory(option_name, display_name):
"""Factory function to create application fixtures with common lifecycle management.

Args:
option_name: CLI option name (e.g., "--blinky")
display_name: Display name for logging (e.g., "Blinky")

Returns:
A pytest fixture function
"""

@pytest.fixture(scope="module")
def application_fixture(request, emulator):
"""Start application after emulator is ready."""
app_arg = request.config.getoption(option_name)
if not app_arg:
pytest.skip(f"{option_name} not provided")

app_executable = pathlib.Path(app_arg).resolve()
assert app_executable.exists(), (
f"{display_name} executable not found: {app_executable}"
)

# Emulator is already started and ready (fixture dependency)
app_process = subprocess.Popen(
[str(app_executable)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)

try:
# Wait for application to be ready
_wait_for_process_ready(app_process)
yield app_process

finally:
# Automatic cleanup
if app_process.poll() is None:
logger.debug(f"[Fixture] Stopping {display_name}")
app_process.terminate()
try:
app_process.wait(timeout=2)
except subprocess.TimeoutExpired:
app_process.kill()
app_process.wait()
logger.debug(
f"[Fixture] {display_name} exit code: {app_process.returncode}"
)

return application_fixture


# Create application fixtures using the factory
blinky = _application_fixture_factory("--blinky", "Blinky")
uart_echo = _application_fixture_factory("--uart-echo", "UartEcho")
i2c_demo = _application_fixture_factory("--i2c-demo", "I2CDemo")