feat: add partitioned_stream_status state metrics, enables % progress estimation for partitioned streams#1000
Conversation
Adds partition lifecycle tracking to ConcurrentPerPartitionCursor state: - num_partitions_started: count of partitions that have begun processing - num_partitions_completed: count of partitions fully processed and cleaned up - num_partitions_expected: total partitions discovered (grows until discovery complete) - is_partition_discovery_complete: flag indicating parent stream finished reading These fields enable cursor-based % completion estimation for partitioned streams by providing completed/expected ratios in emitted state messages.
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1777183332-partitioned-stream-status#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1777183332-partitioned-stream-statusPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
partitioned_stream_status tracking for progress estimation
partitioned_stream_status tracking for progress estimationpartitioned_stream_status tracking for progress estimation on partitioned streams
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 13 minutes and 14 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
📝 WalkthroughWalkthroughTracks partition discovery and processing by adding counters for partitions started/completed and a boolean for discovery completion; the cursor’s emitted state now includes a Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
partitioned_stream_status tracking for progress estimation on partitioned streamspartitioned_stream_status tracking for partitioned stream sync % progress estimation
partitioned_stream_status tracking for partitioned stream sync % progress estimationpartitioned_stream_status state metrics, enables % progress estimation for partitioned streams
There was a problem hiding this comment.
🧹 Nitpick comments (2)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)
1137-1139: Shouldinitial_final_stateuse.copy()like its siblings — wdyt? 🪞Just below at lines 1154–1158 and 1184–1188 the pattern is
__dict__.copy()before stripping, but hereinitial_final_stategrabs the real__dict__and mutates it in place. It's likely benign because nothing downstream re-reads that exact state message, but the inconsistency tripped me up while reading. Mind aligning it for symmetry?♻️ Proposed tweak
- initial_final_state = output.state_messages[-1].state.stream.stream_state.__dict__ - _strip_partitioned_stream_status(initial_final_state) - final_states.append(initial_final_state) + initial_final_state = output.state_messages[-1].state.stream.stream_state.__dict__.copy() + _strip_partitioned_stream_status(initial_final_state) + final_states.append(initial_final_state)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py` around lines 1137 - 1139, The assignment to initial_final_state currently captures the live dict (output.state_messages[-1].state.stream.stream_state.__dict__) and is then mutated by _strip_partitioned_stream_status; change it to take a shallow copy (use .copy()) like the other occurrences so you mutate a separate dict before appending to final_states — update the code around initial_final_state, keeping the call to _strip_partitioned_stream_status and the final_states.append(initial_final_state) the same but operating on the copied dict.
331-338: Tiny nit + a heads-up about shallow copies — wdyt? 🤔Two small things:
keyis unused in the loop, so iterating over.values()reads a touch cleaner.- Heads-up for callers: this helper mutates nested dicts in place. The downstream call sites at lines 1184–1188 do
state_dict.copy()(shallow) before calling this — which protects the top-levelpartitioned_stream_statuspop, but the nestedparent_statedict is still shared with the original state message and gets mutated. Probably harmless in test context, but worth being aware of.🧹 Optional tidy-up
def _strip_partitioned_stream_status(state_dict: dict) -> dict: """Recursively strip partitioned_stream_status from state dicts, including nested parent_state.""" state_dict.pop("partitioned_stream_status", None) if "parent_state" in state_dict and isinstance(state_dict["parent_state"], dict): - for key, value in state_dict["parent_state"].items(): + for value in state_dict["parent_state"].values(): if isinstance(value, dict): _strip_partitioned_stream_status(value) return state_dict🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py` around lines 331 - 338, The helper _strip_partitioned_stream_status currently iterates parent_state items but doesn't use the key — change the loop to iterate over values() for clarity, and avoid surprising in-place mutations by making a defensive deep copy of nested parent_state dicts (or perform a recursive copy inside _strip_partitioned_stream_status) before modifying them; ensure callers like the tests that currently call state_dict.copy() are either updated to deep-copy or rely on this function's internal deep-copy so nested parent_state in the original message is not mutated.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py`:
- Around line 1137-1139: The assignment to initial_final_state currently
captures the live dict
(output.state_messages[-1].state.stream.stream_state.__dict__) and is then
mutated by _strip_partitioned_stream_status; change it to take a shallow copy
(use .copy()) like the other occurrences so you mutate a separate dict before
appending to final_states — update the code around initial_final_state, keeping
the call to _strip_partitioned_stream_status and the
final_states.append(initial_final_state) the same but operating on the copied
dict.
- Around line 331-338: The helper _strip_partitioned_stream_status currently
iterates parent_state items but doesn't use the key — change the loop to iterate
over values() for clarity, and avoid surprising in-place mutations by making a
defensive deep copy of nested parent_state dicts (or perform a recursive copy
inside _strip_partitioned_stream_status) before modifying them; ensure callers
like the tests that currently call state_dict.copy() are either updated to
deep-copy or rely on this function's internal deep-copy so nested parent_state
in the original message is not mutated.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 2985002b-65cf-44e4-a484-a5f5cae363ca
📒 Files selected for processing (2)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.pyunit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py
…/pending distinction - Add _partitions_observed: set[str] to track partitions where observe() has been called - Update state property to emit 4 fields: in_progress, completed, expected, is_partition_discovery_complete - num_partitions_in_progress = len(observed) - completed (worker started but not finished) - Dropped num_partitions_not_started (derivable from expected - in_progress - completed) - _cleanup_if_done adds partition to observed set to prevent negative in_progress - Updated all test assertions for new semantics
…d for duplicate partition keys)
Summary
Adds a
partitioned_stream_statusdict to the state emitted byConcurrentPerPartitionCursor, enabling consumers to estimate sync % completion for partitioned streams.The new state field contains:
num_partitions_started— partitions that have begun processingnum_partitions_completed— partitions fully processed and cleaned upnum_partitions_expected— total partitions discovered so far (reuses existing_generated_partitions_count)is_partition_discovery_complete— whether the parent stream has finished yielding all partitionsnum_partitions_expectedcan increase during a sync untilis_partition_discovery_completebecomestrue. Once discovery is complete,completed / expectedgives a reliable progress ratio.All counter mutations use the existing
self._lockfor thread safety.Review & Testing Checklist for Human
partitioned_stream_statusis now always present in emitted state. Verify that downstream consumers (platform state handling, state persistence, UI) tolerate the new key without breaking. This is the highest-risk item._set_initial_statedoes NOT restore counters: When resuming from persisted state, counters reset to 0. Verify this is acceptable (counters are per-sync, not cross-sync) and that the presence ofpartitioned_stream_statusin incoming state doesn't cause issues during deserialization.num_partitions_startedis currently always equal tonum_partitions_expected: Both are incremented in the same code path (_generate_slices_from_partition). Confirm this redundancy is intentional for future divergence, or whether one should be removed._cleanup_if_doneincrement:_num_partitions_completed += 1is added inside_cleanup_if_done(), which is called fromclose_partition()underself._lock. Verify no other call path reaches_cleanup_if_donewithout holding the lock.parent_state: For multi-level substreams, parent streams also emitpartitioned_stream_statusinsideparent_state. Verify this doesn't cause issues for state consumers.Recommended test plan: Run a real sync of a connector with substream partitioning (e.g., Gong or Stripe) and inspect the emitted state messages to verify
partitioned_stream_statusvalues are correct and progress monotonically.Notes
partitioned_stream_statusbefore comparing against expected state dicts (via recursive_strip_partitioned_stream_statushelper). Three unit tests (test_given_no_partitions_processed...,test_given_unfinished_first_parent_partition...,test_given_unfinished_last_parent_partition...) explicitly assert the exact counter values.Link to Devin session: https://app.devin.ai/sessions/862a59baa22d4f009897d47c61e56b9e
Requested by: Aaron ("AJ") Steers (@aaronsteers)
Summary by CodeRabbit
New Features
Tests