Skip to content

fix(async-job): propagate underlying failure_type through orchestrator wrappers#1002

Draft
devin-ai-integration[bot] wants to merge 1 commit intomainfrom
devin/1777296034-async-job-failure-type-propagation
Draft

fix(async-job): propagate underlying failure_type through orchestrator wrappers#1002
devin-ai-integration[bot] wants to merge 1 commit intomainfrom
devin/1777296034-async-job-failure-type-propagation

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Summary

AsyncJobOrchestrator currently hardcodes failure_type=FailureType.system_error on both wrappers it raises after a partition exhausts its retries:

  • the per-partition wrapper in _process_partitions_with_errors (Async job failed after exhausting all retry attempts.)
  • the outer wrapper at the end of create_and_get_completed_partitions (One or more async jobs failed after exhausting all retry attempts.)

This collapses every async-job failure to system_error, even when the underlying cause is a transient_error (rate limits / 429s) or a config_error (missing perms / 403s). Concrete impact observed in airbytehq/oncall#12043: 17 of 23 affected source-amazon-seller-partner connections surfaced as system_error in Sentry's airbyte-python-cdk-prod project even though the underlying cause was Amazon SP-API createReport 429 throttling — driving unnecessary on-call alerts and triage misdirection.

The original exception is also lost on the trace path: _keep_api_budget_with_failed_job wraps the source-side exception (which carries the correct failure_type), emits it as an AirbyteMessage, then creates a fake FAILED AsyncJob with no reference to it. The wrapper later builds a fresh AirbyteTracedException with system_error regardless of how the underlying job failed.

This PR keeps the intent of airbytehq/airbyte-python-cdk#961 (do not collapse async-job failures to config_error) while restoring correct propagation for transient_error and config_error cases:

  • AsyncJob gains an optional failure_exception slot with failure_exception() / set_failure_exception(...) accessors so a job can carry the originating AirbyteTracedException.
  • _keep_api_budget_with_failed_job now passes the traced exception through to _create_failed_job, which attaches it to the fake job — closing the leak on the start-failure path that oncall#12043 exercises.
  • A new AsyncJobOrchestrator._resolve_failure_type helper aggregates FailureType values from a collection of exceptions with priority config_error > system_error > transient_error, falling back to system_error when no typed exceptions are available (preserves today's safe default for jobs that transition to FAILED on the API side without a captured local exception).
  • Both wrappers use _resolve_failure_type instead of a hardcoded value.

PR #961's regression is preserved: _is_breaking_exception still short-circuits genuine config_error exceptions before they reach the wrapper, so the only config_error reaching the aggregation is one the source itself classified that way (e.g. expired SAS URL classified as transient_error at the http layer continues to surface as transient_error, not config_error).

For the oncall#12043 dataset this means roughly 14 connections will surface as transient_error and 3 as config_error instead of all 17 as system_error. Platform sync-retry behaviour is unchanged for transient_error and system_error (both retry); config_error continues to halt retries by design, which is the correct behaviour for genuine permission issues.

Review & Testing Checklist for Human

  • Confirm the priority order config_error > system_error > transient_error matches the platform's intent. Escalating to config_error when even one underlying failure is config-driven means the sync will not auto-retry — this is intentional but worth a sanity check.
  • Sanity-check that no consumer of AsyncJobOrchestrator relied on the wrapper always being system_error (e.g. alerting rules, downstream except filters keyed on failure_type).
  • Verify the per-partition wrapper change is acceptable as-is. After replace_job, only the most recent attempt's failure_exception is visible from partition.jobs, so a partition whose retries alternated between failure types will surface only the last attempt's type. This is strictly better than today's always-system_error, but a fuller fix would require tracking per-attempt history on AsyncPartition. Cross-partition aggregation in the outer wrapper still handles mixed types correctly.
  • Optional follow-up: consider having AsyncJobRepository implementations call job.set_failure_exception(...) from update_jobs_status when an API-reported FAILED job carries a structured reason (e.g. SP-API report processingStatus=FATAL). That would let the per-partition wrapper resolve correctly even when the failure is API-side rather than client-side.

Test plan

  • New unit tests in unit_tests/sources/declarative/async_job/test_job_orchestrator.py:
    • test_resolve_failure_type_priority_matrix — covers empty input, None entries, single-type lists, all priority pairs, and non-traced exceptions.
    • test_given_transient_errors_on_start_when_max_attempts_reached_then_raise_transient_error — end-to-end test replicating the oncall#12043 path (repeated transient start failures).
    • test_given_mixed_transient_and_system_errors_across_partitions_when_max_attempts_reached_then_raise_system_error — outer-wrapper aggregation across partitions.
  • Existing test_given_exception_when_start_job_and_skip_this_exception continues to assert system_error because its raised AirbyteTracedExceptions do not specify a failure_type (default is system_error), so the new aggregation correctly resolves to system_error.

poetry run pytest unit_tests/sources/declarative/async_job/, poetry run ruff check, poetry run ruff format --check, and poetry run mypy airbyte_cdk/sources/declarative/async_job/ all pass locally.

Notes

  • Backwards-compatible additive change: AsyncJob.failure_exception() / set_failure_exception(...) are new methods; no public signatures are removed.
  • Behaviour change is intentional: syncs that previously surfaced as system_error will surface their underlying type. This affects Sentry alert volumes (airbyte-python-cdk-prod is fingerprinted off this exception).
  • Related background: airbytehq/oncall#12043, airbytehq/airbyte-python-cdk#961, airbytehq/airbyte-internal-issues#16221.

Link to Devin session: https://app.devin.ai/sessions/127eb6374a1f4a998407587828709f60

…r wrappers

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1777296034-async-job-failure-type-propagation#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1777296034-async-job-failure-type-propagation

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link
Copy Markdown

PyTest Results (Fast)

4 037 tests  +3   4 026 ✅ +3   6m 45s ⏱️ - 1m 17s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 01b2011. ± Comparison against base commit 60bae81.

@github-actions
Copy link
Copy Markdown

PyTest Results (Full)

4 040 tests  +3   4 028 ✅ +3   11m 2s ⏱️ +5s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 01b2011. ± Comparison against base commit 60bae81.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

0 participants