Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions containers/api-proxy/token-persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,54 @@ function validateTokenUsageRecord(record) {
return true;
}

/**
* Build a token usage record for JSONL persistence.
*
* @param {object} normalized - Normalized usage object from normalizeUsage()
* @param {object} opts
* @param {string} opts.requestId
* @param {string} opts.provider
* @param {string|null} opts.model
* @param {string} opts.reqPath
* @param {number} opts.status
* @param {boolean} opts.streaming
* @param {number} opts.duration
* @param {number} opts.responseBytes
* @returns {object}
*/
function buildTokenUsageRecord(normalized, opts) {
const { requestId, provider, model, reqPath, status, streaming, duration, responseBytes } = opts;
return {
_schema: TOKEN_USAGE_SCHEMA,
timestamp: new Date().toISOString(),
request_id: requestId,
provider,
model: model || 'unknown',
path: reqPath,
status,
streaming,
input_tokens: normalized.input_tokens,
output_tokens: normalized.output_tokens,
cache_read_tokens: normalized.cache_read_tokens,
cache_write_tokens: normalized.cache_write_tokens,
duration_ms: duration,
response_bytes: responseBytes,
};
}

/**
* Increment token usage metrics when a metrics sink is available.
*
* @param {object|null} metricsRef
* @param {string} provider
* @param {object} normalized
*/
function incrementTokenMetrics(metricsRef, provider, normalized) {
if (!metricsRef) return;
metricsRef.increment('input_tokens_total', { provider }, normalized.input_tokens);
metricsRef.increment('output_tokens_total', { provider }, normalized.output_tokens);
}

/**
* Write a token usage record to the JSONL log file.
* Validates the record against the token-usage schema before writing.
Expand Down Expand Up @@ -181,6 +229,8 @@ module.exports = {
TOKEN_LOG_FILE,
TOKEN_USAGE_SCHEMA,
diag,
buildTokenUsageRecord,
incrementTokenMetrics,
validateTokenUsageRecord,
writeTokenUsage,
closeLogStream,
Expand Down
32 changes: 14 additions & 18 deletions containers/api-proxy/token-tracker-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ const {
extractUsageFromJson,
normalizeUsage,
} = require('./token-parsers');
const { writeTokenUsage, TOKEN_USAGE_SCHEMA, diag } = require('./token-persistence');
const {
writeTokenUsage,
buildTokenUsageRecord,
incrementTokenMetrics,
diag,
} = require('./token-persistence');

// Max response body to buffer for non-streaming usage extraction (5 MB).
// Responses larger than this are still forwarded but usage is not extracted.
Expand Down Expand Up @@ -238,28 +243,19 @@ function trackTokenUsage(proxyRes, opts) {
}

// Update metrics
if (metricsRef) {
metricsRef.increment('input_tokens_total', { provider }, normalized.input_tokens);
metricsRef.increment('output_tokens_total', { provider }, normalized.output_tokens);
}
incrementTokenMetrics(metricsRef, provider, normalized);

// Build log record
const record = {
_schema: TOKEN_USAGE_SCHEMA,
timestamp: new Date().toISOString(),
request_id: requestId,
const record = buildTokenUsageRecord(normalized, {
requestId,
provider,
model: model || 'unknown',
path: reqPath,
model,
reqPath,
status: proxyRes.statusCode,
streaming,
input_tokens: normalized.input_tokens,
output_tokens: normalized.output_tokens,
cache_read_tokens: normalized.cache_read_tokens,
cache_write_tokens: normalized.cache_write_tokens,
duration_ms: duration,
response_bytes: totalBytes,
};
duration,
responseBytes: totalBytes,
});

// Include billing/quota info when available (Copilot PRU tracking)
if (initiatorSent) record.x_initiator = initiatorSent;
Expand Down
32 changes: 14 additions & 18 deletions containers/api-proxy/token-tracker-ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@

const { logRequest } = require('./logging');
const { extractUsageFromSseLine, normalizeUsage } = require('./token-parsers');
const { writeTokenUsage, TOKEN_USAGE_SCHEMA, diag } = require('./token-persistence');
const {
writeTokenUsage,
buildTokenUsageRecord,
incrementTokenMetrics,
diag,
} = require('./token-persistence');

/**
* Parse WebSocket frames from a buffer (server→client direction, unmasked).
Expand Down Expand Up @@ -198,27 +203,18 @@ function trackWebSocketTokenUsage(upstreamSocket, opts) {
}
}

if (metricsRef) {
metricsRef.increment('input_tokens_total', { provider }, normalized.input_tokens);
metricsRef.increment('output_tokens_total', { provider }, normalized.output_tokens);
}
incrementTokenMetrics(metricsRef, provider, normalized);

const record = {
_schema: TOKEN_USAGE_SCHEMA,
timestamp: new Date().toISOString(),
request_id: requestId,
const record = buildTokenUsageRecord(normalized, {
requestId,
provider,
model: streamingModel || 'unknown',
path: reqPath,
model: streamingModel,
reqPath,
status: 101,
streaming: true,
input_tokens: normalized.input_tokens,
output_tokens: normalized.output_tokens,
cache_read_tokens: normalized.cache_read_tokens,
cache_write_tokens: normalized.cache_write_tokens,
duration_ms: duration,
response_bytes: totalBytes - headerBytes,
};
duration,
responseBytes: totalBytes - headerBytes,
});

writeTokenUsage(record);

Expand Down
43 changes: 43 additions & 0 deletions containers/api-proxy/token-tracker.schema.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const {
writeTokenUsage,
closeLogStream,
} = require('./token-tracker');
const { buildTokenUsageRecord, incrementTokenMetrics } = require('./token-persistence');
const { EventEmitter } = require('events');

afterAll(async () => {
Expand Down Expand Up @@ -96,6 +97,48 @@ describe('validateTokenUsageRecord', () => {
});
});

describe('shared token usage helpers', () => {
test('buildTokenUsageRecord returns schema-compatible record shape', () => {
const record = buildTokenUsageRecord({
input_tokens: 10,
output_tokens: 5,
cache_read_tokens: 2,
cache_write_tokens: 1,
}, {
requestId: 'helper-record-test',
provider: 'openai',
model: null,
reqPath: '/v1/chat/completions',
status: 200,
streaming: false,
duration: 123,
responseBytes: 456,
});

expect(record).toMatchObject({
request_id: 'helper-record-test',
provider: 'openai',
model: 'unknown',
path: '/v1/chat/completions',
status: 200,
streaming: false,
input_tokens: 10,
output_tokens: 5,
cache_read_tokens: 2,
cache_write_tokens: 1,
duration_ms: 123,
response_bytes: 456,
});
expect(validateTokenUsageRecord(record)).toBe(true);
});

test('incrementTokenMetrics is a no-op when metrics sink is missing', () => {
expect(() => {
incrementTokenMetrics(null, 'anthropic', { input_tokens: 1, output_tokens: 2 });
}).not.toThrow();
});
});

// ── JSONL records include _schema field ───────────────────────────────

/**
Expand Down
Loading