Skip to content

Conversation

@devin-ai-integration
Copy link
Contributor

Summary

Adds a concurrency_group property to streams that allows specifying which streams should be processed serially with respect to each other. Streams with the same non-None concurrency group will wait for other streams in that group to complete before starting.

This feature addresses the need for connectors like Intercom where certain API endpoints (e.g., scroll-based pagination) only allow one active request at a time. By assigning streams like companies and company_segments to the same concurrency group (e.g., "scroll"), they will be processed serially instead of concurrently.

Key changes:

  • Added concurrency_group property to AbstractStream (returns None by default)
  • Added concurrency_group parameter to DefaultStream
  • Added concurrency_group field to declarative component schema
  • Modified ConcurrentReadProcessor to track active groups and defer streams when their group is busy

Usage in declarative manifests:

streams:
  - type: DeclarativeStream
    name: companies
    concurrency_group: "scroll"
    # ...
  - type: DeclarativeStream
    name: company_segments
    concurrency_group: "scroll"
    # ...

Resolves: airbytehq/oncall#8346

Review & Testing Checklist for Human

  • Verify concurrency group deactivation logic: In _on_stream_is_done, confirm that _is_concurrency_group_active correctly checks both partition generation AND running partitions before deactivating a group
  • Verify deferred stream re-queuing: Ensure streams are properly re-queued when their group becomes inactive, and that they don't get "stuck" in the deferred list
  • Test with actual Intercom connector: Apply this change to the Intercom connector's companies and company_segments streams and verify they sync serially without API errors
  • Verify backward compatibility: Confirm existing connectors without concurrency_group continue to work (streams should run concurrently as before)

Recommended test plan:

  1. Create a test connector with 3 streams: 2 with the same concurrency group and 1 without
  2. Run a sync and verify the grouped streams run serially while the ungrouped stream runs concurrently
  3. Check logs for the debug messages about activating/deactivating concurrency groups

Notes

…essing

Add a concurrency_group property to AbstractStream that allows streams
to specify they should be processed serially with respect to other
streams in the same group. This is useful for APIs that limit concurrent
requests to certain endpoints, such as scroll-based pagination APIs that
only allow one active scroll at a time.

Key changes:
- Add concurrency_group property to AbstractStream (returns None by default)
- Add concurrency_group parameter to DefaultStream
- Add concurrency_group field to declarative component schema
- Update ConcurrentReadProcessor to track active concurrency groups
- Defer streams when their concurrency group is already active
- Re-queue deferred streams when their group becomes inactive
- Add unit tests for concurrency group behavior

This feature enables connectors like Intercom to mark streams using
scroll endpoints (companies, company_segments) with the same concurrency
group so they are processed serially instead of concurrently.

Resolves: airbytehq/oncall#8346
Co-Authored-By: unknown <>
@devin-ai-integration
Copy link
Contributor Author

Original prompt from API User
Comment from @agarctfi: /ai-fix Can you create a feature in the CDK that will allow us to specify exclusivity of concurrency for streams?

The code would have to live in the concurrent source, where the stream starts in the CDK.

The option should be defined on the stream itself.



Context: For Intercom, we need to be able to exclude streams from concurrency to prevent issues. The companies and company segments streams both use the scroll endpoints, but the API limits the amount of concurrent scrolls to one. 

So to fix this properly, we would want to add this flag to these streams to they are processed serially instead of concurrently. 
> 🔧 **AI Fix session starting...** Reproducing issue and developing a comprehensive fix. [View playbook](https://github.com/airbytehq/oncall/blob/main/prompts/playbooks/issue_fix.md)
>
> ❌ **Failed to create Devin AI session.**
>
> - ⚙️ [Link to job logs.](https://github.com/airbytehq/oncall/actions/runs/20240959583)
>
> **Please check the logs for details.**\n\nIMPORTANT: The user will expect a response posted back to the PR. You should post exactly one comment back to the respective issue PR. If the user requested a code change or PR, your comment should contain a link to the PR. Assume the user has no access to your session or conversation thread unless/until you respond back to them.\n\nIssue #8346 by @Srooney3: Source Intercom: Companies stream limited to 200 records\n\nIssue URL: https://github.com/airbytehq/oncall/issues/8346\n\nPlease use playbook macro: !issue_fix

PLAYBOOK_md:
# `/ai-fix` Slash Command Playbook

You are AI Fix Devin, an expert at reproducing and fixing Airbyte-related issues.

## Context

You are working on the issue linked above in context. You will also need to pull issue comments for full context.

## Rule: Immediate Issue Comment After PR Creation

**MANDATORY REQUIREMENT**: If you create a PR during an AI Fix workflow, your **first action** after creating the PR must be to create a comment on the ori... (6540 chars truncated...)

@devin-ai-integration
Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1765820926-stream-concurrency-exclusivity#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1765820926-stream-concurrency-exclusivity

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

@github-actions github-actions bot added the enhancement New feature or request label Dec 15, 2025
@github-actions
Copy link

PyTest Results (Fast)

3 825 tests  +3 403   3 813 ✅ +3 401   6m 36s ⏱️ + 4m 58s
    1 suites ±    0      12 💤 +    3 
    1 files   ±    0       0 ❌  -     1 

Results for commit a004c51. ± Comparison against base commit 452acd1.

@github-actions
Copy link

PyTest Results (Full)

3 828 tests  +5   3 816 ✅ +7   10m 58s ⏱️ +33s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌  - 2 

Results for commit a004c51. ± Comparison against base commit 452acd1.

@agarctfi agarctfi closed this Dec 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants