fix(workflow): close steps before print_status so buffered records are counted#27964
fix(workflow): close steps before print_status so buffered records are counted#27964
Conversation
…e counted PR #27213 reordered execute() teardown to call print_status() before stop() so the streamable logging handler stays alive while the final summary is emitted. The unintended side effect was that step.close() — invoked from stop() — is what triggers the metadata REST sink to flush its batched buffer and call status.scanned_all(...). With the new ordering, the printed status no longer included those buffered records (databases, schemas, queries, lineage, etc.), only the records that had already been committed individually. This broke vanilla CLI E2E ingestion tests for every connector hitting the shared assert_for_vanilla_ingestion path, which uses a strict assertGreater(records + updated_records, expected_tables()) as a regression guard that the sink emits more than just the tables themselves. Postgres saw "AssertionError: 2 not greater than 2" starting Apr 24. Extract step closing into a dedicated close_steps() method invoked before print_status(), and remove the duplicate loop from stop(). This preserves the original intent (streamable logger stays alive for the final summary, stop() always runs even when print_status() raises) while restoring accurate counts. Test updates lock the new ordering (close_steps -> print_status -> stop) and add coverage for the case where close_steps() itself raises, ensuring stop() still runs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR fixes an ingestion workflow teardown ordering regression so that step buffers (e.g., batched entities in the metadata REST sink) are flushed before the final status summary is printed, restoring accurate record counts while keeping streamable logging active during print_status().
Changes:
- Introduces
BaseWorkflow.close_steps()to close steps (flush buffers) independently fromstop(). - Reorders
BaseWorkflow.execute()teardown toclose_steps() -> print_status() -> stop(), and removes the step-closing loop fromstop(). - Updates unit tests to lock the new ordering and ensure
stop()still runs whenclose_steps()orprint_status()raises.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| ingestion/src/metadata/workflow/base.py | Adds close_steps() and reorders execute teardown so buffered sink records are counted before printing final status. |
| ingestion/tests/unit/workflow/test_base_workflow.py | Updates/extends teardown-order tests to validate close_steps -> print_status -> stop and failure behavior. |
Address Copilot review feedback: build_ingestion_status() reads from the same step status objects that print_status() does (via Summary.from_step), so calling close_steps() only before print_status() leaves the persisted pipeline status — sent to the server via set_ingestion_pipeline_status() — missing the records that sinks only commit on close(). Move close_steps() to run before build_ingestion_status() so both the persisted run stats and the printed summary include the flushed records. The stop()-always-runs guarantee is preserved by keeping the inner block inside try/finally. Lock the new ordering in tests: close_steps -> build_ingestion_status -> set_ingestion_pipeline_status -> print_status -> stop Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address gitar-bot review feedback: previously the pipeline status was persisted unconditionally at the start of the finally block. After moving close_steps() ahead of build_ingestion_status / set_ingestion_pipeline_status to fix count correctness, an unexpected failure inside close_steps() (e.g., workflow_steps() returning a bad iterator) would skip persistence entirely. Wrap close_steps() in its own try/except so a catastrophic flush failure is logged but does not prevent the pipeline status from reaching the server. The flushed counts may be missing in that edge case, but a status record is better than none. Replace test_stop_still_runs_when_close_steps_raises with test_pipeline_status_is_persisted_when_close_steps_raises, which now asserts set_ingestion_pipeline_status, print_status, and stop all run when close_steps() raises. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address Copilot review feedback: stop() is part of the public cleanup
contract — three callers in cli/common.py, cli/app.py, and cli/ingest_dbt.py
invoke it explicitly, and any future caller using stop() standalone (or in
addition to execute()) was silently leaking step resources after the
previous commits removed the step.close() loop from stop().
Restore step cleanup in stop() with an idempotency flag (_steps_closed)
so execute() still flushes before print_status() without double-closing
during the subsequent stop(). Place the close_steps() call in stop()
before metadata.close() so sinks can use the OM client during their
final flush, and wrap it defensively so a flush failure doesn't leave
the timer stopped but the OM client still open.
Add two tests:
- test_stop_closes_steps_when_called_standalone: stop() invoked outside
execute() still closes steps.
- test_close_steps_is_idempotent_across_execute_and_stop: when execute()
closes steps before print_status, the subsequent close_steps() call
inside stop() does not re-run step.close().
Update the ordering test to expect the trailing close_steps recording
(no-op via the idempotency flag) and the catastrophic-failure test to
allow close_steps to be invoked twice (once from execute, once from
inside stop, both raising and being swallowed).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code Review ✅ Approved 1 resolved / 1 findingsExtracts step closing into a dedicated method invoked before status printing to ensure sink buffers are flushed and accurately counted. The pipeline status remains persisted even if cleanup routines raise exceptions. ✅ 1 resolved✅ Edge Case: Pipeline status not persisted if close_steps() propagates
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|
🟡 Playwright Results — all passed (13 flaky)✅ 4015 passed · ❌ 0 failed · 🟡 13 flaky · ⏭️ 86 skipped
🟡 13 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |



Summary
PR #27213 reordered the
execute()teardown iningestion/src/metadata/workflow/base.pyto callprint_status()beforestop(), so the streamable logging handler (cleaned up insidestop()) stays alive while the final summary is emitted. The unintended side effect:step.close()is invoked fromstop(), and the metadata REST sink only flushes its batched buffer and callsstatus.scanned_all(...)insideclose()(ingestion/src/metadata/ingestion/sink/metadata_rest.py:1047). With the new ordering, buffered records (databases, schemas, queries, lineage, ...) were never reflected in the printed status — only the records that had already been committed individually.This broke vanilla CLI E2E ingestion for every connector that goes through
assert_for_vanilla_ingestion, which uses a strictas a regression guard that the sink emits more than just the tables themselves. Postgres started failing on April 24 with
AssertionError: 2 not greater than 2(workflow run 24865479570); other connectors hit the same assertion.Fix
close_steps()method on the workflow base.close_steps()beforeprint_status()in theexecute()teardown so buffered records are flushed and counted prior to the printed summary.step.close()loop fromstop().This preserves the original intent of #27213 — streamable logging stays alive while
print_status()runs, andstop()is still guaranteed to run even whenprint_status()raises — while restoring accurate sink counts.Test plan
tests/unit/workflow/test_base_workflow.py::TestWorkflowExecuteTeardown::test_close_steps_runs_before_print_status_and_stopto lock the new ordering (close_steps -> print_status -> stop).tests/unit/workflow/test_base_workflow.py::TestWorkflowExecuteTeardown::test_stop_still_runs_when_close_steps_raisesso thefinallyguarantee aroundstop()extends to failures inside the new flush step.test_stop_still_runs_when_print_status_raisesstill passes.pytest ingestion/tests/unit/workflow/test_base_workflow.py— 8 passed.🤖 Generated with Claude Code