-
Notifications
You must be signed in to change notification settings - Fork 32
feat(concurrent-source): Add stream concurrency group for serial processing #868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
devin-ai-integration
wants to merge
1
commit into
main
from
devin/1765820926-stream-concurrency-exclusivity
Closed
feat(concurrent-source): Add stream concurrency group for serial processing #868
devin-ai-integration
wants to merge
1
commit into
main
from
devin/1765820926-stream-concurrency-exclusivity
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…essing Add a concurrency_group property to AbstractStream that allows streams to specify they should be processed serially with respect to other streams in the same group. This is useful for APIs that limit concurrent requests to certain endpoints, such as scroll-based pagination APIs that only allow one active scroll at a time. Key changes: - Add concurrency_group property to AbstractStream (returns None by default) - Add concurrency_group parameter to DefaultStream - Add concurrency_group field to declarative component schema - Update ConcurrentReadProcessor to track active concurrency groups - Defer streams when their concurrency group is already active - Re-queue deferred streams when their group becomes inactive - Add unit tests for concurrency group behavior This feature enables connectors like Intercom to mark streams using scroll endpoints (companies, company_segments) with the same concurrency group so they are processed serially instead of concurrently. Resolves: airbytehq/oncall#8346 Co-Authored-By: unknown <>
Contributor
Author
Original prompt from API User |
Contributor
Author
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1765820926-stream-concurrency-exclusivity#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1765820926-stream-concurrency-exclusivityHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
Adds a
concurrency_groupproperty to streams that allows specifying which streams should be processed serially with respect to each other. Streams with the same non-None concurrency group will wait for other streams in that group to complete before starting.This feature addresses the need for connectors like Intercom where certain API endpoints (e.g., scroll-based pagination) only allow one active request at a time. By assigning streams like
companiesandcompany_segmentsto the same concurrency group (e.g.,"scroll"), they will be processed serially instead of concurrently.Key changes:
concurrency_groupproperty toAbstractStream(returnsNoneby default)concurrency_groupparameter toDefaultStreamconcurrency_groupfield to declarative component schemaConcurrentReadProcessorto track active groups and defer streams when their group is busyUsage in declarative manifests:
Resolves: airbytehq/oncall#8346
Review & Testing Checklist for Human
_on_stream_is_done, confirm that_is_concurrency_group_activecorrectly checks both partition generation AND running partitions before deactivating a groupcompaniesandcompany_segmentsstreams and verify they sync serially without API errorsconcurrency_groupcontinue to work (streams should run concurrently as before)Recommended test plan:
Notes