Add Atlas Stream Processing commands#1898
Draft
nickpoindexter wants to merge 8 commits into
Draft
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## v2.x #1898 +/- ##
============================================
- Coverage 87.72% 86.37% -1.36%
- Complexity 3328 3472 +144
============================================
Files 454 467 +13
Lines 6656 6927 +271
============================================
+ Hits 5839 5983 +144
- Misses 817 944 +127
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds first-class driver support for Atlas Stream Processing (ASP), implementing the ASP driver spec. Users currently have to drop down to
Manager::executeCommandagainst a workspace endpoint; this PR adds a dedicated client/handles/operations layer matching the patterns used byClient/Database/Collection.What's new
Public API (
src/)MongoDB\StreamProcessingClient— workspace-scoped client distinct fromMongoDB\Client. Validates the workspace URI (atlas-stream-*.a.query.mongodb.net), enforces TLS, defaultsauthSource=admin, and rejectsmongodb+srv://.MongoDB\StreamProcessors—create()/get()/getInfo()for managing processors in a workspace.MongoDB\StreamProcessor—start()/stop()/drop()/stats()/getStreamProcessorSamples()for a named processor.MongoDB\Model\StreamProcessorInfo— typed accessor over thegetStreamProcessorresponse. Fields the spec marksOptional(e.g.id,pipelineVersion) returnnullwhen the server omits them.MongoDB\Model\StreamProcessorSamples— result ofgetStreamProcessorSamples(), exposescursorId,documents,isExhausted().Operation layer (
src/Operation/StreamProcessing/)One Operation class per wire command:
createStreamProcessor,startStreamProcessor,stopStreamProcessor,dropStreamProcessor,getStreamProcessor,getStreamProcessorStats,startSampleStreamProcessor,getMoreSampleStreamProcessor. Retryable reads (getStreamProcessor,getStreamProcessorStats) useexecuteReadCommand; everything else usesexecuteCommandper the spec's retryability matrix. All commands target theadmindatabase.Notable spec / server alignment
startAfter(inStartStreamProcessorOptions) is intentionally NOT serialized — the spec marks it RESERVED for future use and explicitly forbids drivers from sending it.GetStreamProcessorunwraps{ok: 1, result: {…}}when the server wraps the response.GetMoreSampleStreamProcessoraccepts bothnextBatch(spec) andmessages(current dev server).StreamProcessorInfo::getId()/getPipelineVersion()returnnullwhen the server omits the field.getStreamProcessorSamples()is a single-command dispatch: absent/zerocursorId→startSampleStreamProcessor; non-zerocursorId→getMoreSampleStreamProcessor. Callers stop when the returnedcursorId === 0.tenantID/projectIdare not surfaced in the user-facing API.Test plan
composer check:cs— cleancomposer check:rector— clean (0 rule violations on new files)composer check:psalm— clean against updated baseline. The 11 new baseline entries are allMixedAssignmentfrom reading untypedarray $options/array $info, matching the pattern already established for everyother Operation class.
tests/StreamProcessing/cover constructor-option type validation for every Operation, URI/TLS validation for the client, and model getters. Run withvendor/bin/phpunit --filter StreamProcessing.StreamProcessingFunctionalTestexercises the fullcreate → start → stats → sample → stop → droplifecycle. Self-skips unlessMONGODB_STREAM_PROCESSING_URIis set to a real workspace endpoint —needs an Evergreen variant or a workspace-aware tester to actually exercise.
Out of scope (per spec)
Deferred to a follow-up:
modifyStreamProcessor,listStreamProcessors,listStreamConnections,processStreamProcessor,listWorkspaceDefaults. Users who need these today can still send them viaManager::executeCommand.