Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions airbyte_cdk/sources/declarative/async_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from airbyte_cdk.sources.declarative.async_job.timer import Timer
from airbyte_cdk.sources.types import StreamSlice
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

from .status import AsyncJobStatus

Expand All @@ -24,11 +25,39 @@ def __init__(
self._api_job_id = api_job_id
self._job_parameters = job_parameters
self._status = AsyncJobStatus.RUNNING
self._failure_exception: Optional[AirbyteTracedException] = None

timeout = timeout if timeout else timedelta(minutes=60)
self._timer = Timer(timeout)
self._timer.start()

def failure_exception(self) -> Optional[AirbyteTracedException]:
"""
Return the exception that caused this job to fail, if any.

This is set by the orchestrator (or repository) when a job transitions
to a terminal failure state so that downstream error aggregation can
preserve the original `failure_type` instead of collapsing every async
job failure to `system_error`.
"""
return self._failure_exception

def set_failure_exception(self, exception: Optional[Exception]) -> None:
"""
Attach the originating exception for a failed job.

Wraps non-`AirbyteTracedException` values via `AirbyteTracedException.from_exception`
so the stored value always carries a `failure_type`.
"""
if exception is None:
self._failure_exception = None
return
self._failure_exception = (
exception
if isinstance(exception, AirbyteTracedException)
else AirbyteTracedException.from_exception(exception)
)

def api_job_id(self) -> str:
return self._api_job_id

Expand Down
34 changes: 30 additions & 4 deletions airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,16 @@ def _keep_api_budget_with_failed_job(
# Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate,
# we would keep the exceptions in-memory until we know that we have reached the max attempt.
self._message_repository.emit_message(traced_exception.as_airbyte_message())
job = self._create_failed_job(_slice)
job = self._create_failed_job(_slice, traced_exception)
self._job_tracker.add_job(intent, job.api_job_id())
return job

def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob:
def _create_failed_job(
self, stream_slice: StreamSlice, exception: Optional[Exception] = None
) -> AsyncJob:
job = AsyncJob(f"{uuid.uuid4()} - Job that could not start", stream_slice, _NO_TIMEOUT)
job.update_status(AsyncJobStatus.FAILED)
job.set_failure_exception(exception)
return job

def _get_running_jobs(self) -> Set[AsyncJob]:
Expand Down Expand Up @@ -420,6 +423,28 @@ def _reallocate_partition(
"""
current_running_partitions.insert(0, partition)

@staticmethod
def _resolve_failure_type(exceptions: Iterable[Optional[Exception]]) -> FailureType:
"""
Aggregate `FailureType` from a collection of exceptions.

Priority is `config_error` > `system_error` > `transient_error`. When no
typed exceptions are available (for example a job that transitioned to
FAILED on the API side without a captured local exception), fall back
to `system_error` so the behaviour matches the previous default.
"""
failure_types: Set[FailureType] = set()
for exception in exceptions:
if isinstance(exception, AirbyteTracedException):
failure_types.add(exception.failure_type)
if FailureType.config_error in failure_types:
return FailureType.config_error
if FailureType.system_error in failure_types:
return FailureType.system_error
if FailureType.transient_error in failure_types:
return FailureType.transient_error
return FailureType.system_error

def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
"""
Process a partition with status errors (FAILED and TIMEOUT).
Expand All @@ -432,11 +457,12 @@ def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
AirbyteTracedException: If at least one job could not be completed.
"""
status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs}
failure_type = self._resolve_failure_type(job.failure_exception() for job in partition.jobs)
self._non_breaking_exceptions.append(
AirbyteTracedException(
message="Async job failed after exhausting all retry attempts.",
internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.",
failure_type=FailureType.system_error,
failure_type=failure_type,
)
)

Expand Down Expand Up @@ -489,7 +515,7 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
for exception in self._non_breaking_exceptions
]
),
failure_type=FailureType.system_error,
failure_type=self._resolve_failure_type(self._non_breaking_exceptions),
)

def _handle_non_breaking_error(self, exception: Exception) -> None:
Expand Down
95 changes: 95 additions & 0 deletions unit_tests/sources/declarative/async_job/test_job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,101 @@ def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_

assert job_tracker.try_to_get_intent()

@mock.patch(sleep_mock_target)
def test_given_transient_errors_on_start_when_max_attempts_reached_then_raise_transient_error(
self, mock_sleep: MagicMock
) -> None:
"""
Repeated `transient_error` failures while starting a job should surface
as a `transient_error` from the outer wrapper instead of being relabeled
as `system_error`. This is the oncall #12043 case (rate-limit 429 on
Amazon SP-API `createReport`).
"""
self._job_repository.start.side_effect = [
AirbyteTracedException("Rate limited", failure_type=FailureType.transient_error)
for _ in range(_MAX_NUMBER_OF_ATTEMPTS)
]

orchestrator = self._orchestrator([_A_STREAM_SLICE])

partitions, exception = self._accumulate_create_and_get_completed_partitions(orchestrator)

assert len(partitions) == 0
assert exception is not None
assert exception.failure_type == FailureType.transient_error # type: ignore[attr-defined]

@mock.patch(sleep_mock_target)
def test_given_mixed_transient_and_system_errors_across_partitions_when_max_attempts_reached_then_raise_system_error(
self, mock_sleep: MagicMock
) -> None:
"""
When the underlying failures span both `transient_error` and
`system_error` across partitions, the outer wrapper should escalate
to `system_error` (priority `system_error` > `transient_error`).
"""
self._job_repository.start.side_effect = [
AirbyteTracedException("Rate limited", failure_type=FailureType.transient_error),
AirbyteTracedException("Unexpected", failure_type=FailureType.system_error),
]

orchestrator = AsyncJobOrchestrator(
self._job_repository,
[_A_STREAM_SLICE, _ANOTHER_STREAM_SLICE],
JobTracker(_NO_JOB_LIMIT),
self._message_repository,
job_max_retry=1,
)

_, exception = self._accumulate_create_and_get_completed_partitions(orchestrator)

assert exception is not None
assert exception.failure_type == FailureType.system_error # type: ignore[attr-defined]

def test_resolve_failure_type_priority_matrix(self) -> None:
"""
Priority is `config_error` > `system_error` > `transient_error` with
`system_error` as the safe fallback when no typed exceptions are
available.
"""
resolve = AsyncJobOrchestrator._resolve_failure_type

assert resolve([]) == FailureType.system_error
assert resolve([None, None]) == FailureType.system_error
assert (
resolve([AirbyteTracedException(failure_type=FailureType.transient_error)])
== FailureType.transient_error
)
assert (
resolve([AirbyteTracedException(failure_type=FailureType.system_error)])
== FailureType.system_error
)
assert (
resolve([AirbyteTracedException(failure_type=FailureType.config_error)])
== FailureType.config_error
)
assert (
resolve(
[
AirbyteTracedException(failure_type=FailureType.transient_error),
AirbyteTracedException(failure_type=FailureType.system_error),
]
)
== FailureType.system_error
)
assert (
resolve(
[
AirbyteTracedException(failure_type=FailureType.transient_error),
AirbyteTracedException(failure_type=FailureType.config_error),
AirbyteTracedException(failure_type=FailureType.system_error),
]
)
== FailureType.config_error
)
# non-traced exceptions are ignored when computing the priority but the
# fallback is still `system_error`.
assert resolve([ValueError("boom")]) == FailureType.system_error

def given_budget_already_taken_before_start_when_create_and_get_completed_partitions_then_wait_for_budget_to_be_freed(
self,
) -> None:
Expand Down
Loading