Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/blueapi/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def __post_init__(self, configuration: ApplicationConfig | None):

path_provider = StartDocumentPathProvider()
set_path_provider(path_provider)
self.run_engine.subscribe(path_provider.update_run, "start")
self.run_engine.subscribe(path_provider.run_start, "start")
self.run_engine.subscribe(path_provider.run_stop, "stop")

def _update_scan_num(md: dict[str, Any]) -> int:
scan = numtracker.create_scan(
Expand Down
33 changes: 21 additions & 12 deletions src/blueapi/utils/path_provider.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pathlib import Path

from event_model import RunStart
from event_model.basemodels import RunStart, RunStop
from ophyd_async.core import PathInfo, PathProvider

DEFAULT_TEMPLATE = "{device_name}-{instrument}-{scan_id}"
Expand All @@ -17,15 +17,17 @@ class StartDocumentPathProvider(PathProvider):
"""

def __init__(self) -> None:
self._doc = {}
self._doc: RunStart | None = None

def update_run(self, name: str, start_doc: RunStart) -> None:
"""Cache a start document.

This can be plugged into the run engine's subscribe method.
"""
def run_start(self, name: str, start_document: RunStart) -> None:
if name == "start":
self._doc = start_doc
if self._doc is None:
self._doc = start_document

def run_stop(self, name: str, stop_document: RunStop) -> None:
if name == "stop":
if self._doc is not None and stop_document.run_start == self._doc.uid:
self._doc = None

def __call__(self, device_name: str | None = None) -> PathInfo:
"""Returns the directory path and filename for a given data_session.
Expand All @@ -36,7 +38,14 @@ def __call__(self, device_name: str | None = None) -> PathInfo:

If you do not provide a data_session_directory it will default to "/tmp".
"""
template = self._doc.get("data_file_path_template", DEFAULT_TEMPLATE)
sub_path = template.format_map(self._doc | {"device_name": device_name})
data_session_directory = Path(self._doc.get("data_session_directory", "/tmp"))
return PathInfo(directory_path=data_session_directory, filename=sub_path)
if self._doc is None:
raise AttributeError(
"Start document not found. This call must be made inside a run."
)
else:
doc = self._doc.model_dump()
template = doc.get("data_file_path_template", DEFAULT_TEMPLATE)
data_session_directory = Path(doc.get("data_session_directory", "/tmp"))
filename = template.format_map(doc | {"device_name": device_name})

return PathInfo(directory_path=data_session_directory, filename=filename)
Loading
Loading