Skip to content

fix: harden resume checkpoint handling#624

Open
nabinchha wants to merge 7 commits intomainfrom
fix/resume-hardening-623
Open

fix: harden resume checkpoint handling#624
nabinchha wants to merge 7 commits intomainfrom
fix/resume-hardening-623

Conversation

@nabinchha
Copy link
Copy Markdown
Contributor

@nabinchha nabinchha commented May 8, 2026

Summary

  • Harden resume checkpoints by persisting config fingerprints in metadata, writing metadata atomically, and rejecting unsafe resume states such as allow_resize=True and already post-processed datasets.
  • Make async resume recover actual persisted row counts from parquet metadata for early-shutdown salvage scenarios.
  • Unify sync + async resume: both engines now derive num_completed_batches and actual_num_records from parquet-files/batch_*.parquet via _recover_progress_from_disk. metadata.json keeps describing the run configuration (buffer_size, target_num_records, original_target_num_records, config fingerprint); the filesystem is the source of truth for progress. The sync engine additionally rejects non-contiguous batch IDs.
  • Document the unified resume strategy in architecture and Fern docs.
  • Document DatasetCreationResults observability scope on resume: methods that read the artifact directory (load_dataset, count_records, load_analysis, export, push_to_hub) reflect the full on-disk dataset, while task_traces, model-usage logs, and telemetry events are scoped to the current invocation only.
  • Address review feedback: collapse the unreachable IF_POSSIBLE guard in _post_generation_processed_resume_result, split the post-processed raise so num_records < prior_target and the extension case get accurate messages, and drop the dead _find_completed_row_group_ids helper.

Fixes #623

Test plan

  • uv run pytest packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py -k "resume or allow_resize or post_generation or non_contiguous or recovers_progress"
  • uv run pytest packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py -k "metadata"
  • uv run pytest packages/data-designer/tests/interface/test_data_designer.py packages/data-designer/tests/interface/test_results.py
  • uv run pytest packages/data-designer-engine/tests/ (full engine suite)
  • uv run ruff format --check <touched files>
  • uv run ruff check <touched files>
  • fern check (not run: Fern CLI is not installed in this environment)

Persist config identity in metadata, make checkpoints atomic, and reject unsafe resume states so interrupted runs do not mix incompatible or post-processed data.
@nabinchha nabinchha requested a review from a team as a code owner May 8, 2026 22:10
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 8, 2026

PR #624 Review — fix: harden resume checkpoint handling

Summary

This PR hardens the resume checkpoint pipeline to address several crash-recovery and correctness issues:

  • Atomic metadata writesmetadata.json is now written via tmp file + fsync + os.replace, so a crash mid-write cannot leave a partially written checkpoint.
  • Config-fingerprint in metadata — every metadata write is stamped with config_hash, config_hash_algo, and config_hash_version. Resume compatibility checks read metadata.json first and fall back to builder_config.json only for older artifacts.
  • Single-writer lock — a new ArtifactStorage.dataset_lock() context manager creates an exclusive .data_designer.lock file (O_CREAT | O_EXCL) for the life of a run so two processes cannot write to the same dataset directory.
  • Unsafe-resume rejection — resume now fails fast for allow_resize=True columns (row boundaries are non-deterministic) and for datasets that already completed process_after_generation() (schema/row-count may have changed).
  • Real parquet row counts on resume — the async engine scans parquet-files/batch_*.parquet and reads parquet metadata to recover the actual persisted row count per row-group, which matters for partial-salvage scenarios.
  • Better error on corrupt metadata — a json.JSONDecodeError while loading metadata.json now raises a dedicated DatasetGenerationError instead of being silently treated as "assume compatible."
  • Docs updated in architecture/dataset-builders.md, the Fern docs (architecture-and-performance.mdx, processors.mdx).

Findings

🟠 Bug — outer lock caches resolved_dataset_name prematurely

In data_designer.py:268+, create() now acquires the lock before calling builder.build():

with builder.artifact_storage.dataset_lock():
    builder.build(num_records=num_records, resume=resume)

dataset_lock() calls mkdir_if_needed(self.base_dataset_path) (artifact_storage.py:252), which accesses base_dataset_path → the cached_property resolved_dataset_name. This is the exact premature-caching the existing _check_resume_config_compatibility comment warns against ("must NOT access base_dataset_path … which would cache resolved_dataset_name prematurely").

Inside build(), when resume == IF_POSSIBLE and the stored config fingerprint is incompatible, the code pops the cache and re-resolves to a new timestamped directory:

self.artifact_storage.resume = ResumeMode.NEVER
self.artifact_storage.__dict__.pop("resolved_dataset_name", None)

After this switch, base_dataset_path now points at foo_MM-DD-YYYY_HHMMSS/ — but the outer lock file was already created in the original foo/ directory. The new timestamped directory is unlocked, and the lock file in foo/ protects a directory the run never writes to.

In practice two concurrent runs will race to unique timestamps and not collide, so this is unlikely to cause data corruption today. But the lock semantics are broken in the IF_POSSIBLE + incompatible path, which is the single most interesting case for a lock to protect (both runs are targeting the same dataset_name). Worth fixing.

Suggested fix: either (a) move the outer lock inside build() after the compat-check / mode switch and re-acquire a narrower lock around profiling, or (b) expose a helper that resolves the final directory without creating it, lock that, and pass the path through. Option (a) is probably simpler — the inner dataset_lock() already exists.

🟡 Redundant nested lock when called from DataDesigner.create()

With the outer lock in data_designer.py plus the inner dataset_lock() inside DatasetBuilder.build(), the inner lock is effectively a no-op re-entry through _lock_depth in the common path. It only matters when build() is called directly (tests, future callers). That's fine, but the two-layer locking is easy to misread. A comment at either site pointing out the re-entry contract would save the next reader some time.

🟡 _lock_depth counter is not thread-safe

_lock_depth is a bare int; increments/decrements are not atomic. Today DataDesigner runs single-threaded control flow, so this doesn't matter — but if the same ArtifactStorage is ever shared across threads (e.g. a future profiler that reads metadata from a worker thread), re-entrant locks could desynchronize. Worth a one-line docstring note that dataset_lock() is re-entrant per-process but not thread-safe.

🟡 Stale lock requires manual cleanup

dataset_lock() relies on unlink(missing_ok=True) in the finally block. If the process is SIGKILLed or the host crashes, the lock file remains and the next run fails with the "remove the stale lock if no process is active" message. The PID is written to the file, so a future enhancement could auto-detect staleness (e.g. psutil.pid_exists), but the current message is clear and actionable — flagging only as follow-up, not a blocker.

🟡 post_generation_processed flag is only set if there are AFTER_GENERATION processors

if self._processor_runner.has_processors_for(ProcessorStage.AFTER_GENERATION):
    self.artifact_storage.update_metadata({"post_generation_processed": True})

If a config has no AFTER_GENERATION processors, the flag is never set, so _post_generation_processed_resume_result never fires. That's correct (nothing was applied). But if a user re-runs the same dataset with a newly added AFTER_GENERATION processor, that processor will run on the completed dataset — which is what the PR description says is intended ("can change row counts, schemas, row-group boundaries"). Please confirm this is the desired behavior; a docstring or test pinning this contract would prevent future regressions.

🟢 _post_generation_processed_resume_result early-return bypasses task_traces initialization

When this method returns early, build() returns the final dataset path without entering the inner with block. data_designer.py:create() then accesses builder.task_traces, builder.actual_num_records, builder.first_non_retryable_error, and builder.early_shutdown on the builder. These need to be initialized to safe defaults (empty list, -1, None, False) before build() is called for the early-return path to work end-to-end. Worth adding an integration-style test that drives the whole DataDesigner.create() flow against a completed+post-processed dataset (the current test only patches _initialize_generators_and_graph on the builder).

🟢 read_metadata() used in two places, different exception-handling contracts

  • _load_resume_state (dataset_builder.py:402) now catches json.JSONDecodeError and raises DatasetGenerationError with "metadata.json is corrupt."
  • _post_generation_processed_resume_result catches (FileNotFoundError, json.JSONDecodeError) and silently returns None.

Catching JSONDecodeError silently in the post-generation check means a corrupt metadata slips past this gate and is only caught later in _load_resume_state. That's probably fine (the real resume path hits it), but it's asymmetric with the compat-check path which raises on JSONDecodeError. Consider routing all corrupt-metadata reads through a single helper so the behavior is consistent.

🟢 Style nit — typing.Iterator

artifact_storage.py imports Iterator from typing. The codebase already uses modern typing elsewhere; collections.abc.Iterator is preferred for runtime-usable generic aliases on 3.9+. Ruff's UP035 should flag this if it's in the rule set — not worth blocking on, but easy to fix.

🟢 Tests are solid

The new tests cover the important invariants:

  • test_resume_rejects_allow_resize_columns
  • test_build_resume_raises_on_corrupt_metadata
  • test_build_resume_post_generation_processed_same_target_returns_existing_path
  • test_build_resume_post_generation_processed_extension_raises
  • test_initial_actual_num_records_uses_actual_parquet_rows_for_partial_row_group
  • test_artifact_storage_write_metadata_includes_defaults
  • test_artifact_storage_dataset_lock_rejects_second_writer

The parquet fixtures were upgraded from empty text files to real parquet with row counts, which is necessary for the new read_metadata(p).num_rows path. Good hygiene.

Missing coverage:

  • End-to-end DataDesigner.create() flow against a completed+post-processed dataset (sanity-check for the early-return path through builder attributes).
  • Lock behavior across the IF_POSSIBLE + incompatible-config mode switch (this is exactly the bug in the first finding).
  • Thread-reentrancy behavior of _lock_depth, if the contract is "single-threaded" that should be documented/tested.

Docs

  • architecture/dataset-builders.md — new "Resume Checkpointing" section is accurate and matches the code.
  • fern/.../architecture-and-performance.mdx — clean summary of resume modes and invariants.
  • fern/.../processors.mdx — the new warning about process_after_generation() being terminal is clear.

Verdict

Request changes (minor) before merge. The PR is a clear improvement in crash-recovery hardness, and most of the design is well-considered. Please address the outer-lock premature-caching issue (the IF_POSSIBLE + incompatible path leaves the run unlocked) before merging — that's the only finding I'd treat as blocking. The rest are follow-ups or style nits.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 8, 2026

Greptile Summary

This PR hardens resume checkpoint handling by making metadata writes atomic, unifying sync and async progress recovery from parquet files on disk, and adding new guards that reject unsafe resume states (allow_resize=True columns, partially post-processed datasets).

  • Atomic metadata + unified progress recovery: write_metadata now uses a tmp-file + fsync + os.replace pattern to survive mid-write crashes. Both engines derive num_completed_batches and actual_num_records from parquet-files/batch_*.parquet via _recover_progress_from_disk, so a crash between writing a parquet file and updating metadata no longer causes double-counted records.
  • New safety guards: _post_generation_processed_resume_result short-circuits or raises for terminal post-processed datasets; the allow_resize=True check raises before generation begins; the sync engine additionally rejects non-contiguous batch IDs from disk; config fingerprints are now persisted in metadata.json so compatibility checks no longer need to parse builder_config.json on the hot path.
  • Documentation and tests: Architecture docs, Fern docs, and processors docs are updated to describe the new invariants; new tests cover all added code paths including the crash-window metadata-lags-disk scenario and partial row-group salvage.

Confidence Score: 5/5

Safe to merge — all new guards are correctly sequenced after IF_POSSIBLE normalisation, atomic writes cover the crash-mid-write hazard, and the unified disk-scan progress recovery is thoroughly tested.

The control flow is carefully ordered: IF_POSSIBLE is always normalised to ALWAYS or NEVER before the new post-processing guard, allow_resize check, and _use_async determination run. Progress recovery reads actual parquet row counts rather than recomputing from buffer arithmetic. The atomic write pattern is implemented correctly. Test coverage is comprehensive across all new branches.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py Core resume hardening: adds _post_generation_processed_resume_result guard, allow_resize rejection, unified _recover_progress_from_disk for sync+async, _find_completed_row_groups returning real parquet row counts, and config-fingerprint-first compat check.
packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py Adds atomic metadata write (tmp + fsync + os.replace with finally-guarded cleanup), set_metadata_defaults for fingerprint injection, and _metadata_defaults merged with caller dict on every write_metadata call.
packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py Comprehensive new test coverage: allow_resize guards, post-processed terminal-state paths, corrupt metadata handling, non-contiguous batch ID rejection, metadata-lags-disk crash-window scenario, and partial row-group salvage.
packages/data-designer/src/data_designer/interface/data_designer.py Splits the single try/except into two blocks so DeprecationWarning from builder construction propagates correctly under strict warning filters.
packages/data-designer/src/data_designer/interface/results.py Documentation-only update clarifying resume scope for DatasetCreationResults.
packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py Adds test verifying that set_metadata_defaults fields are merged into every write_metadata call.
architecture/dataset-builders.md New Resume Checkpointing section documenting the unified metadata/filesystem split and new invariants.

Reviews (6): Last reviewed commit: "docs: explain DeprecationWarning re-rais..." | Re-trigger Greptile

Comment thread packages/data-designer/src/data_designer/interface/data_designer.py Outdated
nabinchha added 4 commits May 8, 2026 16:35
Let IF_POSSIBLE start fresh for resize configs and mark after-generation processing before mutation so interrupted processors cannot be resumed unsafely.
Single-user CLI/notebook flows don't race on the artifact directory, and
the timestamped-directory fallback already handles the "ran it twice"
case. The lock added complexity (re-entrancy, stale cleanup, the
cached-property trap where IF_POSSIBLE→NEVER moves writes to a
timestamped directory while the lock stays pinned to the original) for
no real protection. Atomic metadata writes still cover the actual hazard
(crash mid-write).

Also fix a pre-existing test bug in
test_initial_actual_num_records_uses_actual_parquet_rows_for_partial_row_group
where the mocked scheduler hit the partial-completion path with
unconfigured Mock attributes.
* Drop the unreachable ResumeMode.IF_POSSIBLE branch in
  _post_generation_processed_resume_result. By the time this helper
  runs, build() has normalised IF_POSSIBLE to ALWAYS or NEVER, so the
  guard now matches reality. Tighten the docstring to document the
  three outcomes (no-op return / fall through / raise).

* Split the post-processed extension/raise into two cases. When
  num_records < prior_target the user just asked for fewer records than
  already exist; the previous "would mix pre- and post-processor
  records" message only describes the extension case. Mirror the
  wording used by _load_resume_state and add a regression test.

* Remove the dead _find_completed_row_group_ids wrapper now that
  _build_async uses _find_completed_row_groups directly. Rename the
  related test to match.
Both engines now derive `num_completed_batches` and `actual_num_records`
from `parquet-files/batch_*.parquet` via `_recover_progress_from_disk`.
`metadata.json` keeps describing the run *configuration* (`buffer_size`,
`target_num_records`, `original_target_num_records`, config fingerprint),
while the filesystem is the source of truth for *progress*. This closes
the sync engine's race window between `move_partial_result_to_final_file_path`
and the metadata write that follows it, matching the crash-recovery the
async engine already had.

The sync engine additionally rejects non-contiguous batch IDs (a hole can
only mean external mutation or a directory written by an incompatible
engine); the async engine continues to tolerate gaps from out-of-order
completion via `allow_holes=True`.

Existing sync resume tests now seed parquet files alongside metadata,
and two new tests cover the unified behaviour: filesystem progress wins
when metadata lags, and sync rejects non-contiguous IDs.
nabinchha added 2 commits May 8, 2026 17:38
`load_dataset`, `count_records`, `load_analysis`, `export`, and `push_to_hub`
all read from the artifact directory, so they reflect the cumulative dataset
(original + resume rows). `task_traces`, model-usage logs, and telemetry
events are scoped to the current invocation only because the original run's
in-memory state is not persisted. Document this in the class docstring,
the architecture note, and the Fern resume guide.
Future readers were puzzled by the ``except DeprecationWarning: raise``
short-circuits before the generic generation-error wrappers. Add a
comment in ``create()`` (with a back-reference from ``preview()``) to
record that strict warning filters (``pytest.warns``,
``-W error::DeprecationWarning``) turn the engine's
``warnings.warn(..., DeprecationWarning)`` calls — most notably the
``allow_resize=True`` deprecation in ``_resolve_async_compatibility`` —
into raised exceptions, and we want them to surface untouched instead of
being swallowed by ``DataDesignerGenerationError``.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Harden resume checkpointing and edge cases

1 participant