Skip to content

Commit c7e10f2

Browse files
committed
fix(core): Replace global interval with trace-specific interval based flushing (#19686)
As discussed yesterday with @cleptric, we don't want a global interval-based flushing but the interval shall be set per trace bucket. This PR makes that change.
1 parent cd57783 commit c7e10f2

File tree

2 files changed

+47
-50
lines changed

2 files changed

+47
-50
lines changed

packages/core/src/tracing/spans/spanBuffer.ts

Lines changed: 45 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ const MAX_SPANS_PER_ENVELOPE = 1000;
1616

1717
const MAX_TRACE_WEIGHT_IN_BYTES = 5_000_000;
1818

19+
interface TraceBucket {
20+
spans: Set<SerializedStreamedSpanWithSegmentSpan>;
21+
size: number;
22+
timeout: ReturnType<typeof setTimeout>;
23+
}
24+
1925
export interface SpanBufferOptions {
2026
/**
2127
* Max spans per trace before auto-flush
@@ -26,7 +32,8 @@ export interface SpanBufferOptions {
2632
maxSpanLimit?: number;
2733

2834
/**
29-
* Flush interval in ms
35+
* Per-trace flush timeout in ms. A timeout is started when a trace bucket is first created
36+
* and fires flush() for that specific trace when it expires.
3037
* Must be greater than 0.
3138
*
3239
* @default 5_000
@@ -44,7 +51,7 @@ export interface SpanBufferOptions {
4451

4552
/**
4653
* A buffer for serialized streamed span JSON objects that flushes them to Sentry in Span v2 envelopes.
47-
* Handles interval-based flushing, size thresholds, and graceful shutdown.
54+
* Handles per-trace timeout-based flushing, size thresholds, and graceful shutdown.
4855
* Also handles computation of the Dynamic Sampling Context (DSC) for the trace, if it wasn't yet
4956
* frozen onto the segment span.
5057
*
@@ -54,19 +61,16 @@ export interface SpanBufferOptions {
5461
* still active and modifyable when child spans are added to the buffer.
5562
*/
5663
export class SpanBuffer {
57-
/* Bucket spans by their trace id */
58-
private _traceMap: Map<string, Set<SerializedStreamedSpanWithSegmentSpan>>;
59-
private _traceWeightMap: Map<string, number>;
64+
/* Bucket spans by their trace id, along with accumulated size and a per-trace flush timeout */
65+
private _traceBuckets: Map<string, TraceBucket>;
6066

61-
private _flushIntervalId: ReturnType<typeof setInterval> | null;
6267
private _client: Client;
6368
private _maxSpanLimit: number;
6469
private _flushInterval: number;
6570
private _maxTraceWeight: number;
6671

6772
public constructor(client: Client, options?: SpanBufferOptions) {
68-
this._traceMap = new Map();
69-
this._traceWeightMap = new Map();
73+
this._traceBuckets = new Map();
7074
this._client = client;
7175

7276
const { maxSpanLimit, flushInterval, maxTraceWeightInBytes } = options ?? {};
@@ -79,21 +83,17 @@ export class SpanBuffer {
7983
this._maxTraceWeight =
8084
maxTraceWeightInBytes && maxTraceWeightInBytes > 0 ? maxTraceWeightInBytes : MAX_TRACE_WEIGHT_IN_BYTES;
8185

82-
this._flushIntervalId = null;
83-
this._debounceFlushInterval();
84-
8586
this._client.on('flush', () => {
8687
this.drain();
8788
});
8889

8990
this._client.on('close', () => {
9091
// No need to drain the buffer here as `Client.close()` internally already calls `Client.flush()`
9192
// which already invokes the `flush` hook and thus drains the buffer.
92-
if (this._flushIntervalId) {
93-
clearInterval(this._flushIntervalId);
94-
}
95-
this._traceMap.clear();
96-
this._traceWeightMap.clear();
93+
this._traceBuckets.forEach(bucket => {
94+
clearTimeout(bucket.timeout);
95+
});
96+
this._traceBuckets.clear();
9797
});
9898
}
9999

@@ -102,57 +102,62 @@ export class SpanBuffer {
102102
*/
103103
public add(spanJSON: SerializedStreamedSpanWithSegmentSpan): void {
104104
const traceId = spanJSON.trace_id;
105-
let traceBucket = this._traceMap.get(traceId);
106-
if (traceBucket) {
107-
traceBucket.add(spanJSON);
108-
} else {
109-
traceBucket = new Set([spanJSON]);
110-
this._traceMap.set(traceId, traceBucket);
105+
let bucket = this._traceBuckets.get(traceId);
106+
107+
if (!bucket) {
108+
bucket = {
109+
spans: new Set(),
110+
size: 0,
111+
timeout: safeUnref(
112+
setTimeout(() => {
113+
this.flush(traceId);
114+
}, this._flushInterval),
115+
),
116+
};
117+
this._traceBuckets.set(traceId, bucket);
111118
}
112119

113-
const newWeight = (this._traceWeightMap.get(traceId) ?? 0) + estimateSerializedSpanSizeInBytes(spanJSON);
114-
this._traceWeightMap.set(traceId, newWeight);
120+
bucket.spans.add(spanJSON);
121+
bucket.size += estimateSerializedSpanSizeInBytes(spanJSON);
115122

116-
if (traceBucket.size >= this._maxSpanLimit || newWeight >= this._maxTraceWeight) {
123+
if (bucket.spans.size >= this._maxSpanLimit || bucket.size >= this._maxTraceWeight) {
117124
this.flush(traceId);
118-
this._debounceFlushInterval();
119125
}
120126
}
121127

122128
/**
123129
* Drain and flush all buffered traces.
124130
*/
125131
public drain(): void {
126-
if (!this._traceMap.size) {
132+
if (!this._traceBuckets.size) {
127133
return;
128134
}
129135

130-
DEBUG_BUILD && debug.log(`Flushing span tree map with ${this._traceMap.size} traces`);
136+
DEBUG_BUILD && debug.log(`Flushing span tree map with ${this._traceBuckets.size} traces`);
131137

132-
this._traceMap.forEach((_, traceId) => {
138+
this._traceBuckets.forEach((_, traceId) => {
133139
this.flush(traceId);
134140
});
135-
this._debounceFlushInterval();
136141
}
137142

138143
/**
139144
* Flush spans of a specific trace.
140-
* In contrast to {@link SpanBuffer.flush}, this method does not flush all traces, but only the one with the given traceId.
145+
* In contrast to {@link SpanBuffer.drain}, this method does not flush all traces, but only the one with the given traceId.
141146
*/
142147
public flush(traceId: string): void {
143-
const traceBucket = this._traceMap.get(traceId);
144-
if (!traceBucket) {
148+
const bucket = this._traceBuckets.get(traceId);
149+
if (!bucket) {
145150
return;
146151
}
147152

148-
if (!traceBucket.size) {
149-
// we should never get here, given we always add a span when we create a new bucket
153+
if (!bucket.spans.size) {
154+
// we should never get here, given we always add a span when we create a new bucket
150155
// and delete the bucket once we flush out the trace
151156
this._removeTrace(traceId);
152157
return;
153158
}
154159

155-
const spans = Array.from(traceBucket);
160+
const spans = Array.from(bucket.spans);
156161

157162
const segmentSpan = spans[0]?._segmentSpan;
158163
if (!segmentSpan) {
@@ -181,18 +186,10 @@ export class SpanBuffer {
181186
}
182187

183188
private _removeTrace(traceId: string): void {
184-
this._traceMap.delete(traceId);
185-
this._traceWeightMap.delete(traceId);
186-
}
187-
188-
private _debounceFlushInterval(): void {
189-
if (this._flushIntervalId) {
190-
clearInterval(this._flushIntervalId);
189+
const bucket = this._traceBuckets.get(traceId);
190+
if (bucket) {
191+
clearTimeout(bucket.timeout);
191192
}
192-
this._flushIntervalId = safeUnref(
193-
setInterval(() => {
194-
this.drain();
195-
}, this._flushInterval),
196-
);
193+
this._traceBuckets.delete(traceId);
197194
}
198195
}

packages/core/test/lib/tracing/spans/spanBuffer.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ describe('SpanBuffer', () => {
7070
expect(sentEnvelopes[1]?.[1]?.[0]?.[1]?.items[0]?.trace_id).toBe('trace456');
7171
});
7272

73-
it('drains on interval', () => {
73+
it('flushes trace after per-trace timeout', () => {
7474
const buffer = new SpanBuffer(client, { flushInterval: 1000 });
7575

7676
const segmentSpan1 = new SentrySpan({ name: 'segment', sampled: true });
@@ -106,7 +106,7 @@ describe('SpanBuffer', () => {
106106

107107
expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1);
108108

109-
// since the buffer is now empty, it should not send anything anymore
109+
// the trace bucket was removed after flushing, so no timeout remains and no further sends occur
110110
vi.advanceTimersByTime(1000);
111111
expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1);
112112
});

0 commit comments

Comments
 (0)