Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/source/concurrency.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
Concurrency in LabThings-FastAPI
==================================

.. note::

This page attempts to describe several aspects of concurrency in LabThings. If you just want an answer to the question "how do I make sure only one thing happens at a time", skip to :ref:`global_locking`\ .

One of the major challenges when controlling hardware, particularly from web frameworks, is concurrency. Most web frameworks assume resources (database connections, object storage, etc.) may be instantiated multiple times, and often initialise or destroy objects as required. In contrast, hardware can usually only be controlled from one process, and usually is initialised and shut down only once.

LabThings-FastAPI instantiates each :class:`~lt.Thing` only once, and runs all code in a thread. More specifically, each time an action is invoked via HTTP, a new thread is created to run the action. Similarly, each time a property is read or written, a new thread is created to run the property method. This means that :class:`~lt.Thing` code should protect important variables or resources using locks from the `threading` module, and need not worry about writing asynchronous code.
Expand Down Expand Up @@ -30,3 +34,14 @@ Each time an action is run ("invoked" in :ref:`wot_cc`), we create a new thread
Usually, the best solution to this problem is to generate a new invocation ID for the thread. This means only the original action thread will receive cancellation events, and only the original action thread will log to the invocation logger. If the action is cancelled, you must cancel the background thread. This is the behaviour of `~lt.ThreadWithInvocationID`\ .

It is also possible to copy the current invocation ID to a new thread. This is often a bad idea, as it's ill-defined whether the exception will arise in the original thread or the new one if the invocation is cancelled. Logs from the two threads will also be interleaved. If it's desirable to log from the background thread, the invocation logger may safely be passed as an argument, rather than accessed via ``lt.get_invocation_logger``\ .

.. _global_locking:

Global locking
--------------

It is possible to add a global lock object to the `~lt.ThingServer` by specifying `enable_global_lock=True` either as an argument or in the configuration file. When this is enabled, only one action may run at a given time. Setting properties also requires the lock, so you may assume that property values will not change while your action is running (unless you set them from the action).

The `GlobalLock` is a work-a-like wrapper for `threading.RLock`\ . This means it can be acquired multiple times by the same thread - so actions can call other actions and set properties without worrying about locking, and everything is protected such that only one thread may make changes at a time.

It is possible for individual actions or properties to opt out of the global lock, by specifying `use_global_lock=False` either as an argument to `~lt.property` or `~lt.action` or by setting the `use_global_lock` attribute on a functional property (see :ref:`properties`). Note that actions or setters that are exempted from the lock may not call other actions or properties that are locked: this will usually time out with a `GlobalLockBusyError`\ .
24 changes: 19 additions & 5 deletions docs/source/public_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ This page summarises the parts of the LabThings API that should be most frequent
:no-index:

.. py:function:: property(getter: Callable[[Owner], Value]) -> FunctionalProperty[Owner, Value]
property(*, default: Value, readonly: bool = False, **constraints: Any) -> Value
property(*, default_factory: Callable[[], Value], readonly: bool = False, **constraints: Any) -> Value
property(*, default: Value, readonly: bool = False, use_global_lock: bool | None = None, **constraints: Any) -> Value
property(*, default_factory: Callable[[], Value], readonly: bool = False, use_global_lock: bool | None = None, **constraints: Any) -> Value

This function may be used to define :ref:`properties` either by decorating a function, or marking an attribute. Full documentation is available at `labthings_fastapi.properties.property` and a more in-depth discussion is available at :ref:`properties`\ . This page focuses on the most frequently used examples.

Expand Down Expand Up @@ -143,14 +143,14 @@ This page summarises the parts of the LabThings API that should be most frequent


.. py:function:: setting(getter: Callable[[Owner], Value]) -> FunctionalSetting[Owner, Value]
setting(*, default: Value, readonly: bool = False, **constraints: Any) -> Value
setting(*, default_factory: Callable[[], Value], readonly: bool = False, **constraints: Any) -> Value
setting(*, default: Value, readonly: bool = False, use_global_lock: bool | None = None, **constraints: Any) -> Value
setting(*, default_factory: Callable[[], Value], readonly: bool = False, use_global_lock: bool | None = None, **constraints: Any) -> Value

A setting is a property that is saved to disk. It is defined in the same way as `property` but will be synchronised with the `Thing`\ 's settings file. Full documentation is available at `labthings_fastapi.properties.setting`


.. py:decorator:: action
action(**kwargs: Any)
action(use_global_lock: bool | None = None, **kwargs: Any)

Mark a method of a `~lt.Thing` as a LabThings Action.

Expand Down Expand Up @@ -302,6 +302,17 @@ This page summarises the parts of the LabThings API that should be most frequent
.. automethod:: labthings_fastapi.thing_server_interface.ThingServerInterface.get_thing_states
:no-index:

.. py:property:: global_lock
:type GlobalLock | None:

A global lock object that is used to restrict concurrent execution of actions and setting of properties.

.. py:method:: hold_global_lock(*, error_if_unavailable: bool = True)

A context manager that holds the global lock. By default, an exception is raised if the global lock
is not enabled. ``error_if_unavailable`` may be used to suppress that error, in which case the
context manager silently does nothing if there is no global lock to acquire.


.. py:class:: ThingClassSettings

Expand Down Expand Up @@ -354,6 +365,9 @@ This page summarises the parts of the LabThings API that should be most frequent
.. autoattribute:: labthings_fastapi.server.config_model.ThingServerConfig.settings_folder
:no-index:

.. autoattribute:: labthings_fastapi.server.config_model.ThingServerConfig.enable_global_lock
:no-index:

.. autoattribute:: labthings_fastapi.server.config_model.ThingServerConfig.application_config
:no-index:

Expand Down
116 changes: 97 additions & 19 deletions src/labthings_fastapi/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
"""

from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
import datetime
import logging
from collections import deque
from functools import partial
from functools import partial, wraps
import inspect
from threading import Thread, Lock
import uuid
Expand All @@ -29,6 +31,7 @@
Callable,
Concatenate,
Generic,
Literal,
Optional,
ParamSpec,
TypeVar,
Expand All @@ -39,6 +42,7 @@
from fastapi import APIRouter, FastAPI, HTTPException, Request, Body, BackgroundTasks
from pydantic import BaseModel, create_model


from .middleware.url_for import URLFor
from .base_descriptor import (
BaseDescriptor,
Expand All @@ -49,6 +53,7 @@
from .utilities import model_to_dict, wrap_plain_types_in_rootmodel
from .invocations import InvocationModel, InvocationStatus
from .exceptions import (
GlobalLockBusyError,
InvocationCancelledError,
InvocationError,
NotConnectedToServerError,
Expand Down Expand Up @@ -287,19 +292,22 @@
# occur.
raise RuntimeError("Cannot start an invocation without a Thing.")

with self._status_lock:
self._status = InvocationStatus.RUNNING
self._start_time = datetime.datetime.now()
action.emit_changed_event(self.thing, self._status.value)
# The action's `context_for_func` context manager will acquire the
# global lock if needed.
with action.context_for_func(thing):
with self._status_lock:
self._status = InvocationStatus.RUNNING
self._start_time = datetime.datetime.now()
action.emit_changed_event(self.thing, self._status.value)

bound_method = action.__get__(thing)
# Actually run the action
ret = bound_method(**kwargs, **self.dependencies)
# Actually run the action
ret = action.func(thing, **kwargs, **self.dependencies)

with self._status_lock:
self._return_value = ret
self._status = InvocationStatus.COMPLETED
action.emit_changed_event(self.thing, self._status.value)

with self._status_lock:
self._return_value = ret
self._status = InvocationStatus.COMPLETED
action.emit_changed_event(self.thing, self._status.value)
except InvocationCancelledError:
logger.info(f"Invocation {self.id} was cancelled.")
with self._status_lock:
Expand All @@ -308,9 +316,17 @@
except Exception as e: # skipcq: PYL-W0703
# First log
if isinstance(e, InvocationError):
# Log without traceback
# Log without traceback for anticipated errors
logger.error(e)
elif (
isinstance(e, GlobalLockBusyError)
and self._status == InvocationStatus.PENDING
):
# The global lock timed out before the function started.
# In this case, don't print a traceback.
logger.warning(f"Global lock was busy: didn't run {action.name}.")
else:
# Other exceptions show up in the log with a traceback
logger.exception(e)
# Then set status
with self._status_lock:
Expand Down Expand Up @@ -505,8 +521,8 @@
with self._invocations_lock:
try:
invocation: Any = self._invocations[id]
except KeyError as e:
raise HTTPException(

Check warning on line 525 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

524-525 lines are not covered with tests
status_code=404,
detail="No action invocation found with ID {id}",
) from e
Expand All @@ -519,7 +535,7 @@
invocation.output.response
):
# TODO: honour "accept" header
return invocation.output.response()

Check warning on line 538 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

538 line is not covered with tests
return invocation.output

@router.delete(
Expand All @@ -544,8 +560,8 @@
with self._invocations_lock:
try:
invocation: Any = self._invocations[id]
except KeyError as e:
raise HTTPException(

Check warning on line 564 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

563-564 lines are not covered with tests
status_code=404,
detail="No action invocation found with ID {id}",
) from e
Expand Down Expand Up @@ -665,8 +681,9 @@
func: Callable[Concatenate[OwnerT, ActionParams], ActionReturn],
response_timeout: float = 1,
retention_time: float = 300,
use_global_lock: Literal[False] | None = None,
) -> None:
"""Create a new action descriptor.
r"""Create a new action descriptor.

The action descriptor wraps a method of a `~lt.Thing`. It may still be
called from Python in the same way, but it will also be added to the
Expand All @@ -683,6 +700,16 @@
of the action.
:param retention_time: how long, in seconds, the action should be kept
for after it has completed.
:param use_global_lock: If the global lock is enabled,
this parameter may be used to opt out. See :ref:`global_locking`
for details of how the global lock is implemented.

If this parameter is `False` then the lock will not be acquired, even
if global locking is enabled. That is appropriate if the action does
not have side effects that would cause problems for other actions, or
if more nuanced locking behaviour is required meaning the lock is
acquired directly in the action code, for example using
`~lt.ThingServerInterface.hold_global_lock`\ .
"""
super().__init__()
self.func = func
Expand All @@ -692,6 +719,7 @@
name = func.__name__ # this is checked in __set_name__
self.response_timeout = response_timeout
self.retention_time = retention_time
self.use_global_lock = use_global_lock
self.dependency_params = fastapi_dependency_params(func)
self.input_model = input_model_from_signature(
func,
Expand Down Expand Up @@ -720,24 +748,74 @@
"""
super().__set_name__(owner, name)
if self.name != self.func.__name__:
raise ValueError(

Check warning on line 751 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

751 line is not covered with tests
f"Action name '{self.name}' does not match function name "
f"'{self.func.__name__}'",
)

@contextmanager
def context_for_func(self, obj: OwnerT) -> Iterator[None]:
"""Create the context in which ``func`` runs.

Currently, if global locking is enabled and this action hasn't opted out,
this context manager will hold the global lock for the duration of the
action.

This method is intended to create a hook for pre-run set-up and post-run
clean-up code that may be customised by `Thing` implementations in the future,
such as acquiring locks or other resources.

When an action is run from Python code as ``thing.action()`` this context
manager is entered before executing `func` bound to the `Thing` instance.

When an action is run from HTTP, this context manager is entered while the
action's status is ``pending`` and the status changes to ``running`` just
before `func` (the function decorated by `~lt.action`) runs. This allows
some slightly nicer error handling, for example not cluttering the log with
stack traces if an action can't start because the global lock is in use.

:param obj: The object on which the method is being called.
:return: the function, wrapped if necessary.
Comment thread
rwb27 marked this conversation as resolved.
"""
with obj._thing_server_interface._optionally_hold_global_lock(
self.use_global_lock
):
yield

def instance_get(self, obj: OwnerT) -> Callable[ActionParams, ActionReturn]:
"""Return the function, bound to an object as for a normal method.
"""Return the function, bound to an object and wrapped in a context manager.

Accessing a regular Python method returns the method bound to the instance,
i.e. the `self` argument is supplied.

This currently doesn't validate the arguments, though it may do so
in future. In its present form, this is equivalent to a regular
Python method, i.e. all we do is supply the first argument, `self`.
LabThings Actions work the same way, but they also wrap the function in a
context manager. Currently, this context manager will handle acquiring the
global lock if required.

If locking is disabled, the context manager does nothing.
If locking is enabled, we return a wrapped function that holds the
global lock while the action runs.

.. note::

The returned function will hold a reference to both `obj` and `self`
(this descriptor). Given that accessing ``instance.method`` returns
a function that's already bound to the instance, this shouldn't cause
any problems.

:param obj: the `~lt.Thing` to which we are attached. This will be
the first argument supplied to the function wrapped by this
descriptor.
:return: the action function, bound to ``obj``.
"""
return partial(self.func, obj)

@wraps(self.func)
def wrapped(*args: Any, **kwargs: Any) -> Any: # noqa: DOC
"""Acquire the lock then run `func` with supplied arguments."""
with self.context_for_func(obj):
return self.func(*args, **kwargs)

return partial(wrapped, obj)

def _observers_set(self, obj: Thing) -> WeakSet:
"""Return a set used to notify changes.
Expand Down Expand Up @@ -862,14 +940,14 @@
try:
responses[200]["model"] = self.output_model
pass
except AttributeError:
print(f"Failed to generate response model for action {self.name}")

Check warning on line 944 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

943-944 lines are not covered with tests
# Add an additional media type if we may return a file
if hasattr(self.output_model, "media_type"):
responses[200]["content"][self.output_model.media_type] = {}

Check warning on line 947 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

947 line is not covered with tests
# Now we can add the endpoint to the app.
if thing.path is None:
raise NotConnectedToServerError(

Check warning on line 950 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

950 line is not covered with tests
"Can't add the endpoint without thing.path!"
)
app.post(
Expand Down Expand Up @@ -917,7 +995,7 @@
"""
path = path or thing.path
if path is None:
raise NotConnectedToServerError("Can't generate forms without a path!")

Check warning on line 998 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

998 line is not covered with tests
forms = [
Form[ActionOp](href=path + self.name, op=[ActionOp.invokeaction]),
]
Expand Down
2 changes: 1 addition & 1 deletion src/labthings_fastapi/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@
:raise KeyError: if there is no link with the specified ``rel`` value.
"""
if "links" not in obj:
raise ObjectHasNoLinksError(f"Can't find any links on {obj}.")

Check warning on line 62 in src/labthings_fastapi/client/__init__.py

View workflow job for this annotation

GitHub Actions / coverage

62 line is not covered with tests
try:
return next(link for link in obj["links"] if link["rel"] == rel)
except StopIteration as e:
raise KeyError(f"No link was found with rel='{rel}' on {obj}.") from e

Check warning on line 66 in src/labthings_fastapi/client/__init__.py

View workflow job for this annotation

GitHub Actions / coverage

65-66 lines are not covered with tests


def invocation_href(invocation: dict) -> str:
Expand Down Expand Up @@ -171,7 +171,7 @@
"""
response = self.client.put(urljoin(self.path, path), json=value)
if response.is_error:
detail = response.json().get("detail")
detail = response.json().get("detail", None)
err_msg = "Unknown error"
if isinstance(detail, str):
err_msg = detail
Expand Down
18 changes: 18 additions & 0 deletions src/labthings_fastapi/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,15 @@ class InvalidClassSettingsError(ValueError):
"""


class FeatureNotEnabledError(RuntimeError):
"""A feature is being used that is currently disabled.

Some new or optional features must be enabled in the server settings or in
`~lt.Thing._class_settings` before they can be used.
This error is raised if a feature is used when it is not enabled.
"""
Comment thread
rwb27 marked this conversation as resolved.


class PropertyRedefinitionError(AttributeError):
"""A property is being incorrectly redefined.

Expand All @@ -363,3 +372,12 @@ class DefaultWillChangeWarning(DeprecationWarning):
A default value will change in the future. This warning can usually be eliminated
by setting the value explicitly.
"""


class GlobalLockBusyError(TimeoutError):
Comment thread
rwb27 marked this conversation as resolved.
"""The global lock is already in use.

This exception is raised when code needs the global lock but cannot acquire
it. It indicates that the LabThings server is busy running another action or
property setter.
"""
Loading
Loading