fix(async-job): propagate underlying failure_type through orchestrator wrappers#1002
Draft
devin-ai-integration[bot] wants to merge 1 commit intomainfrom
Draft
Conversation
…r wrappers Co-Authored-By: bot_apk <apk@cognition.ai>
Contributor
Author
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1777296034-async-job-failure-type-propagation#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1777296034-async-job-failure-type-propagationPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
AsyncJobOrchestratorcurrently hardcodesfailure_type=FailureType.system_erroron both wrappers it raises after a partition exhausts its retries:_process_partitions_with_errors(Async job failed after exhausting all retry attempts.)create_and_get_completed_partitions(One or more async jobs failed after exhausting all retry attempts.)This collapses every async-job failure to
system_error, even when the underlying cause is atransient_error(rate limits / 429s) or aconfig_error(missing perms / 403s). Concrete impact observed in airbytehq/oncall#12043: 17 of 23 affectedsource-amazon-seller-partnerconnections surfaced assystem_errorin Sentry'sairbyte-python-cdk-prodproject even though the underlying cause was Amazon SP-APIcreateReport429 throttling — driving unnecessary on-call alerts and triage misdirection.The original exception is also lost on the trace path:
_keep_api_budget_with_failed_jobwraps the source-side exception (which carries the correctfailure_type), emits it as anAirbyteMessage, then creates a fake FAILEDAsyncJobwith no reference to it. The wrapper later builds a freshAirbyteTracedExceptionwithsystem_errorregardless of how the underlying job failed.This PR keeps the intent of airbytehq/airbyte-python-cdk#961 (do not collapse async-job failures to
config_error) while restoring correct propagation fortransient_errorandconfig_errorcases:AsyncJobgains an optionalfailure_exceptionslot withfailure_exception()/set_failure_exception(...)accessors so a job can carry the originatingAirbyteTracedException._keep_api_budget_with_failed_jobnow passes the traced exception through to_create_failed_job, which attaches it to the fake job — closing the leak on the start-failure path that oncall#12043 exercises.AsyncJobOrchestrator._resolve_failure_typehelper aggregatesFailureTypevalues from a collection of exceptions with priorityconfig_error>system_error>transient_error, falling back tosystem_errorwhen no typed exceptions are available (preserves today's safe default for jobs that transition to FAILED on the API side without a captured local exception)._resolve_failure_typeinstead of a hardcoded value.PR #961's regression is preserved:
_is_breaking_exceptionstill short-circuits genuineconfig_errorexceptions before they reach the wrapper, so the onlyconfig_errorreaching the aggregation is one the source itself classified that way (e.g. expired SAS URL classified astransient_errorat the http layer continues to surface astransient_error, notconfig_error).For the oncall#12043 dataset this means roughly 14 connections will surface as
transient_errorand 3 asconfig_errorinstead of all 17 assystem_error. Platform sync-retry behaviour is unchanged fortransient_errorandsystem_error(both retry);config_errorcontinues to halt retries by design, which is the correct behaviour for genuine permission issues.Review & Testing Checklist for Human
config_error>system_error>transient_errormatches the platform's intent. Escalating toconfig_errorwhen even one underlying failure is config-driven means the sync will not auto-retry — this is intentional but worth a sanity check.AsyncJobOrchestratorrelied on the wrapper always beingsystem_error(e.g. alerting rules, downstreamexceptfilters keyed onfailure_type).replace_job, only the most recent attempt'sfailure_exceptionis visible frompartition.jobs, so a partition whose retries alternated between failure types will surface only the last attempt's type. This is strictly better than today's always-system_error, but a fuller fix would require tracking per-attempt history onAsyncPartition. Cross-partition aggregation in the outer wrapper still handles mixed types correctly.AsyncJobRepositoryimplementations calljob.set_failure_exception(...)fromupdate_jobs_statuswhen an API-reported FAILED job carries a structured reason (e.g. SP-API reportprocessingStatus=FATAL). That would let the per-partition wrapper resolve correctly even when the failure is API-side rather than client-side.Test plan
unit_tests/sources/declarative/async_job/test_job_orchestrator.py:test_resolve_failure_type_priority_matrix— covers empty input,Noneentries, single-type lists, all priority pairs, and non-traced exceptions.test_given_transient_errors_on_start_when_max_attempts_reached_then_raise_transient_error— end-to-end test replicating the oncall#12043 path (repeated transientstartfailures).test_given_mixed_transient_and_system_errors_across_partitions_when_max_attempts_reached_then_raise_system_error— outer-wrapper aggregation across partitions.test_given_exception_when_start_job_and_skip_this_exceptioncontinues to assertsystem_errorbecause its raisedAirbyteTracedExceptions do not specify afailure_type(default issystem_error), so the new aggregation correctly resolves tosystem_error.poetry run pytest unit_tests/sources/declarative/async_job/,poetry run ruff check,poetry run ruff format --check, andpoetry run mypy airbyte_cdk/sources/declarative/async_job/all pass locally.Notes
AsyncJob.failure_exception()/set_failure_exception(...)are new methods; no public signatures are removed.system_errorwill surface their underlying type. This affects Sentry alert volumes (airbyte-python-cdk-prodis fingerprinted off this exception).Link to Devin session: https://app.devin.ai/sessions/127eb6374a1f4a998407587828709f60