Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@
return {"success": True}


class RSyncerInfo(BaseModel):
class ObserverInfo(BaseModel):

Check warning on line 242 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L242

Added line #L242 was not covered by tests
source: str
num_files_transferred: int
num_files_in_queue: int
Expand All @@ -248,11 +248,11 @@


@router.get("/sessions/{session_id}/rsyncer_info")
def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]:
def get_rsyncer_info(session_id: MurfeySessionID) -> list[ObserverInfo]:

Check warning on line 251 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L251

Added line #L251 was not covered by tests
info = []
for k, v in controllers[session_id].rsync_processes.items():
info.append(
RSyncerInfo(
ObserverInfo(
source=str(k),
num_files_transferred=v._files_transferred,
num_files_in_queue=v.queue.qsize(),
Expand All @@ -263,6 +263,22 @@
return info


@router.get("/sessions/{session_id}/analyser_info")
def get_analyser_info(session_id: MurfeySessionID) -> list[ObserverInfo]:
info = []

Check warning on line 268 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L266-L268

Added lines #L266 - L268 were not covered by tests
for k, v in controllers[session_id].analysers.items():
info.append(

Check warning on line 270 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L270

Added line #L270 was not covered by tests
ObserverInfo(
source=str(k),
num_files_transferred=0,
num_files_in_queue=v.queue.qsize(),
alive=v.thread.is_alive(),
stopping=v._stopping,
)
)
return info

Check warning on line 279 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L279

Added line #L279 was not covered by tests


class ProcessingParameters(BaseModel):
gain_ref: str
dose_per_frame: Optional[float] = None
Expand Down
50 changes: 40 additions & 10 deletions src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,11 @@
source: str
num_files_transferred: int
num_files_in_queue: int
num_files_to_analyse: int
alive: bool
stopping: bool
analyser_alive: bool
analyser_stopping: bool
destination: str
tag: str
files_transferred: int
Expand All @@ -469,7 +472,8 @@
async def get_rsyncer_info(
instrument_name: str, session_id: MurfeySessionID, db=murfey_db
) -> List[RSyncerInfo]:
data = []
rsyncer_list = []
analyser_list = []

Check warning on line 476 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L475-L476

Added lines #L475 - L476 were not covered by tests
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
Expand All @@ -486,27 +490,53 @@
headers={"Authorization": f"Bearer {token}"},
) as resp:
if resp.status == 200:
data = await resp.json()
rsyncer_list = await resp.json()

Check warning on line 493 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L493

Added line #L493 was not covered by tests
else:
data = []
rsyncer_list = []

Check warning on line 495 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L495

Added line #L495 was not covered by tests
except KeyError:
data = []
rsyncer_list = []

Check warning on line 497 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L497

Added line #L497 was not covered by tests
except Exception:
log.warning(
"Exception encountered gathering rsyncer info from the instrument server",
exc_info=True,
)

try:
async with lock:
token = instrument_server_tokens[session_id]["access_token"]
async with aiohttp.ClientSession() as clientsession:
async with clientsession.get(

Check warning on line 508 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L504-L508

Added lines #L504 - L508 were not covered by tests
f"{machine_config.instrument_server_url}/sessions/{session_id}/analyser_info",
headers={"Authorization": f"Bearer {token}"},
) as resp:
if resp.status == 200:
analyser_list = await resp.json()

Check warning on line 513 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L513

Added line #L513 was not covered by tests
else:
analyser_list = []
except KeyError:
analyser_list = []
except Exception:
log.warning(

Check warning on line 519 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L515-L519

Added lines #L515 - L519 were not covered by tests
"Exception encountered gathering analyser info from the instrument server",
exc_info=True,
)

combined_data = []
data_source_lookup = {d["source"]: d for d in data}
rsyncer_source_lookup = {d["source"]: d for d in rsyncer_list}
analyser_source_lookup = {d["source"]: d for d in analyser_list}

Check warning on line 526 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L525-L526

Added lines #L525 - L526 were not covered by tests
for ri in rsync_instances:
d = data_source_lookup.get(ri.source, {})
rsync_data = rsyncer_source_lookup.get(ri.source, {})
analyser_data = analyser_source_lookup.get(ri.source, {})

Check warning on line 529 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L528-L529

Added lines #L528 - L529 were not covered by tests
combined_data.append(
RSyncerInfo(
source=ri.source,
num_files_transferred=d.get("num_files_transferred", 0),
num_files_in_queue=d.get("num_files_in_queue", 0),
alive=d.get("alive", False),
stopping=d.get("stopping", True),
num_files_transferred=rsync_data.get("num_files_transferred", 0),
num_files_in_queue=rsync_data.get("num_files_in_queue", 0),
num_files_to_analyse=analyser_data.get("num_files_in_queue", 0),
alive=rsync_data.get("alive", False),
stopping=rsync_data.get("stopping", True),
analyser_alive=analyser_data.get("alive", False),
analyser_stopping=analyser_data.get("stopping", True),
destination=ri.destination,
tag=ri.tag,
files_transferred=ri.files_transferred,
Expand Down