Skip to content

refactor: replace stage-based config with callback-based processor design#294

Open
andreatgretel wants to merge 5 commits intomainfrom
andreatgretel/feat/processor-plugins
Open

refactor: replace stage-based config with callback-based processor design#294
andreatgretel wants to merge 5 commits intomainfrom
andreatgretel/feat/processor-plugins

Conversation

@andreatgretel
Copy link
Contributor

@andreatgretel andreatgretel commented Feb 4, 2026

Summary

Refactors the processor system from stage-based configuration to a callback-based design. Processors now define behavior by implementing callback methods instead of setting a build_stage field in config.

Changes

Added

  • Four callback methods to Processor base class with default no-op implementations:
    • preprocess() - PRE_GENERATION stage, on full seed data (writes to disk)
    • process_before_batch() - PRE_BATCH stage, after seed columns, before dependent columns
    • process_after_batch() - POST_BATCH stage, after each batch completes
    • postprocess() - POST_GENERATION stage, on final combined dataset
  • implements() method to check if processor overrides a callback
  • ProcessorRunner class to encapsulate all processor execution logic
  • preprocessed_seed_uri field to ResourceProvider for disk-based preprocessed seed data
  • Warning when PRE_BATCH processors change row count
  • Cleanup of preprocessed seed file after preview mode
  • Parametrized test verifying all 4 stages run in order for both preview and build modes
  • Edge case tests: exception handling, no-op processors, processor ordering, empty DataFrames

Changed

  • Extract processor execution from ColumnWiseBuilder to dedicated ProcessorRunner class
  • Builder now delegates to runner methods directly (no wrapper methods)
  • Only runs processors that implement each stage (via implements() check)
  • Preprocessed seed data written to disk for memory efficiency with large datasets
  • Postprocessor always rewrites final dataset after processing
  • Refactored test suite with fixtures and helpers to reduce duplication
  • Updated docs/concepts/processors.md to explain callback-based design with 4 stages

Removed

  • build_stage field from ProcessorConfig
  • Stage validation logic
  • Wrapper methods from builder (replaced with direct runner calls)

Why This Change?

The callback-based design:

  • Makes processor behavior explicit in implementation, not config
  • Allows processors to handle multiple stages if needed
  • Simplifies configuration (no stage field to set/validate)
  • Enables memory-efficient preprocessing (write to disk, not hold in memory)
  • Separates concerns: builder handles orchestration, runner handles processor execution

Attention Areas

Reviewers: Please pay special attention to the following:

Test Plan

  • All existing processor tests pass
  • Tests for callback-based processor execution
  • Tests for selective processor execution via implements()
  • Parametrized test for all 4 stages in both preview and build modes
  • Edge case tests for exceptions, no-op processors, ordering

Description updated with AI

@andreatgretel andreatgretel force-pushed the andreatgretel/feat/processor-plugins branch 2 times, most recently from 35df5cf to ec54289 Compare February 4, 2026 23:06
@andreatgretel andreatgretel changed the title feat: add processor plugins with PRE_GENERATION and POST_GENERATION stages refactor: replace stage-based config with callback-based processor design Feb 4, 2026
@andreatgretel andreatgretel force-pushed the andreatgretel/feat/processor-plugins branch 2 times, most recently from edbaf44 to 5cc6f21 Compare February 5, 2026 14:05
@andreatgretel andreatgretel force-pushed the andreatgretel/feat/processor-plugins branch from f6a820b to 8310910 Compare February 5, 2026 15:48
@andreatgretel andreatgretel marked this pull request as ready for review February 5, 2026 15:49
@andreatgretel andreatgretel requested a review from a team as a code owner February 5, 2026 15:49
name: str = Field(
description="The name of the processor, used to identify the processor in the results and to write the artifacts to disk.",
)
build_stage: BuildStage = Field(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for build_stage anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet, this is a neat approach!

from data_designer.engine.configurable_task import ConfigurableTask, DataT, TaskConfigT


class Processor(ConfigurableTask[TaskConfigT], ABC):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change - three different methods

@greptile-apps
Copy link

greptile-apps bot commented Feb 5, 2026

Greptile Overview

Greptile Summary

This PR successfully refactors the processor system from stage-based configuration to a callback-based design, significantly improving code organization and flexibility.

Key Changes

  • Callback-based design: Processors now implement callback methods (preprocess(), process_before_batch(), process_after_batch(), postprocess()) instead of setting a build_stage field in config
  • ProcessorRunner class: Extracted all processor execution logic from ColumnWiseBuilder into dedicated ProcessorRunner class for better separation of concerns
  • Selective execution: Only runs processors that implement each stage via the implements() check, reducing unnecessary overhead
  • Memory-efficient preprocessing: Preprocessed seed data written to disk instead of held in memory, improving scalability for large datasets
  • Preview mode cleanup: Properly cleans up preprocessed seed file after preview to avoid side effects

Architecture Improvements

The refactoring improves code maintainability through:

  • Cleaner orchestration: ColumnWiseBuilder now delegates to ProcessorRunner methods directly
  • Explicit behavior: Processor stage participation is determined by implementation, not configuration
  • Better error handling: Processors wrap exceptions with clear context about which stage/processor failed
  • Comprehensive testing: Edge cases covered including exception handling, no-op processors, ordering, and parametrized tests for all 4 stages

Attention Areas

  • The postprocessor now always rewrites the final dataset after processing (addressing previous thread concern about non-persisted changes)
  • Preview mode properly cleans up preprocessed seed file to avoid affecting subsequent build() calls
  • SchemaTransformProcessor intentionally returns original data (transformed output saved as artifact only)

Confidence Score: 4/5

  • This PR is safe to merge with minimal risk - well-tested refactoring with no breaking changes to public API
  • Score reflects solid architecture, comprehensive test coverage, and proper handling of edge cases. One point deducted due to lack of migration validation in integration tests (only unit tests verify the callback system)
  • No files require special attention - the refactoring is well-executed across all components

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/processing/processors/base.py Introduces callback-based design with 4 stage methods and implements() check - clean architecture
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py New class encapsulates processor execution logic with proper error handling and stage checks
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py Simplified by delegating to ProcessorRunner; cleaner orchestration with proper preview cleanup
packages/data-designer-config/src/data_designer/config/processors.py Removed build_stage field and validation - config simplified as intended
packages/data-designer-engine/src/data_designer/engine/processing/processors/schema_transform.py Implements process_after_batch() with artifact-only output (returns original data by design)
packages/data-designer-engine/tests/engine/dataset_builders/test_column_wise_builder.py Comprehensive tests for all 4 stages, edge cases, exception handling, and ordering

Sequence Diagram

sequenceDiagram
    participant Builder as ColumnWiseBuilder
    participant Runner as ProcessorRunner
    participant P1 as Processor 1
    participant P2 as Processor 2
    participant Seed as SeedReader
    participant Disk as Disk Storage

    Note over Builder,Disk: PRE_GENERATION Stage (once before batching)
    Builder->>Runner: run_preprocess()
    Runner->>Runner: has_processors_for("preprocess")?
    alt has preprocess processors
        Runner->>Seed: Load full seed data
        Seed-->>Runner: seed DataFrame
        Runner->>P1: implements("preprocess")?
        alt implements preprocess
            Runner->>P1: preprocess(df)
            P1-->>Runner: transformed df
        end
        Runner->>P2: implements("preprocess")?
        alt implements preprocess
            Runner->>P2: preprocess(df)
            P2-->>Runner: transformed df
        end
        Runner->>Disk: Write preprocessed_seed.parquet
        Note over Runner: Updates preprocessed_seed_uri
    end

    Note over Builder,Disk: Batch Generation Loop
    loop For each batch
        Builder->>Builder: Generate seed columns
        
        Note over Builder,Disk: PRE_BATCH Stage (after seed, before dependent columns)
        Builder->>Runner: run_pre_batch(batch_manager)
        Runner->>Runner: has_processors_for("process_before_batch")?
        alt has pre-batch processors
            Runner->>P1: implements("process_before_batch")?
            alt implements
                Runner->>P1: process_before_batch(df)
                P1-->>Runner: transformed df
            end
            Runner->>P2: implements("process_before_batch")?
            alt implements
                Runner->>P2: process_before_batch(df)
                P2-->>Runner: transformed df
            end
            Runner->>Builder: Update batch records
        end

        Builder->>Builder: Generate dependent columns
        
        Note over Builder,Disk: POST_BATCH Stage (after each batch completes)
        Builder->>Runner: run_post_batch(df, batch_number)
        Runner->>P1: implements("process_after_batch")?
        alt implements
            Runner->>P1: process_after_batch(df, current_batch_number)
            P1-->>Runner: transformed df
        end
        Runner->>P2: implements("process_after_batch")?
        alt implements
            Runner->>P2: process_after_batch(df, current_batch_number)
            P2-->>Runner: transformed df
        end
        Runner-->>Builder: processed df
        Builder->>Disk: Write batch to disk
    end

    Note over Builder,Disk: POST_GENERATION Stage (once after all batches)
    Builder->>Runner: run_postprocess()
    Runner->>Runner: has_processors_for("postprocess")?
    alt has postprocess processors
        Runner->>Disk: Load final dataset
        Disk-->>Runner: final DataFrame
        Runner->>P1: implements("postprocess")?
        alt implements
            Runner->>P1: postprocess(df)
            P1-->>Runner: transformed df
        end
        Runner->>P2: implements("postprocess")?
        alt implements
            Runner->>P2: postprocess(df)
            P2-->>Runner: transformed df
        end
        Runner->>Disk: Rewrite final dataset
    end
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 files reviewed, 4 comments

Edit Code Review Agent Settings | Greptile

@greptile-apps
Copy link

greptile-apps bot commented Feb 5, 2026

Additional Comments (3)

packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Leaking DuckDB connection
_run_pre_generation_processors() creates a DuckDB connection via seed_reader.create_duckdb_connection() and never closes it. If a build runs many times in-process (service mode/tests), this will leak connections/file handles. Close the connection (e.g., try/finally: conn.close()), or use a context manager if the seed reader supports it.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 438:444

Comment:
**Leaking DuckDB connection**
`_run_pre_generation_processors()` creates a DuckDB connection via `seed_reader.create_duckdb_connection()` and never closes it. If a build runs many times in-process (service mode/tests), this will leak connections/file handles. Close the connection (e.g., `try/finally: conn.close()`), or use a context manager if the seed reader supports it.

How can I resolve this? If you propose a fix, please make it concise.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
seed_row_indices not updated
_run_pre_generation_processors() only sets resource_provider.seed_row_indices when len(df) != original_len. If preprocess reorders rows or filters+adds back to the same length, seed_row_indices stays unset and the generator will read unfiltered rows even though preprocess changed the seed selection. Track indices whenever preprocess runs (or whenever _dd_rowid changes), not only on length change.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 448:455

Comment:
**seed_row_indices not updated**
`_run_pre_generation_processors()` only sets `resource_provider.seed_row_indices` when `len(df) != original_len`. If preprocess reorders rows or filters+adds back to the same length, `seed_row_indices` stays unset and the generator will read unfiltered rows even though preprocess changed the seed selection. Track indices whenever preprocess runs (or whenever `_dd_rowid` changes), not only on length change.

How can I resolve this? If you propose a fix, please make it concise.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Postprocess changes may be lost
_run_post_generation_processors() only rewrites the final dataset when row count or column names/order change. If postprocess() modifies values in-place (same rows/columns), those changes won't be persisted and will be silently dropped. This is a functional correctness issue for value-only postprocessors; consider always rewriting when processors exist, or require postprocessors to return a new df and detect df is not original_df/hashing.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 486:496

Comment:
**Postprocess changes may be lost**
`_run_post_generation_processors()` only rewrites the final dataset when row count or column *names/order* change. If `postprocess()` modifies values in-place (same rows/columns), those changes won't be persisted and will be silently dropped. This is a functional correctness issue for value-only postprocessors; consider always rewriting when processors exist, or require postprocessors to return a new df and detect `df is not original_df`/hashing.

How can I resolve this? If you propose a fix, please make it concise.

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@andreatgretel andreatgretel marked this pull request as draft February 5, 2026 17:47
@andreatgretel andreatgretel force-pushed the andreatgretel/feat/processor-plugins branch from 293ef7b to b6192af Compare February 5, 2026 18:57
Replace single process() method with stage-specific callbacks:
- preprocess(): PRE_GENERATION, on full seed data, writes to disk
- process_before_batch(): PRE_BATCH, after seed columns, before dependent columns
- process_after_batch(): POST_BATCH, after each batch completes
- postprocess(): POST_GENERATION, on final combined dataset

Key changes:
- Remove build_stage config field; stages determined by implemented callbacks
- Add implements() method to check if processor overrides a callback
- Only run processors that implement each stage
- Preprocessed seed data written to disk for memory efficiency
- Update docs and tests
- Add warning when PRE_BATCH changes row count
- Clean up preprocessed seed file after preview
- Add parametrized test verifying all 4 stages run in order
The comprehensive test_all_processor_stages_run_in_order covers
both preview and build modes, making the individual tests redundant.
- Move imports to top of file
- Add seed_data_setup and builder_with_seed fixtures
- Add create_mock_processor helper function
- Add edge case tests for exceptions, no-op processors, ordering
- Move all processor stage logic to new ProcessorRunner in utils/
- ProcessorRunner takes dependencies and provides complete stage methods
- Builder now calls runner methods directly instead of wrapper methods
- Remove unused imports from column_wise_builder
@andreatgretel andreatgretel force-pushed the andreatgretel/feat/processor-plugins branch from a1323f9 to 46880e7 Compare February 5, 2026 22:36
@andreatgretel andreatgretel marked this pull request as ready for review February 5, 2026 22:38
|-------|--------------|-----------------|-----------|
| Pre-generation | Once, on full seed data before batching | `preprocess()` | Filter seed data, validate inputs, normalize data |
| Pre-batch | After seed columns, before dependent columns | `process_before_batch()` | Transform seed data before other columns are generated |
| Post-batch | After each batch completes | `process_after_batch()` | Drop columns, transform schema per batch |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow in by head dropping columns needs to happen post generation, BUT the key point is that each batch contains the full schema of the dataset. Might be good to have a tip (or something) box that highlights this point.

pipeline. They can modify, reshape, or augment the dataset.

The processor implementation determines which stages it handles by overriding
the appropriate callback methods (preprocess, process_after_batch, postprocess).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
the appropriate callback methods (preprocess, process_after_batch, postprocess).
the appropriate callback methods (preprocess, process_before_batch, process_after_batch, postprocess).

Comment on lines +56 to +60
# Use preprocessed seed if available, otherwise use original
if self.resource_provider.preprocessed_seed_uri is not None:
self._dataset_uri = self.resource_provider.preprocessed_seed_uri
else:
self._dataset_uri = self.resource_provider.seed_reader.get_dataset_uri()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikeknep – would be helpful to have your eyes specifically on the seed dataset parts of this PR.

@@ -0,0 +1,121 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to future us: We need to add some instruction for LLMs to use the make update-license header commands instead of writing it.

"""Check if any processor implements the given method."""
return any(p.implements(method_name) for p in self._processors)

def _run_stage(self, df: pd.DataFrame, method_name: str, **kwargs) -> pd.DataFrame:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to use an enum for this method name parameter? can it be anything other than the different stage options?

logger.info("⏳ Running preprocess on seed data...")
seed_reader = self._resource_provider.seed_reader
conn = seed_reader.create_duckdb_connection()
df = conn.execute(f"SELECT * FROM '{seed_reader.get_dataset_uri()}'").fetchdf()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably be problematic for large datasets? There's a bunch of indexing logic in the seed dataset column generator, depending on the selection_strategy in the config. Oof we might need to centralize slicing and pulling from the seed dataset to make this work? If so, might make sense to save pre-processing for follow-up work (?) cc @nabinchha

if len(df) != original_len:
logger.warning(
f"⚠️ PRE_BATCH processors changed row count from {original_len} to {len(df)}. "
"This may cause unexpected behavior in downstream generators."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way for us to make this not a concern? For example, update the expected number of records and whatever else needs to change so all downstream generators have no idea this processing step happened?

In this world, we probably need to reframe num_records – this is the number of records you are starting from-scratch generators at but is not necessarily the number of records that will be in your final dataset.

Comment on lines +82 to +95
@property
def _processors(self) -> list[Processor]:
"""Expose processors for test compatibility."""
return self._processor_runner._processors

@_processors.setter
def _processors(self, processors: list[Processor]) -> None:
"""Allow setting processors for test compatibility."""
self._processor_runner = ProcessorRunner(
processors=processors,
resource_provider=self._resource_provider,
artifact_storage=self.artifact_storage,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These getters and setters for a private property look weird to me (?). Could be just me, but I think I'd expect a public processors property for accessing self._processor_runner._processors and then a set_process_runner (or something similar) method to (re)set the processor runner.

dataframe=self.batch_manager.get_current_batch(as_dataframe=True),
current_batch_number=batch_idx,
)
df_batch = self.batch_manager.get_current_batch(as_dataframe=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we missing the pre-batch processing here? I'm starting to confuse myself about what that would be and how it would differ from pre-processing. Oh, perhaps this can help with the seed indexing issue? The benefit of running seed "pre" processing as pre-batch is that we have the batch indices? Sorry, that's a lot of questions 😅

Comment on lines +189 to +192
# Run PRE_BATCH after seed generator, before other columns
if not ran_pre_batch:
self._processor_runner.run_pre_batch(self.batch_manager)
ran_pre_batch = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh interesting it is here

Copy link
Contributor

@johnnygreco johnnygreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome work @andreatgretel! Definitely getting us closer to where we need to be on processors 🙌

For me, there biggest question marks are around preprocessing. Wondering if we should punt on that for now, particularly since I think it is reasonable to ask users to pre-process seed datasets as needed before passing them to DD. Curious about what others think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants