-
Notifications
You must be signed in to change notification settings - Fork 57
refactor: replace stage-based config with callback-based processor design #294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
b24779d
71574b7
d8f5071
54f6095
46880e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -12,11 +12,8 @@ | |||||
| from typing_extensions import TypeAlias | ||||||
|
|
||||||
| from data_designer.config.base import ConfigBase | ||||||
| from data_designer.config.dataset_builders import BuildStage | ||||||
| from data_designer.config.errors import InvalidConfigError | ||||||
|
|
||||||
| SUPPORTED_STAGES = [BuildStage.POST_BATCH] | ||||||
|
|
||||||
|
|
||||||
| class ProcessorType(str, Enum): | ||||||
| """Enumeration of available processor types. | ||||||
|
|
@@ -33,33 +30,22 @@ class ProcessorType(str, Enum): | |||||
| class ProcessorConfig(ConfigBase, ABC): | ||||||
| """Abstract base class for all processor configuration types. | ||||||
|
|
||||||
| Processors are transformations that run before or after columns are generated. | ||||||
| They can modify, reshape, or augment the dataset before it's saved. | ||||||
| Processors are transformations that run at different stages of the generation | ||||||
| pipeline. They can modify, reshape, or augment the dataset. | ||||||
|
|
||||||
| The processor implementation determines which stages it handles by overriding | ||||||
| the appropriate callback methods (preprocess, process_after_batch, postprocess). | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| Attributes: | ||||||
| name: Unique name of the processor, used to identify the processor in results | ||||||
| and to name output artifacts on disk. | ||||||
| build_stage: The stage at which the processor runs. Currently only `POST_BATCH` | ||||||
| is supported, meaning processors run after each batch of columns is generated. | ||||||
| """ | ||||||
|
|
||||||
| name: str = Field( | ||||||
| description="The name of the processor, used to identify the processor in the results and to write the artifacts to disk.", | ||||||
| ) | ||||||
| build_stage: BuildStage = Field( | ||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sweet, this is a neat approach! |
||||||
| default=BuildStage.POST_BATCH, | ||||||
| description=f"The stage at which the processor will run. Supported stages: {', '.join(SUPPORTED_STAGES)}", | ||||||
| ) | ||||||
| processor_type: str | ||||||
|
|
||||||
| @field_validator("build_stage") | ||||||
| def validate_build_stage(cls, v: BuildStage) -> BuildStage: | ||||||
| if v not in SUPPORTED_STAGES: | ||||||
| raise ValueError( | ||||||
| f"Invalid dataset builder stage: {v}. Only these stages are supported: {', '.join(SUPPORTED_STAGES)}" | ||||||
| ) | ||||||
| return v | ||||||
|
|
||||||
|
|
||||||
| def get_processor_config_from_kwargs(processor_type: ProcessorType, **kwargs: Any) -> ProcessorConfig: | ||||||
| """Create a processor configuration from a processor type and keyword arguments. | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,7 +53,11 @@ def _initialize(self) -> None: | |
| self._num_records_sampled = 0 | ||
| self._batch_reader = None | ||
| self._df_remaining = None | ||
| self._dataset_uri = self.resource_provider.seed_reader.get_dataset_uri() | ||
| # Use preprocessed seed if available, otherwise use original | ||
| if self.resource_provider.preprocessed_seed_uri is not None: | ||
| self._dataset_uri = self.resource_provider.preprocessed_seed_uri | ||
| else: | ||
| self._dataset_uri = self.resource_provider.seed_reader.get_dataset_uri() | ||
|
Comment on lines
+56
to
+60
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mikeknep – would be helpful to have your eyes specifically on the seed dataset parts of this PR. |
||
| self._seed_dataset_size = self.duckdb_conn.execute(f"SELECT COUNT(*) FROM '{self._dataset_uri}'").fetchone()[0] | ||
| self._index_range = self._resolve_index_range() | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somehow in by head dropping columns needs to happen post generation, BUT the key point is that each batch contains the full schema of the dataset. Might be good to have a tip (or something) box that highlights this point.