Skip to content

Conversation

@vatj
Copy link
Contributor

@vatj vatj commented Dec 2, 2025

…te config detected

This PR adds/fixes/changes...

  • please summarize your changes to the code
  • and make sure to include all changes to user-facing APIs

JIRA Issue: -

Priority for Review: -

Related PRs: -

How Has This Been Tested?

  • Unit Tests
  • Integration Tests
  • Manual Tests on VM

Checklist For The Assigned Reviewer:

- [ ] Checked if merge conflicts with master exist
- [ ] Checked if stylechecks for Java and Python pass
- [ ] Checked if all docstrings were added and/or updated appropriately
- [ ] Ran spellcheck on docstring
- [ ] Checked if guides & concepts need to be updated
- [ ] Checked if naming conventions for parameters and variables were followed
- [ ] Checked if private methods are properly declared and used
- [ ] Checked if hard-to-understand areas of code are commented
- [ ] Checked if tests are effective
- [ ] Built and deployed changes on dev VM and tested manually
- [x] (Checked if all type annotations were added and/or updated appropriately)

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds functionality to configure materialization jobs using spark configurations from write_options["spark"], and enhances debugging capabilities by adding comprehensive logging throughout the Kafka engine. The changes enable users to customize Spark job configurations when writing to streaming feature groups, improving flexibility for performance tuning.

Key Changes

  • Added logic to update materialization job configurations with user-provided spark configurations from offline_write_options["spark"] before job execution
  • Enhanced debugging capabilities by adding logger statements throughout kafka_engine.py to trace Kafka producer/consumer initialization, offset retrieval, and configuration setup
  • Improved code formatting by using keyword argument syntax in get_kafka_connector() call

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 9 comments.

File Description
python/hsfs/engine/python.py Added duplicated code blocks (lines 1854-1867 and 1902-1915) to extract spark config from write_options and update materialization job config before running the job. Uses assertions for validation.
python/hsfs/core/kafka_engine.py Added extensive debug logging throughout Kafka engine functions to trace initialization, configuration, and operations. Fixed minor spelling issue and improved parameter formatting.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1864 to +1866
for key, value in spark_compute_config.items():
feature_group.materialization_job.config[key] = value
feature_group.materialization_job.save()
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new functionality to configure materialization job using offline_write_options["spark"] lacks documentation. Consider adding:

  1. A docstring or comment explaining which configuration keys are supported
  2. Usage examples showing how to pass spark configurations
  3. Warnings about potential conflicts with existing job configurations

For example:

# Update materialization job config with user-provided spark configurations
# Supported keys: memory, cores, executor configurations, etc.
# These configurations will override the existing job config.

This is especially important since the public API documentation at lines 3146-3174 in feature_group.py mentions the spark key but doesn't specify it can be used to configure the materialization job in streaming feature groups.

Copilot uses AI. Check for mistakes.
Comment on lines +1854 to +1866
assert (
feature_group.materialization_job is not None
), "Materialization job is not defined. Cannot start materialization job."
if isinstance(offline_write_options, dict) and (
(spark_compute_config := offline_write_options.get("spark", None))
is not None
):
_logger.debug(
"Updating materialization job with user provided spark configurations."
)
for key, value in spark_compute_config.items():
feature_group.materialization_job.config[key] = value
feature_group.materialization_job.save()
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code block (lines 1854-1866) is duplicated at lines 1902-1914. Consider extracting this logic into a helper method to avoid code duplication. For example:

def _update_materialization_job_config(
    self, 
    feature_group: FeatureGroup, 
    offline_write_options: Dict[str, Any]
) -> None:
    """Update materialization job config with user-provided spark configurations."""
    assert (
        feature_group.materialization_job is not None
    ), "Materialization job is not defined. Cannot start materialization job."
    
    if isinstance(offline_write_options, dict) and (
        (spark_compute_config := offline_write_options.get("spark", None))
        is not None
    ):
        _logger.debug(
            "Updating materialization job with user provided spark configurations."
        )
        for key, value in spark_compute_config.items():
            feature_group.materialization_job.config[key] = value
        feature_group.materialization_job.save()

Then call this method before each feature_group.materialization_job.run() invocation.

Copilot uses AI. Check for mistakes.
Comment on lines +1902 to +1904
assert (
feature_group.materialization_job is not None
), "Materialization job is not defined. Cannot start materialization job."
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using assertions for runtime validation is not recommended in production code. Assertions can be disabled with the -O flag in Python and are meant for debugging, not runtime error handling. Consider raising an appropriate exception instead:

if feature_group.materialization_job is None:
    raise ValueError("Materialization job is not defined. Cannot start materialization job.")
Suggested change
assert (
feature_group.materialization_job is not None
), "Materialization job is not defined. Cannot start materialization job."
if feature_group.materialization_job is None:
raise ValueError("Materialization job is not defined. Cannot start materialization job.")

Copilot uses AI. Check for mistakes.
Comment on lines +1905 to +1908
if isinstance(offline_write_options, dict) and (
(spark_compute_config := offline_write_options.get("spark", None))
is not None
):
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isinstance(offline_write_options, dict) check is redundant. Based on the function signature at line 1779, offline_write_options is already typed as Dict[str, Any], so it will always be a dictionary. This check can be simplified to:

if (spark_compute_config := offline_write_options.get("spark", None)) is not None:
Suggested change
if isinstance(offline_write_options, dict) and (
(spark_compute_config := offline_write_options.get("spark", None))
is not None
):
if (spark_compute_config := offline_write_options.get("spark", None)) is not None:

Copilot uses AI. Check for mistakes.
for feature in feature_group.get_complex_features()
}
# setup row writer function
_logger.debug("Setting up writer function to serialise row based on Avro schema.")
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling error: "serialise" should be "serialize" (American English spelling is more commonly used in code).

Suggested change
_logger.debug("Setting up writer function to serialise row based on Avro schema.")
_logger.debug("Setting up writer function to serialize row based on Avro schema.")

Copilot uses AI. Check for mistakes.
Comment on lines +1857 to +1866
if isinstance(offline_write_options, dict) and (
(spark_compute_config := offline_write_options.get("spark", None))
is not None
):
_logger.debug(
"Updating materialization job with user provided spark configurations."
)
for key, value in spark_compute_config.items():
feature_group.materialization_job.config[key] = value
feature_group.materialization_job.save()
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new functionality to update materialization job config with user-provided spark configurations lacks test coverage. Existing tests like test_materialization_kafka and test_materialization_kafka_first_job_execution in python/tests/engine/test_python.py should be extended to verify that:

  1. Spark configurations from offline_write_options["spark"] are correctly applied to the materialization job config
  2. The save() method is called on the materialization job after config updates
  3. The config values are properly overwritten
  4. The behavior is correct when offline_write_options["spark"] is not provided or is None

Copilot uses AI. Check for mistakes.
Comment on lines +1854 to +1856
assert (
feature_group.materialization_job is not None
), "Materialization job is not defined. Cannot start materialization job."
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using assertions for runtime validation is not recommended in production code. Assertions can be disabled with the -O flag in Python and are meant for debugging, not runtime error handling. Consider raising an appropriate exception instead:

if feature_group.materialization_job is None:
    raise ValueError("Materialization job is not defined. Cannot start materialization job.")
Suggested change
assert (
feature_group.materialization_job is not None
), "Materialization job is not defined. Cannot start materialization job."
if feature_group.materialization_job is None:
raise ValueError("Materialization job is not defined. Cannot start materialization job.")

Copilot uses AI. Check for mistakes.
Comment on lines +1857 to +1860
if isinstance(offline_write_options, dict) and (
(spark_compute_config := offline_write_options.get("spark", None))
is not None
):
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isinstance(offline_write_options, dict) check is redundant. Based on the function signature at line 1779, offline_write_options is already typed as Dict[str, Any], so it will always be a dictionary. This check can be simplified to:

if (spark_compute_config := offline_write_options.get("spark", None)) is not None:
Suggested change
if isinstance(offline_write_options, dict) and (
(spark_compute_config := offline_write_options.get("spark", None))
is not None
):
if (spark_compute_config := offline_write_options.get("spark", None)) is not None:

Copilot uses AI. Check for mistakes.
_logger.debug(
"Updating materialization job with user provided spark configurations."
)
for key, value in spark_compute_config.items():
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code directly updates the materialization job config with user-provided spark configurations without any validation. This could potentially lead to invalid configurations or conflicts with existing config values. Consider:

  1. Validating that the keys in spark_compute_config are valid Spark configuration keys
  2. Logging a warning if overwriting existing config values
  3. Documenting which config keys are safe to override

Example:

for key, value in spark_compute_config.items():
    if key in feature_group.materialization_job.config:
        _logger.warning(
            f"Overwriting existing materialization job config key '{key}': "
            f"{feature_group.materialization_job.config[key]} -> {value}"
        )
    feature_group.materialization_job.config[key] = value
Suggested change
for key, value in spark_compute_config.items():
for key, value in spark_compute_config.items():
# Validate Spark config key: must start with "spark."
if not isinstance(key, str) or not re.match(r"^spark\.", key):
_logger.warning(
f"Skipping invalid Spark config key '{key}'. Only keys starting with 'spark.' are allowed."
)
continue
if key in feature_group.materialization_job.config:
_logger.warning(
f"Overwriting existing materialization job config key '{key}': "
f"{feature_group.materialization_job.config[key]} -> {value}"
)

Copilot uses AI. Check for mistakes.
"Updating materialization job with user provided spark configurations."
)
for key, value in spark_compute_config.items():
feature_group.materialization_job.config[key] = value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in most cases what users may want to change is memory/cpu/num execs so it should be ok

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants