Skip to content

feat: add partitioned_stream_status state metrics, enables % progress estimation for partitioned streams#1000

Open
Aaron ("AJ") Steers (aaronsteers) wants to merge 11 commits intomainfrom
devin/1777183332-partitioned-stream-status
Open

feat: add partitioned_stream_status state metrics, enables % progress estimation for partitioned streams#1000
Aaron ("AJ") Steers (aaronsteers) wants to merge 11 commits intomainfrom
devin/1777183332-partitioned-stream-status

Conversation

@aaronsteers
Copy link
Copy Markdown
Member

@aaronsteers Aaron ("AJ") Steers (aaronsteers) commented Apr 26, 2026

Summary

Adds a partitioned_stream_status dict to the state emitted by ConcurrentPerPartitionCursor, enabling consumers to estimate sync % completion for partitioned streams.

The new state field contains:

  • num_partitions_started — partitions that have begun processing
  • num_partitions_completed — partitions fully processed and cleaned up
  • num_partitions_expected — total partitions discovered so far (reuses existing _generated_partitions_count)
  • is_partition_discovery_complete — whether the parent stream has finished yielding all partitions

num_partitions_expected can increase during a sync until is_partition_discovery_complete becomes true. Once discovery is complete, completed / expected gives a reliable progress ratio.

All counter mutations use the existing self._lock for thread safety.

Review & Testing Checklist for Human

  • State schema change: partitioned_stream_status is now always present in emitted state. Verify that downstream consumers (platform state handling, state persistence, UI) tolerate the new key without breaking. This is the highest-risk item.
  • _set_initial_state does NOT restore counters: When resuming from persisted state, counters reset to 0. Verify this is acceptable (counters are per-sync, not cross-sync) and that the presence of partitioned_stream_status in incoming state doesn't cause issues during deserialization.
  • num_partitions_started is currently always equal to num_partitions_expected: Both are incremented in the same code path (_generate_slices_from_partition). Confirm this redundancy is intentional for future divergence, or whether one should be removed.
  • Thread safety of _cleanup_if_done increment: _num_partitions_completed += 1 is added inside _cleanup_if_done(), which is called from close_partition() under self._lock. Verify no other call path reaches _cleanup_if_done without holding the lock.
  • Nested appearance in parent_state: For multi-level substreams, parent streams also emit partitioned_stream_status inside parent_state. Verify this doesn't cause issues for state consumers.

Recommended test plan: Run a real sync of a connector with substream partitioning (e.g., Gong or Stripe) and inspect the emitted state messages to verify partitioned_stream_status values are correct and progress monotonically.

Notes

  • Existing tests are updated to strip partitioned_stream_status before comparing against expected state dicts (via recursive _strip_partitioned_stream_status helper). Three unit tests (test_given_no_partitions_processed..., test_given_unfinished_first_parent_partition..., test_given_unfinished_last_parent_partition...) explicitly assert the exact counter values.
  • The field is emitted unconditionally (not gated by a feature flag). If a phased rollout is preferred, consider making emission conditional.

Link to Devin session: https://app.devin.ai/sessions/862a59baa22d4f009897d47c61e56b9e
Requested by: Aaron ("AJ") Steers (@aaronsteers)

Summary by CodeRabbit

  • New Features

    • Enhanced partition tracking: state checkpoints now include partition discovery completion, counts for partitions started, completed, and expected, improving progress visibility during incremental runs.
  • Tests

    • Updated tests to require and validate the new partition status fields, including numeric invariants between started/completed/expected counts and the discovery-complete flag; comparisons now ignore these fields after validation.

Adds partition lifecycle tracking to ConcurrentPerPartitionCursor state:
- num_partitions_started: count of partitions that have begun processing
- num_partitions_completed: count of partitions fully processed and cleaned up
- num_partitions_expected: total partitions discovered (grows until discovery complete)
- is_partition_discovery_complete: flag indicating parent stream finished reading

These fields enable cursor-based % completion estimation for partitioned streams
by providing completed/expected ratios in emitted state messages.
@devin-ai-integration
Copy link
Copy Markdown
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1777183332-partitioned-stream-status#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1777183332-partitioned-stream-status

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@aaronsteers Aaron ("AJ") Steers (aaronsteers) marked this pull request as ready for review April 26, 2026 06:10
Copilot AI review requested due to automatic review settings April 26, 2026 06:10

This comment was marked as resolved.

@aaronsteers Aaron ("AJ") Steers (aaronsteers) changed the title feat: add partitioned_stream_status tracking for progress estimation feat: add partitioned_stream_status tracking for progress estimation Apr 26, 2026
@aaronsteers Aaron ("AJ") Steers (aaronsteers) changed the title feat: add partitioned_stream_status tracking for progress estimation feat: add partitioned_stream_status tracking for progress estimation on partitioned streams Apr 26, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 26, 2026

Warning

Rate limit exceeded

@devin-ai-integration[bot] has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 13 minutes and 14 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 13 minutes and 14 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 70ec47b9-d754-49a3-a320-950dd75bc804

📥 Commits

Reviewing files that changed from the base of the PR and between 134a78f and 4c3f25c.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
  • unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
  • unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
📝 Walkthrough

Walkthrough

Tracks partition discovery and processing by adding counters for partitions started/completed and a boolean for discovery completion; the cursor’s emitted state now includes a partitioned_stream_status payload. Tests updated to validate and strip this field before existing state comparisons.

Changes

Cohort / File(s) Summary
Partition Discovery Tracking
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Adds thread-safe counters for num_partitions_started and num_partitions_completed, a is_partition_discovery_complete flag set when slice generation finishes, and extends the cursor-emitted state with partitioned_stream_status including started, completed, expected, and discovery status.
Test Assertions and Helpers
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
Adds helper to recursively remove partitioned_stream_status from states. Refactors test helpers and assertions to assert presence and basic invariants of partitioned_stream_status, validate numeric/boolean invariants, then strip it before comparing remaining state.
Integration Test Updates
unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py
Updates per-partition integration tests to assert partitioned_stream_status exists and matches scenario-specific started/completed/expected counts and that is_partition_discovery_complete is true.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely describes the main change: adding partitioned_stream_status metrics to enable progress estimation for partitioned streams, which aligns perfectly with the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch devin/1777183332-partitioned-stream-status

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

@aaronsteers Aaron ("AJ") Steers (aaronsteers) changed the title feat: add partitioned_stream_status tracking for progress estimation on partitioned streams feat: add partitioned_stream_status tracking for partitioned stream sync % progress estimation Apr 26, 2026
@aaronsteers Aaron ("AJ") Steers (aaronsteers) changed the title feat: add partitioned_stream_status tracking for partitioned stream sync % progress estimation feat: add partitioned_stream_status state metrics, enables % progress estimation for partitioned streams Apr 26, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 26, 2026

PyTest Results (Fast)

4 034 tests  ±0   4 023 ✅ ±0   6m 29s ⏱️ - 1m 33s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 4c3f25c. ± Comparison against base commit 60bae81.

♻️ This comment has been updated with latest results.

devin-ai-integration[bot]

This comment was marked as resolved.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 26, 2026

PyTest Results (Full)

4 037 tests  ±0   4 025 ✅ ±0   11m 12s ⏱️ +15s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 4c3f25c. ± Comparison against base commit 60bae81.

♻️ This comment has been updated with latest results.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)

1137-1139: Should initial_final_state use .copy() like its siblings — wdyt? 🪞

Just below at lines 1154–1158 and 1184–1188 the pattern is __dict__.copy() before stripping, but here initial_final_state grabs the real __dict__ and mutates it in place. It's likely benign because nothing downstream re-reads that exact state message, but the inconsistency tripped me up while reading. Mind aligning it for symmetry?

♻️ Proposed tweak
-        initial_final_state = output.state_messages[-1].state.stream.stream_state.__dict__
-        _strip_partitioned_stream_status(initial_final_state)
-        final_states.append(initial_final_state)
+        initial_final_state = output.state_messages[-1].state.stream.stream_state.__dict__.copy()
+        _strip_partitioned_stream_status(initial_final_state)
+        final_states.append(initial_final_state)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py`
around lines 1137 - 1139, The assignment to initial_final_state currently
captures the live dict
(output.state_messages[-1].state.stream.stream_state.__dict__) and is then
mutated by _strip_partitioned_stream_status; change it to take a shallow copy
(use .copy()) like the other occurrences so you mutate a separate dict before
appending to final_states — update the code around initial_final_state, keeping
the call to _strip_partitioned_stream_status and the
final_states.append(initial_final_state) the same but operating on the copied
dict.

331-338: Tiny nit + a heads-up about shallow copies — wdyt? 🤔

Two small things:

  1. key is unused in the loop, so iterating over .values() reads a touch cleaner.
  2. Heads-up for callers: this helper mutates nested dicts in place. The downstream call sites at lines 1184–1188 do state_dict.copy() (shallow) before calling this — which protects the top-level partitioned_stream_status pop, but the nested parent_state dict is still shared with the original state message and gets mutated. Probably harmless in test context, but worth being aware of.
🧹 Optional tidy-up
 def _strip_partitioned_stream_status(state_dict: dict) -> dict:
     """Recursively strip partitioned_stream_status from state dicts, including nested parent_state."""
     state_dict.pop("partitioned_stream_status", None)
     if "parent_state" in state_dict and isinstance(state_dict["parent_state"], dict):
-        for key, value in state_dict["parent_state"].items():
+        for value in state_dict["parent_state"].values():
             if isinstance(value, dict):
                 _strip_partitioned_stream_status(value)
     return state_dict
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py`
around lines 331 - 338, The helper _strip_partitioned_stream_status currently
iterates parent_state items but doesn't use the key — change the loop to iterate
over values() for clarity, and avoid surprising in-place mutations by making a
defensive deep copy of nested parent_state dicts (or perform a recursive copy
inside _strip_partitioned_stream_status) before modifying them; ensure callers
like the tests that currently call state_dict.copy() are either updated to
deep-copy or rely on this function's internal deep-copy so nested parent_state
in the original message is not mutated.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In
`@unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py`:
- Around line 1137-1139: The assignment to initial_final_state currently
captures the live dict
(output.state_messages[-1].state.stream.stream_state.__dict__) and is then
mutated by _strip_partitioned_stream_status; change it to take a shallow copy
(use .copy()) like the other occurrences so you mutate a separate dict before
appending to final_states — update the code around initial_final_state, keeping
the call to _strip_partitioned_stream_status and the
final_states.append(initial_final_state) the same but operating on the copied
dict.
- Around line 331-338: The helper _strip_partitioned_stream_status currently
iterates parent_state items but doesn't use the key — change the loop to iterate
over values() for clarity, and avoid surprising in-place mutations by making a
defensive deep copy of nested parent_state dicts (or perform a recursive copy
inside _strip_partitioned_stream_status) before modifying them; ensure callers
like the tests that currently call state_dict.copy() are either updated to
deep-copy or rely on this function's internal deep-copy so nested parent_state
in the original message is not mutated.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 2985002b-65cf-44e4-a484-a5f5cae363ca

📥 Commits

Reviewing files that changed from the base of the PR and between ffc22c2 and 134a78f.

📒 Files selected for processing (2)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
  • unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py

…/pending distinction

- Add _partitions_observed: set[str] to track partitions where observe() has been called
- Update state property to emit 4 fields: in_progress, completed, expected, is_partition_discovery_complete
- num_partitions_in_progress = len(observed) - completed (worker started but not finished)
- Dropped num_partitions_not_started (derivable from expected - in_progress - completed)
- _cleanup_if_done adds partition to observed set to prevent negative in_progress
- Updated all test assertions for new semantics
devin-ai-integration[bot]

This comment was marked as resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants