chore: async engine follow-up - rename, preview, lifecycle, progress#456
chore: async engine follow-up - rename, preview, lifecycle, progress#456andreatgretel wants to merge 10 commits intomainfrom
Conversation
…-group lifecycle - Rename ColumnWiseDatasetBuilder to DatasetBuilder and column_wise_builder.py to dataset_builder.py, update all references - Extract _prepare_async_run() factory shared by build and preview paths - Add _build_async_preview() for async preview with no disk checkpoints - Replace on_row_group_complete/on_checkpoint_complete with single on_finalize_row_group callback; caller handles checkpointing - Add free_row_group() on RowGroupBufferManager for discard-without-write - Free fully-dropped row groups instead of finalizing them - Add consolidated AsyncProgressReporter for async generation logging Closes #437, closes #442, closes #444
- Add AsyncProgressReporter: groups per-column progress into a single log block emitted at configurable intervals (default 5s) - Add quiet mode to ProgressTracker to suppress per-tracker logging when used with the consolidated reporter - Add ContextVar-based row group tagging (RG1, RG2, ...) for log messages emitted inside async tasks (samplers, expressions, seeds) - Add progress_interval to RunConfig for user-configurable reporting - Remove log_start_async from ProgressTracker (superseded by reporter) Closes #443
436a890 to
709f53d
Compare
Greptile SummaryThis follow-up PR closes four open tickets from the async-engine series (#437, #442, #443, #444) and adds two new improvements: sticky ANSI progress bars and a latent row-group semaphore deadlock fix. The scope is wide — rename, new factory method, callback unification, new progress infrastructure, and a concurrency bug fix — but the changes are well-structured. Key changes:
One new issue found:
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Core scheduler changes: adds _salvage_stalled_row_groups (deadlock fix), replaces dual callbacks with on_finalize_row_group, integrates AsyncProgressReporter and StickyProgressBar. The deadlock fix is sound but the exclude_columns scope in _salvage_stalled_row_groups is too broad for multi-row cell failures, causing permanent under-counting of skipped tasks in the async progress reporter. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | Renamed from column_wise_builder.py; adds _build_async_preview and _prepare_async_run factory, wires progress_bar and progress_interval from RunConfig. The previously flagged KeyError on all-dropped preview rows is correctly guarded with has_row_group(0). |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_progress_reporter.py | New consolidated progress reporter; reads all progress values via tracker.get_snapshot() (lock-protected snapshot) instead of reaching into private tracker state, addressing the previously flagged inconsistency. Logic for _emit, _maybe_report, and log_final looks correct. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/sticky_progress_bar.py | New ANSI sticky progress bar; correctly uses with context in run() (not __enter__ before try), handler wrapping/unwrapping is thread-safe via self._lock. Minor: eta field can exceed 4 chars on long runs, causing bar_width to be computed narrower than the terminal. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py | Added has_row_group() and free_row_group() methods. Both are simple and correct; free_row_group pops all three dicts safely using .pop(key, None). No issues found. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/progress_tracker.py | Added get_snapshot() public method and record_skipped(). The snapshot method acquires the lock and returns a consistent tuple, removing the need for external code to access private state. Clean and correct. |
| packages/data-designer-engine/src/data_designer/engine/context.py | New file introducing a ContextVar-based current_row_group and format_row_group_tag() helper for per-task row-group prefix in log messages. Simple, safe, and correctly resets the context token in _execute_task_inner. |
| packages/data-designer-config/src/data_designer/config/run_config.py | Added progress_bar: bool and progress_interval: float fields with appropriate Pydantic validation (gt=0.0). Documentation and defaults are clear. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py | Comprehensive test additions including test_scheduler_rg_semaphore_deadlock_with_transient_failures for the deadlock regression, plus finalize-callback and dropped-row-group tests. Coverage of the new paths looks solid. |
Sequence Diagram
sequenceDiagram
participant R as run()
participant ML as _main_dispatch_loop
participant SS as _salvage_stalled_RGs
participant SR as _salvage_rounds
participant CP as _checkpoint_completed_RGs
participant Sem as rg_semaphore
participant Rep as AsyncProgressReporter
participant Bar as StickyProgressBar
R->>Bar: __enter__ (with statement)
R->>Rep: log_start()
R->>ML: await _main_dispatch_loop()
loop each dispatch iteration
ML->>CP: _checkpoint_completed_row_groups()
CP->>Sem: release() (frees slot)
ML->>SS: await _salvage_stalled_row_groups() [if deferred]
SS->>SS: identify stalled RGs (in_flight==0)
SS->>SR: await _salvage_rounds(stalled tasks)
SR-->>SS: still-failed tasks remain
SS->>Rep: record_failure(column) per failed task
SS->>ML: drop rows + checkpoint + restore other_deferred
end
ML-->>R: loop exits
R->>Rep: log_final()
R->>Bar: __exit__ (clears bars, restores handlers)
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 444-450
Comment:
**`exclude_columns` scoped too broadly — cross-row failures suppress skip counts**
`exclude` is built by filtering `failed_cells` on `rg == task.row_group` only, ignoring `ri`. This means if column C fails for `(rg=0, ri=1)` and column D fails for `(rg=0, ri=0)`, when dropping `ri=0` the exclude set contains `{C, D}`. Inside `_record_skipped_tasks_for_row(rg=0, ri=0, exclude={C, D})`, column C is excluded even though it only failed for `ri=1` — so the skipped-task credit for C on row 0 is silently lost. The same mirroring happens for D on row 1.
The practical effect: for any stalled row group where cells from *different* rows fail permanently, the `completed` counter across columns will never reach `total_records`, leaving the async progress log stuck below 100%.
The fix is to restrict the cell-task exclude to the exact `(rg, ri)` pair:
```python
for task in self._deferred:
if task.row_index is not None:
exclude = {col for rg, ri, col in failed_cells if rg == task.row_group and ri == task.row_index}
self._drop_row(task.row_group, task.row_index, exclude_columns=exclude)
else:
exclude = {col for rg, ri, col in failed_cells if rg == task.row_group}
rg_size = self._get_rg_size(task.row_group)
self._drop_row_group(task.row_group, rg_size, exclude_columns=exclude)
```
From-scratch tasks (row_index is None) correctly use the row-group-wide exclude.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/sticky_progress_bar.py
Line: 174
Comment:
**`eta` field can overflow `stats_width`, misaligning the progress bar**
`_compute_stats_width` uses `"999s"` as the sample ETA (4 characters), but the format string `{eta:>4s}` doesn't truncate — it right-pads to *at least* 4 chars. For slow or large runs (e.g., 10 000 records at 1 rec/s, ETA ≈ 10 000s) the ETA string becomes 6 chars, making `stats` wider than `stats_width`. This causes the computed `bar_width` to go negative (clamped to 10 by `max(10, …)`), so the filled/empty ratio no longer spans the terminal and the bar renders too short.
Consider using a time-aware formatter to cap the display (e.g. convert seconds ≥ 3600 to `h:mm` format) or widen the sample string in `_compute_stats_width`:
```python
sample = f" 100% | {'9' * total_w}/{total} | 9999.9 rec/s | eta 9999s | {'9' * total_w} failed"
```
(changing `999s` → `9999s` accommodates runs under ~3 hours).
How can I resolve this? If you propose a fix, please make it concise.Reviews (8): Last reviewed commit: "fix: stable progress bar width and accur..." | Re-trigger Greptile
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py
Show resolved
Hide resolved
...a-designer-engine/src/data_designer/engine/dataset_builders/utils/async_progress_reporter.py
Show resolved
Hide resolved
| max_conversation_restarts: int = Field(default=5, ge=0) | ||
| max_conversation_correction_steps: int = Field(default=0, ge=0) | ||
| async_trace: bool = False | ||
| progress_interval: float = Field(default=5.0, gt=0.0) |
There was a problem hiding this comment.
perhaps gt >= 1? Anything less could be too noisy?
There was a problem hiding this comment.
kept this at > 0 for now so shorter intervals are still available when debugging short runs; default is still 5s
| pct = (tracker.completed / tracker.total_records * 100) if tracker.total_records else 100.0 | ||
| rate = tracker.completed / elapsed if elapsed > 0 else 0.0 | ||
| emoji = tracker._random_emoji.progress(pct) |
There was a problem hiding this comment.
suggestions:
- tracker can expose a prop for pct,
- expose a public accessor for random emoji instead of tapping into _random_emoji...
There was a problem hiding this comment.
done, the reporter now reads a public tracker snapshot instead of tapping into private tracker state
| if num_records > 0 and buffer_size > 0: | ||
| task_counts = graph.compute_task_count(num_records, buffer_size) | ||
| trackers: dict[str, ProgressTracker] = {} | ||
| for col in graph.columns: | ||
| if graph.get_strategy(col) != GenerationStrategy.CELL_BY_CELL: | ||
| continue | ||
| trackers[col] = ProgressTracker( | ||
| total_records=task_counts[col], | ||
| label=f"column '{col}'", | ||
| quiet=True, | ||
| ) | ||
| if trackers: | ||
| interval = progress_interval if progress_interval is not None else DEFAULT_REPORT_INTERVAL | ||
| self._reporter = AsyncProgressReporter(trackers, report_interval=interval) |
There was a problem hiding this comment.
nit: may be this can go into a private method named _setup_async_progress_reporter. This constructor is looking quite long...
There was a problem hiding this comment.
done, pulled the reporter setup into _setup_async_progress_reporter
Add RunConfig.progress_bar setting that replaces periodic log-line progress with sticky terminal bars that stay at the bottom while logs scroll above. Pure ANSI escape codes, no new dependencies. Disabled by default - existing log-based output unchanged.
Skip the time gate when the progress bar is active so the bar redraws on every record instead of every progress_interval seconds.
| def on_seeds_complete(rg_id: int, rg_size: int) -> None: | ||
| if not self._processor_runner.has_processors_for(ProcessorStage.PRE_BATCH): | ||
| return | ||
| df = buffer_manager.get_dataframe(rg_id) | ||
| df = self._processor_runner.run_pre_batch_on_df(df) | ||
| buffer_manager.replace_dataframe(rg_id, df) | ||
| # Sync newly-dropped rows to the tracker so downstream cell tasks are skipped | ||
| for ri in range(rg_size): | ||
| if buffer_manager.is_dropped(rg_id, ri) and not tracker.is_dropped(rg_id, ri): | ||
| tracker.drop_row(rg_id, ri) |
There was a problem hiding this comment.
Progress reporter misses skipped tasks when pre-batch filters rows
When on_seeds_complete succeeds but the pre-batch processor filters out rows (via buffer_manager.replace_dataframe marking trailing rows dropped), the sync loop calls tracker.drop_row(rg_id, ri) directly. This bypasses the scheduler's _drop_row helper, which is the only code path that calls _record_skipped_tasks_for_row on the AsyncProgressReporter.
As a result, any downstream CELL_BY_CELL tasks for those filtered rows are silently skipped without being counted in the reporter's trackers. Since the tracker total was pre-computed assuming all rows would be processed, the progress percentage for affected columns will never reach 100%, and the final log_final summary will undercount skipped tasks.
The exception path (where on_seeds_complete raises) is already handled correctly by _drop_row_group — the missing piece is the success-with-filtered-rows path. Consider feeding dropped rows back through the scheduler's _record_skipped_tasks_for_row after this sync, or making the reporter aware of the pre-computed totals vs. actual tasks executed.
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py
Line: 357-363
Comment:
**Progress reporter misses skipped tasks when pre-batch filters rows**
When `on_seeds_complete` succeeds but the pre-batch processor filters out rows (via `buffer_manager.replace_dataframe` marking trailing rows dropped), the sync loop calls `tracker.drop_row(rg_id, ri)` directly. This bypasses the scheduler's `_drop_row` helper, which is the only code path that calls `_record_skipped_tasks_for_row` on the `AsyncProgressReporter`.
As a result, any downstream `CELL_BY_CELL` tasks for those filtered rows are silently skipped without being counted in the reporter's trackers. Since the tracker total was pre-computed assuming all rows would be processed, the progress percentage for affected columns will never reach 100%, and the final `log_final` summary will undercount skipped tasks.
The exception path (where `on_seeds_complete` raises) is already handled correctly by `_drop_row_group` — the missing piece is the success-with-filtered-rows path. Consider feeding dropped rows back through the scheduler's `_record_skipped_tasks_for_row` after this sync, or making the reporter aware of the pre-computed totals vs. actual tasks executed.
How can I resolve this? If you propose a fix, please make it concise.When all tasks for admitted row groups fail with transient errors, the row-group semaphore never releases, blocking admission of new row groups. Fix by salvaging stalled row groups inline - retrying deferred tasks immediately so row groups can checkpoint and free their semaphore slots. Also updates row group log format to (x/X) with leading zeros.
| if self._progress_bar is not None: | ||
| self._progress_bar.__enter__() |
There was a problem hiding this comment.
__enter__ called before try — __exit__ won't run on exception
self._progress_bar.__enter__() is called at line 236, one line before the try: block at line 237. If __enter__ raises (e.g., _wrap_handlers patches handler.emit on the last handler and then self._write("\033[?25l") fails on a degraded TTY), the finally clause at line 271 never executes. The root logger's handler.emit methods will remain monkey-patched — every subsequent log statement in the process would then try to _clear_bars / _redraw using a StickyProgressBar that's already in a half-initialised state, likely causing logging to break silently or error loudly.
The idiomatic fix — already used in _fan_out_with_async and _fan_out_with_threads — moves the condition into the with statement:
| if self._progress_bar is not None: | |
| self._progress_bar.__enter__() | |
| with self._progress_bar or contextlib.nullcontext(): |
That way Python guarantees __exit__ is called on every exit path, including exceptions from within __enter__ itself.
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 235-236
Comment:
**`__enter__` called before `try` — `__exit__` won't run on exception**
`self._progress_bar.__enter__()` is called at line 236, one line *before* the `try:` block at line 237. If `__enter__` raises (e.g., `_wrap_handlers` patches `handler.emit` on the last handler and then `self._write("\033[?25l")` fails on a degraded TTY), the `finally` clause at line 271 never executes. The root logger's `handler.emit` methods will remain monkey-patched — every subsequent log statement in the process would then try to `_clear_bars` / `_redraw` using a `StickyProgressBar` that's already in a half-initialised state, likely causing logging to break silently or error loudly.
The idiomatic fix — already used in `_fan_out_with_async` and `_fan_out_with_threads` — moves the condition into the `with` statement:
```suggestion
with self._progress_bar or contextlib.nullcontext():
```
That way Python guarantees `__exit__` is called on every exit path, including exceptions from within `__enter__` itself.
How can I resolve this? If you propose a fix, please make it concise.Run inline salvage after every checkpoint pass instead of only when globally stalled. Row groups with 0 in-flight and only deferred tasks are salvaged immediately, freeing their semaphore slot for new work.
- Use `with` statement for progress bar context (safe __exit__ on error) - Check bar.is_active instead of bar is not None (non-TTY fallback) - Record failures (not skips) for tasks that exhaust salvage retries - Record skipped tasks when pre-batch filtering drops rows
| for task in self._deferred: | ||
| if self._reporter: | ||
| self._reporter.record_failure(task.column) | ||
| if task.row_index is not None: | ||
| self._drop_row(task.row_group, task.row_index) | ||
| else: | ||
| rg_size = self._get_rg_size(task.row_group) |
There was a problem hiding this comment.
Missing
exclude_columns causes double-counting of permanently-failed tasks
After all salvage rounds are exhausted, record_failure(task.column) is called for the task — then _drop_row is called without exclude_columns. Inside _drop_row → _record_skipped_tasks_for_row, task.column passes every guard (not excluded, is CELL_BY_CELL, not in-flight, not complete) and record_skipped(task.column) fires a second time. The result is that completed for that column is incremented twice for a single task, which can push completed above total_records and display progress > 100%.
This is a direct inconsistency with _execute_task_inner_impl, which correctly passes exclude_columns={task.column} in the analogous path:
# _execute_task_inner_impl — correct
self._drop_row(task.row_group, task.row_index, exclude_columns={task.column})Apply the same pattern here:
| for task in self._deferred: | |
| if self._reporter: | |
| self._reporter.record_failure(task.column) | |
| if task.row_index is not None: | |
| self._drop_row(task.row_group, task.row_index) | |
| else: | |
| rg_size = self._get_rg_size(task.row_group) | |
| for task in self._deferred: | |
| if self._reporter: | |
| self._reporter.record_failure(task.column) | |
| if task.row_index is not None: | |
| self._drop_row(task.row_group, task.row_index, exclude_columns={task.column}) | |
| else: | |
| rg_size = self._get_rg_size(task.row_group) | |
| self._drop_row_group(task.row_group, rg_size, exclude_columns={task.column}) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 439-445
Comment:
**Missing `exclude_columns` causes double-counting of permanently-failed tasks**
After all salvage rounds are exhausted, `record_failure(task.column)` is called for the task — then `_drop_row` is called *without* `exclude_columns`. Inside `_drop_row` → `_record_skipped_tasks_for_row`, `task.column` passes every guard (not excluded, is CELL_BY_CELL, not in-flight, not complete) and `record_skipped(task.column)` fires a second time. The result is that `completed` for that column is incremented twice for a single task, which can push `completed` above `total_records` and display progress > 100%.
This is a direct inconsistency with `_execute_task_inner_impl`, which correctly passes `exclude_columns={task.column}` in the analogous path:
```python
# _execute_task_inner_impl — correct
self._drop_row(task.row_group, task.row_index, exclude_columns={task.column})
```
Apply the same pattern here:
```suggestion
for task in self._deferred:
if self._reporter:
self._reporter.record_failure(task.column)
if task.row_index is not None:
self._drop_row(task.row_group, task.row_index, exclude_columns={task.column})
else:
rg_size = self._get_rg_size(task.row_group)
self._drop_row_group(task.row_group, rg_size, exclude_columns={task.column})
```
How can I resolve this? If you propose a fix, please make it concise.- Pre-compute fixed stats width at bar creation to prevent bar resizing when failed count appears - Cap displayed completed at total to avoid >100% on retries - Exclude already-failed columns from skip recording to prevent double-counting in progress reporter
Summary
Follow-up to #449 (async engine PR 6) addressing four open tickets, plus progress bar support and a deadlock fix.
#437 - Rename
ColumnWiseDatasetBuildertoDatasetBuildercolumn_wise_builder.py->dataset_builder.py)#442 - Async preview path
_prepare_async_run()factory method shared by build and preview_build_async_preview()toDatasetBuilderfree_row_group()to release memory without checkpointing#444 - Unify row-group lifecycle callback
on_finalize_row_group#443 - Consolidated async progress reporting
AsyncProgressReporterreplaces per-column logging in async pathRunConfig.progress_intervalparameter (default 5s)ContextVar-based row group prefix(x/X)on repetitive log messagesSticky ANSI progress bars (new)
RunConfig.progress_barsetting for sticky terminal progress barssticky_progress_bar.py,run_config.pyRow-group semaphore deadlock fix (new)
test_scheduler_rg_semaphore_deadlock_with_transient_failuresAttention Areas
async_scheduler.py- Deadlock fix:_salvage_stalled_row_groupsand updated main dispatch loopTest plan
test_scheduler_on_finalize_row_group_callback_fires- verifies finalize fires for completed row groupstest_scheduler_on_finalize_skips_empty_row_group- verifies dropped row groups skip finalizetest_free_row_group_releases_memory/test_free_row_group_idempotent- new buffer manager teststest_scheduler_rg_semaphore_deadlock_with_transient_failures- regression test for deadlock fixtest_build_async_end_to_end,test_checkpoint_produces_correct_parquet_calls)test_dataset_builder_*tests passCloses #437, closes #442, closes #443, closes #444
Description updated with AI