feat: add intermediate state checkpointing during pagination#915
feat: add intermediate state checkpointing during pagination#915devin-ai-integration[bot] wants to merge 4 commits intomainfrom
Conversation
When records are sorted in ascending order by cursor field, the CDK will now emit state checkpoints every N pages (default: 5) during pagination within a partition. This prevents loss of all progress when a sync fails mid-pagination due to rate limits or errors. Changes: - Add emit_intermediate_state() to ConcurrentCursor - Extend PaginationTracker with page counting and checkpoint triggering - Call on_page_complete() in SimpleRetriever._read_pages() - Wire up checkpoint cursor in model_to_component_factory Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1771602439-intermediate-state-checkpoint#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1771602439-intermediate-state-checkpointPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
…remental sync cursors Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
PyTest Results (Full)3 884 tests 3 872 ✅ 10m 49s ⏱️ Results for commit e0ef3eb. |
| else: | ||
| self._concurrent_state["slices"].append( | ||
| { | ||
| self._connector_state_converter.START_KEY: self.start, | ||
| self._connector_state_converter.END_KEY: most_recent_cursor_value, | ||
| self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, | ||
| } | ||
| ) |
There was a problem hiding this comment.
🟡 Missing "slices" key check in else branch of emit_intermediate_state can cause KeyError
In emit_intermediate_state, the if self._slice_boundary_fields: branch (line 301) correctly checks if "slices" not in self._concurrent_state: return. However, the else branch (line 313) directly accesses self._concurrent_state["slices"].append(...) without this guard. While _concurrent_state should almost always have "slices", this is inconsistent with the defensive check in the sibling branch and with _add_slice_to_state (cursor.py:331) which raises a RuntimeError when "slices" is missing. If _concurrent_state somehow lacks "slices", this will raise a KeyError instead of gracefully returning or raising a descriptive error.
| else: | |
| self._concurrent_state["slices"].append( | |
| { | |
| self._connector_state_converter.START_KEY: self.start, | |
| self._connector_state_converter.END_KEY: most_recent_cursor_value, | |
| self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, | |
| } | |
| ) | |
| else: | |
| if "slices" not in self._concurrent_state: | |
| return | |
| self._concurrent_state["slices"].append( | |
| { | |
| self._connector_state_converter.START_KEY: self.start, | |
| self._connector_state_converter.END_KEY: most_recent_cursor_value, | |
| self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, | |
| } | |
| ) |
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Good catch — added the missing "slices" key guard in the else branch to be consistent with the if branch. Fixed in 967e09a.
…_state Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
feat: add intermediate state checkpointing during pagination
Summary
When a stream paginates through many pages within a single slice/partition, state is currently only emitted when the partition closes. If the sync fails mid-pagination (e.g., rate limits, 504 errors), all progress is lost.
This PR adds intermediate state checkpointing to the CDK: when
ConcurrentCursordetects that records are arriving in ascending cursor order, it will emit a state checkpoint every N pages. On the next sync, the stream resumes from the last checkpoint rather than restarting from the beginning of the slice.Motivation: airbytehq/oncall#11335 —
source-zendesk-supportticket_commentsstream loses ~25k records of progress on each failure because no state is emitted during pagination.Changes:
declarative_component_schema.yaml) — Addedpages_per_checkpoint_interval(optional integer) to bothDatetimeBasedCursorandIncrementingCountCursordefinitions. Defaults to disabled (no intermediate checkpointing unless explicitly configured).declarative_component_schema.py) — UpdatedDatetimeBasedCursorandIncrementingCountCursormodel classes with the newpages_per_checkpoint_intervalfield.ConcurrentCursor.emit_intermediate_state(stream_slice)— New method that adds a partial[start, cursor_value]slice to state and emits a state message, but only when_is_ascending_orderis True. Handles both streams with and withoutslice_boundary_fields.PaginationTracker— Extended withcheckpoint_cursorandpages_per_checkpoint_intervalparams. Newon_page_complete()method increments a page counter and triggers intermediate checkpoint when the interval is reached.SimpleRetriever._read_pages()— Callspagination_tracker.on_page_complete(stream_slice)after each successful page.model_to_component_factory._create_pagination_tracker_factory()— Now readspages_per_checkpoint_intervalfrom the incremental sync model (if present) and passes it through toPaginationTracker. The feature is only active when aConcurrentCursoris present AND the schema value is set.Safety: The feature is a no-op when records are not in ascending order (the cursor tracks this via
_is_ascending_order). Themerge_intervalscall ensures intermediate slices are correctly merged with the final partition close. When not configured in the schema, behavior is unchanged from before this PR.Review & Testing Checklist for Human
close_partitionfor intermediate slices — Whenemit_intermediate_stateadds a partial slice, thenclose_partitionalso adds a slice for the same range. The merge should combine them correctly, but this interaction is only unit-tested in isolation. Verify end-to-end that state doesn't get corrupted or duplicated after a full partition lifecycle with intermediate checkpoints._is_ascending_ordercheck —emit_intermediate_state()readsself._is_ascending_orderoutside the lock, butobserve()writes it without a lock. Likely benign (flag only transitions True→False), but worth verifying no race exists._page_countnever resets — Unlike_record_count, the_page_countinPaginationTrackeris never reset (even in_reset()). This means checkpoint intervals span pagination resets. Is this intended?pages_per_checkpoint_intervalonly works when records are in ascending cursor order. The schema description mentions this, but consider whether additional documentation is needed.declarative_component_schema.pywere manually edited rather than regenerated viabin/generate_component_manifest_files.py. Verify the manual changes match what the code generator would produce.Recommended test plan:
ConcurrentCursor.emit_intermediate_state()— covering ascending/non-ascending order, with/without boundary fieldsPaginationTracker.on_page_complete()— verifying page counting and checkpoint triggeringpages_per_checkpoint_interval: 5, verify state is emitted at pages 5, 10, 15, 20source-zendesk-supportor similar connector: configurepages_per_checkpoint_intervalin the manifest, verify state advances during pagination, and that a mid-pagination failure resumes from the last checkpointNotes
pages_per_checkpoint_intervalon their incremental sync cursor to enable intermediate checkpointing. When not set, behavior is unchanged.close_partitionstill emits the full slice state._create_pagination_tracker_factorycaptures the actual cursor (not a copy), so allPaginationTrackerinstances share the same cursor reference. This is intended — the lock inemit_intermediate_statehandles concurrent access.Devin session
Requested by: gl_anatolii.yatsuk@airbyte.io