Skip to content

Add Atlas Stream Processing commands#1898

Draft
nickpoindexter wants to merge 8 commits into
mongodb:v2.xfrom
nickpoindexter:add_asp_2.x
Draft

Add Atlas Stream Processing commands#1898
nickpoindexter wants to merge 8 commits into
mongodb:v2.xfrom
nickpoindexter:add_asp_2.x

Conversation

@nickpoindexter
Copy link
Copy Markdown

Summary

Adds first-class driver support for Atlas Stream Processing (ASP), implementing the ASP driver spec. Users currently have to drop down to
Manager::executeCommand against a workspace endpoint; this PR adds a dedicated client/handles/operations layer matching the patterns used by Client / Database / Collection.

What's new

Public API (src/)

  • MongoDB\StreamProcessingClient — workspace-scoped client distinct from MongoDB\Client. Validates the workspace URI (atlas-stream-*.a.query.mongodb.net), enforces TLS, defaults authSource=admin, and rejects
    mongodb+srv://.
  • MongoDB\StreamProcessorscreate() / get() / getInfo() for managing processors in a workspace.
  • MongoDB\StreamProcessorstart() / stop() / drop() / stats() / getStreamProcessorSamples() for a named processor.
  • MongoDB\Model\StreamProcessorInfo — typed accessor over the getStreamProcessor response. Fields the spec marks Optional (e.g. id, pipelineVersion) return null when the server omits them.
  • MongoDB\Model\StreamProcessorSamples — result of getStreamProcessorSamples(), exposes cursorId, documents, isExhausted().

Operation layer (src/Operation/StreamProcessing/)
One Operation class per wire command: createStreamProcessor, startStreamProcessor, stopStreamProcessor, dropStreamProcessor, getStreamProcessor, getStreamProcessorStats, startSampleStreamProcessor,
getMoreSampleStreamProcessor. Retryable reads (getStreamProcessor, getStreamProcessorStats) use executeReadCommand; everything else uses executeCommand per the spec's retryability matrix. All commands target the
admin database.

Notable spec / server alignment

  • startAfter (in StartStreamProcessorOptions) is intentionally NOT serialized — the spec marks it RESERVED for future use and explicitly forbids drivers from sending it.
  • Dev-server response shape deviations are accommodated, matching the workarounds in the Python POC:
    • GetStreamProcessor unwraps {ok: 1, result: {…}} when the server wraps the response.
    • GetMoreSampleStreamProcessor accepts both nextBatch (spec) and messages (current dev server).
    • StreamProcessorInfo::getId() / getPipelineVersion() return null when the server omits the field.
  • getStreamProcessorSamples() is a single-command dispatch: absent/zero cursorIdstartSampleStreamProcessor; non-zero cursorIdgetMoreSampleStreamProcessor. Callers stop when the returned cursorId === 0.
  • Internal-only fields like tenantID / projectId are not surfaced in the user-facing API.

Test plan

  • composer check:cs — clean
  • composer check:rector — clean (0 rule violations on new files)
  • composer check:psalm — clean against updated baseline. The 11 new baseline entries are all MixedAssignment from reading untyped array $options / array $info, matching the pattern already established for every
    other Operation class.
  • Unit tests under tests/StreamProcessing/ cover constructor-option type validation for every Operation, URI/TLS validation for the client, and model getters. Run with vendor/bin/phpunit --filter StreamProcessing.
  • Functional smoke test StreamProcessingFunctionalTest exercises the full create → start → stats → sample → stop → drop lifecycle. Self-skips unless MONGODB_STREAM_PROCESSING_URI is set to a real workspace endpoint —
    needs an Evergreen variant or a workspace-aware tester to actually exercise.

Out of scope (per spec)

Deferred to a follow-up: modifyStreamProcessor, listStreamProcessors, listStreamConnections, processStreamProcessor, listWorkspaceDefaults. Users who need these today can still send them via Manager::executeCommand.

Comment thread src/Model/StreamProcessorInfo.php Fixed
Comment thread src/Model/StreamProcessorInfo.php Fixed
Comment thread src/Operation/StreamProcessing/CreateStreamProcessor.php Fixed
Comment thread src/Operation/StreamProcessing/GetMoreSampleStreamProcessor.php Fixed
Comment thread src/Operation/StreamProcessing/GetMoreSampleStreamProcessor.php Fixed
Comment thread src/Operation/StreamProcessing/StartStreamProcessor.php Fixed
Comment thread src/Operation/StreamProcessing/StartStreamProcessor.php Fixed
Comment thread src/StreamProcessingClient.php Fixed
Comment thread src/StreamProcessingClient.php Fixed
Comment thread src/StreamProcessingClient.php Fixed
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 14, 2026

Codecov Report

❌ Patch coverage is 53.87454% with 125 lines in your changes missing coverage. Please review.
✅ Project coverage is 86.37%. Comparing base (2628950) to head (ac657c9).
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/StreamProcessor.php 15.62% 27 Missing ⚠️
.../StreamProcessing/GetMoreSampleStreamProcessor.php 14.28% 18 Missing ⚠️
...peration/StreamProcessing/StartStreamProcessor.php 59.09% 18 Missing ⚠️
...eration/StreamProcessing/CreateStreamProcessor.php 39.13% 14 Missing ⚠️
...ation/StreamProcessing/GetStreamProcessorStats.php 20.00% 12 Missing ⚠️
...on/StreamProcessing/StartSampleStreamProcessor.php 20.00% 12 Missing ⚠️
.../Operation/StreamProcessing/GetStreamProcessor.php 0.00% 11 Missing ⚠️
src/StreamProcessors.php 60.00% 6 Missing ⚠️
...Operation/StreamProcessing/DropStreamProcessor.php 0.00% 3 Missing ⚠️
...Operation/StreamProcessing/StopStreamProcessor.php 0.00% 3 Missing ⚠️
... and 1 more
Additional details and impacted files
@@             Coverage Diff              @@
##               v2.x    #1898      +/-   ##
============================================
- Coverage     87.72%   86.37%   -1.36%     
- Complexity     3328     3472     +144     
============================================
  Files           454      467      +13     
  Lines          6656     6927     +271     
============================================
+ Hits           5839     5983     +144     
- Misses          817      944     +127     
Flag Coverage Δ
6.0-replica_set 85.28% <53.87%> (-1.28%) ⬇️
6.0-server 81.49% <53.87%> (-1.13%) ⬇️
6.0-sharded_cluster ?
8.0-replica_set 86.27% <53.87%> (-1.32%) ⬇️
8.0-server 82.20% <53.87%> (-1.16%) ⬇️
8.0-sharded_cluster ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants