Skip to content

fix(workflow): close steps before print_status so buffered records are counted#27964

Open
ulixius9 wants to merge 4 commits intomainfrom
fix/workflow-flush-before-print-status
Open

fix(workflow): close steps before print_status so buffered records are counted#27964
ulixius9 wants to merge 4 commits intomainfrom
fix/workflow-flush-before-print-status

Conversation

@ulixius9
Copy link
Copy Markdown
Member

@ulixius9 ulixius9 commented May 7, 2026

Summary

PR #27213 reordered the execute() teardown in ingestion/src/metadata/workflow/base.py to call print_status() before stop(), so the streamable logging handler (cleaned up inside stop()) stays alive while the final summary is emitted. The unintended side effect: step.close() is invoked from stop(), and the metadata REST sink only flushes its batched buffer and calls status.scanned_all(...) inside close() (ingestion/src/metadata/ingestion/sink/metadata_rest.py:1047). With the new ordering, buffered records (databases, schemas, queries, lineage, ...) were never reflected in the printed status — only the records that had already been committed individually.

This broke vanilla CLI E2E ingestion for every connector that goes through assert_for_vanilla_ingestion, which uses a strict

self.assertGreater(
    len(sink_status.records) + len(sink_status.updated_records),
    self.expected_tables(),
)

as a regression guard that the sink emits more than just the tables themselves. Postgres started failing on April 24 with AssertionError: 2 not greater than 2 (workflow run 24865479570); other connectors hit the same assertion.

Fix

  • Extract step closing into a dedicated close_steps() method on the workflow base.
  • Call close_steps() before print_status() in the execute() teardown so buffered records are flushed and counted prior to the printed summary.
  • Drop the duplicate step.close() loop from stop().

This preserves the original intent of #27213 — streamable logging stays alive while print_status() runs, and stop() is still guaranteed to run even when print_status() raises — while restoring accurate sink counts.

Test plan

  • Added tests/unit/workflow/test_base_workflow.py::TestWorkflowExecuteTeardown::test_close_steps_runs_before_print_status_and_stop to lock the new ordering (close_steps -> print_status -> stop).
  • Added tests/unit/workflow/test_base_workflow.py::TestWorkflowExecuteTeardown::test_stop_still_runs_when_close_steps_raises so the finally guarantee around stop() extends to failures inside the new flush step.
  • Existing test_stop_still_runs_when_print_status_raises still passes.
  • Local: pytest ingestion/tests/unit/workflow/test_base_workflow.py — 8 passed.
  • CI: vanilla CLI E2E (postgres + the other connectors that started failing on Apr 24) should go green again.

🤖 Generated with Claude Code

…e counted

PR #27213 reordered execute() teardown to call print_status() before stop()
so the streamable logging handler stays alive while the final summary is
emitted. The unintended side effect was that step.close() — invoked from
stop() — is what triggers the metadata REST sink to flush its batched
buffer and call status.scanned_all(...). With the new ordering, the printed
status no longer included those buffered records (databases, schemas,
queries, lineage, etc.), only the records that had already been committed
individually.

This broke vanilla CLI E2E ingestion tests for every connector hitting the
shared assert_for_vanilla_ingestion path, which uses a strict
assertGreater(records + updated_records, expected_tables()) as a regression
guard that the sink emits more than just the tables themselves. Postgres
saw "AssertionError: 2 not greater than 2" starting Apr 24.

Extract step closing into a dedicated close_steps() method invoked before
print_status(), and remove the duplicate loop from stop(). This preserves
the original intent (streamable logger stays alive for the final summary,
stop() always runs even when print_status() raises) while restoring
accurate counts.

Test updates lock the new ordering (close_steps -> print_status -> stop)
and add coverage for the case where close_steps() itself raises, ensuring
stop() still runs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 7, 2026 11:52
@ulixius9 ulixius9 requested a review from a team as a code owner May 7, 2026 11:52
@github-actions github-actions Bot added Ingestion safe to test Add this label to run secure Github workflows on PRs labels May 7, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes an ingestion workflow teardown ordering regression so that step buffers (e.g., batched entities in the metadata REST sink) are flushed before the final status summary is printed, restoring accurate record counts while keeping streamable logging active during print_status().

Changes:

  • Introduces BaseWorkflow.close_steps() to close steps (flush buffers) independently from stop().
  • Reorders BaseWorkflow.execute() teardown to close_steps() -> print_status() -> stop(), and removes the step-closing loop from stop().
  • Updates unit tests to lock the new ordering and ensure stop() still runs when close_steps() or print_status() raises.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
ingestion/src/metadata/workflow/base.py Adds close_steps() and reorders execute teardown so buffered sink records are counted before printing final status.
ingestion/tests/unit/workflow/test_base_workflow.py Updates/extends teardown-order tests to validate close_steps -> print_status -> stop and failure behavior.

Comment thread ingestion/src/metadata/workflow/base.py Outdated
Comment thread ingestion/src/metadata/workflow/base.py
Address Copilot review feedback: build_ingestion_status() reads from the
same step status objects that print_status() does (via Summary.from_step),
so calling close_steps() only before print_status() leaves the persisted
pipeline status — sent to the server via set_ingestion_pipeline_status()
— missing the records that sinks only commit on close().

Move close_steps() to run before build_ingestion_status() so both the
persisted run stats and the printed summary include the flushed records.
The stop()-always-runs guarantee is preserved by keeping the inner block
inside try/finally.

Lock the new ordering in tests:
  close_steps -> build_ingestion_status -> set_ingestion_pipeline_status
  -> print_status -> stop

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread ingestion/src/metadata/workflow/base.py
Address gitar-bot review feedback: previously the pipeline status was
persisted unconditionally at the start of the finally block. After moving
close_steps() ahead of build_ingestion_status / set_ingestion_pipeline_status
to fix count correctness, an unexpected failure inside close_steps() (e.g.,
workflow_steps() returning a bad iterator) would skip persistence entirely.

Wrap close_steps() in its own try/except so a catastrophic flush failure
is logged but does not prevent the pipeline status from reaching the
server. The flushed counts may be missing in that edge case, but a status
record is better than none.

Replace test_stop_still_runs_when_close_steps_raises with
test_pipeline_status_is_persisted_when_close_steps_raises, which now
asserts set_ingestion_pipeline_status, print_status, and stop all run when
close_steps() raises.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 7, 2026 14:02
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

Comment thread ingestion/src/metadata/workflow/base.py
Address Copilot review feedback: stop() is part of the public cleanup
contract — three callers in cli/common.py, cli/app.py, and cli/ingest_dbt.py
invoke it explicitly, and any future caller using stop() standalone (or in
addition to execute()) was silently leaking step resources after the
previous commits removed the step.close() loop from stop().

Restore step cleanup in stop() with an idempotency flag (_steps_closed)
so execute() still flushes before print_status() without double-closing
during the subsequent stop(). Place the close_steps() call in stop()
before metadata.close() so sinks can use the OM client during their
final flush, and wrap it defensively so a flush failure doesn't leave
the timer stopped but the OM client still open.

Add two tests:
  - test_stop_closes_steps_when_called_standalone: stop() invoked outside
    execute() still closes steps.
  - test_close_steps_is_idempotent_across_execute_and_stop: when execute()
    closes steps before print_status, the subsequent close_steps() call
    inside stop() does not re-run step.close().

Update the ordering test to expect the trailing close_steps recording
(no-op via the idempotency flag) and the catastrophic-failure test to
allow close_steps to be invoked twice (once from execute, once from
inside stop, both raising and being swallowed).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented May 7, 2026

Code Review ✅ Approved 1 resolved / 1 findings

Extracts step closing into a dedicated method invoked before status printing to ensure sink buffers are flushed and accurately counted. The pipeline status remains persisted even if cleanup routines raise exceptions.

✅ 1 resolved
Edge Case: Pipeline status not persisted if close_steps() propagates

📄 ingestion/src/metadata/workflow/base.py:274-285
By moving build_ingestion_status() and set_ingestion_pipeline_status() inside the same try block after close_steps() (line 280-282), if close_steps() ever propagates an exception the pipeline status will never be persisted to the server. Previously it was persisted unconditionally.

In practice this is unlikely because close_steps() already wraps each step in its own try/except, but an unexpected failure in self.workflow_steps() (e.g., returning a bad iterator) or a SystemExit/KeyboardInterrupt could trigger this path.

The test test_stop_still_runs_when_close_steps_raises confirms stop() is called but does not assert that the pipeline status is persisted.

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented May 7, 2026

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

🟡 Playwright Results — all passed (13 flaky)

✅ 4015 passed · ❌ 0 failed · 🟡 13 flaky · ⏭️ 86 skipped

Shard Passed Failed Flaky Skipped
🟡 Shard 1 298 0 1 4
🟡 Shard 2 753 0 2 8
🟡 Shard 3 753 0 6 7
🟡 Shard 4 788 0 2 18
🟡 Shard 5 686 0 1 41
🟡 Shard 6 737 0 1 8
🟡 13 flaky test(s) (passed on retry)
  • Pages/AuditLogs.spec.ts › should apply both User and EntityType filters simultaneously (shard 1, 1 retry)
  • Features/ActivityAPI.spec.ts › Activity event shows the actor who made the change (shard 2, 1 retry)
  • Features/BulkEditEntity.spec.ts › Glossary (shard 2, 1 retry)
  • Features/RTL.spec.ts › Verify Following widget functionality (shard 3, 1 retry)
  • Features/Table.spec.ts › Table pagination with sorting should works (shard 3, 1 retry)
  • Features/UserProfileOnlineStatus.spec.ts › Should show online status badge on user profile for active users (shard 3, 1 retry)
  • Flow/AddRoleAndAssignToUser.spec.ts › Verify assigned role to new user (shard 3, 1 retry)
  • Flow/PersonaFlow.spec.ts › Set default persona for team should work properly (shard 3, 1 retry)
  • Flow/ServiceForm.spec.ts › Verify form selects are working properly (shard 3, 1 retry)
  • Pages/CustomProperties.spec.ts › Should clear search and show all properties for apiCollection in right panel (shard 4, 1 retry)
  • Pages/Domains.spec.ts › Rename domain with assets (tables, topics, dashboards) preserves associations (shard 4, 1 retry)
  • Pages/ExplorePageRightPanel.spec.ts › Should verify deleted user not visible in owner selection for table (shard 5, 1 retry)
  • Pages/Lineage/LineageFilters.spec.ts › Verify lineage schema filter selection (shard 6, 1 retry)

📦 Download artifacts

How to debug locally
# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip    # view trace

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Ingestion safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants