Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 67 additions & 3 deletions airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import timedelta
from typing import (
Any,
Dict,
Generator,
Generic,
Iterable,
Expand Down Expand Up @@ -38,6 +39,29 @@
_NO_TIMEOUT = timedelta.max
_API_SIDE_RUNNING_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}

# Precedence used to aggregate the `FailureType` of many non-breaking
# exceptions into a single value. A `config_error` means the user must act
# before retries can succeed, so it dominates. `transient_error` is next
# (retryable). `system_error` is the fallback for genuine internal failures.
_FAILURE_TYPE_PRECEDENCE: Tuple[FailureType, ...] = (
FailureType.config_error,
FailureType.transient_error,
FailureType.system_error,
)

# Deterministic, aggregation-friendly user-facing messages per dominant
# `FailureType`. Counts and raw exception reprs go into `internal_message`
# so that the `message` field stays stable as a log aggregation key.
_ASYNC_JOB_FAILURE_MESSAGE_BY_TYPE: Mapping[FailureType, str] = {
FailureType.config_error: (
"Async jobs failed because the source API rejected the request as unauthorized or forbidden."
),
FailureType.transient_error: (
"Async jobs failed after exhausting retries for source API rate limit or transient errors."
),
FailureType.system_error: "Async jobs failed after exhausting retry attempts.",
}


class AsyncPartition:
"""
Expand Down Expand Up @@ -481,16 +505,56 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
if self._non_breaking_exceptions:
# We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
# call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
failure_type = self._aggregate_failure_type(self._non_breaking_exceptions)
failure_counts = self._count_failure_types(self._non_breaking_exceptions)
summary = ", ".join(
f"{ft.value}={failure_counts[ft]}"
for ft in _FAILURE_TYPE_PRECEDENCE
if ft in failure_counts
)
raise AirbyteTracedException(
message="One or more async jobs failed after exhausting all retry attempts.",
message=_ASYNC_JOB_FAILURE_MESSAGE_BY_TYPE[failure_type],
internal_message="\n".join(
[
[f"Underlying failure breakdown: {summary}."]
+ [
filter_secrets(exception.__repr__())
for exception in self._non_breaking_exceptions
]
),
failure_type=FailureType.system_error,
failure_type=failure_type,
)

@staticmethod
def _aggregate_failure_type(exceptions: List[Exception]) -> FailureType:
"""Return the highest-precedence `FailureType` across `exceptions`.

Non-`AirbyteTracedException` exceptions are treated as `system_error`
(matching `AirbyteTracedException`'s default). The precedence order
is `config_error` > `transient_error` > `system_error`.
"""
types_present: Set[FailureType] = {
exc.failure_type
if isinstance(exc, AirbyteTracedException) and exc.failure_type is not None
else FailureType.system_error
for exc in exceptions
}
for failure_type in _FAILURE_TYPE_PRECEDENCE:
if failure_type in types_present:
return failure_type
return FailureType.system_error

@staticmethod
def _count_failure_types(exceptions: List[Exception]) -> Dict[FailureType, int]:
"""Return a count of each `FailureType` observed in `exceptions`."""
counts: Dict[FailureType, int] = {}
for exc in exceptions:
failure_type = (
exc.failure_type
if isinstance(exc, AirbyteTracedException) and exc.failure_type is not None
else FailureType.system_error
)
counts[failure_type] = counts.get(failure_type, 0) + 1
return counts

def _handle_non_breaking_error(self, exception: Exception) -> None:
LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}")
Expand Down
39 changes: 39 additions & 0 deletions unit_tests/sources/declarative/async_job/test_job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,45 @@ def test_given_exception_when_start_job_and_skip_this_exception(
assert self._message_repository.emit_message.call_count == 3 # one for each traced message
assert exception.failure_type == FailureType.system_error # type: ignore # exception should be of type AirbyteTracedException

def test_aggregate_failure_type_gives_config_error_highest_precedence(self) -> None:
exceptions: List[Exception] = [
AirbyteTracedException("a", failure_type=FailureType.transient_error),
AirbyteTracedException("b", failure_type=FailureType.config_error),
AirbyteTracedException("c"),
ValueError("d"),
]
assert AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.config_error

def test_aggregate_failure_type_prefers_transient_over_system(self) -> None:
exceptions: List[Exception] = [
AirbyteTracedException("a"),
AirbyteTracedException("b", failure_type=FailureType.transient_error),
ValueError("c"),
]
assert (
AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.transient_error
)

def test_aggregate_failure_type_defaults_to_system_error(self) -> None:
exceptions: List[Exception] = [
ValueError("a"),
AirbyteTracedException("b"),
]
assert AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.system_error

def test_count_failure_types_counts_traced_and_plain_exceptions(self) -> None:
exceptions: List[Exception] = [
AirbyteTracedException("a", failure_type=FailureType.transient_error),
AirbyteTracedException("b", failure_type=FailureType.transient_error),
AirbyteTracedException("c"),
ValueError("d"),
]
counts = AsyncJobOrchestrator._count_failure_types(exceptions)
assert counts == {
FailureType.transient_error: 2,
FailureType.system_error: 2,
}

@mock.patch(sleep_mock_target)
def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget(
self, mock_sleep: MagicMock
Expand Down
Loading