Skip to content
Merged
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ developer = [
"pytest", # Test code functionality
]
instrument-server = [
"aiohttp",
"fastapi[standard]",
"python-jose[cryptography]",
"uvicorn[standard]",
Expand Down
5 changes: 5 additions & 0 deletions src/murfey/client/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@
)
self.post_transfer(transferred_file)
self.queue.task_done()
self.notify(final=True)

def _xml_file(self, data_file: Path) -> Path:
if not self._environment:
Expand Down Expand Up @@ -432,6 +433,10 @@
logger.info(f"Analyser thread starting for {self}")
self.thread.start()

def request_stop(self):
self._stopping = True
self._halt_thread = True

Check warning on line 438 in src/murfey/client/analyser.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/analyser.py#L437-L438

Added lines #L437 - L438 were not covered by tests

def stop(self):
logger.debug("Analyser thread stop requested")
self._stopping = True
Expand Down
42 changes: 41 additions & 1 deletion src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Dict, List, Optional
from urllib.parse import urlparse

import aiohttp

Check warning on line 12 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L12

Added line #L12 was not covered by tests
import requests

import murfey.client.websocket
Expand All @@ -34,6 +35,8 @@
rsync_url: str = ""
rsync_module: str = "data"
demo: bool = False
dormant: bool = False
multigrid_watcher_active: bool = True

Check warning on line 39 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L38-L39

Added lines #L38 - L39 were not covered by tests
processing_enabled: bool = True
do_transfer: bool = True
dummy_dc: bool = False
Expand Down Expand Up @@ -95,6 +98,37 @@
register_client=False,
)

def _multigrid_watcher_finalised(self):
self.multigrid_watcher_active = False
self.dormancy_check()

Check warning on line 103 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L101-L103

Added lines #L101 - L103 were not covered by tests

async def dormancy_check(self):

Check warning on line 105 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L105

Added line #L105 was not covered by tests
if not self.multigrid_watcher_active:
if (
all(r._finalised for r in self.rsync_processes.values())
and not any(a.thread.is_alive() for a in self.analysers.values())
and not any(
w.thread.is_alive() for w in self._environment.watchers.values()
)
):
async with aiohttp.ClientSession() as clientsession:
async with clientsession.delete(

Check warning on line 115 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L114-L115

Added lines #L114 - L115 were not covered by tests
f"{self._environment.url.geturl()}/sessions/{self.session_id}",
json={"access_token": self.token, "token_type": "bearer"},
) as response:
success = response.status == 200

Check warning on line 119 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L119

Added line #L119 was not covered by tests
if not success:
log.warning(f"Could not delete database data for {self.session_id}")
self.dormant = True

Check warning on line 122 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L121-L122

Added lines #L121 - L122 were not covered by tests

def finalise(self):

Check warning on line 124 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L124

Added line #L124 was not covered by tests
for a in self.analysers.values():
a.request_stop()

Check warning on line 126 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L126

Added line #L126 was not covered by tests
for w in self._environment.watchers.values():
w.request_stop()

Check warning on line 128 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L128

Added line #L128 was not covered by tests
for p in self.rsync_processes.keys():
self._finalise_rsyncer(p)

Check warning on line 130 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L130

Added line #L130 was not covered by tests

def _start_rsyncer_multigrid(
self,
source: Path,
Expand Down Expand Up @@ -165,7 +199,9 @@
def _finalise_rsyncer(self, source: Path):
finalise_thread = threading.Thread(
name=f"Controller finaliser thread ({source})",
target=self.rsync_processes[source].finalise,
target=partial(
self.rsync_processes[source].finalise, callback=self.dormancy_check
),
kwargs={"thread": False},
daemon=True,
)
Expand Down Expand Up @@ -297,6 +333,7 @@
)
else:
self.analysers[source].subscribe(self._data_collection_form)
self.analysers[source].subscribe(self.dormancy_check, final=True)

Check warning on line 336 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L336

Added line #L336 was not covered by tests
self.analysers[source].start()
if transfer:
self.rsync_processes[source].subscribe(self.analysers[source].enqueue)
Expand Down Expand Up @@ -336,6 +373,9 @@
),
secondary=True,
)
self._environment.watchers[source].subscribe(

Check warning on line 376 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L376

Added line #L376 was not covered by tests
self.dormancy_check, final=True
)
self._environment.watchers[source].start()

def _data_collection_form(self, response: dict):
Expand Down
12 changes: 10 additions & 2 deletions src/murfey/client/rsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import time
from enum import Enum
from pathlib import Path
from typing import Callable, List, NamedTuple
from typing import Awaitable, Callable, List, NamedTuple
from urllib.parse import ParseResult

from murfey.client.tui.status_bar import StatusBar
Expand Down Expand Up @@ -75,6 +75,7 @@
self._local = local
self._server_url = server_url
self._notify = notify
self._finalised = False

Check warning on line 78 in src/murfey/client/rsync.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/rsync.py#L78

Added line #L78 was not covered by tests

# Set rsync destination
if local:
Expand Down Expand Up @@ -181,7 +182,11 @@
self.thread.join()
logger.debug("RSync thread stop completed")

def finalise(self, thread: bool = True):
def finalise(
self,
thread: bool = True,
callback: Callable[..., Awaitable[None] | None] | None = None,
):
self.stop()
self._remove_files = True
self._notify = False
Expand All @@ -196,6 +201,9 @@
self.stop()
else:
self._transfer(list(self._basepath.glob("**/*")))
self._finalised = True

Check warning on line 204 in src/murfey/client/rsync.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/rsync.py#L204

Added line #L204 was not covered by tests
if callback:
callback()

Check warning on line 206 in src/murfey/client/rsync.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/rsync.py#L206

Added line #L206 was not covered by tests

def enqueue(self, file_path: Path):
if not self._stopping:
Expand Down
5 changes: 5 additions & 0 deletions src/murfey/client/watchdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
log.info(f"DirWatcher thread starting for {self}")
self.thread.start()

def request_stop(self):
self._stopping = True
self._halt_thread = True

Check warning on line 68 in src/murfey/client/watchdir.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/watchdir.py#L67-L68

Added lines #L67 - L68 were not covered by tests

def stop(self):
log.debug("DirWatcher thread stop requested")
self._stopping = True
Expand Down Expand Up @@ -90,6 +94,7 @@
modification_time=modification_time, transfer_all=self._transfer_all
)
time.sleep(15)
self.notify(final=True)

Check warning on line 97 in src/murfey/client/watchdir.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/watchdir.py#L97

Added line #L97 was not covered by tests

def scan(self, modification_time: float | None = None, transfer_all: bool = False):
"""
Expand Down
2 changes: 2 additions & 0 deletions src/murfey/client/watchdir_multigrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,5 @@
if first_loop:
first_loop = False
time.sleep(15)

self.notify(final=True)

Check warning on line 118 in src/murfey/client/watchdir_multigrid.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/watchdir_multigrid.py#L118

Added line #L118 was not covered by tests
13 changes: 13 additions & 0 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@
if controllers.get(session_id) is not None:
return {"success": True}
label = watcher_spec.label
for sid, controller in controllers.items():
if controller.dormant:
del controllers[sid]

Check warning on line 149 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L149

Added line #L149 was not covered by tests
controllers[session_id] = MultigridController(
[],
watcher_spec.visit,
Expand Down Expand Up @@ -176,6 +179,9 @@
destination_overrides=watcher_spec.destination_overrides,
)
)
watchers[session_id].subscribe(

Check warning on line 182 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L182

Added line #L182 was not covered by tests
controllers[session_id]._multigrid_watcher_finalised, final=True
)
watchers[session_id].start()
return {"success": True}

Expand Down Expand Up @@ -213,6 +219,13 @@
return {"success": True}


@router.post("/sessions/{session_id}/finalise_session")
def finalise_session(session_id: MurfeySessionID):
watchers[session_id].request_stop()
controllers[session_id].finalise()
return {"success": True}

Check warning on line 226 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L222-L226

Added lines #L222 - L226 were not covered by tests


@router.post("/sessions/{session_id}/restart_rsyncer")
def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource):
controllers[session_id]._restart_rsyncer(rsyncer_source.source)
Expand Down
21 changes: 21 additions & 0 deletions src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,27 @@
return data


@router.post("/sessions/{session_id}/finalise_session")
async def finalise_session(session_id: MurfeySessionID, db=murfey_db):
data = {}
instrument_name = (

Check warning on line 359 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L358-L359

Added lines #L358 - L359 were not covered by tests
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
machine_config = get_machine_config(instrument_name=instrument_name)[

Check warning on line 362 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L362

Added line #L362 was not covered by tests
instrument_name
]
if machine_config.instrument_server_url:
async with aiohttp.ClientSession() as clientsession:
async with clientsession.post(

Check warning on line 367 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L366-L367

Added lines #L366 - L367 were not covered by tests
f"{machine_config.instrument_server_url}/sessions/{session_id}/finalise_session",
headers={
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
},
) as resp:
data = await resp.json()
return data

Check warning on line 374 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L373-L374

Added lines #L373 - L374 were not covered by tests


@router.post("/sessions/{session_id}/remove_rsyncer")
async def remove_rsyncer(
session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db
Expand Down
30 changes: 24 additions & 6 deletions src/murfey/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,31 @@
def __init__(self):
self._listeners: list[Callable[..., Awaitable[None] | None]] = []
self._secondary_listeners: list[Callable[..., Awaitable[None] | None]] = []
self._final_listeners: list[Callable[..., Awaitable[None] | None]] = []
super().__init__()

def subscribe(
self, fn: Callable[..., Awaitable[None] | None], secondary: bool = False
self,
fn: Callable[..., Awaitable[None] | None],
secondary: bool = False,
final: bool = False,
):
if secondary:
if final:
self._final_listeners.append(fn)

Check warning on line 241 in src/murfey/util/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/util/__init__.py#L241

Added line #L241 was not covered by tests
elif secondary:
self._secondary_listeners.append(fn)
else:
self._listeners.append(fn)

async def anotify(self, *args, secondary: bool = False, **kwargs) -> None:
async def anotify(
self, *args, secondary: bool = False, final: bool = False, **kwargs
) -> None:
awaitables: list[Awaitable] = []
listeners = self._secondary_listeners if secondary else self._listeners
listeners = (
self._secondary_listeners
if secondary
else self._final_listeners if final else self._listeners
)
for notify_function in listeners:
result = notify_function(*args, **kwargs)
if result is not None and inspect.isawaitable(result):
Expand All @@ -253,9 +265,15 @@
for awaitable in asyncio.as_completed(awaitables):
await awaitable

def notify(self, *args, secondary: bool = False, **kwargs) -> None:
def notify(
self, *args, secondary: bool = False, final: bool = False, **kwargs
) -> None:
awaitables: list[Awaitable] = []
listeners = self._secondary_listeners if secondary else self._listeners
listeners = (
self._secondary_listeners
if secondary
else self._final_listeners if final else self._listeners
)
for notify_function in listeners:
result = notify_function(*args, **kwargs)
if result is not None and inspect.isawaitable(result):
Expand Down
3 changes: 3 additions & 0 deletions src/murfey/util/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@
self,
fn: Callable[[str, T | None], Awaitable[None] | None],
secondary: bool = False,
final: bool = False,
):
if secondary:
self._secondary_listeners.append(fn)
elif final:
self._final_listeners.append(fn)

Check warning on line 77 in src/murfey/util/state.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/util/state.py#L77

Added line #L77 was not covered by tests
else:
self._listeners.append(fn)

Expand Down