feat(async-jobs): add deferred retry with cooldown for failed async jobs#1016
feat(async-jobs): add deferred retry with cooldown for failed async jobs#1016Daryna Ishchenko (darynaishchenko) wants to merge 4 commits intomainfrom
Conversation
Add failed_retry_wait_time_in_seconds parameter to AsyncRetriever and AsyncJobOrchestrator. When configured, FAILED jobs are not retried immediately but deferred until a cooldown period elapses. This is non-blocking: the orchestrator skips deferred jobs and continues processing other partitions normally. Changes: - AsyncJob: add set_retry_after(), retry_deferred(), ready_to_retry() - AsyncJobOrchestrator: defer FAILED job retries when wait time is set - Schema/models: add failed_retry_wait_time_in_seconds to AsyncRetriever - Factory: wire parameter through to orchestrator - Tests: add parametrized tests for retry_after and orchestrator behavior Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1778159323-failed-retry-wait-time#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1778159323-failed-retry-wait-timePR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
PyTest Results (Fast)4 049 tests +6 4 038 ✅ +6 7m 35s ⏱️ -13s Results for commit 9ddf6ca. ± Comparison against base commit ccc185f. This pull request removes 3 and adds 9 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
PyTest Results (Full)4 052 tests +6 4 040 ✅ +6 11m 13s ⏱️ +12s Results for commit 9ddf6ca. ± Comparison against base commit ccc185f. This pull request removes 3 and adds 9 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
📝 WalkthroughWalkthroughAdds configurable cooldown-based deferred retries for FAILED async jobs: AsyncJob tracks a retry-after timestamp, orchestrator gates replacements using a new failed_retry_wait_time_in_seconds setting, configuration is wired through the factory/schema, and tests cover the behavior. ChangesAsync Job Retry Deferral
Sequence Diagram(s)sequenceDiagram
participant Client
participant Orchestrator
participant JobRepo
participant Job
Client->>Orchestrator: trigger job handling
Orchestrator->>JobRepo: fetch job
Orchestrator->>Job: check status
Orchestrator->>Job: ready_to_retry()?
alt Not ready
Orchestrator-->>Job: skip/defers replacement (set retry_after)
else Ready
Orchestrator->>JobRepo: start replacement
JobRepo-->>Orchestrator: new job (may be FAILED)
alt Replacement FAILED && original was deferred
Orchestrator->>Job: copy retry_after to new job
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Would you like me to flag places to tighten timezone handling or add more unit coverage around edge cases, wdyt? 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py (1)
171-193:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winCould we reject negative retry cooldown values at construction time, wdyt?
Right now, a negative value can produce a
retry_afterin the past, which makes the deferral semantics confusing instead of explicit.Proposed fix
def __init__( self, job_repository: AsyncJobRepository, slices: Iterable[StreamSlice], job_tracker: JobTracker, message_repository: MessageRepository, exceptions_to_break_on: Iterable[Type[Exception]] = tuple(), has_bulk_parent: bool = False, job_max_retry: Optional[int] = None, failed_retry_wait_time_in_seconds: Optional[int] = None, ) -> None: + if ( + failed_retry_wait_time_in_seconds is not None + and failed_retry_wait_time_in_seconds < 0 + ): + raise ValueError( + "failed_retry_wait_time_in_seconds must be >= 0" + ) """🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py` around lines 171 - 193, Reject negative failed_retry_wait_time_in_seconds in the constructor by validating the parameter and raising a ValueError if it's < 0; add a check after parameters are received (near AsyncJobStatus/_KNOWN_JOB_STATUSES validation) to assert failed_retry_wait_time_in_seconds is either None or >= 0 and include a clear error message, then assign it to self._failed_retry_wait_time_in_seconds as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@airbyte_cdk/sources/declarative/async_job/job.py`:
- Around line 58-70: The set_retry_after method should guard against naive
datetimes and store a normalized UTC-aware datetime in self._retry_after: if
retry_after.tzinfo is None (naive) attach timezone.utc (e.g. retry_after =
retry_after.replace(tzinfo=timezone.utc)), then normalize any timezone-aware
value to UTC via retry_after = retry_after.astimezone(timezone.utc) before
assigning to self._retry_after; keep ready_to_retry using
datetime.now(tz=timezone.utc) and the _retry_after field for comparisons so no
TypeError occurs.
In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 4105-4111: Add a minimum bound to the integer branch of the
failed_retry_wait_time_in_seconds schema so negative or zero values are rejected
by the schema; specifically, in the failed_retry_wait_time_in_seconds anyOf
entry for type: integer add minimum: 1 (mirroring the pattern used by
DynamicStreamCheckConfig.stream_count) to ensure the orchestrator's
ready_to_retry() logic isn't fed non-positive values while keeping the existing
anyOf string branch and interpolation_context unchanged.
In `@airbyte_cdk/sources/declarative/models/declarative_component_schema.py`:
- Around line 3077-3080: The field failed_retry_wait_time_in_seconds currently
allows negative values; add validation to prevent negatives by adding ge=0 to
its Pydantic Field call in the declarative component model (update the
Field(...) for failed_retry_wait_time_in_seconds to include ge=0) and also add
minimum: 0 to the corresponding YAML schema definition for the same field so
generated models and schema both enforce a non-negative cooldown.
In `@unit_tests/sources/declarative/async_job/test_job.py`:
- Around line 30-55: The parametrized datetimes are computed at import time
which makes the test time-sensitive; change the test to compute retry_after
values at runtime inside test_retry_after instead of using datetime.now(...) in
the param list: keep the same parameter cases (None, future, past) but pass a
marker or enum in the parametrize tuple and in test_retry_after compute
datetime.now(timezone.utc) + timedelta(hours=1) or - timedelta(seconds=1) as
needed, then call AsyncJob.set_retry_after(retry_after) and assert
job.retry_deferred() and job.ready_to_retry() as before (refer to
test_retry_after, AsyncJob.set_retry_after, job.retry_deferred,
job.ready_to_retry).
---
Outside diff comments:
In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py`:
- Around line 171-193: Reject negative failed_retry_wait_time_in_seconds in the
constructor by validating the parameter and raising a ValueError if it's < 0;
add a check after parameters are received (near
AsyncJobStatus/_KNOWN_JOB_STATUSES validation) to assert
failed_retry_wait_time_in_seconds is either None or >= 0 and include a clear
error message, then assign it to self._failed_retry_wait_time_in_seconds as
before.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 708f4de0-2b2c-45d9-9671-f27afea41ede
📒 Files selected for processing (7)
airbyte_cdk/sources/declarative/async_job/job.pyairbyte_cdk/sources/declarative/async_job/job_orchestrator.pyairbyte_cdk/sources/declarative/declarative_component_schema.yamlairbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.pyunit_tests/sources/declarative/async_job/test_job.pyunit_tests/sources/declarative/async_job/test_job_orchestrator.py
…runtime offsets Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
|
A couple of concerns from review: 1. Double cooldown via synthetic FAILED jobs
For the motivating SP-API case, hitting
Either way, worth a regression test that simulates 2. Three different lower bounds for
|
…dation - When _start_job returns a synthetic FAILED job (API still in cooldown), carry _retry_after from the old job to prevent double-cooldown - Change constructor validation from < 0 to <= 0 to match YAML schema minimum: 1 constraint - Add regression test for synthetic FAILED replacement scenario Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
|
Both concerns addressed in 9ddf6ca: 1. Double cooldown via synthetic FAILED jobsFixed by carrying new_job = self._start_job(job.job_parameters(), job.api_job_id())
if (
self._failed_retry_wait_time_in_seconds
and new_job.status() == AsyncJobStatus.FAILED
and job.retry_deferred()
):
new_job.set_retry_after(job._retry_after)
partition.replace_job(job, [new_job])This ensures the total wall-clock wait equals one cooldown, not two. Added a regression test ( 2. Consistent lower boundConstructor validation now rejects if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0:
raise ValueError("failed_retry_wait_time_in_seconds must be >= 1")The three layers are now consistent:
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py (1)
204-223: ⚡ Quick winCould we flip the cooldown guard order to reduce hidden coupling, wdyt?
At Line 205, readiness is checked before we know whether cooldown was ever armed. Would you consider arming on first FAILED (
not retry_deferred) first, then checking readiness, so this path doesn’t depend on implicitready_to_retry()semantics?Proposed refactor
- if self._failed_retry_wait_time_in_seconds and job.status() == AsyncJobStatus.FAILED: - if not job.ready_to_retry(): - lazy_log( - LOGGER, - logging.DEBUG, - lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.", - ) - continue - if not job.retry_deferred(): + if self._failed_retry_wait_time_in_seconds and job.status() == AsyncJobStatus.FAILED: + if not job.retry_deferred(): job.set_retry_after( datetime.now(tz=timezone.utc) + timedelta(seconds=self._failed_retry_wait_time_in_seconds) @@ ) continue + if not job.ready_to_retry(): + lazy_log( + LOGGER, + logging.DEBUG, + lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.", + ) + continue🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py` around lines 204 - 223, Flip the cooldown guard order so the retry cooldown is explicitly armed on the first FAILED occurrence before relying on ready_to_retry(); inside the loop handling AsyncJobStatus.FAILED (use symbols _failed_retry_wait_time_in_seconds, job.retry_deferred(), job.set_retry_after(...), job.ready_to_retry(), job.api_job_id(), _start_job), first check if not job.retry_deferred() and then call job.set_retry_after(datetime.now(tz=timezone.utc) + timedelta(seconds=self._failed_retry_wait_time_in_seconds)) and log the deferral and continue; otherwise (if retry_deferred) then check job.ready_to_retry() and skip/continue if not ready, only proceed to call self._start_job(job.job_parameters(), job.api_job_id()) when readiness passes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py`:
- Around line 204-223: Flip the cooldown guard order so the retry cooldown is
explicitly armed on the first FAILED occurrence before relying on
ready_to_retry(); inside the loop handling AsyncJobStatus.FAILED (use symbols
_failed_retry_wait_time_in_seconds, job.retry_deferred(),
job.set_retry_after(...), job.ready_to_retry(), job.api_job_id(), _start_job),
first check if not job.retry_deferred() and then call
job.set_retry_after(datetime.now(tz=timezone.utc) +
timedelta(seconds=self._failed_retry_wait_time_in_seconds)) and log the deferral
and continue; otherwise (if retry_deferred) then check job.ready_to_retry() and
skip/continue if not ready, only proceed to call
self._start_job(job.job_parameters(), job.api_job_id()) when readiness passes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 242307c0-269e-4c29-aba3-70f104ce272f
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/async_job/job_orchestrator.pyunit_tests/sources/declarative/async_job/test_job_orchestrator.py
Summary
Adds a new optional
failed_retry_wait_time_in_secondsparameter toAsyncRetrieverthat enables non-blocking, cooldown-aware retry forFAILEDasync jobs. This is motivated by Amazon SP-API's undocumented per-report-type cooldown (e.g. 30 min for near-real-time FBA reports) — when a report request FATALs due to cooldown, immediate retries are guaranteed to fail again.How it works:
AsyncJobgains aretry_aftertimestamp and helper methods (set_retry_after,retry_deferred,ready_to_retry).AsyncJobOrchestrator._replace_failed_jobs()uses this to implement a two-phase deferral:retry_after = now + wait_timeand skips (does not replace).now < retry_after.TIMED_OUTjobs bypass the deferral entirely and are replaced immediately (existing behavior preserved).time.sleep()— the orchestrator loop continues processing other partitions/jobs normally.The parameter is exposed on the declarative schema (
AsyncRetriever.failed_retry_wait_time_in_seconds), supports interpolation from config, and is wired through the model factory.Review & Testing Checklist for Human
_replace_failed_jobs(job_orchestrator.py lines ~198-220): The two guards (ready_to_retrythenretry_deferred) must be checked in this order. On first failure both are false/false so the second branch fires. On subsequent calls before cooldown,ready_to_retry()is false so the first branch fires. After cooldown, both are true and execution falls through to replacement. Confirm this state machine is correct and handles edge cases (e.g., job fails again after deferred retry succeeds and a new job also fails).partition.replace_jobis never called while a job is deferred: Thecontinuestatements must prevent reaching thepartition.replace_job(job, [new_job])line. A missed replacement during deferral would leave the partition in an inconsistent state.failed_retry_wait_time_in_seconds. A value of 0 or negative would effectively make the deferral a no-op (theif self._failed_retry_wait_time_in_secondsguard is falsy for 0). Decide if this is acceptable or if validation should be added.pytest unit_tests/sources/declarative/async_job/ -v). For E2E validation, configure a manifest-based connector that usesAsyncRetrieverwithfailed_retry_wait_time_in_secondsset, trigger a FATAL/failed job, and observe that the retry is deferred in logs rather than immediate.Notes
job_tracker._jobs(private set) directly to pre-register job IDs before orchestrator logic runs. This mirrors how the orchestrator normally registers jobs via_start_jobbut is coupled to the internal implementation.test_given_status_is_terminal_when_update_status_then_stop_timer(which was justpass) was removed during thetest_job.pyrewrite to pytest functions.Link to Devin session: https://app.devin.ai/sessions/2cbe53a30ea14f9f8151f407f423283b
Requested by: Daryna Ishchenko (@darynaishchenko)
Summary by CodeRabbit
New Features
Configuration
Tests