Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/clabe/aind_apps/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .waterlog import WaterlogApp, WaterlogSettings

__all__ = ["WaterlogApp", "WaterlogSettings"]
66 changes: 66 additions & 0 deletions src/clabe/aind_apps/waterlog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import shutil
from pathlib import Path
from typing import Any, ClassVar, Optional, Self

from pydantic import Field
from pydantic_settings import CliApp, SettingsConfigDict

from clabe.apps import StdCommand
from clabe.apps._base import CommandResult

from ..apps import ExecutableApp, LocalDetachedExecutor
from ..services import ServiceSettings


class WaterlogSettings(ServiceSettings):
"""Settings for the waterlog service."""

model_config = SettingsConfigDict(cli_kebab_case=True)
# This should be ok since the subclass initializes first the and toml sources get appended to the settings dict
__yml_section__: ClassVar[Optional[str]] = "waterlog"
Comment thread
bruno-f-cruz marked this conversation as resolved.

username: Optional[str] = Field(default=None, description="Username for the waterlog service")
mouse_id: Optional[str] = Field(default=None, description="Mouse ID for the waterlog service")
mouse_weight: Optional[float] = Field(default=None, description="Mouse weight for the waterlog service")
comment: Optional[str] = Field(default=None, description="Comment for the waterlog service")
earned_water: Optional[float] = Field(default=None, description="Water earned during behavior task (mL)")
water_supplement_ml: Optional[float] = Field(default=None, description="Water supplement amount (mL)")
water_supplement_delivered: Optional[bool] = Field(
default=None, description="Flag indicating if the water supplement has been delivered"
)


class WaterlogApp(ExecutableApp):
"""App for logging water consumption and related information."""

_EXECUTABLE: Optional[Path] = (
Path(os.getenv("PROGRAMFILES", r"C:\Program Files")) / r"AIBS_MPE\waterlog\waterlog.exe"
)

def __init__(self, settings: WaterlogSettings):
"""Initialize the WaterlogApp with the given settings."""
self._executable = str(self._EXECUTABLE)
self._settings = settings
self.validate()
_cmd = [self._executable] + CliApp.serialize(settings)
self._command = StdCommand(cmd=_cmd)

def validate(self) -> None:
"""Validates the settings and checks for the presence of the waterlog executable."""
if not Path(self._executable).exists():
if (loc := shutil.which("waterlog.exe")) is None:
raise FileNotFoundError(
f"'{self._EXECUTABLE}' command not found. Please ensure it is installed and available."
)
self._executable = loc

@property
def command(self) -> StdCommand:
"""Get the command to execute."""
return self._command

def run(self: Self, executor_kwargs: Optional[dict[str, Any]] = None) -> CommandResult:
"""Execute the command using a local executor and return the result."""
executor = LocalDetachedExecutor(**(executor_kwargs or {}))
return self.command.execute(executor)
2 changes: 2 additions & 0 deletions src/clabe/apps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from ._bonsai import AindBehaviorServicesBonsaiApp, BonsaiApp
from ._curriculum import CurriculumApp, CurriculumSettings, CurriculumSuggestion
from ._executors import LocalDetachedExecutor
from ._python_script import PythonScriptApp

__all__ = [
Expand All @@ -30,4 +31,5 @@
"PythonScriptApp",
"ExecutableApp",
"StdCommand",
"LocalDetachedExecutor",
]
60 changes: 60 additions & 0 deletions src/clabe/apps/_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,66 @@ async def run_async(self, command: Command) -> CommandResult:
return command_result


class LocalDetachedExecutor(Executor):
"""
Fire-and-forget executor that spawns a subprocess and returns immediately.

Launches the command via ``subprocess.Popen`` without waiting for it to
finish. stdout and stderr are redirected to ``DEVNULL`` so that no pipe
handles are left open. The returned ``CommandResult`` is a placeholder
with ``exit_code=0`` and no captured output; callers must not rely on it
to reflect the actual process outcome.

Attributes:
cwd: Working directory for command execution
env: Environment variables for the subprocess

Example:
```python
executor = LocalDetachedExecutor()
cmd = Command(cmd=["bonsai", "workflow.bonsai"], output_parser=identity_parser)
_ = executor.run(cmd) # returns immediately; Bonsai keeps running
```
"""

def __init__(self, cwd: os.PathLike | None = None, env: dict[str, str] | None = None) -> None:
"""Initialize the detached executor.

Args:
cwd: Working directory for command execution
env: Environment variables for the subprocess
"""
self.cwd = cwd or os.getcwd()
self.env = env

def run(self, command: Command[Any]) -> CommandResult:
"""Spawn the command and return immediately without waiting.

Args:
command: The command to execute (as a list of strings)

Returns:
A placeholder ``CommandResult`` with ``exit_code=0`` and no output.
The actual process exit code is never captured.

Example:
```python
executor = LocalDetachedExecutor()
cmd = Command(cmd=["bonsai", "workflow.bonsai"], output_parser=identity_parser)
result = executor.run(cmd) # fire-and-forget
```
"""
subprocess.Popen(
command.cmd,
cwd=self.cwd,
env=self.env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
shell=False,
)
return CommandResult(stdout=None, stderr=None, exit_code=0)


class _DefaultExecutorMixin:
"""
Mixin providing default executor implementations for ExecutableApp classes.
Expand Down
118 changes: 118 additions & 0 deletions tests/apps/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
CommandError,
CommandResult,
Executor,
LocalDetachedExecutor,
PythonScriptApp,
identity_parser,
)
Expand Down Expand Up @@ -265,6 +266,123 @@ async def test_async_executor_concurrent_execution(self, async_local_executor: A
assert "cmd2" in (results[1].stdout or "")


# ============================================================================
# LocalDetachedExecutor Tests
# ============================================================================


class TestLocalDetachedExecutor:
"""Tests for LocalDetachedExecutor (fire-and-forget executor)."""

@patch("subprocess.Popen")
def test_returns_placeholder_result_immediately(self, mock_popen):
"""run() returns exit_code=0 with no stdout/stderr without waiting."""
executor = LocalDetachedExecutor()
cmd = Command[CommandResult](cmd=["sleep", "999"], output_parser=identity_parser)

result = executor.run(cmd)

assert result.exit_code == 0
assert result.stdout is None
assert result.stderr is None
assert result.ok is True

@patch("subprocess.Popen")
def test_spawns_process_without_waiting(self, mock_popen):
"""Popen is called once and communicate/wait are never called."""
executor = LocalDetachedExecutor()
cmd = Command[CommandResult](cmd=["sleep", "999"], output_parser=identity_parser)

executor.run(cmd)

mock_popen.assert_called_once()
mock_popen.return_value.wait.assert_not_called()
mock_popen.return_value.communicate.assert_not_called()

@patch("subprocess.Popen")
def test_passes_command_args_to_popen(self, mock_popen):
"""The exact command list is forwarded to Popen."""
executor = LocalDetachedExecutor()
expected_cmd = ["python", "-c", "print('hello')"]
cmd = Command[CommandResult](cmd=expected_cmd, output_parser=identity_parser)

executor.run(cmd)

call_args = mock_popen.call_args
assert call_args[0][0] == expected_cmd

@patch("subprocess.Popen")
def test_stdout_stderr_redirected_to_devnull(self, mock_popen):
"""stdout and stderr are DEVNULL to avoid unclosed pipe handles."""
import subprocess as sp

executor = LocalDetachedExecutor()
cmd = Command[CommandResult](cmd=["echo", "hi"], output_parser=identity_parser)

executor.run(cmd)

call_kwargs = mock_popen.call_args[1]
assert call_kwargs["stdout"] == sp.DEVNULL
assert call_kwargs["stderr"] == sp.DEVNULL

@patch("subprocess.Popen")
def test_shell_is_false(self, mock_popen):
"""shell=False is enforced to prevent shell injection."""
executor = LocalDetachedExecutor()
cmd = Command[CommandResult](cmd=["echo", "hi"], output_parser=identity_parser)

executor.run(cmd)

call_kwargs = mock_popen.call_args[1]
assert call_kwargs["shell"] is False

@patch("subprocess.Popen")
def test_uses_provided_cwd(self, mock_popen, tmp_path: Path):
"""Custom cwd is passed through to Popen."""
executor = LocalDetachedExecutor(cwd=tmp_path)
cmd = Command[CommandResult](cmd=["echo", "hi"], output_parser=identity_parser)

executor.run(cmd)

call_kwargs = mock_popen.call_args[1]
assert call_kwargs["cwd"] == tmp_path

@patch("subprocess.Popen")
def test_defaults_cwd_to_getcwd(self, mock_popen):
"""When cwd is omitted, cwd defaults to os.getcwd()."""
import os

executor = LocalDetachedExecutor()
assert executor.cwd == os.getcwd()

@patch("subprocess.Popen")
def test_uses_provided_env(self, mock_popen):
"""Custom env dict is passed through to Popen."""
custom_env = {"MY_VAR": "my_value"}
executor = LocalDetachedExecutor(env=custom_env)
cmd = Command[CommandResult](cmd=["echo", "hi"], output_parser=identity_parser)

executor.run(cmd)

call_kwargs = mock_popen.call_args[1]
assert call_kwargs["env"] == custom_env

@patch("subprocess.Popen")
def test_satisfies_executor_protocol(self, mock_popen):
"""LocalDetachedExecutor satisfies the Executor protocol."""
executor = LocalDetachedExecutor()
assert isinstance(executor, Executor)

@patch("subprocess.Popen")
def test_placeholder_result_does_not_raise_on_check_returncode(self, mock_popen):
"""The placeholder CommandResult does not raise when check_returncode() is called."""
executor = LocalDetachedExecutor()
cmd = Command[CommandResult](cmd=["echo", "hi"], output_parser=identity_parser)

result = executor.run(cmd)
result.check_returncode() # must not raise


# ============================================================================
# BonsaiApp Tests
# ============================================================================
Expand Down
Loading