Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/murfey/client/contexts/spa_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ def post_transfer(
},
)

elif transferred_file.suffix == ".dm" and environment:
elif (
transferred_file.suffix == ".dm"
and transferred_file.name.startswith("GridSquare")
and environment
):
gs_name = transferred_file.stem.split("_")[1]
fh_positions = _foil_hole_positions(transferred_file, int(gs_name))
source = _get_source(transferred_file, environment=environment)
Expand Down
3 changes: 3 additions & 0 deletions src/murfey/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2948,7 +2948,10 @@ def feedback_callback(header: dict, message: dict) -> None:
else:
# Send it directly to DLQ without trying to rerun it
_transport_object.transport.nack(header, requeue=False)
if not result:
logger.error(f"Workflow {message['register']} returned {result}")
Comment thread Fixed
return None
logger.error(f"No workflow found for {message['register']}")
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
if _transport_object:
_transport_object.transport.nack(header, requeue=False)
return None
Expand Down
2 changes: 1 addition & 1 deletion src/murfey/server/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def flush_spa_processing(
visit_name: str, session_id: MurfeySessionID, tag: Tag, db=murfey_db
):
zocalo_message = {
"register": "flush_spa_preprocess",
"register": "spa.flush_spa_preprocess",
"session_id": session_id,
"tag": tag.tag,
}
Expand Down
92 changes: 51 additions & 41 deletions src/murfey/workflows/spa/flush_spa_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,19 @@

from murfey.server import _murfey_id, _transport_object, sanitise
from murfey.server.api.auth import MurfeySessionID
from murfey.server.murfey_db import murfey_db
from murfey.util.config import get_machine_config, get_microscope
from murfey.util.db import DataCollectionGroup, FoilHole, GridSquare
from murfey.util.db import (
AutoProcProgram,
DataCollection,
DataCollectionGroup,
FoilHole,
GridSquare,
Movie,
PreprocessStash,
ProcessingJob,
)
from murfey.util.db import Session as MurfeySession
from murfey.util.db import SPAFeedbackParameters, SPARelionParameters
from murfey.util.models import FoilHoleParameters, GridSquareParameters
from murfey.util.processing_params import default_spa_parameters
from murfey.util.spa_metadata import (
Expand All @@ -30,10 +40,10 @@ def register_grid_square(
session_id: MurfeySessionID,
gsid: int,
grid_square_params: GridSquareParameters,
db=murfey_db,
murfey_db: Session,
):
try:
grid_square = db.exec(
grid_square = murfey_db.exec(
select(GridSquare)
.where(GridSquare.name == gsid)
.where(GridSquare.tag == grid_square_params.tag)
Expand All @@ -51,7 +61,7 @@ def register_grid_square(
_transport_object.do_update_grid_square(grid_square.id, grid_square_params)
except Exception:
if _transport_object:
dcg = db.exec(
dcg = murfey_db.exec(
select(DataCollectionGroup)
.where(DataCollectionGroup.session_id == session_id)
.where(DataCollectionGroup.tag == grid_square_params.tag)
Expand Down Expand Up @@ -90,19 +100,19 @@ def register_grid_square(
pixel_size=grid_square_params.pixel_size,
image=secured_grid_square_image_path,
)
db.add(grid_square)
db.commit()
db.close()
murfey_db.add(grid_square)
murfey_db.commit()
murfey_db.close()


def register_foil_hole(
session_id: MurfeySessionID,
gs_name: int,
foil_hole_params: FoilHoleParameters,
db=murfey_db,
murfey_db: Session,
):
try:
gs = db.exec(
gs = murfey_db.exec(
select(GridSquare)
.where(GridSquare.tag == foil_hole_params.tag)
.where(GridSquare.session_id == session_id)
Expand All @@ -120,7 +130,7 @@ def register_foil_hole(
else:
jpeg_size = (0, 0)
try:
foil_hole = db.exec(
foil_hole = murfey_db.exec(
select(FoilHole)
.where(FoilHole.name == foil_hole_params.name)
.where(FoilHole.grid_square_id == gsid)
Expand Down Expand Up @@ -180,9 +190,9 @@ def register_foil_hole(
pixel_size=foil_hole_params.pixel_size,
image=secured_foil_hole_image_path,
)
db.add(foil_hole)
db.commit()
db.close()
murfey_db.add(foil_hole)
murfey_db.commit()
murfey_db.close()


def _grid_square_metadata_file(f: Path, grid_square: int) -> Optional[Path]:
Expand All @@ -198,11 +208,11 @@ def _grid_square_metadata_file(f: Path, grid_square: int) -> Optional[Path]:


def _flush_position_analysis(
movie_path: Path, dcg_id: int, session_id: int, db: Session
movie_path: Path, dcg_id: int, session_id: int, murfey_db: Session
) -> Optional[int]:
"""Register a grid square and foil hole in the database"""
data_collection_group = murfey_db.exec(
select(db.DataCollectionGroup).where(db.DataCollectionGroup.id == dcg_id)
select(DataCollectionGroup).where(DataCollectionGroup.id == dcg_id)
).one()

# Work out the grid square and associated metadata file
Expand Down Expand Up @@ -281,17 +291,19 @@ def _flush_position_analysis(
return foil_hole


def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False):
def flush_spa_preprocess(message: dict, murfey_db: Session, demo: bool = False) -> bool:
session_id = message["session_id"]
stashed_files = murfey_db.exec(
select(db.PreprocessStash)
.where(db.PreprocessStash.session_id == session_id)
.where(db.PreprocessStash.tag == message["tag"])
select(PreprocessStash)
.where(PreprocessStash.session_id == session_id)
.where(PreprocessStash.tag == message["tag"])
).all()
if not stashed_files:
return None
return True
instrument_name = (
murfey_db.exec(select(db.Session).where(db.Session.id == message["session_id"]))
murfey_db.exec(
select(MurfeySession).where(MurfeySession.id == message["session_id"])
)
.one()
.instrument_name
)
Expand All @@ -301,32 +313,30 @@ def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False):
recipe_name = machine_config.recipes.get("em-spa-preprocess", "em-spa-preprocess")
collected_ids = murfey_db.exec(
select(
db.DataCollectionGroup,
db.DataCollection,
db.ProcessingJob,
db.AutoProcProgram,
DataCollectionGroup,
DataCollection,
ProcessingJob,
AutoProcProgram,
)
.where(db.DataCollectionGroup.session_id == session_id)
.where(db.DataCollectionGroup.tag == message["tag"])
.where(db.DataCollection.dcg_id == db.DataCollectionGroup.id)
.where(db.ProcessingJob.dc_id == db.DataCollection.id)
.where(db.AutoProcProgram.pj_id == db.ProcessingJob.id)
.where(db.ProcessingJob.recipe == recipe_name)
.where(DataCollectionGroup.session_id == session_id)
.where(DataCollectionGroup.tag == message["tag"])
.where(DataCollection.dcg_id == DataCollectionGroup.id)
.where(ProcessingJob.dc_id == DataCollection.id)
.where(AutoProcProgram.pj_id == ProcessingJob.id)
.where(ProcessingJob.recipe == recipe_name)
).one()
params = murfey_db.exec(
select(db.SPARelionParameters, db.SPAFeedbackParameters)
.where(db.SPARelionParameters.pj_id == collected_ids[2].id)
.where(db.SPAFeedbackParameters.pj_id == db.SPARelionParameters.pj_id)
select(SPARelionParameters, SPAFeedbackParameters)
.where(SPARelionParameters.pj_id == collected_ids[2].id)
.where(SPAFeedbackParameters.pj_id == SPARelionParameters.pj_id)
).one()
proc_params = params[0]
feedback_params = params[1]
if not proc_params:
logger.warning(
f"No SPA processing parameters found for client processing job ID {collected_ids[2].id}"
)
raise ValueError(
"No processing parameters were found in the database when flushing SPA preprocessing"
)
return False

murfey_ids = _murfey_id(
collected_ids[3].id,
Expand All @@ -348,7 +358,7 @@ def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False):
movie_path=f.file_path,
dcg_id=collected_ids[0].id,
session_id=session_id,
db=db,
murfey_db=murfey_db,
)
except Exception as e:
logger.error(
Expand All @@ -361,7 +371,7 @@ def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False):
ppath = Path(f.file_path)
if not mrcp.parent.exists():
mrcp.parent.mkdir(parents=True)
movie = db.Movie(
movie = Movie(
murfey_id=murfey_ids[2 * i],
path=f.file_path,
image_number=f.image_number,
Expand Down Expand Up @@ -407,4 +417,4 @@ def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False):
)
murfey_db.commit()
murfey_db.close()
return None
return True