Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3141,11 +3141,26 @@ export class ContainerRuntime
eventName: "DuplicateBatch",
details: {
batchId: batchStart.batchId,
batchIdExplicit: batchStart.batchId !== undefined,
clientId: batchStart.clientId,
batchStartCsn: batchStart.batchStartCsn,
size: inboundResult.length,
duplicateBatchSequenceNumber: result.otherSequenceNumber,
// Identifying info for the ORIGINAL occurrence of this batch, so we can
// disambiguate the duplicate's source (e.g. resubmit vs fresh submit, same
// vs different wire clientId). Undefined fields indicate the original was
// loaded from a summary snapshot rather than seen at runtime.
otherClientId: result.otherBatchInfo?.clientId,
otherBatchStartCsn: result.otherBatchInfo?.batchStartCsn,
otherBatchIdExplicit: result.otherBatchInfo?.batchIdExplicit,
otherFromSnapshot: result.otherBatchInfo === undefined,
...extractSafePropertiesFromMessage(batchStart.keyMessage),
// For grouped batches, `keyMessage` is one of the sub-messages produced by
// `OpGroupingManager.ungroupOp`, which overwrites `clientSequenceNumber`
// with a synthetic counter (1, 2, 3, ...). Override with the real outer
// envelope's clientSequenceNumber so downstream telemetry doesn't get a
// misleading "fake csn" value.
messageClientSequenceNumber: batchStart.batchStartCsn,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: This override silently changes the meaning of an already-emitted telemetry property. ...extractSafePropertiesFromMessage(batchStart.keyMessage) already emits messageClientSequenceNumber (the post-ungroupOp synthetic counter); the subsequent messageClientSequenceNumber: batchStart.batchStartCsn replaces that value with the outer-envelope CSN.

The PR description confirms the field is in production dashboards today: "Across 50 production events over 7 days, every DuplicateBatch showed ... MessageClientSequenceNumber == 1". Any KQL query, alert, or dashboard tile keyed off DuplicateBatch.MessageClientSequenceNumber == 1 will stop matching once this ships.

Before merging, either:

(a) notify the on-call/telemetry owner (cc kian-thompson — historical reviewer on telemetry shape, see #22454) that the field's semantics are changing, and call out the back-compat impact in the PR description; or

(b) emit the corrected value under a distinct field name (e.g. outerBatchClientSequenceNumber) and leave the inherited spread value untouched.

(a) is preferred — the existing value is known-misleading and is exactly what this PR is trying to stop surfacing as authoritative — but it requires the explicit communication step. Also worth extending the inline comment at the override site to flag that this overrides an inherited field rather than adding a new one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: The rollout/opt-in path for the new diagnostic fields is undocumented. PR #27351 (daesunp, merged 2026-05-20T00:20:38Z — ~2h before this PR was created) reverted #27262 and shifted Fluid.ContainerRuntime.enableBatchIdTracking back to explicit opt-in. The DuplicateBatch event only fires inside the duplicate-detection path that is now opt-in.

The PR description's success criterion — "the next event will tell us which it is" — implicitly assumes the tenant population emitting these events is opted in. The gated path is reachable via Fluid.Container.enableOfflineFull or Fluid.ContainerRuntime.enableBatchIdTracking, so this is a documentation gap, not a code defect. But the diagnostic value of the new fields is contingent on opt-in coverage; reviewers and the on-call investigator need that context.

Suggested fix: add a one-paragraph note to the PR description stating which enablement gate will be set for the affected tenant population and roughly when — either the existing flags will be turned on for those tenants, or this PR is staged ahead of a separate re-enablement.

},
},
error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,43 @@ import type { ITelemetryContext } from "@fluidframework/runtime-definitions/inte
import { getEffectiveBatchId } from "./batchManager.js";
import type { BatchStartInfo } from "./remoteMessageProcessor.js";

/**
* Identifying info for a previously-recorded batch that we can include in DuplicateBatch telemetry
* to help diagnose where the duplicate came from.
*
* @remarks `batchIdExplicit` distinguishes the two main duplicate-source scenarios:
* - `true`: the batchId was stamped on the wire as explicit metadata, indicating a resubmit
* (PendingStateManager.replayPendingStates).
* - `false`: the batchId was derived from the wire `clientId` and `batchStartCsn`, indicating a
* fresh, non-resubmit batch.
*/
export interface RecordedBatchInfo {
/**
* Wire clientId on the message that started the batch (NOT necessarily the `originalClientId`
* encoded in the batchId for resubmits).
*/
readonly clientId: string;
/**
* Wire client sequence number at the start of the batch.
*/
readonly batchStartCsn: number;
/**
* True if the batchId came from explicit metadata on the wire (i.e. a resubmit),
* false if it was derived from clientId + batchStartCsn (i.e. a fresh submit).
*/
readonly batchIdExplicit: boolean;
}

interface RecordedBatch {
readonly batchId: string;
/**
* Identifying info for the batch as observed at runtime.
* `undefined` if the batch was loaded from a summary snapshot (where only the
* `[seqNum, batchId]` pair is persisted).
*/
readonly info: RecordedBatchInfo | undefined;
}

/**
* Detects duplicate batches that can arise from the "parallel fork" scenario:
* Container 1 is serialized, and Containers 2 and 3 are rehydrated from that state.
Expand All @@ -24,31 +61,42 @@ export class DuplicateBatchDetector {
private readonly seqNumByBatchId = new Map<string, number>();

/**
* We map from sequenceNumber to batchId to find which ones we can stop tracking as MSN advances
* Map from sequenceNumber to the recorded batch info. Used to clear out old entries as MSN
* advances, and to report identifying info about the original occurrence when a duplicate
* is detected.
*/
private readonly batchIdsBySeqNum = new Map<number, string>();
private readonly batchesBySeqNum = new Map<number, RecordedBatch>();

/**
* Initialize from snapshot data if provided - otherwise initialize empty
*/
constructor(batchIdsFromSnapshot: [number, string][] | undefined) {
if (batchIdsFromSnapshot) {
for (const [seqNum, batchId] of batchIdsFromSnapshot) {
this.batchIdsBySeqNum.set(seqNum, batchId);
// Entries loaded from a snapshot don't carry the original clientId/csn/explicit-bit;
// we record them with `info: undefined` so duplicate telemetry can indicate that.
this.batchesBySeqNum.set(seqNum, { batchId, info: undefined });
this.seqNumByBatchId.set(batchId, seqNum);
}
}
}

/**
* Records this batch's batchId, and checks if it's a duplicate of a batch we've already seen.
* If it's a duplicate, also return the sequence number of the other batch for logging.
* If it's a duplicate, also return the sequence number of the other batch (and identifying info,
* if the other batch was seen at runtime rather than loaded from snapshot) for logging.
*
* @remarks We also use the minimumSequenceNumber to clear out old batchIds that are no longer at risk for duplicates.
*/
public processInboundBatch(
batchStart: BatchStartInfo,
): { duplicate: true; otherSequenceNumber: number } | { duplicate: false } {
):
| {
duplicate: true;
otherSequenceNumber: number;
otherBatchInfo: RecordedBatchInfo | undefined;
}
| { duplicate: false } {
const { sequenceNumber, minimumSequenceNumber } = batchStart.keyMessage;

// Glance at this batch's MSN. Any batchIds we're tracking with a lower sequence number are now safe to forget.
Expand All @@ -64,21 +112,36 @@ export class DuplicateBatchDetector {
// O(1) duplicate check + get otherSequenceNumber in one lookup
const otherSequenceNumber = this.seqNumByBatchId.get(batchId);
if (otherSequenceNumber !== undefined) {
const other = this.batchesBySeqNum.get(otherSequenceNumber);
assert(
this.batchIdsBySeqNum.get(otherSequenceNumber) === batchId,
other?.batchId === batchId,
0xce0 /* batchIdToSeqNum and seqNumToBatchId should be in sync for duplicate */,
);
return { duplicate: true, otherSequenceNumber };
return {
duplicate: true,
otherSequenceNumber,
otherBatchInfo: other.info,
};
Comment on lines 114 to +124
}

// Now we know it's not a duplicate, so add it to the tracked batchIds and return.
assert(
!this.batchIdsBySeqNum.has(sequenceNumber),
!this.batchesBySeqNum.has(sequenceNumber),
0xce1 /* seqNumToBatchId and batchIdToSeqNum should be in sync */,
);

// Add new batch
this.batchIdsBySeqNum.set(sequenceNumber, batchId);
// Add new batch. Record identifying info so we can report it if a future duplicate matches us.
const info: RecordedBatchInfo | undefined =
batchStart.clientId === undefined || batchStart.batchStartCsn === undefined
? undefined
: {
clientId: batchStart.clientId,
batchStartCsn: batchStart.batchStartCsn,
// True iff the wire carried explicit batchId metadata (resubmit path).
// False indicates the batchId was derived from clientId + batchStartCsn (fresh submit).
batchIdExplicit: batchStart.batchId !== undefined,
};
this.batchesBySeqNum.set(sequenceNumber, { batchId, info });
this.seqNumByBatchId.set(batchId, sequenceNumber);

return { duplicate: false };
Expand All @@ -89,10 +152,10 @@ export class DuplicateBatchDetector {
* since the batch start has been processed by all clients, and local batches are deduped and the forked client would close.
*/
private clearOldBatchIds(msn: number): void {
for (const [sequenceNumber, batchId] of this.batchIdsBySeqNum) {
for (const [sequenceNumber, recorded] of this.batchesBySeqNum) {
if (sequenceNumber < msn) {
this.batchIdsBySeqNum.delete(sequenceNumber);
this.seqNumByBatchId.delete(batchId);
this.batchesBySeqNum.delete(sequenceNumber);
this.seqNumByBatchId.delete(recorded.batchId);
} else {
break;
}
Expand All @@ -108,16 +171,18 @@ export class DuplicateBatchDetector {
public getRecentBatchInfoForSummary(
telemetryContext?: ITelemetryContext,
): [number, string][] | undefined {
if (this.batchIdsBySeqNum.size === 0) {
if (this.batchesBySeqNum.size === 0) {
return undefined;
}

telemetryContext?.set(
"fluid_DuplicateBatchDetector_",
"recentBatchCount",
this.batchIdsBySeqNum.size,
this.batchesBySeqNum.size,
);

return [...this.batchIdsBySeqNum.entries()];
return [...this.batchesBySeqNum.entries()].map(
([seqNum, recorded]) => [seqNum, recorded.batchId] as [number, string],
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,23 @@ import type { BatchStartInfo } from "../../opLifecycle/index.js";

/**
* Helper function to create (enough of) a BatchStartInfo for testing.
* Inbound batch may have explicit batchId, or merely clientId and batchStartCsn and batchId must be computed - allow either as inputs
* Inbound batch may have explicit batchId, or merely clientId and batchStartCsn and batchId must be computed - allow either as inputs.
* Also allows specifying all three together (explicit batchId on a batch that also carries
* clientId + csn at the BatchStartInfo level) to model resubmits.
*/
function makeBatch({
sequenceNumber,
minimumSequenceNumber,
batchId,
clientId,
batchStartCsn,
}: { sequenceNumber: number; minimumSequenceNumber: number } & (
| { batchId: string; clientId?: undefined; batchStartCsn?: undefined }
| { batchId?: undefined; clientId: string; batchStartCsn: number }
)): BatchStartInfo {
}: {
sequenceNumber: number;
minimumSequenceNumber: number;
batchId?: string;
clientId?: string;
batchStartCsn?: number;
}): BatchStartInfo {
Comment on lines 21 to +33
return {
keyMessage: {
sequenceNumber,
Expand All @@ -43,7 +48,7 @@ type PatchedDuplicateBatchDetector = Patch<
DuplicateBatchDetector,
{
seqNumByBatchId: Map<string, number>;
batchIdsBySeqNum: Map<number, string>;
batchesBySeqNum: Map<number, { batchId: string }>;
}
>;

Expand All @@ -62,8 +67,8 @@ describe("DuplicateBatchDetector", () => {
afterEach("validation", () => {
assert.deepEqual(
[...detector.seqNumByBatchId.keys()].sort(),
[...detector.batchIdsBySeqNum.values()].sort(),
"Invariant: seqNumByBatchId and batchIdsBySeqNum should be in sync",
[...detector.batchesBySeqNum.values()].map((r) => r.batchId).sort(),
"Invariant: seqNumByBatchId and batchesBySeqNum should be in sync",
);
});

Expand Down Expand Up @@ -117,7 +122,13 @@ describe("DuplicateBatchDetector", () => {
});
detector.processInboundBatch(inboundBatch1);
const result = detector.processInboundBatch(inboundBatch2);
assert.deepEqual(result, { duplicate: true, otherSequenceNumber: 1 });
// Both inbound batches were created via the test helper with no clientId/batchStartCsn,
// so the recorded info is `undefined`.
assert.deepEqual(result, {
duplicate: true,
otherSequenceNumber: 1,
otherBatchInfo: undefined,
});
});

it("Matching inbound batches, one with batchId one without, are duplicates", () => {
Expand All @@ -134,7 +145,13 @@ describe("DuplicateBatchDetector", () => {
});
detector.processInboundBatch(inboundBatch1);
const result = detector.processInboundBatch(inboundBatch2);
assert.deepEqual(result, { duplicate: true, otherSequenceNumber: 1 });
// The original (inboundBatch1) was recorded without clientId/batchStartCsn (test helper),
// so its recorded info is `undefined`.
assert.deepEqual(result, {
duplicate: true,
otherSequenceNumber: 1,
otherBatchInfo: undefined,
});
});

it("Matching inbound batches are duplicates (roundtrip through summary)", () => {
Expand All @@ -156,7 +173,69 @@ describe("DuplicateBatchDetector", () => {
batchId: "batch1",
});
const result = detector2.processInboundBatch(inboundBatch2);
assert.deepEqual(result, { duplicate: true, otherSequenceNumber: 1 });
// The original came from a snapshot, so otherBatchInfo is `undefined` (and otherFromSnapshot
// would be `true` in the corresponding telemetry).
assert.deepEqual(result, {
duplicate: true,
otherSequenceNumber: 1,
otherBatchInfo: undefined,
});
});

it("Reports runtime info when duplicate's original was seen at runtime", () => {
// Original (fresh submit: no explicit batchId, just clientId + csn)
const original = makeBatch({
sequenceNumber: seqNum++, // 1
minimumSequenceNumber: 0,
clientId: "clientA",
batchStartCsn: 7,
});
// Duplicate arrives with the same effective batchId (clientA_[7]) but explicit metadata
const duplicate = makeBatch({
sequenceNumber: seqNum++, // 2
minimumSequenceNumber: 0,
batchId: "clientA_[7]",
});
detector.processInboundBatch(original);
const result = detector.processInboundBatch(duplicate);
assert.deepEqual(result, {
duplicate: true,
otherSequenceNumber: 1,
otherBatchInfo: {
clientId: "clientA",
batchStartCsn: 7,
batchIdExplicit: false,
},
});
});

it("Detects explicit batchId metadata as such (resubmit-style original)", () => {
// Original was a resubmit: wire stamped explicit batchId metadata.
const original = makeBatch({
sequenceNumber: seqNum++, // 1
minimumSequenceNumber: 0,
clientId: "resubmitter",
batchStartCsn: 42,
batchId: "originalSubmitter_[7]",
});
// Duplicate is the original submitter coming back fresh (no explicit batchId).
const duplicate = makeBatch({
sequenceNumber: seqNum++, // 2
minimumSequenceNumber: 0,
clientId: "originalSubmitter",
batchStartCsn: 7,
});
detector.processInboundBatch(original);
const result = detector.processInboundBatch(duplicate);
assert.deepEqual(result, {
duplicate: true,
otherSequenceNumber: 1,
otherBatchInfo: {
clientId: "resubmitter",
batchStartCsn: 42,
batchIdExplicit: true,
},
});
});

it("should clear old batchIds that are no longer at risk for duplicates", () => {
Expand Down Expand Up @@ -196,7 +275,7 @@ describe("DuplicateBatchDetector", () => {
describe("getStateForSummary", () => {
it("If empty, return undefined", () => {
assert.equal(
detector.batchIdsBySeqNum.size,
detector.batchesBySeqNum.size,
0,
"PRECONDITION: Expected detector to start empty",
);
Expand Down
Loading