refactor: replace stage-based config with callback-based processor design#294
refactor: replace stage-based config with callback-based processor design#294andreatgretel wants to merge 5 commits intomainfrom
Conversation
35df5cf to
ec54289
Compare
edbaf44 to
5cc6f21
Compare
f6a820b to
8310910
Compare
| name: str = Field( | ||
| description="The name of the processor, used to identify the processor in the results and to write the artifacts to disk.", | ||
| ) | ||
| build_stage: BuildStage = Field( |
There was a problem hiding this comment.
No need for build_stage anymore
There was a problem hiding this comment.
Sweet, this is a neat approach!
| from data_designer.engine.configurable_task import ConfigurableTask, DataT, TaskConfigT | ||
|
|
||
|
|
||
| class Processor(ConfigurableTask[TaskConfigT], ABC): |
There was a problem hiding this comment.
This is the main change - three different methods
Greptile OverviewGreptile SummaryThis PR successfully refactors the processor system from stage-based configuration to a callback-based design, significantly improving code organization and flexibility. Key Changes
Architecture ImprovementsThe refactoring improves code maintainability through:
Attention Areas
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/processing/processors/base.py | Introduces callback-based design with 4 stage methods and implements() check - clean architecture |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py | New class encapsulates processor execution logic with proper error handling and stage checks |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py | Simplified by delegating to ProcessorRunner; cleaner orchestration with proper preview cleanup |
| packages/data-designer-config/src/data_designer/config/processors.py | Removed build_stage field and validation - config simplified as intended |
| packages/data-designer-engine/src/data_designer/engine/processing/processors/schema_transform.py | Implements process_after_batch() with artifact-only output (returns original data by design) |
| packages/data-designer-engine/tests/engine/dataset_builders/test_column_wise_builder.py | Comprehensive tests for all 4 stages, edge cases, exception handling, and ordering |
Sequence Diagram
sequenceDiagram
participant Builder as ColumnWiseBuilder
participant Runner as ProcessorRunner
participant P1 as Processor 1
participant P2 as Processor 2
participant Seed as SeedReader
participant Disk as Disk Storage
Note over Builder,Disk: PRE_GENERATION Stage (once before batching)
Builder->>Runner: run_preprocess()
Runner->>Runner: has_processors_for("preprocess")?
alt has preprocess processors
Runner->>Seed: Load full seed data
Seed-->>Runner: seed DataFrame
Runner->>P1: implements("preprocess")?
alt implements preprocess
Runner->>P1: preprocess(df)
P1-->>Runner: transformed df
end
Runner->>P2: implements("preprocess")?
alt implements preprocess
Runner->>P2: preprocess(df)
P2-->>Runner: transformed df
end
Runner->>Disk: Write preprocessed_seed.parquet
Note over Runner: Updates preprocessed_seed_uri
end
Note over Builder,Disk: Batch Generation Loop
loop For each batch
Builder->>Builder: Generate seed columns
Note over Builder,Disk: PRE_BATCH Stage (after seed, before dependent columns)
Builder->>Runner: run_pre_batch(batch_manager)
Runner->>Runner: has_processors_for("process_before_batch")?
alt has pre-batch processors
Runner->>P1: implements("process_before_batch")?
alt implements
Runner->>P1: process_before_batch(df)
P1-->>Runner: transformed df
end
Runner->>P2: implements("process_before_batch")?
alt implements
Runner->>P2: process_before_batch(df)
P2-->>Runner: transformed df
end
Runner->>Builder: Update batch records
end
Builder->>Builder: Generate dependent columns
Note over Builder,Disk: POST_BATCH Stage (after each batch completes)
Builder->>Runner: run_post_batch(df, batch_number)
Runner->>P1: implements("process_after_batch")?
alt implements
Runner->>P1: process_after_batch(df, current_batch_number)
P1-->>Runner: transformed df
end
Runner->>P2: implements("process_after_batch")?
alt implements
Runner->>P2: process_after_batch(df, current_batch_number)
P2-->>Runner: transformed df
end
Runner-->>Builder: processed df
Builder->>Disk: Write batch to disk
end
Note over Builder,Disk: POST_GENERATION Stage (once after all batches)
Builder->>Runner: run_postprocess()
Runner->>Runner: has_processors_for("postprocess")?
alt has postprocess processors
Runner->>Disk: Load final dataset
Disk-->>Runner: final DataFrame
Runner->>P1: implements("postprocess")?
alt implements
Runner->>P1: postprocess(df)
P1-->>Runner: transformed df
end
Runner->>P2: implements("postprocess")?
alt implements
Runner->>P2: postprocess(df)
P2-->>Runner: transformed df
end
Runner->>Disk: Rewrite final dataset
end
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
...ages/data-designer-engine/src/data_designer/engine/processing/processors/schema_transform.py
Show resolved
Hide resolved
Additional Comments (3)
Prompt To Fix With AIThis is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 438:444
Comment:
**Leaking DuckDB connection**
`_run_pre_generation_processors()` creates a DuckDB connection via `seed_reader.create_duckdb_connection()` and never closes it. If a build runs many times in-process (service mode/tests), this will leak connections/file handles. Close the connection (e.g., `try/finally: conn.close()`), or use a context manager if the seed reader supports it.
How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix With AIThis is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 448:455
Comment:
**seed_row_indices not updated**
`_run_pre_generation_processors()` only sets `resource_provider.seed_row_indices` when `len(df) != original_len`. If preprocess reorders rows or filters+adds back to the same length, `seed_row_indices` stays unset and the generator will read unfiltered rows even though preprocess changed the seed selection. Track indices whenever preprocess runs (or whenever `_dd_rowid` changes), not only on length change.
How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix With AIThis is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 486:496
Comment:
**Postprocess changes may be lost**
`_run_post_generation_processors()` only rewrites the final dataset when row count or column *names/order* change. If `postprocess()` modifies values in-place (same rows/columns), those changes won't be persisted and will be silently dropped. This is a functional correctness issue for value-only postprocessors; consider always rewriting when processors exist, or require postprocessors to return a new df and detect `df is not original_df`/hashing.
How can I resolve this? If you propose a fix, please make it concise. |
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py
Outdated
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
293ef7b to
b6192af
Compare
Replace single process() method with stage-specific callbacks: - preprocess(): PRE_GENERATION, on full seed data, writes to disk - process_before_batch(): PRE_BATCH, after seed columns, before dependent columns - process_after_batch(): POST_BATCH, after each batch completes - postprocess(): POST_GENERATION, on final combined dataset Key changes: - Remove build_stage config field; stages determined by implemented callbacks - Add implements() method to check if processor overrides a callback - Only run processors that implement each stage - Preprocessed seed data written to disk for memory efficiency - Update docs and tests
- Add warning when PRE_BATCH changes row count - Clean up preprocessed seed file after preview - Add parametrized test verifying all 4 stages run in order
The comprehensive test_all_processor_stages_run_in_order covers both preview and build modes, making the individual tests redundant.
- Move imports to top of file - Add seed_data_setup and builder_with_seed fixtures - Add create_mock_processor helper function - Add edge case tests for exceptions, no-op processors, ordering
- Move all processor stage logic to new ProcessorRunner in utils/ - ProcessorRunner takes dependencies and provides complete stage methods - Builder now calls runner methods directly instead of wrapper methods - Remove unused imports from column_wise_builder
a1323f9 to
46880e7
Compare
| |-------|--------------|-----------------|-----------| | ||
| | Pre-generation | Once, on full seed data before batching | `preprocess()` | Filter seed data, validate inputs, normalize data | | ||
| | Pre-batch | After seed columns, before dependent columns | `process_before_batch()` | Transform seed data before other columns are generated | | ||
| | Post-batch | After each batch completes | `process_after_batch()` | Drop columns, transform schema per batch | |
There was a problem hiding this comment.
Somehow in by head dropping columns needs to happen post generation, BUT the key point is that each batch contains the full schema of the dataset. Might be good to have a tip (or something) box that highlights this point.
| pipeline. They can modify, reshape, or augment the dataset. | ||
|
|
||
| The processor implementation determines which stages it handles by overriding | ||
| the appropriate callback methods (preprocess, process_after_batch, postprocess). |
There was a problem hiding this comment.
| the appropriate callback methods (preprocess, process_after_batch, postprocess). | |
| the appropriate callback methods (preprocess, process_before_batch, process_after_batch, postprocess). |
| # Use preprocessed seed if available, otherwise use original | ||
| if self.resource_provider.preprocessed_seed_uri is not None: | ||
| self._dataset_uri = self.resource_provider.preprocessed_seed_uri | ||
| else: | ||
| self._dataset_uri = self.resource_provider.seed_reader.get_dataset_uri() |
There was a problem hiding this comment.
@mikeknep – would be helpful to have your eyes specifically on the seed dataset parts of this PR.
| @@ -0,0 +1,121 @@ | |||
| # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |||
There was a problem hiding this comment.
Note to future us: We need to add some instruction for LLMs to use the make update-license header commands instead of writing it.
| """Check if any processor implements the given method.""" | ||
| return any(p.implements(method_name) for p in self._processors) | ||
|
|
||
| def _run_stage(self, df: pd.DataFrame, method_name: str, **kwargs) -> pd.DataFrame: |
There was a problem hiding this comment.
any reason not to use an enum for this method name parameter? can it be anything other than the different stage options?
| logger.info("⏳ Running preprocess on seed data...") | ||
| seed_reader = self._resource_provider.seed_reader | ||
| conn = seed_reader.create_duckdb_connection() | ||
| df = conn.execute(f"SELECT * FROM '{seed_reader.get_dataset_uri()}'").fetchdf() |
There was a problem hiding this comment.
This will probably be problematic for large datasets? There's a bunch of indexing logic in the seed dataset column generator, depending on the selection_strategy in the config. Oof we might need to centralize slicing and pulling from the seed dataset to make this work? If so, might make sense to save pre-processing for follow-up work (?) cc @nabinchha
| if len(df) != original_len: | ||
| logger.warning( | ||
| f"⚠️ PRE_BATCH processors changed row count from {original_len} to {len(df)}. " | ||
| "This may cause unexpected behavior in downstream generators." |
There was a problem hiding this comment.
Is there a way for us to make this not a concern? For example, update the expected number of records and whatever else needs to change so all downstream generators have no idea this processing step happened?
In this world, we probably need to reframe num_records – this is the number of records you are starting from-scratch generators at but is not necessarily the number of records that will be in your final dataset.
| @property | ||
| def _processors(self) -> list[Processor]: | ||
| """Expose processors for test compatibility.""" | ||
| return self._processor_runner._processors | ||
|
|
||
| @_processors.setter | ||
| def _processors(self, processors: list[Processor]) -> None: | ||
| """Allow setting processors for test compatibility.""" | ||
| self._processor_runner = ProcessorRunner( | ||
| processors=processors, | ||
| resource_provider=self._resource_provider, | ||
| artifact_storage=self.artifact_storage, | ||
| ) | ||
|
|
There was a problem hiding this comment.
These getters and setters for a private property look weird to me (?). Could be just me, but I think I'd expect a public processors property for accessing self._processor_runner._processors and then a set_process_runner (or something similar) method to (re)set the processor runner.
| dataframe=self.batch_manager.get_current_batch(as_dataframe=True), | ||
| current_batch_number=batch_idx, | ||
| ) | ||
| df_batch = self.batch_manager.get_current_batch(as_dataframe=True) |
There was a problem hiding this comment.
Are we missing the pre-batch processing here? I'm starting to confuse myself about what that would be and how it would differ from pre-processing. Oh, perhaps this can help with the seed indexing issue? The benefit of running seed "pre" processing as pre-batch is that we have the batch indices? Sorry, that's a lot of questions 😅
| # Run PRE_BATCH after seed generator, before other columns | ||
| if not ran_pre_batch: | ||
| self._processor_runner.run_pre_batch(self.batch_manager) | ||
| ran_pre_batch = True |
There was a problem hiding this comment.
oh interesting it is here
johnnygreco
left a comment
There was a problem hiding this comment.
This is awesome work @andreatgretel! Definitely getting us closer to where we need to be on processors 🙌
For me, there biggest question marks are around preprocessing. Wondering if we should punt on that for now, particularly since I think it is reasonable to ask users to pre-process seed datasets as needed before passing them to DD. Curious about what others think.
Summary
Refactors the processor system from stage-based configuration to a callback-based design. Processors now define behavior by implementing callback methods instead of setting a
build_stagefield in config.Changes
Added
Processorbase class with default no-op implementations:preprocess()- PRE_GENERATION stage, on full seed data (writes to disk)process_before_batch()- PRE_BATCH stage, after seed columns, before dependent columnsprocess_after_batch()- POST_BATCH stage, after each batch completespostprocess()- POST_GENERATION stage, on final combined datasetimplements()method to check if processor overrides a callbackProcessorRunnerclass to encapsulate all processor execution logicpreprocessed_seed_urifield toResourceProviderfor disk-based preprocessed seed dataChanged
ColumnWiseBuilderto dedicatedProcessorRunnerclassimplements()check)docs/concepts/processors.mdto explain callback-based design with 4 stagesRemoved
build_stagefield fromProcessorConfigWhy This Change?
The callback-based design:
Attention Areas
processor_runner.py- New class encapsulating all processor execution logicbase.py- Callback method design withimplements()checkcolumn_wise_builder.py- Simplified to delegate to ProcessorRunnerTest Plan
implements()Description updated with AI