Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion airbyte_cdk/sources/declarative/async_job/job.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.


from datetime import timedelta
from datetime import datetime, timedelta, timezone
from typing import Optional

from airbyte_cdk.sources.declarative.async_job.timer import Timer
Expand All @@ -24,6 +24,7 @@ def __init__(
self._api_job_id = api_job_id
self._job_parameters = job_parameters
self._status = AsyncJobStatus.RUNNING
self._retry_after: Optional[datetime] = None

timeout = timeout if timeout else timedelta(minutes=60)
self._timer = Timer(timeout)
Expand Down Expand Up @@ -54,5 +55,19 @@ def update_status(self, status: AsyncJobStatus) -> None:

self._status = status

def set_retry_after(self, retry_after: datetime) -> None:
"""Set the earliest time this job can be retried."""
self._retry_after = retry_after

def retry_deferred(self) -> bool:
"""Return True if a deferred retry has been scheduled."""
return self._retry_after is not None

def ready_to_retry(self) -> bool:
"""Return True if the job has no deferred retry or the wait period has elapsed."""
if self._retry_after is None:
return True
return datetime.now(tz=timezone.utc) >= self._retry_after
Comment thread
darynaishchenko marked this conversation as resolved.

def __repr__(self) -> str:
return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"
32 changes: 31 additions & 1 deletion airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
import traceback
import uuid
from datetime import timedelta
from datetime import datetime, timedelta, timezone
from typing import (
Any,
Generator,
Expand Down Expand Up @@ -168,6 +168,7 @@ def __init__(
exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
has_bulk_parent: bool = False,
job_max_retry: Optional[int] = None,
failed_retry_wait_time_in_seconds: Optional[int] = None,
) -> None:
"""
If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
Expand All @@ -181,6 +182,9 @@ def __init__(
"An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES"
)

if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0:
raise ValueError("failed_retry_wait_time_in_seconds must be >= 1")

self._job_repository: AsyncJobRepository = job_repository
self._slice_iterator = LookaheadIterator(slices)
self._running_partitions: List[AsyncPartition] = []
Expand All @@ -189,14 +193,40 @@ def __init__(
self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
self._has_bulk_parent = has_bulk_parent
self._job_max_retry = job_max_retry
self._failed_retry_wait_time_in_seconds = failed_retry_wait_time_in_seconds

self._non_breaking_exceptions: List[Exception] = []

def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs]
for job in jobs_to_replace:
if self._failed_retry_wait_time_in_seconds and job.status() == AsyncJobStatus.FAILED:
if not job.ready_to_retry():
lazy_log(
LOGGER,
logging.DEBUG,
lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.",
)
continue
if not job.retry_deferred():
job.set_retry_after(
datetime.now(tz=timezone.utc)
+ timedelta(seconds=self._failed_retry_wait_time_in_seconds)
)
lazy_log(
LOGGER,
logging.INFO,
lambda: f"Job {job.api_job_id()} failed. Deferring retry for {self._failed_retry_wait_time_in_seconds} seconds.",
)
continue
new_job = self._start_job(job.job_parameters(), job.api_job_id())
if (
self._failed_retry_wait_time_in_seconds
and new_job.status() == AsyncJobStatus.FAILED
and job.retry_deferred()
):
new_job.set_retry_after(job._retry_after) # type: ignore[arg-type]
partition.replace_job(job, [new_job])

def _start_jobs(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4102,6 +4102,14 @@ definitions:
- type: string
interpolation_context:
- config
failed_retry_wait_time_in_seconds:
description: Time in seconds to wait before retrying a failed async job. This is useful for APIs with cooldown periods between report generation attempts. When set, the orchestrator defers retry of failed jobs until the wait time has elapsed, without blocking other jobs.
anyOf:
- type: integer
minimum: 1
- type: string
interpolation_context:
- config
Comment thread
coderabbitai[bot] marked this conversation as resolved.
download_target_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.
anyOf:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3074,6 +3074,10 @@ class AsyncRetriever(BaseModel):
None,
description="The time in minutes after which the single Async Job should be considered as Timed Out.",
)
failed_retry_wait_time_in_seconds: Optional[Union[int, str]] = Field(
None,
description="Time in seconds to wait before retrying a failed async job. This is useful for APIs with cooldown periods between report generation attempts. When set, the orchestrator defers retry of failed jobs until the wait time has elapsed, without blocking other jobs.",
)
Comment thread
darynaishchenko marked this conversation as resolved.
download_target_requester: Optional[Union[HttpRequester, CustomRequester]] = Field(
None,
description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3955,6 +3955,17 @@ def _get_job_timeout() -> datetime.timedelta:
job_timeout=_get_job_timeout(),
)

failed_retry_wait_time_in_seconds: Optional[int] = (
int(
InterpolatedString.create(
str(model.failed_retry_wait_time_in_seconds),
parameters={},
).eval(config)
)
if model.failed_retry_wait_time_in_seconds
else None
)

async_job_partition_router = AsyncJobPartitionRouter(
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
job_repository,
Expand All @@ -3966,6 +3977,7 @@ def _get_job_timeout() -> datetime.timedelta:
# set the `job_max_retry` to 1 for the `Connector Builder`` use-case.
# `None` == default retry is set to 3 attempts, under the hood.
job_max_retry=1 if self._emit_connector_builder_messages else None,
failed_retry_wait_time_in_seconds=failed_retry_wait_time_in_seconds,
),
stream_slicer=stream_slicer,
config=config,
Expand Down
45 changes: 29 additions & 16 deletions unit_tests/sources/declarative/async_job/test_job.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import time
from datetime import timedelta
from unittest import TestCase
from datetime import datetime, timedelta, timezone
from typing import Optional

import pytest

from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus
Expand All @@ -14,19 +16,30 @@
_IMMEDIATELY_TIMED_OUT = timedelta(microseconds=1)


class AsyncJobTest(TestCase):
def test_given_timer_is_not_out_when_status_then_return_actual_status(self) -> None:
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT)
assert job.status() == AsyncJobStatus.RUNNING
def test_given_timer_is_not_out_when_status_then_return_actual_status() -> None:
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT)
assert job.status() == AsyncJobStatus.RUNNING


def test_given_timer_is_out_when_status_then_return_timed_out() -> None:
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _IMMEDIATELY_TIMED_OUT)
time.sleep(0.001)
assert job.status() == AsyncJobStatus.TIMED_OUT

def test_given_timer_is_out_when_status_then_return_timed_out(self) -> None:
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _IMMEDIATELY_TIMED_OUT)
time.sleep(0.001)
assert job.status() == AsyncJobStatus.TIMED_OUT

def test_given_status_is_terminal_when_update_status_then_stop_timer(self) -> None:
"""
This test will become important once we will print stats associated with jobs. As for now, we stop the timer but do not return any
metrics regarding the timer so it is not useful.
"""
pass
@pytest.mark.parametrize(
"retry_after_offset,expected_deferred,expected_ready",
[
pytest.param(None, False, True, id="no_retry_after_set"),
pytest.param(timedelta(hours=1), True, False, id="retry_after_in_future"),
pytest.param(-timedelta(seconds=1), True, True, id="retry_after_in_past"),
],
)
def test_retry_after(
retry_after_offset: Optional[timedelta], expected_deferred: bool, expected_ready: bool
) -> None:
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT)
if retry_after_offset is not None:
job.set_retry_after(datetime.now(tz=timezone.utc) + retry_after_offset)
assert job.retry_deferred() == expected_deferred
assert job.ready_to_retry() == expected_ready
Comment thread
coderabbitai[bot] marked this conversation as resolved.
133 changes: 133 additions & 0 deletions unit_tests/sources/declarative/async_job/test_job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import threading
import time
from datetime import datetime, timedelta, timezone
from typing import Callable, List, Mapping, Optional, Set, Tuple
from unittest import TestCase, mock
from unittest.mock import MagicMock, Mock, call
Expand Down Expand Up @@ -416,6 +417,138 @@ def test_given_skipped_does_not_retry(self, mock_sleep: MagicMock) -> None:
# start is called only once — SKIPPED does not trigger a retry
assert self._job_repository.start.call_count == 1

@mock.patch(sleep_mock_target)
def test_given_failed_retry_wait_time_when_job_fails_then_defers_retry(
self, mock_sleep: MagicMock
) -> None:
"""When failed_retry_wait_time_in_seconds is set and a job fails, the retry should
be deferred: first call sets retry_after timestamp and skips, subsequent calls skip
until cooldown elapses, then the job is replaced."""
job_tracker = JobTracker(_NO_JOB_LIMIT)
job = self._an_async_job("deferred-job", _A_STREAM_SLICE)
job_tracker._jobs.add("deferred-job")
partition = AsyncPartition([job], _A_STREAM_SLICE)
orchestrator = AsyncJobOrchestrator(
self._job_repository,
[],
job_tracker,
self._message_repository,
failed_retry_wait_time_in_seconds=1800,
)

job.update_status(AsyncJobStatus.FAILED)

# First call: should set retry_after and NOT replace
orchestrator._replace_failed_jobs(partition)
assert job.retry_deferred()
assert not job.ready_to_retry()
self._job_repository.start.assert_not_called()

# Second call while cooldown hasn't elapsed: should still skip
orchestrator._replace_failed_jobs(partition)
self._job_repository.start.assert_not_called()

# Simulate cooldown elapsed by setting retry_after to the past
job.set_retry_after(datetime.now(tz=timezone.utc) - timedelta(seconds=1))
replacement_job = self._an_async_job("replacement-job", _A_STREAM_SLICE)
self._job_repository.start.return_value = replacement_job

# Third call after cooldown: should replace the job
orchestrator._replace_failed_jobs(partition)
self._job_repository.start.assert_called_once()

@mock.patch(sleep_mock_target)
def test_given_no_failed_retry_wait_time_when_job_fails_then_replaces_immediately(
self, mock_sleep: MagicMock
) -> None:
"""Without failed_retry_wait_time_in_seconds, FAILED jobs are replaced immediately
(existing behavior)."""
job_tracker = JobTracker(_NO_JOB_LIMIT)
job = self._an_async_job("immediate-job", _A_STREAM_SLICE)
job_tracker._jobs.add("immediate-job")
partition = AsyncPartition([job], _A_STREAM_SLICE)
replacement_job = self._an_async_job("replacement-job", _A_STREAM_SLICE)
self._job_repository.start.return_value = replacement_job
orchestrator = AsyncJobOrchestrator(
self._job_repository,
[],
job_tracker,
self._message_repository,
)

job.update_status(AsyncJobStatus.FAILED)
orchestrator._replace_failed_jobs(partition)

self._job_repository.start.assert_called_once()

@mock.patch(sleep_mock_target)
def test_given_failed_retry_wait_time_when_timed_out_job_then_replaces_immediately(
self, mock_sleep: MagicMock
) -> None:
"""TIMED_OUT jobs should be replaced immediately even when
failed_retry_wait_time_in_seconds is set (only FAILED gets deferred)."""
job_tracker = JobTracker(_NO_JOB_LIMIT)
job = self._an_async_job("timed-out-job", _A_STREAM_SLICE)
job_tracker._jobs.add("timed-out-job")
partition = AsyncPartition([job], _A_STREAM_SLICE)
replacement_job = self._an_async_job("replacement-job", _A_STREAM_SLICE)
self._job_repository.start.return_value = replacement_job
orchestrator = AsyncJobOrchestrator(
self._job_repository,
[],
job_tracker,
self._message_repository,
failed_retry_wait_time_in_seconds=1800,
)

job.update_status(AsyncJobStatus.TIMED_OUT)
orchestrator._replace_failed_jobs(partition)

self._job_repository.start.assert_called_once()

@mock.patch(sleep_mock_target)
def test_given_synthetic_failed_replacement_when_cooldown_elapses_then_carries_retry_after(
self, mock_sleep: MagicMock
) -> None:
"""When a deferred job's cooldown elapses and _start_job returns a synthetic FAILED
job (API still in cooldown), the replacement should inherit _retry_after from the
original job to avoid doubling the cooldown wait."""
job_tracker = JobTracker(_NO_JOB_LIMIT)
job = AsyncJob("original-job", _A_STREAM_SLICE)
job_tracker._jobs.add("original-job")
partition = AsyncPartition([job], _A_STREAM_SLICE)
orchestrator = AsyncJobOrchestrator(
self._job_repository,
[],
job_tracker,
self._message_repository,
failed_retry_wait_time_in_seconds=1800,
)

job.update_status(AsyncJobStatus.FAILED)

# First call: arms the cooldown
orchestrator._replace_failed_jobs(partition)
assert job.retry_deferred()

# Simulate cooldown elapsed
job.set_retry_after(datetime.now(tz=timezone.utc) - timedelta(seconds=1))
original_retry_after = job._retry_after

# _start_job returns a synthetic FAILED job (API still rejects)
synthetic_failed_job = AsyncJob("synthetic-failed", _A_STREAM_SLICE)
synthetic_failed_job.update_status(AsyncJobStatus.FAILED)
self._job_repository.start.return_value = synthetic_failed_job
job_tracker._jobs.add("synthetic-failed")

# Replace should carry _retry_after to the synthetic job
orchestrator._replace_failed_jobs(partition)
self._job_repository.start.assert_called_once()

# The synthetic replacement should have inherited _retry_after
replaced_job = list(partition.jobs)[0]
assert replaced_job._retry_after == original_retry_after

def _mock_repository(self) -> None:
self._job_repository = Mock(spec=AsyncJobRepository)

Expand Down
Loading