-
Notifications
You must be signed in to change notification settings - Fork 26
[HWORKS-2045] Use spark job config in write_options to configure materialisation job #747
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds functionality to configure materialization jobs using spark configurations from write_options["spark"], and enhances debugging capabilities by adding comprehensive logging throughout the Kafka engine. The changes enable users to customize Spark job configurations when writing to streaming feature groups, improving flexibility for performance tuning.
Key Changes
- Added logic to update materialization job configurations with user-provided spark configurations from
offline_write_options["spark"]before job execution - Enhanced debugging capabilities by adding logger statements throughout
kafka_engine.pyto trace Kafka producer/consumer initialization, offset retrieval, and configuration setup - Improved code formatting by using keyword argument syntax in
get_kafka_connector()call
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
| python/hsfs/engine/python.py | Added duplicated code blocks (lines 1854-1867 and 1902-1915) to extract spark config from write_options and update materialization job config before running the job. Uses assertions for validation. |
| python/hsfs/core/kafka_engine.py | Added extensive debug logging throughout Kafka engine functions to trace initialization, configuration, and operations. Fixed minor spelling issue and improved parameter formatting. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for key, value in spark_compute_config.items(): | ||
| feature_group.materialization_job.config[key] = value | ||
| feature_group.materialization_job.save() |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new functionality to configure materialization job using offline_write_options["spark"] lacks documentation. Consider adding:
- A docstring or comment explaining which configuration keys are supported
- Usage examples showing how to pass spark configurations
- Warnings about potential conflicts with existing job configurations
For example:
# Update materialization job config with user-provided spark configurations
# Supported keys: memory, cores, executor configurations, etc.
# These configurations will override the existing job config.This is especially important since the public API documentation at lines 3146-3174 in feature_group.py mentions the spark key but doesn't specify it can be used to configure the materialization job in streaming feature groups.
| assert ( | ||
| feature_group.materialization_job is not None | ||
| ), "Materialization job is not defined. Cannot start materialization job." | ||
| if isinstance(offline_write_options, dict) and ( | ||
| (spark_compute_config := offline_write_options.get("spark", None)) | ||
| is not None | ||
| ): | ||
| _logger.debug( | ||
| "Updating materialization job with user provided spark configurations." | ||
| ) | ||
| for key, value in spark_compute_config.items(): | ||
| feature_group.materialization_job.config[key] = value | ||
| feature_group.materialization_job.save() |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code block (lines 1854-1866) is duplicated at lines 1902-1914. Consider extracting this logic into a helper method to avoid code duplication. For example:
def _update_materialization_job_config(
self,
feature_group: FeatureGroup,
offline_write_options: Dict[str, Any]
) -> None:
"""Update materialization job config with user-provided spark configurations."""
assert (
feature_group.materialization_job is not None
), "Materialization job is not defined. Cannot start materialization job."
if isinstance(offline_write_options, dict) and (
(spark_compute_config := offline_write_options.get("spark", None))
is not None
):
_logger.debug(
"Updating materialization job with user provided spark configurations."
)
for key, value in spark_compute_config.items():
feature_group.materialization_job.config[key] = value
feature_group.materialization_job.save()Then call this method before each feature_group.materialization_job.run() invocation.
| assert ( | ||
| feature_group.materialization_job is not None | ||
| ), "Materialization job is not defined. Cannot start materialization job." |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using assertions for runtime validation is not recommended in production code. Assertions can be disabled with the -O flag in Python and are meant for debugging, not runtime error handling. Consider raising an appropriate exception instead:
if feature_group.materialization_job is None:
raise ValueError("Materialization job is not defined. Cannot start materialization job.")| assert ( | |
| feature_group.materialization_job is not None | |
| ), "Materialization job is not defined. Cannot start materialization job." | |
| if feature_group.materialization_job is None: | |
| raise ValueError("Materialization job is not defined. Cannot start materialization job.") |
| if isinstance(offline_write_options, dict) and ( | ||
| (spark_compute_config := offline_write_options.get("spark", None)) | ||
| is not None | ||
| ): |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isinstance(offline_write_options, dict) check is redundant. Based on the function signature at line 1779, offline_write_options is already typed as Dict[str, Any], so it will always be a dictionary. This check can be simplified to:
if (spark_compute_config := offline_write_options.get("spark", None)) is not None:| if isinstance(offline_write_options, dict) and ( | |
| (spark_compute_config := offline_write_options.get("spark", None)) | |
| is not None | |
| ): | |
| if (spark_compute_config := offline_write_options.get("spark", None)) is not None: |
| for feature in feature_group.get_complex_features() | ||
| } | ||
| # setup row writer function | ||
| _logger.debug("Setting up writer function to serialise row based on Avro schema.") |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling error: "serialise" should be "serialize" (American English spelling is more commonly used in code).
| _logger.debug("Setting up writer function to serialise row based on Avro schema.") | |
| _logger.debug("Setting up writer function to serialize row based on Avro schema.") |
| if isinstance(offline_write_options, dict) and ( | ||
| (spark_compute_config := offline_write_options.get("spark", None)) | ||
| is not None | ||
| ): | ||
| _logger.debug( | ||
| "Updating materialization job with user provided spark configurations." | ||
| ) | ||
| for key, value in spark_compute_config.items(): | ||
| feature_group.materialization_job.config[key] = value | ||
| feature_group.materialization_job.save() |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new functionality to update materialization job config with user-provided spark configurations lacks test coverage. Existing tests like test_materialization_kafka and test_materialization_kafka_first_job_execution in python/tests/engine/test_python.py should be extended to verify that:
- Spark configurations from
offline_write_options["spark"]are correctly applied to the materialization job config - The
save()method is called on the materialization job after config updates - The config values are properly overwritten
- The behavior is correct when
offline_write_options["spark"]is not provided or is None
| assert ( | ||
| feature_group.materialization_job is not None | ||
| ), "Materialization job is not defined. Cannot start materialization job." |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using assertions for runtime validation is not recommended in production code. Assertions can be disabled with the -O flag in Python and are meant for debugging, not runtime error handling. Consider raising an appropriate exception instead:
if feature_group.materialization_job is None:
raise ValueError("Materialization job is not defined. Cannot start materialization job.")| assert ( | |
| feature_group.materialization_job is not None | |
| ), "Materialization job is not defined. Cannot start materialization job." | |
| if feature_group.materialization_job is None: | |
| raise ValueError("Materialization job is not defined. Cannot start materialization job.") |
| if isinstance(offline_write_options, dict) and ( | ||
| (spark_compute_config := offline_write_options.get("spark", None)) | ||
| is not None | ||
| ): |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isinstance(offline_write_options, dict) check is redundant. Based on the function signature at line 1779, offline_write_options is already typed as Dict[str, Any], so it will always be a dictionary. This check can be simplified to:
if (spark_compute_config := offline_write_options.get("spark", None)) is not None:| if isinstance(offline_write_options, dict) and ( | |
| (spark_compute_config := offline_write_options.get("spark", None)) | |
| is not None | |
| ): | |
| if (spark_compute_config := offline_write_options.get("spark", None)) is not None: |
| _logger.debug( | ||
| "Updating materialization job with user provided spark configurations." | ||
| ) | ||
| for key, value in spark_compute_config.items(): |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code directly updates the materialization job config with user-provided spark configurations without any validation. This could potentially lead to invalid configurations or conflicts with existing config values. Consider:
- Validating that the keys in
spark_compute_configare valid Spark configuration keys - Logging a warning if overwriting existing config values
- Documenting which config keys are safe to override
Example:
for key, value in spark_compute_config.items():
if key in feature_group.materialization_job.config:
_logger.warning(
f"Overwriting existing materialization job config key '{key}': "
f"{feature_group.materialization_job.config[key]} -> {value}"
)
feature_group.materialization_job.config[key] = value| for key, value in spark_compute_config.items(): | |
| for key, value in spark_compute_config.items(): | |
| # Validate Spark config key: must start with "spark." | |
| if not isinstance(key, str) or not re.match(r"^spark\.", key): | |
| _logger.warning( | |
| f"Skipping invalid Spark config key '{key}'. Only keys starting with 'spark.' are allowed." | |
| ) | |
| continue | |
| if key in feature_group.materialization_job.config: | |
| _logger.warning( | |
| f"Overwriting existing materialization job config key '{key}': " | |
| f"{feature_group.materialization_job.config[key]} -> {value}" | |
| ) |
| "Updating materialization job with user provided spark configurations." | ||
| ) | ||
| for key, value in spark_compute_config.items(): | ||
| feature_group.materialization_job.config[key] = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change will allow you to overwrite all properties we have here https://github.com/logicalclocks/hopsworks-ee/blob/master/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jobs/configuration/spark/SparkJobConfiguration.java that starts with spark.*
But you will not be able to pass arbitrary spark properties since that is done via this field https://github.com/logicalclocks/hopsworks-ee/blob/cdd7ced6446dc48ef5a8af08897b368ad22f7405/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jobs/configuration/spark/SparkJobConfiguration.java#L72 is that intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess in most cases what users may want to change is memory/cpu/num execs so it should be ok
…te config detected
This PR adds/fixes/changes...
JIRA Issue: -
Priority for Review: -
Related PRs: -
How Has This Been Tested?
Checklist For The Assigned Reviewer: