-
Notifications
You must be signed in to change notification settings - Fork 84
Description
Priority Level
Medium
Task Summary
AsyncTaskScheduler still mixes two responsibilities:
- dependency scheduling for the execution graph
- row-group lifecycle orchestration (
PRE_BATCH,POST_BATCH, checkpointing, preview retention, shutdown handling)
That boundary is the root cause of the remaining async follow-up issues:
- Pre-batch resize is unsafe - the scheduler assumes fixed row-group sizes
- Processor failures are swallowed - callback exceptions become dropped data
- Early shutdown is not terminal -
run()can return before submitted work is drained - Mode handling leaks into scheduling - preview/build differences are encoded as callbacks and buffer behavior instead of explicit finalization
This ticket should extract lifecycle ownership into a controller and keep AsyncTaskScheduler focused on scheduling tasks for already-prepared row groups.
Scope
In scope:
- Introduce
AsyncRunController - Move
PRE_BATCH/POST_BATCH/ finalization / abort handling out ofAsyncTaskScheduler - Add explicit sinks for build vs preview
- Make processor failures and early shutdown fail the run by default
- Fail fast on
PRE_BATCHrow-count changes
Out of scope:
- Full async support for
PRE_BATCHresize - Tracker reset and dynamic row-group resizing after processor mutation
That should be a separate follow-up ticket once the controller boundary is in place.
Decisions
These should be treated as resolved for this ticket:
PRE_BATCHrow-count changes fail fast in this iteration.- Processor failures fail the entire run by default.
- The controller lives in a new module, not inside
dataset_builder.py.
Technical Details & Implementation Plan
1. Introduce AsyncRunController (dataset_builders/async_run_controller.py)
The controller owns the row-group lifecycle:
- partition row groups
- materialize seed output
- run
PRE_BATCH - validate finalized row-group size
- hand the prepared row group to
AsyncTaskScheduler - run
POST_BATCH - finalize to a sink
Key point: PRE_BATCH happens before the row group enters the main scheduler. That keeps AsyncTaskScheduler fixed-size and avoids mid-run tracker resets.
DatasetBuilder._prepare_async_run() should build the controller once for both build and preview paths.
2. Add explicit sinks
Replace callback-driven finalization with explicit sink objects:
ParquetRowGroupSinkforbuild()InMemoryRowGroupSinkforbuild_preview()
The controller hands completed row groups to a sink. The scheduler should not infer mode from optional callbacks.
3. Simplify AsyncTaskScheduler
AsyncTaskScheduler should only do dependency scheduling over prepared row groups. Remove lifecycle hooks from its interface:
- remove
on_seeds_complete - remove
on_before_checkpoint - remove preview/build-specific finalization behavior
The scheduler should consume: generators, execution graph, tracker, prepared row groups, buffer manager - and return an explicit outcome to the controller.
4. Normalize fatal error semantics
The controller should translate fatal async-engine outcomes into raised errors.
Required behavior:
PRE_BATCHexceptions raisePOST_BATCHexceptions raise- early shutdown drains or cancels submitted work before returning
- aborted runs do not continue to metadata writing or preview cleanup as if they succeeded
Recommended explicit outcomes: completed, dropped, failed, aborted
5. Add a fail-fast guard for PRE_BATCH resize
Until full resize support exists, PRE_BATCH must not silently truncate rows.
Required behavior for this ticket:
- if
PRE_BATCHchanges row count, raiseDatasetGenerationError - do not mutate tracker or buffer sizing to fit the new shape
- add coverage for both build and preview
Files Affected
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.pypackages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.pypackages/data-designer-engine/src/data_designer/engine/dataset_builders/async_run_controller.py(new)packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py- tests for controller behavior, failure propagation, and shutdown drain
Likely not needed: completion_tracker.py for dynamic resize support (stay out unless the implementation proves otherwise).
Acceptance Criteria
AsyncTaskSchedulerno longer owns processor hooks or mode-specific finalization.- Build and preview both go through
AsyncRunController. - Build uses a parquet sink; preview uses an in-memory sink.
PRE_BATCHrow-count changes raise instead of truncating data.- Processor hook failures raise instead of becoming dropped data.
- Early shutdown does not return while submitted tasks are still running.
- Metadata/finalization do not run after an aborted async build.
Tests
Backfill focused coverage for:
- build/preview parity for processor stages
- fail-fast behavior on
PRE_BATCHresize - processor failure propagation
- early shutdown drain/cancel behavior
- preview buffer lifetime through
InMemoryRowGroupSink
Follow-up Ticket
Create a separate ticket for full async PRE_BATCH resize support:
- allow the controller to finalize a new row-group size after
PRE_BATCH - rebuild tracker state for the resized row group
- update buffer sizing accordingly
- then admit that resized row group to
AsyncTaskScheduler
Dependencies
- Follows PR chore: async engine follow-up #445
- Design context:
plans/429-followup/merged-async-pipeline-refactor.mdtasks 3-5