Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ jobs:
tests/integration/test_tiny_stage_5_artifacts.py
tests/integration/test_tiny_pipeline_e2e.py
tests/integration/test_tiny_pipeline_h5_e2e.py
tests/integration/local_h5/
tests/integration/build_outputs/
tests/integration/test_modal_pipeline_seams.py
tests/integration/test_tiny_h5_pipeline.py
tests/integration/test_modal_pipeline_e2e.py
Expand Down
1 change: 1 addition & 0 deletions changelog.d/927.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added run-scoped local H5 worker bootstrap artifacts for deterministic worker setup metadata.
61 changes: 60 additions & 1 deletion modal_app/local_area.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

from modal_app.images import cpu_image as image # noqa: E402
from modal_app.resilience import reconcile_run_dir_fingerprint # noqa: E402
from policyengine_us_data.build_outputs.bootstrap import ( # noqa: E402
WorkerBootstrapBuilder,
)
from policyengine_us_data.build_outputs.fingerprinting import ( # noqa: E402
FingerprintingService,
PublishingInputBundle,
Expand Down Expand Up @@ -444,6 +447,48 @@ def _resolve_scope_fingerprint(
return computed_fingerprint


@pipeline_node(
PipelineNode(
id="build_worker_bootstrap",
label="Build Worker Bootstrap",
node_type="library",
description="Persist deterministic local H5 worker setup artifacts for one scope.",
source_file="modal_app/local_area.py",
status="current",
stability="moving",
pathways=["local_h5"],
api_refs=[
"policyengine_us_data.build_outputs.bootstrap.WorkerBootstrapBuilder"
],
artifacts_out=[
"bootstrap/{scope}/worker_bootstrap.json",
"bootstrap/{scope}/entity_graph.npz",
],
validation_commands=["uv run pytest tests/unit/test_modal_local_area.py"],
)
)
def _build_worker_bootstrap(
*,
inputs: PublishingInputBundle,
scope: str,
artifacts_dir: Path,
scope_fingerprint: str | None = None,
):
"""Persist optional worker bootstrap artifacts for one local H5 scope."""

bundle = WorkerBootstrapBuilder().build(
inputs=inputs,
scope=scope,
artifacts_dir=artifacts_dir,
scope_fingerprint=scope_fingerprint,
)
print(
f"Worker bootstrap ready for {scope}: "
f"{bundle.manifest_path.relative_to(artifacts_dir)}"
)
return bundle


@pipeline_node(
PipelineNode(
id="coordinate_work_partition",
Expand Down Expand Up @@ -633,7 +678,7 @@ def run_phase(
artifacts_out=["one or more H5 files"],
validation_commands=[
"uv run pytest tests/unit/test_modal_local_area.py",
"uv run pytest tests/integration/local_h5/test_worker_script_tiny_fixture.py",
"uv run pytest tests/integration/build_outputs/h5_worker_runtime/test_worker_script_tiny_fixture.py",
],
)
)
Expand Down Expand Up @@ -1094,6 +1139,13 @@ def coordinate_publish(
print(f"Inputs unchanged ({fingerprint}), resuming...")
else:
print(f"Prepared staging directory for fingerprint {fingerprint}")
_build_worker_bootstrap(
inputs=fingerprint_inputs,
scope="regional",
artifacts_dir=artifacts,
scope_fingerprint=fingerprint,
)
pipeline_volume.commit()
staging_volume.commit()
if work_items_override is None:
result = subprocess.run(
Expand Down Expand Up @@ -1378,6 +1430,13 @@ def coordinate_national_publish(
)
run_dir = staging_dir / run_id
run_dir.mkdir(parents=True, exist_ok=True)
_build_worker_bootstrap(
inputs=fingerprint_inputs,
scope="national",
artifacts_dir=artifacts,
scope_fingerprint=fingerprint,
)
pipeline_volume.commit()
national_h5 = run_dir / "national" / "US.h5"

work_items = [{"type": "national", "id": "US"}]
Expand Down
4 changes: 2 additions & 2 deletions policyengine_us_data/build_outputs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
Modules in this package should land only when they become active runtime
seams rather than speculative placeholders. The current early slices support
H5 output request construction, exact calibration geography loading,
fingerprinting, clone-weight shape contracts, worker partitioning, and source
dataset snapshot contracts.
fingerprinting, clone-weight shape contracts, worker partitioning, source
dataset snapshot contracts, and introduced worker-bootstrap artifacts.
"""
Loading