Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions architecture/dataset-builders.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ Manages in-memory row buffers and persistence:
- Updates dataset metadata between batches
- The async path uses `RowGroupBufferManager` for per-row-group DataFrames and checkpointing

### Resume Checkpointing

`DatasetBuilder.build(..., resume=ResumeMode.*)` can continue an interrupted run from the last durable checkpoint:

- `ResumeMode.NEVER` always starts a fresh run, using a timestamped dataset directory when needed.
- `ResumeMode.ALWAYS` resumes the existing dataset directory and raises on incompatible state.
- `ResumeMode.IF_POSSIBLE` resumes when the persisted config fingerprint matches; otherwise it starts a fresh timestamped run.

Checkpoint state lives in `metadata.json`. Each metadata write includes the config fingerprint (`config_hash`, `config_hash_algo`, and `config_hash_version`) so compatibility checks do not need to deserialize `builder_config.json` for the common path. `builder_config.json` remains the human-readable record of the run configuration and the fallback for older datasets.

Both engines resume the same way: they scan `parquet-files/batch_*.parquet` and read parquet metadata to recover the completed row-group IDs and their actual persisted row counts. `metadata.json` remains the source of truth for the run *configuration* (`buffer_size`, `target_num_records`, `original_target_num_records`, config fingerprint), but the filesystem is the source of truth for *progress* (`num_completed_batches`, `actual_num_records`). Splitting the two sources is what lets resume survive a crash between writing a batch parquet and updating metadata — the filesystem reflects the durable state even when metadata lags by a step. Reading actual row counts also matters for async early-shutdown salvage, where a completed parquet file can contain fewer rows than the requested row-group size. The async engine tolerates non-contiguous IDs because row groups can complete out of order; the sync engine writes batches sequentially and rejects holes (likely external mutation or a directory written by an incompatible engine).

Resume deliberately rejects `allow_resize=True` columns because resized batches mutate row boundaries and the original remaining batch plan cannot be reconstructed safely from aggregate counters. It also treats datasets that have completed `process_after_generation()` as terminal: after-generation processors operate on the whole dataset and can re-chunk rows or change schema, invalidating row-group identity for later resume/extension.

Metadata writes are atomic (`tmp` file + `fsync` + `os.replace`) because `metadata.json` is the crash-recovery checkpoint. Corrupt or partially written metadata raises a clear `DatasetGenerationError` rather than falling through as a generic config mismatch.

`DatasetCreationResults` from a resume invocation reflects the full on-disk dataset for anything that reads the artifact directory (`load_dataset`, `count_records`, `load_analysis`, `export`, `push_to_hub`), but per-run observability (`task_traces`, model-usage logs, telemetry events) is scoped to the current invocation — the original run's in-memory state is not persisted across process boundaries.

## Data Flow

### Sequential
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,45 @@ designer.set_run_config(run_config)

---

## Resuming Interrupted Runs

Long generation jobs can be resumed from checkpoints by passing `resume` to `DataDesigner.create()` or `data-designer create --resume`.

```python
from data_designer.interface import DataDesigner, ResumeMode

designer = DataDesigner()
results = designer.create(
config_builder,
num_records=10_000,
dataset_name="training_data",
resume=ResumeMode.IF_POSSIBLE,
)
```

Resume modes:

- `ResumeMode.NEVER` (default): always start a fresh generation run. If the dataset directory already exists, Data Designer writes to a timestamped directory.
- `ResumeMode.ALWAYS`: resume the existing dataset directory. Raises if the checkpoint is incompatible or cannot be resumed safely.
- `ResumeMode.IF_POSSIBLE`: resume when the stored config fingerprint matches the current config; otherwise start a fresh timestamped run.

Data Designer stores the run configuration in `metadata.json` (`buffer_size`, `target_num_records`, config fingerprint) and `builder_config.json`. Both engines recover progress the same way: they scan completed `batch_*.parquet` row groups and read parquet metadata for the row count actually persisted. That keeps resume crash-safe even if a run was interrupted between writing a batch parquet and updating metadata, because the filesystem reflects the durable state even when metadata lags by a step.

Resume has a few important invariants:

- `buffer_size` must match the original run.
- `num_records` must be at least the original target; you may extend a run by requesting more records.
- Runs with `allow_resize=True` columns are not resumable because row boundaries can change.
- Once `process_after_generation()` has run, the dataset is considered terminal for resume. Re-running with the same target returns the existing dataset; extending requires a fresh run.

The `DatasetCreationResults` returned by a resume invocation reflects the full dataset on disk for anything that reads the artifact directory (`load_dataset`, `count_records`, `load_analysis`, `export`, `push_to_hub`). Per-run observability — `task_traces`, model-usage logs, and telemetry events emitted during the call — is scoped to the resume invocation only; the original run's in-memory traces are not persisted across process boundaries.

<Warning>
Only resume datasets from trusted artifact directories. Resume reads local `metadata.json`, `builder_config.json`, and parquet files to determine checkpoint state.
</Warning>

---

### `max_parallel_requests` (InferenceParams)

Sets the **maximum** concurrent LLM API calls **per model**. This is the ceiling that the AIMD throttle controller can ramp up to — the actual concurrency at runtime may be lower if the server signals rate limits.
Expand Down
4 changes: 4 additions & 0 deletions fern/versions/v0.5.8/pages/concepts/processors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ Processors can run at three stages, determined by which callback methods they im
The async engine (default) enforces row-count invariance in `process_before_batch()` and `process_after_batch()` — a processor returning a different row count raises `DatasetGenerationError`. Run row-filtering or expansion logic in `process_after_generation()`, which operates on the final dataset and supports row-count changes. The legacy sync engine (opt-out via `DATA_DESIGNER_ASYNC_ENGINE=0`) is permissive about row-count changes at all stages.
</Warning>

<Warning title="Resume after process_after_generation">
`process_after_generation()` runs once on the entire generated dataset, not once per buffer. It loads the final parquet dataset, applies the processor, deletes the previous parquet files, and writes a new chunked result. Because this can change row counts, schemas, and row-group boundaries, Data Designer treats a dataset as terminal for resume after this stage has completed. Re-running with the same target is a no-op; extending the dataset requires a fresh run.
</Warning>

A processor can implement any combination of these callbacks. The built-in processors use `process_after_batch()` by default.

## Processor Types
Expand Down
Loading
Loading