Skip to content

Refactor bundle refresh persistence into overridable get/update methods#63835

Open
ephraimbuddy wants to merge 2 commits intoapache:mainfrom
astronomer:refactor-bundle-refresh-persistence
Open

Refactor bundle refresh persistence into overridable get/update methods#63835
ephraimbuddy wants to merge 2 commits intoapache:mainfrom
astronomer:refactor-bundle-refresh-persistence

Conversation

@ephraimbuddy
Copy link
Contributor

Replace inline DagBundleModel mutations in _refresh_dag_bundles() with get_bundle_state() and update_bundle_state() instance methods on DagFileProcessorManager, isolating all DB access for bundle state behind a clean override seam.


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Sonnet 4.6

Replace inline DagBundleModel mutations in _refresh_dag_bundles() with
get_bundle_state() and update_bundle_state() instance methods on
DagFileProcessorManager, isolating all DB access for bundle state
behind a clean override seam.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors DAG bundle refresh persistence in DagFileProcessorManager._refresh_dag_bundles() by moving direct DagBundleModel mutations into two new instance methods (get_bundle_state() and update_bundle_state()), creating a clean override seam for DB access and adding unit tests for the new behavior.

Changes:

  • Introduce BundleState and DagFileProcessorManager.get_bundle_state() / update_bundle_state() to encapsulate persisted bundle refresh state.
  • Update _refresh_dag_bundles() to use the new methods (including error-handling paths) instead of inline session usage.
  • Add unit tests covering state read/write behavior and refresh loop interactions via mocked get/update methods.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
airflow-core/src/airflow/dag_processing/manager.py Adds BundleState + get/update persistence methods; refactors _refresh_dag_bundles() to call them.
airflow-core/tests/unit/dag_processing/test_manager.py Adds unit tests for the new persistence methods and refresh loop behavior.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors how DagFileProcessorManager._refresh_dag_bundles() reads/writes persisted DAG bundle refresh state by introducing overridable get_bundle_state() / update_bundle_state() methods, providing a cleaner seam for isolating DB access.

Changes:

  • Add a BundleState type plus get_bundle_state() / update_bundle_state() methods on DagFileProcessorManager.
  • Refactor _refresh_dag_bundles() to use those methods (and to log/continue on state read/write failures).
  • Add unit tests covering the new methods and key _refresh_dag_bundles() state/persistence behaviors.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
airflow-core/src/airflow/dag_processing/manager.py Introduces BundleState and new DB access methods; updates bundle refresh loop to use them.
airflow-core/tests/unit/dag_processing/test_manager.py Adds unit tests for bundle state get/update and refresh persistence paths.

Comment on lines +706 to +734
self._force_refresh_bundles.discard(bundle.name)

if bundle.supports_versioning:
# We can short-circuit the rest of this if (1) bundle was seen before by
# this dag processor and (2) the version of the bundle did not change
# after refreshing it
version_after_refresh = bundle.get_current_version()
if previously_seen and pre_refresh_version == version_after_refresh:
self.log.debug(
"Bundle %s version not changed after refresh: %s",
bundle.name,
version_after_refresh,
)
else:
version_after_refresh = None
try:
self.update_bundle_state(bundle.name, last_refreshed=now, version=None)
except Exception:
self.log.exception("Error persisting state for bundle %s", bundle.name)
continue

self._bundle_versions[bundle.name] = version_after_refresh
self.log.info("Version changed for %s, new version: %s", bundle.name, version_after_refresh)
else:
version_after_refresh = None

try:
self.update_bundle_state(bundle.name, last_refreshed=now, version=version_after_refresh)
except Exception:
self.log.exception("Error persisting state for bundle %s", bundle.name)
else:
self._bundle_versions[bundle.name] = version_after_refresh
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants