Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ require('werelogs').stderrUtils.catchAndTimestampStderr(
require('cluster').isPrimary ? 1 : null,
);

// Start tracing before requiring anything that hooks into HTTP, MongoDB,
// or ioredis — instrumentation patches modules on require, so anything
// loaded earlier than init() would run unpatched.
Comment thread
delthas marked this conversation as resolved.
Comment on lines +10 to +12
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Start tracing before requiring anything that hooks into HTTP, MongoDB,
// or ioredis — instrumentation patches modules on require, so anything
// loaded earlier than init() would run unpatched.

This is purely standard behaviour in tracing / monitoring. All package that do that (datadog, optl, newrelic, ...) are doing the same things and explain it already.

require('./lib/tracing').init();
Comment thread
delthas marked this conversation as resolved.

require('./lib/server.js')();
335 changes: 178 additions & 157 deletions lib/api/api.js

Large diffs are not rendered by default.

105 changes: 105 additions & 0 deletions lib/instrumentation/simple.js
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simple.js is a really weird name: can we find better?
Not much idea if it is in its own module/folder except instrumentatin/instrumentatin.js); maybe tracing/instrumentation.js would be better ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since both module are inter-dependent (tracing requires this one & vice-versa), they should probably indeed go into the same module...

Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
'use strict';

const tracing = require('../tracing');

let tracer = null;
function getTracer() {
if (tracer) {
return tracer;
}
// Lazy require: this module is loaded eagerly by lib/api/api.js at boot
// to wrap S3 handlers, but @opentelemetry/api itself is only needed
// when an instrumented call actually runs. Keeping the require inside
// getTracer() means a process with ENABLE_OTEL=false never pulls the
// OTEL packages into memory.
const { trace } = require('@opentelemetry/api');
Comment thread
delthas marked this conversation as resolved.
const { version } = require('../../package.json');
tracer = trace.getTracer('cloudserver-api', version);
return tracer;
}
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.

function resetTracer() {
tracer = null;
}

function endSpan(api, span, err) {
if (err) {
span.recordException(err);
span.setStatus({ code: api.SpanStatusCode.ERROR });
if (err.code) {
span.setAttribute('cloudserver.error_code', err.code);
}
} else {
span.setStatus({ code: api.SpanStatusCode.OK });
}
span.end();
}

function instrumentCallbackHandler(self, apiMethod, spanName, args, callbackIndex) {
const api = require('@opentelemetry/api');
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any penalty with repeatedly require'ing the same api module (i.e. extra load time on startup, more memory usage...), or is this somehow memoized by node.js anyway and thus essentially free (but for the syntaxic weight, which stems from line 15) ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, this may be done in api.js or at the top level of module:

const OTEL_ENABLED = process.env.get('OTEL_ENABLED');
const api = OTEL_ENABLED ? require('api') : null;

this would avoid the extra api parameter in endSpan, and repeated require/api var ; while still not loading OTEL module unless enabled.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the repeat require, it's a no cost. Node have a cache

const span = getTracer().startSpan(spanName, { kind: api.SpanKind.INTERNAL });

const wrappedArgs = [...args];
const originalCallback = args[callbackIndex];
wrappedArgs[callbackIndex] = function wrappedCallback(err, ...results) {
endSpan(api, span, err);
return originalCallback.call(this, err, ...results);
};

const ctx = api.trace.setSpan(api.context.active(), span);
try {
return api.context.with(ctx, () => apiMethod.apply(self, wrappedArgs));
} catch (err) {
endSpan(api, span, err);
throw err;
}
Comment thread
delthas marked this conversation as resolved.
}
Comment thread
delthas marked this conversation as resolved.

function instrumentAsyncHandler(self, apiMethod, spanName, args) {
const api = require('@opentelemetry/api');
const span = getTracer().startSpan(spanName, { kind: api.SpanKind.INTERNAL });
const ctx = api.trace.setSpan(api.context.active(), span);

let result;
try {
result = api.context.with(ctx, () => apiMethod.apply(self, args));
} catch (err) {
endSpan(api, span, err);
throw err;
}

// Preserve return shape: only chain endSpan onto the result if it
// is actually a Promise (async/await migration shape). For sync
// handlers we end the span immediately and return the raw value.
if (result && typeof result.then === 'function') {
return (async () => {
let value;
try {
value = await result;
} catch (err) {
endSpan(api, span, err);
throw err;
}
endSpan(api, span);
return value;
})();
}
endSpan(api, span);
return result;
}

function instrumentApiMethod(apiMethod, methodName) {
if (!tracing.isEnabled()) {
return apiMethod;
}
const spanName = `api.${methodName}`;
return function instrumented(...args) {
const callbackIndex = args.findLastIndex(a => typeof a === 'function');
Comment thread
delthas marked this conversation as resolved.
if (callbackIndex !== -1) {
return instrumentCallbackHandler(this, apiMethod, spanName, args, callbackIndex);
}
return instrumentAsyncHandler(this, apiMethod, spanName, args);
};
}

module.exports = { instrumentApiMethod, resetTracer };
97 changes: 49 additions & 48 deletions lib/server.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
const http = require('http');
const https = require('https');
const cluster = require('cluster');
const { promisify } = require('util');
const { series } = require('async');
const arsenal = require('arsenal');
const { setServerHeader } = arsenal.s3routes.routesUtils;
const { RedisClient, StatsClient } = arsenal.metrics;
const monitoringClient = require('./utilities/monitoringHandler');
const tracing = require('./tracing');

const logger = require('./utilities/logger');
const { internalHandlers } = require('./utilities/internalHandlers');
Expand All @@ -15,15 +17,11 @@ const { blacklistedPrefixes } = require('../constants');
const api = require('./api/api');
const dataWrapper = require('./data/wrapper');
const kms = require('./kms/wrapper');
const locationStorageCheck =
require('./api/apiUtils/object/locationStorageCheck');
const locationStorageCheck = require('./api/apiUtils/object/locationStorageCheck');
const vault = require('./auth/vault');
const metadata = require('./metadata/wrapper');
const { initManagement } = require('./management');
const {
initManagementClient,
isManagementAgentUsed,
} = require('./management/agentClient');
const { initManagementClient, isManagementAgentUsed } = require('./management/agentClient');
const { startCleanupJob } = require('./api/apiUtils/rateLimit/cleanup');
const { startRefillJob, stopRefillJob } = require('./api/apiUtils/rateLimit/refillJob');

Expand All @@ -46,8 +44,7 @@ updateAllEndpoints();
_config.on('location-constraints-update', () => {
if (implName === 'multipleBackends') {
const clients = parseLC(_config, vault);
client = new MultipleBackendGateway(
clients, metadata, locationStorageCheck);
client = new MultipleBackendGateway(clients, metadata, locationStorageCheck);
}
});

Expand All @@ -59,8 +56,7 @@ if (_config.localCache) {
// stats client
const STATS_INTERVAL = 5; // 5 seconds
const STATS_EXPIRY = 30; // 30 seconds
const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL,
STATS_EXPIRY);
const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL, STATS_EXPIRY);
const enableRemoteManagement = true;

class S3Server {
Expand All @@ -84,7 +80,7 @@ class S3Server {
process.on('SIGHUP', this.cleanUp.bind(this));
process.on('SIGQUIT', this.cleanUp.bind(this));
process.on('SIGTERM', this.cleanUp.bind(this));
process.on('SIGPIPE', () => { });
process.on('SIGPIPE', () => {});
// This will pick up exceptions up the stack
process.on('uncaughtException', err => {
// If just send the error object results in empty
Expand All @@ -95,7 +91,7 @@ class S3Server {
workerId: this.worker ? this.worker.id : undefined,
workerPid: this.worker ? this.worker.process.pid : undefined,
});
this.caughtExceptionShutdown();
void this.caughtExceptionShutdown();
});
this.started = false;
}
Expand Down Expand Up @@ -130,9 +126,10 @@ class S3Server {
const requestStartTime = process.hrtime.bigint();

// Skip server access logs for heartbeat.
const isLoggingEnabled = _config.serverAccessLogs
&& (_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY
|| _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED);
const isLoggingEnabled =
_config.serverAccessLogs &&
(_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY ||
_config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED);
const isInternalRoute = req.url.startsWith('/_');
const isBackbeatRoute = req.url.startsWith('/_/backbeat/');
if (isLoggingEnabled && (!isInternalRoute || isBackbeatRoute)) {
Expand Down Expand Up @@ -176,9 +173,7 @@ class S3Server {
labels.action = req.apiMethod;
}
monitoringClient.httpRequestsTotal.labels(labels).inc();
monitoringClient.httpRequestDurationSeconds
.labels(labels)
.observe(responseTimeInNs / 1e9);
monitoringClient.httpRequestDurationSeconds.labels(labels).observe(responseTimeInNs / 1e9);
monitoringClient.httpActiveRequests.dec();
};
res.on('close', monitorEndOfRequest);
Expand Down Expand Up @@ -231,14 +226,13 @@ class S3Server {
};

let reqUids = req.headers['x-scal-request-uids'];
if (reqUids !== undefined && !/*isValidReqUids*/(reqUids.length < 128)) {
if (reqUids !== undefined && !(/*isValidReqUids*/ (reqUids.length < 128))) {
// simply ignore invalid id (any user can provide an
// invalid request ID through a crafted header)
reqUids = undefined;
}
const log = (reqUids !== undefined ?
logger.newRequestLoggerFromSerializedUids(reqUids) :
logger.newRequestLogger());
const log =
reqUids !== undefined ? logger.newRequestLoggerFromSerializedUids(reqUids) : logger.newRequestLogger();
log.end().addDefaultFields(clientInfo);

log.debug('received admin request', clientInfo);
Expand Down Expand Up @@ -292,8 +286,7 @@ class S3Server {
server.requestTimeout = 0; // disabling request timeout

server.on('connection', socket => {
socket.on('error', err => logger.info('request rejected',
{ error: err }));
socket.on('error', err => logger.info('request rejected', { error: err }));
});

// https://nodejs.org/dist/latest-v6.x/
Expand All @@ -309,8 +302,11 @@ class S3Server {
};
const { address } = addr;
logger.info('server started', {
address, port,
pid: process.pid, serverIP: address, serverPort: port
address,
port,
pid: process.pid,
serverIP: address,
serverPort: port,
});
});

Expand All @@ -323,32 +319,40 @@ class S3Server {
this.servers.push(server);
}

/*
* This exits the running process properly.
*/
cleanUp() {
async cleanUp() {
logger.info('server shutting down');
// Stop token refill job if running
if (this.config.rateLimiting?.enabled) {
stopRefillJob(logger);
}
Promise.all(this.servers.map(server =>
new Promise(resolve => server.close(resolve))
)).then(() => process.exit(0));
try {
await Promise.all(this.servers.map(server => promisify(server.close.bind(server))()));
await tracing.close();
} finally {
process.exit(0);
}
}

caughtExceptionShutdown() {
async caughtExceptionShutdown() {
Comment thread
delthas marked this conversation as resolved.
if (!this.cluster) {
process.exit(1);
try {
await tracing.close();
} finally {
process.exit(1);
}
}
logger.error('shutdown of worker due to exception', {
workerId: this.worker ? this.worker.id : undefined,
workerPid: this.worker ? this.worker.process.pid : undefined,
});
// Will close all servers, cause disconnect event on primary and kill
// worker process with 'SIGTERM'.
// worker.kill() is graceful (closes servers, disconnects IPC) but
// does not fire our SIGTERM handler, so the BatchSpanProcessor
// would lose buffered spans without an explicit flush here.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we/can we add tests for verify behavior on shutdown?

if (this.worker) {
this.worker.kill();
try {
await tracing.close();
} finally {
this.worker.kill();
}
}
}
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.

Expand All @@ -363,10 +367,7 @@ class S3Server {
}

initiateStartup(log) {
series([
next => metadata.setup(next),
next => clientCheck(true, log, next),
], (err, results) => {
series([next => metadata.setup(next), next => clientCheck(true, log, next)], (err, results) => {
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
if (err) {
log.warn('initial health check failed, delaying startup', {
error: err,
Expand Down Expand Up @@ -417,8 +418,10 @@ class S3Server {

try {
logger.info('ServerAccessLogger config', { config: _config.serverAccessLogs });
if (_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY
|| _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED) {
if (
_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY ||
_config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED
) {
var serverAccessLogger = new ServerAccessLogger(
_config.serverAccessLogs.outputFile,
_config.serverAccessLogs.highWaterMarkBytes,
Expand All @@ -434,7 +437,6 @@ class S3Server {
logger.error('ServerAccessLogger creation error', error);
}


this.started = true;
});
}
Expand Down Expand Up @@ -490,8 +492,7 @@ function main() {
});

const metricServer = new S3Server(_config);
metricServer.startServer(_config.metricsListenOn,
_config.metricsPort, metricServer.routeAdminRequest);
metricServer.startServer(_config.metricsListenOn, _config.metricsPort, metricServer.routeAdminRequest);
}
if (_config.isCluster && cluster.isWorker) {
const server = new S3Server(_config, cluster.worker);
Expand Down
18 changes: 18 additions & 0 deletions lib/tracing/healthPaths.js
Comment thread
delthas marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict';

// Probe + scrape paths that should never produce a span. Filtered at
// ingest (not at the trace backend) because probe rate × pod count ×
// always-on sampling overwhelms the exporter and storage with traffic
// nobody queries.
const HEALTH_PATHS = new Set(['/live', '/ready', '/_/healthcheck', '/_/healthcheck/deep', '/metrics']);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should health path filtering be configurable, i.e. allow enabling it (possibly with an extra env variable)?

function isHealthPath(url) {
Comment thread
delthas marked this conversation as resolved.
if (typeof url !== 'string' || url.length === 0) {
return false;
}
const qIdx = url.indexOf('?');
const path = qIdx === -1 ? url : url.slice(0, qIdx);
return HEALTH_PATHS.has(path);
}

module.exports = { isHealthPath };
Loading