Skip to content

refactor: introduce AsyncRunController and explicit sinks for async build/preview #447

@andreatgretel

Description

@andreatgretel

Priority Level

Medium

Task Summary

AsyncTaskScheduler still mixes two responsibilities:

  1. dependency scheduling for the execution graph
  2. row-group lifecycle orchestration (PRE_BATCH, POST_BATCH, checkpointing, preview retention, shutdown handling)

That boundary is the root cause of the remaining async follow-up issues:

  1. Pre-batch resize is unsafe - the scheduler assumes fixed row-group sizes
  2. Processor failures are swallowed - callback exceptions become dropped data
  3. Early shutdown is not terminal - run() can return before submitted work is drained
  4. Mode handling leaks into scheduling - preview/build differences are encoded as callbacks and buffer behavior instead of explicit finalization

This ticket should extract lifecycle ownership into a controller and keep AsyncTaskScheduler focused on scheduling tasks for already-prepared row groups.

Scope

In scope:

  • Introduce AsyncRunController
  • Move PRE_BATCH / POST_BATCH / finalization / abort handling out of AsyncTaskScheduler
  • Add explicit sinks for build vs preview
  • Make processor failures and early shutdown fail the run by default
  • Fail fast on PRE_BATCH row-count changes

Out of scope:

  • Full async support for PRE_BATCH resize
  • Tracker reset and dynamic row-group resizing after processor mutation

That should be a separate follow-up ticket once the controller boundary is in place.

Decisions

These should be treated as resolved for this ticket:

  1. PRE_BATCH row-count changes fail fast in this iteration.
  2. Processor failures fail the entire run by default.
  3. The controller lives in a new module, not inside dataset_builder.py.

Technical Details & Implementation Plan

1. Introduce AsyncRunController (dataset_builders/async_run_controller.py)

The controller owns the row-group lifecycle:

  1. partition row groups
  2. materialize seed output
  3. run PRE_BATCH
  4. validate finalized row-group size
  5. hand the prepared row group to AsyncTaskScheduler
  6. run POST_BATCH
  7. finalize to a sink

Key point: PRE_BATCH happens before the row group enters the main scheduler. That keeps AsyncTaskScheduler fixed-size and avoids mid-run tracker resets.

DatasetBuilder._prepare_async_run() should build the controller once for both build and preview paths.

2. Add explicit sinks

Replace callback-driven finalization with explicit sink objects:

  • ParquetRowGroupSink for build()
  • InMemoryRowGroupSink for build_preview()

The controller hands completed row groups to a sink. The scheduler should not infer mode from optional callbacks.

3. Simplify AsyncTaskScheduler

AsyncTaskScheduler should only do dependency scheduling over prepared row groups. Remove lifecycle hooks from its interface:

  • remove on_seeds_complete
  • remove on_before_checkpoint
  • remove preview/build-specific finalization behavior

The scheduler should consume: generators, execution graph, tracker, prepared row groups, buffer manager - and return an explicit outcome to the controller.

4. Normalize fatal error semantics

The controller should translate fatal async-engine outcomes into raised errors.

Required behavior:

  • PRE_BATCH exceptions raise
  • POST_BATCH exceptions raise
  • early shutdown drains or cancels submitted work before returning
  • aborted runs do not continue to metadata writing or preview cleanup as if they succeeded

Recommended explicit outcomes: completed, dropped, failed, aborted

5. Add a fail-fast guard for PRE_BATCH resize

Until full resize support exists, PRE_BATCH must not silently truncate rows.

Required behavior for this ticket:

  • if PRE_BATCH changes row count, raise DatasetGenerationError
  • do not mutate tracker or buffer sizing to fit the new shape
  • add coverage for both build and preview

Files Affected

  • packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
  • packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py
  • packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_run_controller.py (new)
  • packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py
  • tests for controller behavior, failure propagation, and shutdown drain

Likely not needed: completion_tracker.py for dynamic resize support (stay out unless the implementation proves otherwise).

Acceptance Criteria

  1. AsyncTaskScheduler no longer owns processor hooks or mode-specific finalization.
  2. Build and preview both go through AsyncRunController.
  3. Build uses a parquet sink; preview uses an in-memory sink.
  4. PRE_BATCH row-count changes raise instead of truncating data.
  5. Processor hook failures raise instead of becoming dropped data.
  6. Early shutdown does not return while submitted tasks are still running.
  7. Metadata/finalization do not run after an aborted async build.

Tests

Backfill focused coverage for:

  • build/preview parity for processor stages
  • fail-fast behavior on PRE_BATCH resize
  • processor failure propagation
  • early shutdown drain/cancel behavior
  • preview buffer lifetime through InMemoryRowGroupSink

Follow-up Ticket

Create a separate ticket for full async PRE_BATCH resize support:

  • allow the controller to finalize a new row-group size after PRE_BATCH
  • rebuild tracker state for the resized row group
  • update buffer sizing accordingly
  • then admit that resized row group to AsyncTaskScheduler

Dependencies

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions