Add stagingModeMaxBatchOps for staging mode batch control#26577
Add stagingModeMaxBatchOps for staging mode batch control#26577anthony-murphy-agent wants to merge 4 commits intomicrosoft:mainfrom
Conversation
… mode During staging mode, the runtime flushes ops into separate staged batches at every JS turn boundary. This means consumers like Word that want to accumulate ops across many turns into fewer, larger batches get fragmented results. Add a `stagingModeMaxBatchOps` option to `ContainerRuntimeOptionsInternal` that suppresses automatic (turn-based/async) flush scheduling during staging mode until the accumulated batch reaches the specified op count. Incoming ops still break the current batch regardless (they change the reference sequence number via direct flush() calls that bypass scheduleFlush()). Default: 1000 ops. This was chosen based on production telemetry analysis: - Copy-paste operations routinely produce batches of 1000+ ops (435K instances of >=1000 ops observed over 30 days via GroupLargeBatch telemetry) - All are non-reentrant single-turn batches from normal user actions - Receivers on modern Fluid versions (2.74+) handle these without jank (p99 processing duration ~5ms for typical batches) - 1000 matches the existing "large batch" telemetry threshold in OpGroupingManager - The threshold only affects cross-turn accumulation; single-turn operations (like paste) are unaffected since all ops are submitted synchronously Consumers can override: set to Infinity to only break batches on system events, or to a lower value for tighter batch control. Co-Authored-By: anthony-murphy <anthony.murphy@microsoft.com> Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The option controls when automatic flush scheduling kicks in, not a cap on batch size. A batch can contain far more ops if a single synchronous turn pushes many ops past the threshold (e.g. paste). The new name makes it clear that only automatic/scheduled flushes are affected, not direct flush calls from incoming ops, connection changes, or exit staging mode. Co-Authored-By: anthony-murphy <anthony.murphy@microsoft.com> Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Address PR feedback: - Field is now always `number` (not `number | undefined`), with the default applied at construction time - Add config override via Fluid.ContainerRuntime.StagingModeAutoFlushThreshold for runtime tuning without code changes - Config override takes precedence over runtime option, which takes precedence over the default (1000) Co-Authored-By: anthony-murphy <anthony.murphy@microsoft.com> Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds an internal runtime option to control when automatic flush scheduling resumes during staging mode, allowing cross-turn accumulation of staged ops up to a configurable op-count threshold.
Changes:
- Introduces
stagingModeMaxBatchOps?: numberonContainerRuntimeOptionsInternalwith a default of 1000 ops. - Updates
ContainerRuntime.scheduleFlush()to suppress turn-based/async flush scheduling in staging mode until the threshold is reached. - Adds staging-mode threshold tests and excludes the option from doc-schema-affecting runtime options.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| packages/runtime/container-runtime/src/containerRuntime.ts | Adds option + default constant and gates scheduleFlush() during staging mode based on accumulated op count. |
| packages/runtime/container-runtime/src/test/containerRuntime.spec.ts | Adds tests intended to validate staging-mode batching behavior under/at threshold and with incoming ops. |
| packages/runtime/container-runtime/src/containerCompatibility.ts | Omits stagingModeMaxBatchOps from doc-schema affecting runtime options. |
| // (deltaManager "op" handler, process(), connection changes, getPendingLocalState, | ||
| // exitStagingMode). Those all bypass scheduleFlush(), so they're unaffected by this check. | ||
| // Additionally, outbox.maybeFlushPartialBatch() (called on every submit) detects | ||
| // sequence number changes and forces a flush as a safety net. |
There was a problem hiding this comment.
This comment implies outbox.maybeFlushPartialBatch() will "force a flush" when sequence numbers change, but in the default configuration it throws instead (it only flushes when flushPartialBatches is enabled via Fluid.ContainerRuntime.DisableFlushBeforeProcess). Please clarify this to prevent readers from assuming a flush-based safety net always exists.
| // sequence number changes and forces a flush as a safety net. | |
| // sequence number changes. By default it throws if unexpected changes are detected; it only | |
| // forces a flush as a safety net when partial-batch flushing is enabled via | |
| // Fluid.ContainerRuntime.DisableFlushBeforeProcess. |
There was a problem hiding this comment.
Fixed in a977adc — the comment now reads:
By default it throws if unexpected changes are detected; it only forces a flush as a safety net when partial-batch flushing is enabled via Fluid.ContainerRuntime.DisableFlushBeforeProcess.
This matches the suggested wording.
| it("ops flush when threshold is reached", async () => { | ||
| runtimeWithThreshold = await createRuntimeWithThreshold(3); | ||
| stubChannelCollection(runtimeWithThreshold); | ||
| submittedOps.length = 0; | ||
|
|
||
| runtimeWithThreshold.enterStagingMode(); | ||
|
|
||
| // Submit 3 ops — exactly at the threshold | ||
| submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); | ||
| submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); | ||
| assert.equal(submittedOps.length, 0, "Under threshold, no flush yet"); | ||
|
|
||
| submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("op3")); | ||
| // The 3rd op should trigger scheduleFlush to fall through to normal scheduling | ||
| await Promise.resolve(); | ||
|
|
||
| assert.equal( | ||
| submittedOps.length, | ||
| 0, | ||
| "Ops should not be submitted while in staging mode (flushed into PSM only)", | ||
| ); |
There was a problem hiding this comment.
This test is named as if the threshold triggers a flush, but it only asserts that nothing was submitted to the wire (which is also true when no flush happens). To actually cover the new behavior, assert an observable effect of the flush at the threshold (e.g., that pending state now contains staged messages, or that the outbox is emptied / a staged batch was created).
There was a problem hiding this comment.
Fixed in a977adc — the test now asserts that outbox.mainBatchMessageCount drops to 0 after the threshold is reached, confirming the outbox was actually emptied (ops moved into PendingStateManager as a staged batch).
| it("incoming ops break the batch regardless of threshold", async () => { | ||
| runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); | ||
| stubChannelCollection(runtimeWithThreshold); | ||
| submittedOps.length = 0; | ||
|
|
||
| runtimeWithThreshold.enterStagingMode(); | ||
|
|
||
| submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); | ||
| submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); | ||
| assert.equal(submittedOps.length, 0, "No ops submitted yet"); | ||
|
|
||
| // Simulate an incoming op — bumps lastSequenceNumber and emits "op" | ||
| // The deltaManager "op" handler calls this.flush() directly, | ||
| // which moves pending ops from outbox into PSM (as staged batches). | ||
| const mockDeltaManager = mockContext.deltaManager as MockDeltaManager; | ||
| ++mockDeltaManager.lastSequenceNumber; | ||
| mockDeltaManager.emit("op", { | ||
| clientId: mockClientId, | ||
| sequenceNumber: mockDeltaManager.lastSequenceNumber, | ||
| clientSequenceNumber: 1, | ||
| type: MessageType.ClientJoin, | ||
| contents: "test content", | ||
| }); | ||
|
|
||
| // Ops are not submitted to the wire during staging mode — they're flushed to PSM | ||
| assert.equal( | ||
| submittedOps.length, | ||
| 0, | ||
| "Ops should not be submitted to wire while in staging mode", | ||
| ); |
There was a problem hiding this comment.
This test intends to verify that an incoming op breaks the current staged batch, but it currently only checks that nothing was submitted to the wire (which would be true even if the batch was not broken). Consider asserting that the incoming op caused local pending state to advance (e.g., staged ops were flushed into PendingStateManager as a staged batch, or the outbox count resets).
There was a problem hiding this comment.
Fixed in a977adc — the test now asserts that outbox.mainBatchMessageCount drops to 0 after the incoming op, confirming the outbox was flushed into PendingStateManager as a staged batch (not just that nothing was submitted to the wire).
|
|
||
| private scheduleFlush(): void { | ||
| // During staging mode with a batch threshold, suppress automatic flush scheduling. | ||
| // Only flush when the main batch exceeds the threshold. |
There was a problem hiding this comment.
The comment says the batch will flush when it "exceeds" the threshold, but the condition below resumes scheduling when mainBatchMessageCount is >= threshold (i.e., it flushes when the threshold is reached, not exceeded). Please adjust the wording to match the behavior to avoid confusion when reasoning about off-by-one semantics.
| // Only flush when the main batch exceeds the threshold. | |
| // Only flush when the main batch reaches or exceeds the threshold. |
There was a problem hiding this comment.
Fixed in a977adc — the comment now reads "reaches or exceeds the threshold" to accurately reflect the >= condition.
- Fix comment accuracy: scheduleFlush threshold triggers at "reaches or exceeds", not just "exceeds" - Fix maybeFlushPartialBatch comment: by default it throws on unexpected sequence number changes, only forces a flush when partial-batch flushing is enabled via Fluid.ContainerRuntime.DisableFlushBeforeProcess - Strengthen threshold and incoming-op tests: assert that the outbox is actually emptied (mainBatchMessageCount drops to 0) rather than only checking that nothing was submitted to the wire Co-Authored-By: anthony-murphy <anthony.murphy@microsoft.com> Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
The following pipeline checks are missing (expected for fork PRs). To trigger them, copy and paste these |
|
/azp run Build - api-markdown-documenter |
|
No pipelines are associated with this pull request. |
| /** | ||
| * Controls automatic batch flushing during staging mode. | ||
| * Normal turn-based/async flush scheduling is suppressed while in staging mode | ||
| * until the accumulated batch reaches this many ops, at which point the batch | ||
| * is flushed. Incoming ops always break the current batch regardless of this setting. | ||
| * | ||
| * Set to Infinity to only break batches on system events (incoming ops). | ||
| * | ||
| * @defaultValue 1000 | ||
| */ | ||
| readonly stagingModeAutoFlushThreshold?: number; |
There was a problem hiding this comment.
The PR title/description refers to this option as stagingModeMaxBatchOps, but the code introduces stagingModeAutoFlushThreshold. Please align the naming (either rename the option/tests or update the PR metadata) to avoid confusion for reviewers and future maintainers.
| * @defaultValue 1000 | ||
| */ | ||
| readonly stagingModeAutoFlushThreshold?: number; |
There was a problem hiding this comment.
ContainerRuntimeOptionsInternal is documented as a fully-required set of options, but stagingModeAutoFlushThreshold is introduced as optional. Consider making this field required (and populating it with the default when building internalRuntimeOptions), or update the interface documentation to reflect that internal options can now be optional.
Summary
stagingModeMaxBatchOpsoption toContainerRuntimeOptionsInternalthat controls automatic batch flushing during staging modeInfinityfor Word's use case)Default Justification (from production telemetry)
GroupLargeBatchtelemetry)OpGroupingManagerKey Design Points
scheduleFlush()— directflush()calls (incoming ops, connection changes, stashing, exit staging mode) bypass the threshold entirelyContainerRuntimeOptionsInternal(not public) since the design is not yet finalized for general useTest plan
🤖 Generated with Claude Code