Refactor bundle refresh persistence into overridable get/update methods#63835
Refactor bundle refresh persistence into overridable get/update methods#63835ephraimbuddy wants to merge 2 commits intoapache:mainfrom
Conversation
Replace inline DagBundleModel mutations in _refresh_dag_bundles() with get_bundle_state() and update_bundle_state() instance methods on DagFileProcessorManager, isolating all DB access for bundle state behind a clean override seam.
There was a problem hiding this comment.
Pull request overview
This PR refactors DAG bundle refresh persistence in DagFileProcessorManager._refresh_dag_bundles() by moving direct DagBundleModel mutations into two new instance methods (get_bundle_state() and update_bundle_state()), creating a clean override seam for DB access and adding unit tests for the new behavior.
Changes:
- Introduce
BundleStateandDagFileProcessorManager.get_bundle_state()/update_bundle_state()to encapsulate persisted bundle refresh state. - Update
_refresh_dag_bundles()to use the new methods (including error-handling paths) instead of inline session usage. - Add unit tests covering state read/write behavior and refresh loop interactions via mocked get/update methods.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
airflow-core/src/airflow/dag_processing/manager.py |
Adds BundleState + get/update persistence methods; refactors _refresh_dag_bundles() to call them. |
airflow-core/tests/unit/dag_processing/test_manager.py |
Adds unit tests for the new persistence methods and refresh loop behavior. |
There was a problem hiding this comment.
Pull request overview
This PR refactors how DagFileProcessorManager._refresh_dag_bundles() reads/writes persisted DAG bundle refresh state by introducing overridable get_bundle_state() / update_bundle_state() methods, providing a cleaner seam for isolating DB access.
Changes:
- Add a
BundleStatetype plusget_bundle_state()/update_bundle_state()methods onDagFileProcessorManager. - Refactor
_refresh_dag_bundles()to use those methods (and to log/continue on state read/write failures). - Add unit tests covering the new methods and key
_refresh_dag_bundles()state/persistence behaviors.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
airflow-core/src/airflow/dag_processing/manager.py |
Introduces BundleState and new DB access methods; updates bundle refresh loop to use them. |
airflow-core/tests/unit/dag_processing/test_manager.py |
Adds unit tests for bundle state get/update and refresh persistence paths. |
| self._force_refresh_bundles.discard(bundle.name) | ||
|
|
||
| if bundle.supports_versioning: | ||
| # We can short-circuit the rest of this if (1) bundle was seen before by | ||
| # this dag processor and (2) the version of the bundle did not change | ||
| # after refreshing it | ||
| version_after_refresh = bundle.get_current_version() | ||
| if previously_seen and pre_refresh_version == version_after_refresh: | ||
| self.log.debug( | ||
| "Bundle %s version not changed after refresh: %s", | ||
| bundle.name, | ||
| version_after_refresh, | ||
| ) | ||
| else: | ||
| version_after_refresh = None | ||
| try: | ||
| self.update_bundle_state(bundle.name, last_refreshed=now, version=None) | ||
| except Exception: | ||
| self.log.exception("Error persisting state for bundle %s", bundle.name) | ||
| continue | ||
|
|
||
| self._bundle_versions[bundle.name] = version_after_refresh | ||
| self.log.info("Version changed for %s, new version: %s", bundle.name, version_after_refresh) | ||
| else: | ||
| version_after_refresh = None | ||
|
|
||
| try: | ||
| self.update_bundle_state(bundle.name, last_refreshed=now, version=version_after_refresh) | ||
| except Exception: | ||
| self.log.exception("Error persisting state for bundle %s", bundle.name) | ||
| else: | ||
| self._bundle_versions[bundle.name] = version_after_refresh |
Replace inline DagBundleModel mutations in _refresh_dag_bundles() with get_bundle_state() and update_bundle_state() instance methods on DagFileProcessorManager, isolating all DB access for bundle state behind a clean override seam.
Was generative AI tooling used to co-author this PR?
Generated-by: Sonnet 4.6