Skip to content

Commit 5ea8d4e

Browse files
committed
feat(core): Add SpanBuffer implementation (#19204)
This PR adds a simple span buffer implementation to be used for buffering streamed spans. Behaviour: - buckets incoming spans by `traceId`, as we must not mix up spans of different traces in one envelope - flushes the entire buffer every 5s by default - flushes the specific trace bucket if the max span limit (1000) is reached. Relay accepts at max. 1000 spans per envelope - computes the DSC when flushing the first span of a trace. This is the latest time we can do it as once we flushed we have to freeze the DSC for Dynamic Sampling consistency - debounces the flush interval whenever we flush - flushes the entire buffer if `Sentry.flush()` is called - shuts down the interval-based flushing when `Sentry.close()` is called - [implicit] Client report generation for dropped envelopes is handled in the transport Methods: - `add` accepts a new span to be enqueued into the buffer - `drain` flushes the entire buffer - `flush(traceId)` flushes a specific traceId bucket. This can be used by e.g. the browser span streaming implementation to flush out the trace of a segment span directly once it ends. Options: - `maxSpanLimit` - allows to configure a 0 < maxSpanLimit < 1000 custom span limit. Useful for testing but we could also expose this to users if we see a need - `flushInterval`- allows to configure a >0 flush interval Limitations/edge cases: - No maximum limit of concurrently buffered traces. I'd tend to accept this for now and see where this leads us in terms of memory pressure but at the end of the day, the interval based flushing, in combination with our promise buffer _should_ avoid an ever-growing map of trace buckets. Happy to change this if reviewers have strong opinions or I'm missing something important! - There's no priority based scheduling relative to other telemetry items. Just like with our other log and metric buffers. - since `Map` is [insertion order preserving](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Map#description), we apply a FIFO strategy when`drain`ing the trace buckets. This is in line with our [develop spec](https://develop.sentry.dev/sdk/telemetry/telemetry-processor/backend-telemetry-processor/#:~:text=The%20span%20buffer,in%20the%20buffer.) for the telemetry processor but might lead to cases where new traces are dropped by the promise buffer if a lof of concurrently running traces are flushed. I think that's a fine trade off. ref #19119
1 parent 1ca46f9 commit 5ea8d4e

File tree

5 files changed

+441
-2
lines changed

5 files changed

+441
-2
lines changed

packages/core/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ export type {
180180
GoogleGenAIOptions,
181181
GoogleGenAIIstrumentedMethod,
182182
} from './tracing/google-genai/types';
183+
184+
export { SpanBuffer } from './tracing/spans/spanBuffer';
185+
183186
export type { FeatureFlag } from './utils/featureFlags';
184187

185188
export {

packages/core/src/tracing/dynamicSamplingContext.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ export function getDynamicSamplingContextFromSpan(span: Span): Readonly<Partial<
119119
const dsc = getDynamicSamplingContextFromClient(span.spanContext().traceId, client);
120120

121121
// We don't want to have a transaction name in the DSC if the source is "url" because URLs might contain PII
122-
const source = rootSpanAttributes[SEMANTIC_ATTRIBUTE_SENTRY_SOURCE];
122+
// TODO(v11): Only read `SEMANTIC_ATTRIBUTE_SENTRY_SOURCE` again, once we renamed it to `sentry.span.source`
123+
const source = rootSpanAttributes[SEMANTIC_ATTRIBUTE_SENTRY_SOURCE] ?? rootSpanAttributes['sentry.span.source'];
123124

124125
// after JSON conversion, txn.name becomes jsonSpan.description
125126
const name = rootSpanJson.description;

packages/core/src/tracing/spans/captureSpan.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import {
2525
} from '../../utils/spanUtils';
2626
import { getCapturedScopesOnSpan } from '../utils';
2727

28-
type SerializedStreamedSpanWithSegmentSpan = SerializedStreamedSpan & {
28+
export type SerializedStreamedSpanWithSegmentSpan = SerializedStreamedSpan & {
2929
_segmentSpan: Span;
3030
};
3131

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
import type { Client } from '../../client';
2+
import { DEBUG_BUILD } from '../../debug-build';
3+
import type { SerializedStreamedSpan } from '../../types-hoist/span';
4+
import { debug } from '../../utils/debug-logger';
5+
import { safeUnref } from '../../utils/timer';
6+
import { getDynamicSamplingContextFromSpan } from '../dynamicSamplingContext';
7+
import type { SerializedStreamedSpanWithSegmentSpan } from './captureSpan';
8+
import { createStreamedSpanEnvelope } from './envelope';
9+
10+
/**
11+
* We must not send more than 1000 spans in one envelope.
12+
* Otherwise the envelope is dropped by Relay.
13+
*/
14+
const MAX_SPANS_PER_ENVELOPE = 1000;
15+
16+
export interface SpanBufferOptions {
17+
/**
18+
* Max spans per trace before auto-flush
19+
* Must not exceed 1000.
20+
*
21+
* @default 1_000
22+
*/
23+
maxSpanLimit?: number;
24+
25+
/**
26+
* Flush interval in ms
27+
* Must be greater than 0.
28+
*
29+
* @default 5_000
30+
*/
31+
flushInterval?: number;
32+
}
33+
34+
/**
35+
* A buffer for serialized streamed span JSON objects that flushes them to Sentry in Span v2 envelopes.
36+
* Handles interval-based flushing, size thresholds, and graceful shutdown.
37+
* Also handles computation of the Dynamic Sampling Context (DSC) for the trace, if it wasn't yet
38+
* frozen onto the segment span.
39+
*
40+
* For this, we need the reference to the segment span instance, from
41+
* which we compute the DSC. Doing this in the buffer ensures that we compute the DSC as late as possible,
42+
* allowing span name and data updates up to this point. Worth noting here that the segment span is likely
43+
* still active and modifyable when child spans are added to the buffer.
44+
*/
45+
export class SpanBuffer {
46+
/* Bucket spans by their trace id */
47+
private _traceMap: Map<string, Set<SerializedStreamedSpanWithSegmentSpan>>;
48+
49+
private _flushIntervalId: ReturnType<typeof setInterval> | null;
50+
private _client: Client;
51+
private _maxSpanLimit: number;
52+
private _flushInterval: number;
53+
54+
public constructor(client: Client, options?: SpanBufferOptions) {
55+
this._traceMap = new Map();
56+
this._client = client;
57+
58+
const { maxSpanLimit, flushInterval } = options ?? {};
59+
60+
this._maxSpanLimit =
61+
maxSpanLimit && maxSpanLimit > 0 && maxSpanLimit <= MAX_SPANS_PER_ENVELOPE
62+
? maxSpanLimit
63+
: MAX_SPANS_PER_ENVELOPE;
64+
this._flushInterval = flushInterval && flushInterval > 0 ? flushInterval : 5_000;
65+
66+
this._flushIntervalId = null;
67+
this._debounceFlushInterval();
68+
69+
this._client.on('flush', () => {
70+
this.drain();
71+
});
72+
73+
this._client.on('close', () => {
74+
// No need to drain the buffer here as `Client.close()` internally already calls `Client.flush()`
75+
// which already invokes the `flush` hook and thus drains the buffer.
76+
if (this._flushIntervalId) {
77+
clearInterval(this._flushIntervalId);
78+
}
79+
this._traceMap.clear();
80+
});
81+
}
82+
83+
/**
84+
* Add a span to the buffer.
85+
*/
86+
public add(spanJSON: SerializedStreamedSpanWithSegmentSpan): void {
87+
const traceId = spanJSON.trace_id;
88+
let traceBucket = this._traceMap.get(traceId);
89+
if (traceBucket) {
90+
traceBucket.add(spanJSON);
91+
} else {
92+
traceBucket = new Set([spanJSON]);
93+
this._traceMap.set(traceId, traceBucket);
94+
}
95+
96+
if (traceBucket.size >= this._maxSpanLimit) {
97+
this.flush(traceId);
98+
this._debounceFlushInterval();
99+
}
100+
}
101+
102+
/**
103+
* Drain and flush all buffered traces.
104+
*/
105+
public drain(): void {
106+
if (!this._traceMap.size) {
107+
return;
108+
}
109+
110+
DEBUG_BUILD && debug.log(`Flushing span tree map with ${this._traceMap.size} traces`);
111+
112+
this._traceMap.forEach((_, traceId) => {
113+
this.flush(traceId);
114+
});
115+
this._debounceFlushInterval();
116+
}
117+
118+
/**
119+
* Flush spans of a specific trace.
120+
* In contrast to {@link SpanBuffer.flush}, this method does not flush all traces, but only the one with the given traceId.
121+
*/
122+
public flush(traceId: string): void {
123+
const traceBucket = this._traceMap.get(traceId);
124+
if (!traceBucket) {
125+
return;
126+
}
127+
128+
if (!traceBucket.size) {
129+
// we should never get here, given we always add a span when we create a new bucket
130+
// and delete the bucket once we flush out the trace
131+
this._traceMap.delete(traceId);
132+
return;
133+
}
134+
135+
const spans = Array.from(traceBucket);
136+
137+
const segmentSpan = spans[0]?._segmentSpan;
138+
if (!segmentSpan) {
139+
DEBUG_BUILD && debug.warn('No segment span reference found on span JSON, cannot compute DSC');
140+
this._traceMap.delete(traceId);
141+
return;
142+
}
143+
144+
const dsc = getDynamicSamplingContextFromSpan(segmentSpan);
145+
146+
const cleanedSpans: SerializedStreamedSpan[] = spans.map(spanJSON => {
147+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
148+
const { _segmentSpan, ...cleanSpanJSON } = spanJSON;
149+
return cleanSpanJSON;
150+
});
151+
152+
const envelope = createStreamedSpanEnvelope(cleanedSpans, dsc, this._client);
153+
154+
DEBUG_BUILD && debug.log(`Sending span envelope for trace ${traceId} with ${cleanedSpans.length} spans`);
155+
156+
this._client.sendEnvelope(envelope).then(null, reason => {
157+
DEBUG_BUILD && debug.error('Error while sending streamed span envelope:', reason);
158+
});
159+
160+
this._traceMap.delete(traceId);
161+
}
162+
163+
private _debounceFlushInterval(): void {
164+
if (this._flushIntervalId) {
165+
clearInterval(this._flushIntervalId);
166+
}
167+
this._flushIntervalId = safeUnref(
168+
setInterval(() => {
169+
this.drain();
170+
}, this._flushInterval),
171+
);
172+
}
173+
}

0 commit comments

Comments
 (0)