Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 70 additions & 10 deletions packages/core/src/__tests__/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ describe('#sendEvents', () => {
.mockReturnValue('2001-01-01T00:00:00.000Z');
});

async function sendAnEventPer(writeKey: string, toUrl: string) {
async function sendAnEventPer(
writeKey: string,
toUrl: string,
retryCount?: number
) {
const mockResponse = Promise.resolve('MANOS');
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
Expand Down Expand Up @@ -60,9 +64,19 @@ describe('#sendEvents', () => {
writeKey: writeKey,
url: toUrl,
events: [event],
retryCount,
});

expect(fetch).toHaveBeenCalledWith(toUrl, {
return event;
}

it('sends an event', async () => {
const toSegmentBatchApi = 'https://api.segment.io/v1.b';
const writeKey = 'SEGMENT_KEY';

const event = await sendAnEventPer(writeKey, toSegmentBatchApi);

expect(fetch).toHaveBeenCalledWith(toSegmentBatchApi, {
method: 'POST',
keepalive: true,
body: JSON.stringify({
Expand All @@ -72,21 +86,67 @@ describe('#sendEvents', () => {
}),
headers: {
'Content-Type': 'application/json; charset=utf-8',
'X-Retry-Count': '0',
},
});
}

it('sends an event', async () => {
const toSegmentBatchApi = 'https://api.segment.io/v1.b';
const writeKey = 'SEGMENT_KEY';

await sendAnEventPer(writeKey, toSegmentBatchApi);
});

it('sends an event to proxy', async () => {
const toProxyUrl = 'https://myprox.io/b';
const writeKey = 'SEGMENT_KEY';

await sendAnEventPer(writeKey, toProxyUrl);
const event = await sendAnEventPer(writeKey, toProxyUrl);

expect(fetch).toHaveBeenCalledWith(toProxyUrl, {
method: 'POST',
body: JSON.stringify({
batch: [event],
sentAt: '2001-01-01T00:00:00.000Z',
writeKey: 'SEGMENT_KEY',
}),
headers: {
'Content-Type': 'application/json; charset=utf-8',
'X-Retry-Count': '0',
},
keepalive: true,
});
});

it('sends X-Retry-Count header with default value 0', async () => {
const url = 'https://api.segment.io/v1.b';
await sendAnEventPer('KEY', url);

expect(fetch).toHaveBeenCalledWith(
url,
expect.objectContaining({
headers: expect.objectContaining({
'X-Retry-Count': '0',
}),
})
);
});

it('sends X-Retry-Count header with provided retry count', async () => {
const url = 'https://api.segment.io/v1.b';
await sendAnEventPer('KEY', url, 5);

expect(fetch).toHaveBeenCalledWith(
url,
expect.objectContaining({
headers: expect.objectContaining({
'X-Retry-Count': '5',
}),
})
);
});

it('sends X-Retry-Count as string format', async () => {
const url = 'https://api.segment.io/v1.b';
await sendAnEventPer('KEY', url, 42);

const callArgs = (fetch as jest.Mock).mock.calls[0];
const headers = callArgs[1].headers;
expect(typeof headers['X-Retry-Count']).toBe('string');
expect(headers['X-Retry-Count']).toBe('42');
});
});
90 changes: 87 additions & 3 deletions packages/core/src/__tests__/internal/fetchSettings.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SegmentClient } from '../../analytics';
import { settingsCDN } from '../../constants';
import { settingsCDN, defaultHttpConfig } from '../../constants';
import { SEGMENT_DESTINATION_KEY } from '../../plugins/SegmentDestination';
import { getMockLogger, MockSegmentStore } from '../../test-helpers';
import { getURL } from '../../util';
Expand Down Expand Up @@ -436,6 +436,80 @@ describe('internal #getSettings', () => {
});
});

describe('CDN integrations validation', () => {
it('treats null integrations as empty (no integrations configured)', async () => {
(fetch as jest.MockedFunction<typeof fetch>).mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ integrations: null }),
status: 200,
} as Response);

const client = new SegmentClient(clientArgs);
await client.fetchSettings();

expect(setSettingsSpy).toHaveBeenCalledWith({});
});

it('treats missing integrations as empty (no integrations configured)', async () => {
(fetch as jest.MockedFunction<typeof fetch>).mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({}),
status: 200,
} as Response);

const client = new SegmentClient(clientArgs);
await client.fetchSettings();

expect(setSettingsSpy).toHaveBeenCalledWith({});
});

it('falls back to defaults when CDN returns integrations as an array', async () => {
(fetch as jest.MockedFunction<typeof fetch>).mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ integrations: ['invalid'] }),
status: 200,
} as Response);

const client = new SegmentClient(clientArgs);
await client.fetchSettings();

expect(setSettingsSpy).toHaveBeenCalledWith(
defaultIntegrationSettings.integrations
);
});

it('falls back to defaults when CDN returns integrations as a string', async () => {
(fetch as jest.MockedFunction<typeof fetch>).mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ integrations: 'invalid' }),
status: 200,
} as Response);

const client = new SegmentClient(clientArgs);
await client.fetchSettings();

expect(setSettingsSpy).toHaveBeenCalledWith(
defaultIntegrationSettings.integrations
);
});

it('stores empty integrations when CDN returns null integrations and no defaults', async () => {
(fetch as jest.MockedFunction<typeof fetch>).mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ integrations: null }),
status: 200,
} as Response);

const client = new SegmentClient({
...clientArgs,
config: { ...clientArgs.config, defaultSettings: undefined },
});
await client.fetchSettings();

expect(setSettingsSpy).toHaveBeenCalledWith({});
});
});

describe('httpConfig extraction', () => {
it('extracts httpConfig from CDN response and merges with defaults', async () => {
const serverHttpConfig = {
Expand Down Expand Up @@ -483,7 +557,7 @@ describe('internal #getSettings', () => {
expect(result?.backoffConfig?.jitterPercent).toBe(20);
});

it('returns undefined httpConfig when CDN has no httpConfig', async () => {
it('returns defaultHttpConfig when CDN has no httpConfig', async () => {
(fetch as jest.MockedFunction<typeof fetch>).mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve(defaultIntegrationSettings),
Expand All @@ -496,7 +570,17 @@ describe('internal #getSettings', () => {
});

await anotherClient.fetchSettings();
expect(anotherClient.getHttpConfig()).toBeUndefined();
const result = anotherClient.getHttpConfig();
expect(result).toBeDefined();
expect(result?.rateLimitConfig?.enabled).toBe(
defaultHttpConfig.rateLimitConfig!.enabled
);
expect(result?.backoffConfig?.enabled).toBe(
defaultHttpConfig.backoffConfig!.enabled
);
expect(result?.backoffConfig?.statusCodeOverrides).toEqual(
defaultHttpConfig.backoffConfig!.statusCodeOverrides
);
});

it('returns undefined httpConfig when fetch fails', async () => {
Expand Down
55 changes: 32 additions & 23 deletions packages/core/src/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ export class SegmentClient {
}

/**
* Retrieves the server-side httpConfig from CDN settings.
* Returns undefined if the CDN did not provide httpConfig (retry features disabled).
* Retrieves the merged httpConfig (defaultHttpConfig ← CDN ← config overrides).
* Returns undefined only if settings have not yet been fetched.
*/
getHttpConfig(): HttpConfig | undefined {
return this.httpConfig;
Expand Down Expand Up @@ -406,8 +406,13 @@ export class SegmentClient {
const resJson: SegmentAPISettings =
(await res.json()) as SegmentAPISettings;

// A valid 200 with missing integrations means "no integrations configured"
// Only fall back to defaults for truly malformed types (non-object or array)
if (resJson.integrations == null) {
resJson.integrations = {};
}

if (
resJson.integrations == null ||
typeof resJson.integrations !== 'object' ||
Array.isArray(resJson.integrations)
) {
Expand All @@ -429,25 +434,27 @@ export class SegmentClient {
resJson.middlewareSettings?.routingRules ?? []
);

// Extract httpConfig from CDN, merge with defaults, validate and clamp
if (resJson.httpConfig) {
const mergedRateLimit = resJson.httpConfig.rateLimitConfig
? {
...defaultHttpConfig.rateLimitConfig!,
...resJson.httpConfig.rateLimitConfig,
}
: defaultHttpConfig.rateLimitConfig!;

const mergedBackoff = resJson.httpConfig.backoffConfig
? {
...defaultHttpConfig.backoffConfig!,
...resJson.httpConfig.backoffConfig,
statusCodeOverrides: {
...defaultHttpConfig.backoffConfig!.statusCodeOverrides,
...resJson.httpConfig.backoffConfig.statusCodeOverrides,
},
}
: defaultHttpConfig.backoffConfig!;
// Merge httpConfig: defaultHttpConfig ← CDN ← config overrides
{
const cdnConfig = resJson.httpConfig ?? {};
const clientConfig = this.config.httpConfig ?? {};

const mergedRateLimit = {
...defaultHttpConfig.rateLimitConfig!,
...(cdnConfig.rateLimitConfig ?? {}),
...(clientConfig.rateLimitConfig ?? {}),
};

const mergedBackoff = {
...defaultHttpConfig.backoffConfig!,
...(cdnConfig.backoffConfig ?? {}),
...(clientConfig.backoffConfig ?? {}),
statusCodeOverrides: {
...defaultHttpConfig.backoffConfig!.statusCodeOverrides,
...(cdnConfig.backoffConfig?.statusCodeOverrides ?? {}),
...(clientConfig.backoffConfig?.statusCodeOverrides ?? {}),
},
};

const validatedRateLimit = validateRateLimitConfig(
mergedRateLimit,
Expand All @@ -461,7 +468,9 @@ export class SegmentClient {
validatedRateLimit
),
};
this.logger.info('Loaded httpConfig from CDN settings.');
if (resJson.httpConfig) {
this.logger.info('Loaded httpConfig from CDN settings.');
}
}

this.logger.info('Received settings from Segment succesfully.');
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ export const uploadEvents = async ({
writeKey,
url,
events,
retryCount = 0,
}: {
writeKey: string;
url: string;
events: SegmentEvent[];
retryCount?: number;
}) => {
return await fetch(url, {
method: 'POST',
Expand All @@ -19,6 +21,7 @@ export const uploadEvents = async ({
}),
headers: {
'Content-Type': 'application/json; charset=utf-8',
'X-Retry-Count': retryCount.toString(),
},
});
};
19 changes: 7 additions & 12 deletions packages/core/src/backoff/RetryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ export type RetryResult = 'rate_limited' | 'backed_off' | 'limit_exceeded';
* - BACKING_OFF: transient error; exponential backoff until wait expires
*
* Designed for concurrent batch uploads (Promise.all). Multiple batches can
* fail simultaneously with different errors or partially succeed. The retry
* strategy (eager/lazy) controls how concurrent wait times are consolidated.
* fail simultaneously with different errors or partially succeed. When
* concurrent wait times conflict, the shorter wait is used (eager strategy)
* to retry sooner.
*
* Uses a global retry counter since batches are re-chunked from the event
* queue on each flush and have no stable identities.
Expand All @@ -40,20 +41,17 @@ export class RetryManager {
private rateLimitConfig?: RateLimitConfig;
private backoffConfig?: BackoffConfig;
private logger?: LoggerType;
private retryStrategy: 'eager' | 'lazy';

constructor(
storeId: string,
persistor: Persistor | undefined,
rateLimitConfig?: RateLimitConfig,
backoffConfig?: BackoffConfig,
logger?: LoggerType,
retryStrategy: 'eager' | 'lazy' = 'lazy'
logger?: LoggerType
) {
this.rateLimitConfig = rateLimitConfig;
this.backoffConfig = backoffConfig;
this.logger = logger;
this.retryStrategy = retryStrategy;

try {
this.store = createStore<RetryStateData>(
Expand Down Expand Up @@ -293,14 +291,11 @@ export class RetryManager {
}

/**
* Consolidate two wait-until times based on retry strategy.
* - 'lazy': take the longer wait (most conservative, default)
* - 'eager': take the shorter wait (retry sooner)
* Consolidate two wait-until times using the eager strategy:
* take the shorter wait to retry sooner.
*/
private applyRetryStrategy(existing: number, incoming: number): number {
return this.retryStrategy === 'eager'
? Math.min(existing, incoming)
: Math.max(existing, incoming);
return Math.min(existing, incoming);
}

private async transitionToReady(): Promise<void> {
Expand Down
Loading