Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/concepts/processors.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,16 @@ Each processor:
- Applies its transformation
- Passes the result to the next processor (or to output)

Currently, processors run only at the `POST_BATCH` stage, i.e., after column generation completes for each batch.
Processors can run at four stages, determined by which callback methods they implement:

| Stage | When it runs | Callback method | Use cases |
|-------|--------------|-----------------|-----------|
| Pre-generation | Once, on full seed data before batching | `preprocess()` | Filter seed data, validate inputs, normalize data |
| Pre-batch | After seed columns, before dependent columns | `process_before_batch()` | Transform seed data before other columns are generated |
| Post-batch | After each batch completes | `process_after_batch()` | Drop columns, transform schema per batch |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow in by head dropping columns needs to happen post generation, BUT the key point is that each batch contains the full schema of the dataset. Might be good to have a tip (or something) box that highlights this point.

| Post-generation | Once, on final dataset after all batches | `postprocess()` | Deduplicate, aggregate statistics, final cleanup |

A processor can implement any combination of these callbacks. The built-in processors use `process_after_batch()` by default.

## Processor Types

Expand Down Expand Up @@ -134,7 +143,6 @@ Processors execute in the order they're added. Plan accordingly when one process
| Parameter | Type | Description |
|-----------|------|-------------|
| `name` | str | Identifier for the processor, used in output directory names |
| `build_stage` | BuildStage | When to run (default: `POST_BATCH`) |

### DropColumnsProcessorConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
from typing_extensions import TypeAlias

from data_designer.config.base import ConfigBase
from data_designer.config.dataset_builders import BuildStage
from data_designer.config.errors import InvalidConfigError

SUPPORTED_STAGES = [BuildStage.POST_BATCH]


class ProcessorType(str, Enum):
"""Enumeration of available processor types.
Expand All @@ -33,33 +30,22 @@ class ProcessorType(str, Enum):
class ProcessorConfig(ConfigBase, ABC):
"""Abstract base class for all processor configuration types.

Processors are transformations that run before or after columns are generated.
They can modify, reshape, or augment the dataset before it's saved.
Processors are transformations that run at different stages of the generation
pipeline. They can modify, reshape, or augment the dataset.

The processor implementation determines which stages it handles by overriding
the appropriate callback methods (preprocess, process_after_batch, postprocess).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
the appropriate callback methods (preprocess, process_after_batch, postprocess).
the appropriate callback methods (preprocess, process_before_batch, process_after_batch, postprocess).


Attributes:
name: Unique name of the processor, used to identify the processor in results
and to name output artifacts on disk.
build_stage: The stage at which the processor runs. Currently only `POST_BATCH`
is supported, meaning processors run after each batch of columns is generated.
"""

name: str = Field(
description="The name of the processor, used to identify the processor in the results and to write the artifacts to disk.",
)
build_stage: BuildStage = Field(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for build_stage anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet, this is a neat approach!

default=BuildStage.POST_BATCH,
description=f"The stage at which the processor will run. Supported stages: {', '.join(SUPPORTED_STAGES)}",
)
processor_type: str

@field_validator("build_stage")
def validate_build_stage(cls, v: BuildStage) -> BuildStage:
if v not in SUPPORTED_STAGES:
raise ValueError(
f"Invalid dataset builder stage: {v}. Only these stages are supported: {', '.join(SUPPORTED_STAGES)}"
)
return v


def get_processor_config_from_kwargs(processor_type: ProcessorType, **kwargs: Any) -> ProcessorConfig:
"""Create a processor configuration from a processor type and keyword arguments.
Expand Down
43 changes: 6 additions & 37 deletions packages/data-designer-config/tests/config/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pytest
from pydantic import ValidationError

from data_designer.config.dataset_builders import BuildStage
from data_designer.config.errors import InvalidConfigError
from data_designer.config.processors import (
DropColumnsProcessorConfig,
Expand All @@ -16,92 +15,64 @@


def test_drop_columns_processor_config_creation():
config = DropColumnsProcessorConfig(
name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["col1", "col2"]
)
config = DropColumnsProcessorConfig(name="drop_columns_processor", column_names=["col1", "col2"])

assert config.build_stage == BuildStage.POST_BATCH
assert config.column_names == ["col1", "col2"]
assert config.processor_type == ProcessorType.DROP_COLUMNS
assert isinstance(config, ProcessorConfig)


def test_drop_columns_processor_config_validation():
# Test unsupported stage raises error
with pytest.raises(ValidationError, match="Invalid dataset builder stage"):
DropColumnsProcessorConfig(
name="drop_columns_processor", build_stage=BuildStage.PRE_BATCH, column_names=["col1"]
)

# Test missing required field raises error
with pytest.raises(ValidationError, match="Field required"):
DropColumnsProcessorConfig(name="drop_columns_processor", build_stage=BuildStage.POST_BATCH)
DropColumnsProcessorConfig(name="drop_columns_processor")


def test_drop_columns_processor_config_serialization():
config = DropColumnsProcessorConfig(
name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["col1", "col2"]
)
config = DropColumnsProcessorConfig(name="drop_columns_processor", column_names=["col1", "col2"])

# Serialize to dict
config_dict = config.model_dump()
assert config_dict["build_stage"] == "post_batch"
assert config_dict["column_names"] == ["col1", "col2"]

# Deserialize from dict
config_restored = DropColumnsProcessorConfig.model_validate(config_dict)
assert config_restored.build_stage == config.build_stage
assert config_restored.column_names == config.column_names


def test_schema_transform_processor_config_creation():
config = SchemaTransformProcessorConfig(
name="output_format_processor",
build_stage=BuildStage.POST_BATCH,
template={"text": "{{ col1 }}"},
)

assert config.build_stage == BuildStage.POST_BATCH
assert config.template == {"text": "{{ col1 }}"}
assert config.processor_type == ProcessorType.SCHEMA_TRANSFORM
assert isinstance(config, ProcessorConfig)


def test_schema_transform_processor_config_validation():
# Test unsupported stage raises error
with pytest.raises(ValidationError, match="Invalid dataset builder stage"):
SchemaTransformProcessorConfig(
name="schema_transform_processor",
build_stage=BuildStage.PRE_BATCH,
template={"text": "{{ col1 }}"},
)

# Test missing required field raises error
with pytest.raises(ValidationError, match="Field required"):
SchemaTransformProcessorConfig(name="schema_transform_processor", build_stage=BuildStage.POST_BATCH)
SchemaTransformProcessorConfig(name="schema_transform_processor")

# Test invalid template raises error
with pytest.raises(InvalidConfigError, match="Template must be JSON serializable"):
SchemaTransformProcessorConfig(
name="schema_transform_processor", build_stage=BuildStage.POST_BATCH, template={"text": {1, 2, 3}}
)
SchemaTransformProcessorConfig(name="schema_transform_processor", template={"text": {1, 2, 3}})


def test_schema_transform_processor_config_serialization():
config = SchemaTransformProcessorConfig(
name="schema_transform_processor",
build_stage=BuildStage.POST_BATCH,
template={"text": "{{ col1 }}"},
)

# Serialize to dict
config_dict = config.model_dump()
assert config_dict["build_stage"] == "post_batch"
assert config_dict["template"] == {"text": "{{ col1 }}"}

# Deserialize from dict
config_restored = SchemaTransformProcessorConfig.model_validate(config_dict)
assert config_restored.build_stage == config.build_stage
assert config_restored.template == config.template


Expand All @@ -110,7 +81,6 @@ def test_get_processor_config_from_kwargs():
config_drop_columns = get_processor_config_from_kwargs(
ProcessorType.DROP_COLUMNS,
name="drop_columns_processor",
build_stage=BuildStage.POST_BATCH,
column_names=["col1"],
)
assert isinstance(config_drop_columns, DropColumnsProcessorConfig)
Expand All @@ -120,7 +90,6 @@ def test_get_processor_config_from_kwargs():
config_schema_transform = get_processor_config_from_kwargs(
ProcessorType.SCHEMA_TRANSFORM,
name="output_format_processor",
build_stage=BuildStage.POST_BATCH,
template={"text": "{{ col1 }}"},
)
assert isinstance(config_schema_transform, SchemaTransformProcessorConfig)
Expand All @@ -134,6 +103,6 @@ class UnknownProcessorType(str, Enum):
UNKNOWN = "unknown"

result = get_processor_config_from_kwargs(
UnknownProcessorType.UNKNOWN, name="unknown_processor", build_stage=BuildStage.POST_BATCH, column_names=["col1"]
UnknownProcessorType.UNKNOWN, name="unknown_processor", column_names=["col1"]
)
assert result is None
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ def _initialize(self) -> None:
self._num_records_sampled = 0
self._batch_reader = None
self._df_remaining = None
self._dataset_uri = self.resource_provider.seed_reader.get_dataset_uri()
# Use preprocessed seed if available, otherwise use original
if self.resource_provider.preprocessed_seed_uri is not None:
self._dataset_uri = self.resource_provider.preprocessed_seed_uri
else:
self._dataset_uri = self.resource_provider.seed_reader.get_dataset_uri()
Comment on lines +56 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikeknep – would be helpful to have your eyes specifically on the seed dataset parts of this PR.

self._seed_dataset_size = self.duckdb_conn.execute(f"SELECT COUNT(*) FROM '{self._dataset_uri}'").fetchone()[0]
self._index_range = self._resolve_index_range()

Expand Down
Loading