-
Notifications
You must be signed in to change notification settings - Fork 1
Add BuildCompiler.plating: generate PUDU/Opentrons plating protocol from transformation results #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| import subprocess | ||
| import tempfile | ||
| import zipfile | ||
| import re | ||
| from pathlib import Path | ||
|
|
||
| def assembly_plan_RDF_to_JSON(file, output_path: str | Path | None = None): | ||
|
|
@@ -251,3 +252,132 @@ def run_manual_script_with_json_to_zip( | |
| z.write(p, arcname=p.relative_to(tmpdir)) | ||
|
|
||
| return out_zip | ||
|
|
||
|
|
||
| def load_json_or_dict(value): | ||
| """Load JSON input from a dictionary, JSON string, or JSON file path.""" | ||
| if isinstance(value, dict): | ||
| return value | ||
|
|
||
| if isinstance(value, Path): | ||
| candidate = value | ||
| elif isinstance(value, str): | ||
| candidate = Path(value) | ||
| else: | ||
| raise ValueError("Expected a dict, JSON string, or path to a JSON file.") | ||
|
|
||
| if candidate.exists(): | ||
| with candidate.open("r", encoding="utf-8") as infile: | ||
| return json.load(infile) | ||
|
|
||
| try: | ||
| return json.loads(str(value)) | ||
| except json.JSONDecodeError as exc: | ||
| raise ValueError( | ||
| "Could not parse input as JSON string or JSON file path." | ||
| ) from exc | ||
|
|
||
|
|
||
| def normalize_plating_data(transformation_results): | ||
| """Normalize transformation results to {'bacterium_locations': {...}}.""" | ||
| data = load_json_or_dict(transformation_results) | ||
| if not isinstance(data, dict): | ||
| raise ValueError("Transformation results must be a JSON object.") | ||
|
|
||
| key_aliases = ( | ||
| "bacterium_locations", | ||
| "strain_locations", | ||
| "thermocycler_wells", | ||
| ) | ||
| for key in key_aliases: | ||
| if key in data: | ||
| well_mapping = data[key] | ||
| if not isinstance(well_mapping, dict) or not well_mapping: | ||
| raise ValueError(f"'{key}' must be a non-empty object.") | ||
| return {"bacterium_locations": well_mapping} | ||
|
|
||
| well_pattern = re.compile(r"^[A-H](?:[1-9]|1[0-2])$") | ||
| if data and all(well_pattern.match(str(k)) for k in data.keys()): | ||
| return {"bacterium_locations": data} | ||
|
|
||
| raise ValueError( | ||
| "Unsupported transformation results format. Expected one of: " | ||
| "{'bacterium_locations': {...}}, {'strain_locations': {...}}, " | ||
| "{'thermocycler_wells': {...}}, or a raw well mapping like {'A1': 'strain_1'}." | ||
| ) | ||
|
|
||
|
|
||
| def write_plating_protocol_script(output_path, plating_data, advanced_params): | ||
| """Write a self-contained PUDU plating runner script.""" | ||
| script_path = Path(output_path) | ||
| script_text = ( | ||
| "from pudu.plating import Plating\n\n" | ||
| f"PLATING_DATA = {json.dumps(plating_data, indent=4)}\n\n" | ||
| f"ADVANCED_PARAMS = {json.dumps(advanced_params, indent=4)}\n\n" | ||
|
Comment on lines
+315
to
+316
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||
| "if __name__ == '__main__':\n" | ||
| " protocol = Plating(plating_data=PLATING_DATA, json_params=ADVANCED_PARAMS)\n" | ||
| " protocol.run()\n" | ||
| ) | ||
| script_path.write_text(script_text, encoding="utf-8") | ||
| return script_path | ||
|
|
||
|
|
||
| def run_opentrons_script_to_zip( | ||
| opentrons_script_path: str | Path, | ||
| plating_json_path: str | Path, | ||
| zip_name: str | None = None, | ||
| overwrite: bool = False, | ||
| ) -> Path: | ||
| """Run opentrons_simulate and zip protocol artifacts and logs.""" | ||
| script_path = Path(opentrons_script_path).resolve() | ||
| json_path = Path(plating_json_path).resolve() | ||
|
|
||
| if not script_path.exists(): | ||
| raise FileNotFoundError(f"Opentrons script not found: {script_path}") | ||
| if not json_path.exists(): | ||
| raise FileNotFoundError(f"JSON file not found: {json_path}") | ||
|
|
||
| out_dir = script_path.parent | ||
| base_name = zip_name or f"{script_path.stem}_opentrons_simulation.zip" | ||
| out_zip = out_dir / base_name | ||
|
|
||
| if out_zip.exists() and not overwrite: | ||
| stem = out_zip.stem | ||
| suffix = out_zip.suffix | ||
| i = 1 | ||
| while True: | ||
| candidate = out_dir / f"{stem}_{i}{suffix}" | ||
| if not candidate.exists(): | ||
| out_zip = candidate | ||
| break | ||
| i += 1 | ||
|
|
||
| with tempfile.TemporaryDirectory() as tmpdirname: | ||
| tmpdir = Path(tmpdirname) | ||
| tmp_script = tmpdir / script_path.name | ||
| tmp_json = tmpdir / json_path.name | ||
| shutil.copy2(script_path, tmp_script) | ||
| shutil.copy2(json_path, tmp_json) | ||
|
|
||
| proc = subprocess.run( | ||
| ["opentrons_simulate", str(tmp_script)], | ||
| capture_output=True, | ||
| cwd=tmpdir, | ||
| ) | ||
|
|
||
| (tmpdir / "simulate_stdout.txt").write_text( | ||
| (proc.stdout or b"").decode("utf-8", errors="replace"), encoding="utf-8" | ||
| ) | ||
| (tmpdir / "simulate_stderr.txt").write_text( | ||
| (proc.stderr or b"").decode("utf-8", errors="replace"), encoding="utf-8" | ||
| ) | ||
| (tmpdir / "simulate_returncode.txt").write_text( | ||
| str(proc.returncode), encoding="utf-8" | ||
| ) | ||
|
|
||
| with zipfile.ZipFile(out_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf: | ||
| for p in tmpdir.rglob("*"): | ||
| if p.is_file(): | ||
| zf.write(p, arcname=p.relative_to(tmpdir)) | ||
|
|
||
| return out_zip | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| import json | ||
| import tempfile | ||
| import unittest | ||
| import zipfile | ||
| import os | ||
| import sys | ||
| from pathlib import Path | ||
| from unittest.mock import patch | ||
|
|
||
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) | ||
|
|
||
| from buildcompiler.buildcompiler import BuildCompiler | ||
| from buildcompiler.robotutils import ( | ||
| normalize_plating_data, | ||
| run_opentrons_script_to_zip, | ||
| ) | ||
|
|
||
|
|
||
| class TestPlatingNormalization(unittest.TestCase): | ||
| def test_accepts_bacterium_locations_shape(self): | ||
| data = {"bacterium_locations": {"A1": "strain_1"}} | ||
| normalized = normalize_plating_data(data) | ||
| self.assertEqual(normalized, data) | ||
|
|
||
| def test_accepts_strain_locations_shape(self): | ||
| normalized = normalize_plating_data({"strain_locations": {"A1": "strain_1"}}) | ||
| self.assertEqual(normalized, {"bacterium_locations": {"A1": "strain_1"}}) | ||
|
|
||
| def test_accepts_thermocycler_wells_shape(self): | ||
| normalized = normalize_plating_data({"thermocycler_wells": {"B2": "strain_2"}}) | ||
| self.assertEqual(normalized, {"bacterium_locations": {"B2": "strain_2"}}) | ||
|
|
||
| def test_accepts_raw_well_map_shape(self): | ||
| normalized = normalize_plating_data({"C3": "strain_3"}) | ||
| self.assertEqual(normalized, {"bacterium_locations": {"C3": "strain_3"}}) | ||
|
|
||
| def test_invalid_shape_raises_value_error(self): | ||
| with self.assertRaises(ValueError): | ||
| normalize_plating_data({"unexpected": {"A1": "strain_1"}}) | ||
|
|
||
|
|
||
| class TestBuildCompilerPlating(unittest.TestCase): | ||
| def test_plating_writes_json_and_script(self): | ||
| compiler = BuildCompiler.__new__(BuildCompiler) | ||
|
|
||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| results_dir = Path(tmpdir) / "plating_results" | ||
|
|
||
| with patch( | ||
| "buildcompiler.buildcompiler.run_opentrons_script_to_zip", | ||
| return_value=results_dir / "plating.zip", | ||
| ): | ||
| output = compiler.plating( | ||
| transformation_results={"A1": "strain_1"}, | ||
| results_dir=results_dir, | ||
| advanced_params={"target_colonies": 12}, | ||
| ) | ||
|
|
||
| self.assertTrue(Path(output["plating_json"]).exists()) | ||
| self.assertTrue(Path(output["protocol_script"]).exists()) | ||
| self.assertTrue(output["simulation_zip"].endswith("plating.zip")) | ||
|
|
||
| script_text = Path(output["protocol_script"]).read_text(encoding="utf-8") | ||
| self.assertIn("from pudu.plating import Plating", script_text) | ||
| self.assertIn("json_params=ADVANCED_PARAMS", script_text) | ||
| self.assertNotIn("advanced_params=ADVANCED_PARAMS", script_text) | ||
|
|
||
| payload = json.loads(Path(output["plating_json"]).read_text(encoding="utf-8")) | ||
| self.assertIn("plating_data", payload) | ||
| self.assertIn("advanced_params", payload) | ||
| self.assertEqual( | ||
| payload["plating_data"], {"bacterium_locations": {"A1": "strain_1"}} | ||
| ) | ||
|
|
||
|
|
||
| class TestPlatingSimulationZip(unittest.TestCase): | ||
| def test_run_opentrons_script_to_zip_with_monkeypatched_subprocess(self): | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| tmpdir_path = Path(tmpdir) | ||
| script_path = tmpdir_path / "run_plating.py" | ||
| script_path.write_text("print('hello')\n", encoding="utf-8") | ||
|
|
||
| json_path = tmpdir_path / "plating_input.json" | ||
| json_path.write_text(json.dumps({"plating_data": {}}), encoding="utf-8") | ||
|
|
||
| class ProcResult: | ||
| stdout = b"simulated stdout" | ||
| stderr = b"simulated stderr" | ||
| returncode = 0 | ||
|
|
||
| with patch("buildcompiler.robotutils.subprocess.run", return_value=ProcResult()): | ||
| zip_path = run_opentrons_script_to_zip(script_path, json_path, zip_name="sim.zip") | ||
|
|
||
| self.assertTrue(zip_path.exists()) | ||
|
|
||
| with zipfile.ZipFile(zip_path, "r") as zf: | ||
| names = set(zf.namelist()) | ||
| self.assertIn("run_plating.py", names) | ||
| self.assertIn("plating_input.json", names) | ||
| self.assertIn("simulate_stdout.txt", names) | ||
| self.assertIn("simulate_stderr.txt", names) | ||
| self.assertIn("simulate_returncode.txt", names) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
load_json_or_dicttreats every string as a path first and callsPath.exists()beforejson.loads; for large but valid JSON strings (e.g., many well mappings), this can raiseOSError: [Errno 36] File name too longand abort parsing. The function should attempt JSON parsing (or catchOSErrorand fall back) so valid JSON string input does not fail based on filename limits.Useful? React with 👍 / 👎.