Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@
},
"[toml]": {
"editor.defaultFormatter": "tamasfe.even-better-toml",
}
},
"python-envs.pythonProjects": []
}
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ dev = [
"mock",
"jwcrypto",
"deepdiff",
"dls-dodal",
]

[project.scripts]
Expand Down
38 changes: 33 additions & 5 deletions src/blueapi/client/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from concurrent.futures import Future
from pathlib import Path

from bluesky_stomp.messaging import MessageContext, StompClient
from bluesky_stomp.models import Broker
Expand All @@ -8,7 +9,11 @@
start_as_current_span,
)

from blueapi.config import ApplicationConfig, MissingStompConfigurationError
from blueapi.config import (
ApplicationConfig,
ConfigLoader,
MissingStompConfigurationError,
)
from blueapi.core.bluesky_types import DataEvent
from blueapi.service.authentication import SessionManager
from blueapi.service.model import (
Expand Down Expand Up @@ -48,8 +53,10 @@ def __init__(
self._rest = rest
self._events = events

@classmethod
def from_config(cls, config: ApplicationConfig) -> "BlueapiClient":
@staticmethod
def config_to_rest_and_events(
config: ApplicationConfig,
) -> tuple[BlueapiRestClient, EventBusClient | None]:
session_manager: SessionManager | None = None
try:
session_manager = SessionManager.from_cache(config.auth_token_path)
Expand All @@ -67,9 +74,30 @@ def from_config(cls, config: ApplicationConfig) -> "BlueapiClient":
)
)
events = EventBusClient(client)
return cls(rest, events)
return rest, events
else:
return cls(rest)
return rest, None

@classmethod
def from_config(cls, config: ApplicationConfig) -> "BlueapiClient":
rest, events = BlueapiClient.config_to_rest_and_events(config)
return cls(rest, events)

@staticmethod
def load_config_from_yaml(blueapi_config_path: str | Path) -> ApplicationConfig:
if not isinstance(blueapi_config_path, Path):
blueapi_config_path = Path(blueapi_config_path)
config_loader = ConfigLoader(ApplicationConfig)
config_loader.use_values_from_yaml(blueapi_config_path)
loaded_config = config_loader.load()

return loaded_config

@classmethod
def from_yaml(cls, blueapi_config_path: str | Path) -> "BlueapiClient":
loaded_config = BlueapiClient.load_config_from_yaml(blueapi_config_path)
rest, events = BlueapiClient.config_to_rest_and_events(loaded_config)
return cls(rest, events)

@start_as_current_span(TRACER)
def get_plans(self) -> PlanResponse:
Expand Down
213 changes: 213 additions & 0 deletions src/blueapi/client/user_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
import time
import warnings
from collections.abc import Callable
from pathlib import Path

from bluesky.callbacks.best_effort import BestEffortCallback

from blueapi.cli.updates import CliEventRenderer
from blueapi.client.client import BlueapiClient
from blueapi.client.event_bus import AnyEvent
from blueapi.client.rest import BlueskyRemoteControlError
from blueapi.core import DataEvent
from blueapi.service.model import TaskRequest
from blueapi.worker import ProgressEvent

warnings.filterwarnings("ignore") # callback complains about not running in main thread

# Currently matplotlib uses tkinter as default, tkinter must be in the main thread
# WebAgg does need ot be, so can allow LivePlots
# import matplotlib
# matplotlib.use("WebAgg")


class UserClient(BlueapiClient):
"""A client that can be easily used by the user, beamline scientist
in a scripts, for running bluesky plans.

Example usage:

blueapi_config_path = "/path/to/ixx_blueapi_config.yaml"

blueapi = UserClient(blueapi_config_path, "cm12345-1")
blueapi.run("count", detectors=["det1", "det2"])
blueapi.change_session("cm12345-2")

from dodal.plan_stubs.wrapped import move

blueapi.run(move, moves={"base.x": 0}) # move base.x to 0

or you can just use args:

blueapi.run(move, {"base.x": 0})

or you can also call the client as if it were a bluesky run-engine:

blueapi(move, {"base.x": 0})

"""

def __init__(
self,
blueapi_config_path: str | Path,
instrument_session: str,
callback: bool = True,
timeout: int | float | None = None,
non_callback_delay: int | float = 1,
):
self.instrument_session = instrument_session
self.callback = callback
self.retries = 5
self.timeout = timeout
self.non_callback_delay = non_callback_delay

loaded_config = BlueapiClient.load_config_from_yaml(blueapi_config_path)
rest, events = BlueapiClient.config_to_rest_and_events(loaded_config)
super().__init__(rest, events)

def __call__(self, plan: Callable, *args, **kwargs) -> None:
if not isinstance(plan, Callable): # incase user passes wrong argument
raise ValueError("Must be a bluesky plan function")

return self._run(plan, args, **kwargs)

def _convert_args_to_kwargs(self, plan: Callable, args: tuple) -> dict:
"""Converts args to kwargs
If the user does not give kwargs, but gives args the bluesky plan is passed
this function can infer the kwargs, build the kwargs and create the params
for TaskRequest"""
arg_names = plan.__code__.co_varnames
inferred_kwargs = {}

for key, val in zip(arg_names, args): # noqa intentionally not strict
inferred_kwargs[key] = val
params = inferred_kwargs
return params

def _args_and_kwargs_to_params(
self, plan: Callable, args: tuple, kwargs: dict
) -> dict:
"""
Creates the params needed for TaskRequest
"""
if not args and not kwargs:
params = {}
return params
elif kwargs and (not args):
params = kwargs
return params
elif args and (not kwargs):
params = self._convert_args_to_kwargs(plan, args)
return params
elif args and kwargs:
params = self._convert_args_to_kwargs(plan, args)
params.update(kwargs)
return params
else:
raise ValueError("Could not infer parameters from args and kwargs")

def _run(self, plan: Callable | str, *args, **kwargs) -> None:
"""Run a bluesky plan via BlueAPI.
plan can be a string, or the bluesky plan name

When used as a hidden method: a str can be passed.
This is to allow devs to tests plans that may not be on main branch"""

if isinstance(plan, Callable):
plan_name = plan.__name__
params = self._args_and_kwargs_to_params(plan, args=args, kwargs=kwargs)
elif isinstance(plan, str) and not args:
params = kwargs or {}
plan_name = plan
elif isinstance(plan, str) and args:
raise ValueError("If passing a str you must only pass kwargs")
else:
raise ValueError("Must be a bluesky plan or name of plan")

task = TaskRequest(
name=plan_name,
params=params,
instrument_session=self.instrument_session,
)
if self.callback:
self.send_with_callback(plan_name, task)
else:
self.send_without_callback(plan_name, task)

@property
def devices(self) -> list[str]:
"""Return a list of StandardReadable for the current beamline."""
devices = self.get_devices().devices
return [d.name for d in devices]

@property
def plans(self) -> list[str]:
"""Return a list of StandardReadable for the current beamline."""
plans = self.get_plans().plans
return [p.name for p in plans]

def change_session(self, new_session: str) -> None:
"""Change the instrument session for the client."""
print(f"New instrument session: {new_session}")
self.instrument_session = new_session

def show_plans(self) -> None:
"""Shows the bluesky plan names in a nice, human readable way"""
plans = self.plans
for plan in plans:
print(plan)
print(f"Total plans: {len(plans)} \n")

def show_devices(self) -> None:
"""Shows the devices in a nice, human readable way"""
devices = self.devices
for dev in devices:
print(dev)
print(f"Total devices: {len(devices)} \n")

def send_with_callback(self, plan_name: str, task: TaskRequest):
"""Sends a bluesky Task to blueapi with callback.
Callback allows LiveTable and LivePlot to be generated
"""
try:
progress_bar = CliEventRenderer()
callback = BestEffortCallback()

def on_event(event: AnyEvent) -> None:
if isinstance(event, ProgressEvent):
progress_bar.on_progress_event(event)
elif isinstance(event, DataEvent):
callback(event.name, event.doc)

resp = self.run_task(task, on_event=on_event, timeout=self.timeout)

if (
(resp.task_status is not None)
and (resp.task_status.task_complete)
and (not resp.task_status.task_failed)
):
print(f"{plan_name} succeeded")

return

except Exception as e:
raise Exception(f"Task could not run: {e}") from e

def send_without_callback(self, plan_name: str, task: TaskRequest):
"""Send the TaskRequest as a put request.
Because it does not have callback
It does not know if blueapi is busy.
So it will try multiple times with a delay"""
success = False

for _ in range(self.retries):
try:
server_task = self.create_and_start_task(task)
print(f"{plan_name} task sent as {server_task.task_id}")
success = True
return
except BlueskyRemoteControlError:
time.sleep(self.non_callback_delay)

if not success:
raise Exception("Task could not be executed")
Loading
Loading