Skip to content

feat(async-jobs): add deferred retry with cooldown for failed async jobs#1016

Open
Daryna Ishchenko (darynaishchenko) wants to merge 4 commits intomainfrom
devin/1778159323-failed-retry-wait-time
Open

feat(async-jobs): add deferred retry with cooldown for failed async jobs#1016
Daryna Ishchenko (darynaishchenko) wants to merge 4 commits intomainfrom
devin/1778159323-failed-retry-wait-time

Conversation

@darynaishchenko
Copy link
Copy Markdown
Contributor

@darynaishchenko Daryna Ishchenko (darynaishchenko) commented May 7, 2026

Summary

Adds a new optional failed_retry_wait_time_in_seconds parameter to AsyncRetriever that enables non-blocking, cooldown-aware retry for FAILED async jobs. This is motivated by Amazon SP-API's undocumented per-report-type cooldown (e.g. 30 min for near-real-time FBA reports) — when a report request FATALs due to cooldown, immediate retries are guaranteed to fail again.

How it works:

  • AsyncJob gains a retry_after timestamp and helper methods (set_retry_after, retry_deferred, ready_to_retry).
  • AsyncJobOrchestrator._replace_failed_jobs() uses this to implement a two-phase deferral:
    1. On first FAILED detection → sets retry_after = now + wait_time and skips (does not replace).
    2. On subsequent loop iterations → skips while now < retry_after.
    3. Once cooldown elapses → proceeds with normal job replacement.
  • TIMED_OUT jobs bypass the deferral entirely and are replaced immediately (existing behavior preserved).
  • No time.sleep() — the orchestrator loop continues processing other partitions/jobs normally.

The parameter is exposed on the declarative schema (AsyncRetriever.failed_retry_wait_time_in_seconds), supports interpolation from config, and is wired through the model factory.

Review & Testing Checklist for Human

  • Verify the deferral logic ordering in _replace_failed_jobs (job_orchestrator.py lines ~198-220): The two guards (ready_to_retry then retry_deferred) must be checked in this order. On first failure both are false/false so the second branch fires. On subsequent calls before cooldown, ready_to_retry() is false so the first branch fires. After cooldown, both are true and execution falls through to replacement. Confirm this state machine is correct and handles edge cases (e.g., job fails again after deferred retry succeeds and a new job also fails).
  • Validate that partition.replace_job is never called while a job is deferred: The continue statements must prevent reaching the partition.replace_job(job, [new_job]) line. A missed replacement during deferral would leave the partition in an inconsistent state.
  • Check that there is no positive-value validation on the resolved failed_retry_wait_time_in_seconds. A value of 0 or negative would effectively make the deferral a no-op (the if self._failed_retry_wait_time_in_seconds guard is falsy for 0). Decide if this is acceptable or if validation should be added.
  • Test plan: Run the async job unit tests (pytest unit_tests/sources/declarative/async_job/ -v). For E2E validation, configure a manifest-based connector that uses AsyncRetriever with failed_retry_wait_time_in_seconds set, trigger a FATAL/failed job, and observe that the retry is deferred in logs rather than immediate.

Notes

  • Tests access job_tracker._jobs (private set) directly to pre-register job IDs before orchestrator logic runs. This mirrors how the orchestrator normally registers jobs via _start_job but is coupled to the internal implementation.
  • The existing no-op test test_given_status_is_terminal_when_update_status_then_stop_timer (which was just pass) was removed during the test_job.py rewrite to pytest functions.

Link to Devin session: https://app.devin.ai/sessions/2cbe53a30ea14f9f8151f407f423283b
Requested by: Daryna Ishchenko (@darynaishchenko)

Summary by CodeRabbit

  • New Features

    • Added an optional configurable cooldown delay for failed async jobs so retries can be deferred and resumed after a wait period; orchestrator defers replacement until the cooldown elapses.
  • Configuration

    • Exposed a new optional setting to control the failed-job retry wait time (accepts integers or interpolated strings).
  • Tests

    • Added/updated tests for deferred-retry gating, readiness-to-retry timing, timeout detection, and orchestrator retry behavior.

Add failed_retry_wait_time_in_seconds parameter to AsyncRetriever and
AsyncJobOrchestrator. When configured, FAILED jobs are not retried
immediately but deferred until a cooldown period elapses. This is
non-blocking: the orchestrator skips deferred jobs and continues
processing other partitions normally.

Changes:
- AsyncJob: add set_retry_after(), retry_deferred(), ready_to_retry()
- AsyncJobOrchestrator: defer FAILED job retries when wait time is set
- Schema/models: add failed_retry_wait_time_in_seconds to AsyncRetriever
- Factory: wire parameter through to orchestrator
- Tests: add parametrized tests for retry_after and orchestrator behavior

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@devin-ai-integration
Copy link
Copy Markdown
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1778159323-failed-retry-wait-time#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1778159323-failed-retry-wait-time

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

Comment thread unit_tests/sources/declarative/async_job/test_job_orchestrator.py Fixed
Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

PyTest Results (Fast)

4 049 tests  +6   4 038 ✅ +6   7m 35s ⏱️ -13s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 9ddf6ca. ± Comparison against base commit ccc185f.

This pull request removes 3 and adds 9 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_status_is_terminal_when_update_status_then_stop_timer
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_timer_is_not_out_when_status_then_return_actual_status
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_timer_is_out_when_status_then_return_timed_out
unit_tests.sources.declarative.async_job.test_job ‑ test_given_timer_is_not_out_when_status_then_return_actual_status
unit_tests.sources.declarative.async_job.test_job ‑ test_given_timer_is_out_when_status_then_return_timed_out
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[no_retry_after_set]
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[retry_after_in_future]
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[retry_after_in_past]
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_failed_retry_wait_time_when_job_fails_then_defers_retry
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_failed_retry_wait_time_when_timed_out_job_then_replaces_immediately
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_no_failed_retry_wait_time_when_job_fails_then_replaces_immediately
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_synthetic_failed_replacement_when_cooldown_elapses_then_carries_retry_after

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

PyTest Results (Full)

4 052 tests  +6   4 040 ✅ +6   11m 13s ⏱️ +12s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 9ddf6ca. ± Comparison against base commit ccc185f.

This pull request removes 3 and adds 9 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_status_is_terminal_when_update_status_then_stop_timer
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_timer_is_not_out_when_status_then_return_actual_status
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_timer_is_out_when_status_then_return_timed_out
unit_tests.sources.declarative.async_job.test_job ‑ test_given_timer_is_not_out_when_status_then_return_actual_status
unit_tests.sources.declarative.async_job.test_job ‑ test_given_timer_is_out_when_status_then_return_timed_out
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[no_retry_after_set]
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[retry_after_in_future]
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[retry_after_in_past]
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_failed_retry_wait_time_when_job_fails_then_defers_retry
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_failed_retry_wait_time_when_timed_out_job_then_replaces_immediately
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_no_failed_retry_wait_time_when_job_fails_then_replaces_immediately
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_synthetic_failed_replacement_when_cooldown_elapses_then_carries_retry_after

♻️ This comment has been updated with latest results.

@darynaishchenko Daryna Ishchenko (darynaishchenko) marked this pull request as ready for review May 7, 2026 14:02
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 7, 2026

Review Change Stack

📝 Walkthrough

Walkthrough

Adds configurable cooldown-based deferred retries for FAILED async jobs: AsyncJob tracks a retry-after timestamp, orchestrator gates replacements using a new failed_retry_wait_time_in_seconds setting, configuration is wired through the factory/schema, and tests cover the behavior.

Changes

Async Job Retry Deferral

Layer / File(s) Summary
Schema and Configuration Fields
airbyte_cdk/sources/declarative/models/declarative_component_schema.py, airbyte_cdk/sources/declarative/declarative_component_schema.yaml
AsyncRetriever gains optional failed_retry_wait_time_in_seconds (int or interpolated string) to control cooldown before retrying failed jobs.
AsyncJob Retry State Tracking
airbyte_cdk/sources/declarative/async_job/job.py
AsyncJob adds _retry_after and methods: set_retry_after(), retry_deferred(), and ready_to_retry() using UTC-aware comparisons.
Orchestrator Retry Deferral Logic
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
AsyncJobOrchestrator accepts failed_retry_wait_time_in_seconds (validated >0); it skips replacing FAILED jobs not ready to retry, defers replacement by setting retry-after when configured, and propagates retry-after to synthetic FAILED replacements; TIMED_OUT behavior unchanged.
Configuration Wiring
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
create_async_retriever interpolates the model value and passes failed_retry_wait_time_in_seconds (Optional[int]) into AsyncHttpJobRepository.
Tests
unit_tests/sources/declarative/async_job/test_job.py, unit_tests/sources/declarative/async_job/test_job_orchestrator.py
Tests converted to pytest; added parametrized tests for AsyncJob retry-after states and multiple orchestrator tests verifying deferred replacement, immediate replacement when unset, TIMED_OUT behavior, and propagation of original retry-after to synthetic failures.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant Orchestrator
  participant JobRepo
  participant Job

  Client->>Orchestrator: trigger job handling
  Orchestrator->>JobRepo: fetch job
  Orchestrator->>Job: check status
  Orchestrator->>Job: ready_to_retry()?
  alt Not ready
    Orchestrator-->>Job: skip/defers replacement (set retry_after)
  else Ready
    Orchestrator->>JobRepo: start replacement
    JobRepo-->>Orchestrator: new job (may be FAILED)
    alt Replacement FAILED && original was deferred
      Orchestrator->>Job: copy retry_after to new job
    end
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Would you like me to flag places to tighten timezone handling or add more unit coverage around edge cases, wdyt?

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 47.37% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(async-jobs): add deferred retry with cooldown for failed async jobs' directly and clearly summarizes the main change: adding a deferred retry mechanism with cooldown for failed async jobs in the async-jobs module.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch devin/1778159323-failed-retry-wait-time

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py (1)

171-193: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Could we reject negative retry cooldown values at construction time, wdyt?

Right now, a negative value can produce a retry_after in the past, which makes the deferral semantics confusing instead of explicit.

Proposed fix
     def __init__(
         self,
         job_repository: AsyncJobRepository,
         slices: Iterable[StreamSlice],
         job_tracker: JobTracker,
         message_repository: MessageRepository,
         exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
         has_bulk_parent: bool = False,
         job_max_retry: Optional[int] = None,
         failed_retry_wait_time_in_seconds: Optional[int] = None,
     ) -> None:
+        if (
+            failed_retry_wait_time_in_seconds is not None
+            and failed_retry_wait_time_in_seconds < 0
+        ):
+            raise ValueError(
+                "failed_retry_wait_time_in_seconds must be >= 0"
+            )
         """
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py` around lines
171 - 193, Reject negative failed_retry_wait_time_in_seconds in the constructor
by validating the parameter and raising a ValueError if it's < 0; add a check
after parameters are received (near AsyncJobStatus/_KNOWN_JOB_STATUSES
validation) to assert failed_retry_wait_time_in_seconds is either None or >= 0
and include a clear error message, then assign it to
self._failed_retry_wait_time_in_seconds as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@airbyte_cdk/sources/declarative/async_job/job.py`:
- Around line 58-70: The set_retry_after method should guard against naive
datetimes and store a normalized UTC-aware datetime in self._retry_after: if
retry_after.tzinfo is None (naive) attach timezone.utc (e.g. retry_after =
retry_after.replace(tzinfo=timezone.utc)), then normalize any timezone-aware
value to UTC via retry_after = retry_after.astimezone(timezone.utc) before
assigning to self._retry_after; keep ready_to_retry using
datetime.now(tz=timezone.utc) and the _retry_after field for comparisons so no
TypeError occurs.

In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 4105-4111: Add a minimum bound to the integer branch of the
failed_retry_wait_time_in_seconds schema so negative or zero values are rejected
by the schema; specifically, in the failed_retry_wait_time_in_seconds anyOf
entry for type: integer add minimum: 1 (mirroring the pattern used by
DynamicStreamCheckConfig.stream_count) to ensure the orchestrator's
ready_to_retry() logic isn't fed non-positive values while keeping the existing
anyOf string branch and interpolation_context unchanged.

In `@airbyte_cdk/sources/declarative/models/declarative_component_schema.py`:
- Around line 3077-3080: The field failed_retry_wait_time_in_seconds currently
allows negative values; add validation to prevent negatives by adding ge=0 to
its Pydantic Field call in the declarative component model (update the
Field(...) for failed_retry_wait_time_in_seconds to include ge=0) and also add
minimum: 0 to the corresponding YAML schema definition for the same field so
generated models and schema both enforce a non-negative cooldown.

In `@unit_tests/sources/declarative/async_job/test_job.py`:
- Around line 30-55: The parametrized datetimes are computed at import time
which makes the test time-sensitive; change the test to compute retry_after
values at runtime inside test_retry_after instead of using datetime.now(...) in
the param list: keep the same parameter cases (None, future, past) but pass a
marker or enum in the parametrize tuple and in test_retry_after compute
datetime.now(timezone.utc) + timedelta(hours=1) or - timedelta(seconds=1) as
needed, then call AsyncJob.set_retry_after(retry_after) and assert
job.retry_deferred() and job.ready_to_retry() as before (refer to
test_retry_after, AsyncJob.set_retry_after, job.retry_deferred,
job.ready_to_retry).

---

Outside diff comments:
In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py`:
- Around line 171-193: Reject negative failed_retry_wait_time_in_seconds in the
constructor by validating the parameter and raising a ValueError if it's < 0;
add a check after parameters are received (near
AsyncJobStatus/_KNOWN_JOB_STATUSES validation) to assert
failed_retry_wait_time_in_seconds is either None or >= 0 and include a clear
error message, then assign it to self._failed_retry_wait_time_in_seconds as
before.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 708f4de0-2b2c-45d9-9671-f27afea41ede

📥 Commits

Reviewing files that changed from the base of the PR and between ccc185f and 483701a.

📒 Files selected for processing (7)
  • airbyte_cdk/sources/declarative/async_job/job.py
  • airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • unit_tests/sources/declarative/async_job/test_job.py
  • unit_tests/sources/declarative/async_job/test_job_orchestrator.py

Comment thread airbyte_cdk/sources/declarative/async_job/job.py
Comment thread airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Comment thread unit_tests/sources/declarative/async_job/test_job.py
…runtime offsets

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@tolik0
Copy link
Copy Markdown
Contributor

A couple of concerns from review:

1. Double cooldown via synthetic FAILED jobs

_start_job doesn't always return a freshly-running job. When the API rejects the start, _keep_api_budget_with_failed_job synthesizes a FAILED AsyncJob with _retry_after = None and returns it (job_orchestrator.py:260-281). Tracing the SP-API cooldown path:

  1. Job fails at T=0 → cooldown armed, retry_after = T + 1800
  2. At T=1800, cooldown elapses → _replace_failed_jobs calls _start_job
  3. API still in cooldown → _start_job returns a synthetic FAILED job (fresh, no _retry_after)
  4. Next tick → state machine sees a fresh FAILED job → arms cooldown again at T ≈ 1800
  5. Effective total wait is roughly 2 × cooldown per attempt, not 1 × cooldown

For the motivating SP-API case, hitting _keep_api_budget_with_failed_job is the expected outcome of retrying during cooldown, so this isn't an edge case. Two reasonable fixes:

  • Carry _retry_after from the old job onto the synthetic replacement, or
  • Don't replace at all when _start_job would synthesize a failure during cooldown — keep the original job and re-arm.

Either way, worth a regression test that simulates _start_job returning a synthetic FAILED job and asserts the total wall-clock wait equals one cooldown, not two.

2. Three different lower bounds for failed_retry_wait_time_in_seconds

  • YAML schema: minimum: 1
  • Constructor: if ... < 0: raise ValueError(...) — allows 0
  • Runtime: if self._failed_retry_wait_time_in_seconds: — treats 0 as disabled

A value of 0 is rejected by the schema, silently accepted by the constructor, and silently disables the feature at runtime. Pick one: is not None and > 0 in the runtime check, and > 0 (or >= 1) in the constructor to match the YAML.

…dation

- When _start_job returns a synthetic FAILED job (API still in cooldown),
  carry _retry_after from the old job to prevent double-cooldown
- Change constructor validation from < 0 to <= 0 to match YAML schema
  minimum: 1 constraint
- Add regression test for synthetic FAILED replacement scenario

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@devin-ai-integration
Copy link
Copy Markdown
Contributor

Both concerns addressed in 9ddf6ca:

1. Double cooldown via synthetic FAILED jobs

Fixed by carrying _retry_after from the old job onto the synthetic replacement. After _start_job returns, if the new job is already FAILED (synthetic) and the original job had a deferred retry, the replacement inherits _retry_after:

new_job = self._start_job(job.job_parameters(), job.api_job_id())
if (
    self._failed_retry_wait_time_in_seconds
    and new_job.status() == AsyncJobStatus.FAILED
    and job.retry_deferred()
):
    new_job.set_retry_after(job._retry_after)
partition.replace_job(job, [new_job])

This ensures the total wall-clock wait equals one cooldown, not two. Added a regression test (test_given_synthetic_failed_replacement_when_cooldown_elapses_then_carries_retry_after) that simulates _start_job returning a synthetic FAILED job and asserts the replacement inherits _retry_after from the original.

2. Consistent lower bound

Constructor validation now rejects <= 0 (matching YAML minimum: 1):

if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0:
    raise ValueError("failed_retry_wait_time_in_seconds must be >= 1")

The three layers are now consistent:

  • YAML schema: minimum: 1
  • Constructor: rejects <= 0
  • Runtime: if self._failed_retry_wait_time_in_seconds: — 0 can never reach here since constructor rejects it

Devin session

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py (1)

204-223: ⚡ Quick win

Could we flip the cooldown guard order to reduce hidden coupling, wdyt?

At Line 205, readiness is checked before we know whether cooldown was ever armed. Would you consider arming on first FAILED (not retry_deferred) first, then checking readiness, so this path doesn’t depend on implicit ready_to_retry() semantics?

Proposed refactor
-            if self._failed_retry_wait_time_in_seconds and job.status() == AsyncJobStatus.FAILED:
-                if not job.ready_to_retry():
-                    lazy_log(
-                        LOGGER,
-                        logging.DEBUG,
-                        lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.",
-                    )
-                    continue
-                if not job.retry_deferred():
+            if self._failed_retry_wait_time_in_seconds and job.status() == AsyncJobStatus.FAILED:
+                if not job.retry_deferred():
                     job.set_retry_after(
                         datetime.now(tz=timezone.utc)
                         + timedelta(seconds=self._failed_retry_wait_time_in_seconds)
@@
                     )
                     continue
+                if not job.ready_to_retry():
+                    lazy_log(
+                        LOGGER,
+                        logging.DEBUG,
+                        lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.",
+                    )
+                    continue
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py` around lines
204 - 223, Flip the cooldown guard order so the retry cooldown is explicitly
armed on the first FAILED occurrence before relying on ready_to_retry(); inside
the loop handling AsyncJobStatus.FAILED (use symbols
_failed_retry_wait_time_in_seconds, job.retry_deferred(),
job.set_retry_after(...), job.ready_to_retry(), job.api_job_id(), _start_job),
first check if not job.retry_deferred() and then call
job.set_retry_after(datetime.now(tz=timezone.utc) +
timedelta(seconds=self._failed_retry_wait_time_in_seconds)) and log the deferral
and continue; otherwise (if retry_deferred) then check job.ready_to_retry() and
skip/continue if not ready, only proceed to call
self._start_job(job.job_parameters(), job.api_job_id()) when readiness passes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py`:
- Around line 204-223: Flip the cooldown guard order so the retry cooldown is
explicitly armed on the first FAILED occurrence before relying on
ready_to_retry(); inside the loop handling AsyncJobStatus.FAILED (use symbols
_failed_retry_wait_time_in_seconds, job.retry_deferred(),
job.set_retry_after(...), job.ready_to_retry(), job.api_job_id(), _start_job),
first check if not job.retry_deferred() and then call
job.set_retry_after(datetime.now(tz=timezone.utc) +
timedelta(seconds=self._failed_retry_wait_time_in_seconds)) and log the deferral
and continue; otherwise (if retry_deferred) then check job.ready_to_retry() and
skip/continue if not ready, only proceed to call
self._start_job(job.job_parameters(), job.api_job_id()) when readiness passes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 242307c0-269e-4c29-aba3-70f104ce272f

📥 Commits

Reviewing files that changed from the base of the PR and between d0385e7 and 9ddf6ca.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
  • unit_tests/sources/declarative/async_job/test_job_orchestrator.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants