-
Notifications
You must be signed in to change notification settings - Fork 255
feat(CLDSRV-884): Add OpenTelemetry tracing instrumentation #6140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development/9.4
Are you sure you want to change the base?
Changes from all commits
837c50f
821c375
bd2bcb6
6076860
ad01b83
f304ef5
396404d
18e9992
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -7,4 +7,9 @@ require('werelogs').stderrUtils.catchAndTimestampStderr( | |||||||
| require('cluster').isPrimary ? 1 : null, | ||||||||
| ); | ||||||||
|
|
||||||||
| // Start tracing before requiring anything that hooks into HTTP, MongoDB, | ||||||||
| // or ioredis — instrumentation patches modules on require, so anything | ||||||||
| // loaded earlier than init() would run unpatched. | ||||||||
|
Comment on lines
+10
to
+12
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
This is purely standard behaviour in tracing / monitoring. All package that do that (datadog, optl, newrelic, ...) are doing the same things and explain it already. |
||||||||
| require('./lib/tracing').init(); | ||||||||
|
delthas marked this conversation as resolved.
|
||||||||
|
|
||||||||
| require('./lib/server.js')(); | ||||||||
Large diffs are not rendered by default.
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since both module are inter-dependent (tracing requires this one & vice-versa), they should probably indeed go into the same module... |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| 'use strict'; | ||
|
|
||
| const tracing = require('../tracing'); | ||
|
|
||
| let tracer = null; | ||
| function getTracer() { | ||
| if (tracer) { | ||
| return tracer; | ||
| } | ||
| // Lazy require: this module is loaded eagerly by lib/api/api.js at boot | ||
| // to wrap S3 handlers, but @opentelemetry/api itself is only needed | ||
| // when an instrumented call actually runs. Keeping the require inside | ||
| // getTracer() means a process with ENABLE_OTEL=false never pulls the | ||
| // OTEL packages into memory. | ||
| const { trace } = require('@opentelemetry/api'); | ||
|
delthas marked this conversation as resolved.
|
||
| const { version } = require('../../package.json'); | ||
| tracer = trace.getTracer('cloudserver-api', version); | ||
| return tracer; | ||
| } | ||
|
delthas marked this conversation as resolved.
delthas marked this conversation as resolved.
|
||
|
|
||
| function resetTracer() { | ||
| tracer = null; | ||
| } | ||
|
|
||
| function endSpan(api, span, err) { | ||
| if (err) { | ||
| span.recordException(err); | ||
| span.setStatus({ code: api.SpanStatusCode.ERROR }); | ||
| if (err.code) { | ||
| span.setAttribute('cloudserver.error_code', err.code); | ||
| } | ||
| } else { | ||
| span.setStatus({ code: api.SpanStatusCode.OK }); | ||
| } | ||
| span.end(); | ||
| } | ||
|
|
||
| function instrumentCallbackHandler(self, apiMethod, spanName, args, callbackIndex) { | ||
| const api = require('@opentelemetry/api'); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there any penalty with repeatedly require'ing the same api module (i.e. extra load time on startup, more memory usage...), or is this somehow memoized by node.js anyway and thus essentially free (but for the syntaxic weight, which stems from line 15) ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alternatively, this may be done in api.js or at the top level of module: const OTEL_ENABLED = process.env.get('OTEL_ENABLED');
const api = OTEL_ENABLED ? require('api') : null;this would avoid the extra
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. About the repeat require, it's a no cost. Node have a cache |
||
| const span = getTracer().startSpan(spanName, { kind: api.SpanKind.INTERNAL }); | ||
|
|
||
| const wrappedArgs = [...args]; | ||
| const originalCallback = args[callbackIndex]; | ||
| wrappedArgs[callbackIndex] = function wrappedCallback(err, ...results) { | ||
| endSpan(api, span, err); | ||
| return originalCallback.call(this, err, ...results); | ||
| }; | ||
|
|
||
| const ctx = api.trace.setSpan(api.context.active(), span); | ||
| try { | ||
| return api.context.with(ctx, () => apiMethod.apply(self, wrappedArgs)); | ||
| } catch (err) { | ||
| endSpan(api, span, err); | ||
| throw err; | ||
| } | ||
|
delthas marked this conversation as resolved.
|
||
| } | ||
|
delthas marked this conversation as resolved.
|
||
|
|
||
| function instrumentAsyncHandler(self, apiMethod, spanName, args) { | ||
| const api = require('@opentelemetry/api'); | ||
| const span = getTracer().startSpan(spanName, { kind: api.SpanKind.INTERNAL }); | ||
| const ctx = api.trace.setSpan(api.context.active(), span); | ||
|
|
||
| let result; | ||
| try { | ||
| result = api.context.with(ctx, () => apiMethod.apply(self, args)); | ||
| } catch (err) { | ||
| endSpan(api, span, err); | ||
| throw err; | ||
| } | ||
|
|
||
| // Preserve return shape: only chain endSpan onto the result if it | ||
| // is actually a Promise (async/await migration shape). For sync | ||
| // handlers we end the span immediately and return the raw value. | ||
| if (result && typeof result.then === 'function') { | ||
| return (async () => { | ||
| let value; | ||
| try { | ||
| value = await result; | ||
| } catch (err) { | ||
| endSpan(api, span, err); | ||
| throw err; | ||
| } | ||
| endSpan(api, span); | ||
| return value; | ||
| })(); | ||
| } | ||
| endSpan(api, span); | ||
| return result; | ||
| } | ||
|
|
||
| function instrumentApiMethod(apiMethod, methodName) { | ||
| if (!tracing.isEnabled()) { | ||
| return apiMethod; | ||
| } | ||
| const spanName = `api.${methodName}`; | ||
| return function instrumented(...args) { | ||
| const callbackIndex = args.findLastIndex(a => typeof a === 'function'); | ||
|
delthas marked this conversation as resolved.
|
||
| if (callbackIndex !== -1) { | ||
| return instrumentCallbackHandler(this, apiMethod, spanName, args, callbackIndex); | ||
| } | ||
| return instrumentAsyncHandler(this, apiMethod, spanName, args); | ||
| }; | ||
| } | ||
|
|
||
| module.exports = { instrumentApiMethod, resetTracer }; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,13 @@ | ||
| const http = require('http'); | ||
| const https = require('https'); | ||
| const cluster = require('cluster'); | ||
| const { promisify } = require('util'); | ||
| const { series } = require('async'); | ||
| const arsenal = require('arsenal'); | ||
| const { setServerHeader } = arsenal.s3routes.routesUtils; | ||
| const { RedisClient, StatsClient } = arsenal.metrics; | ||
| const monitoringClient = require('./utilities/monitoringHandler'); | ||
| const tracing = require('./tracing'); | ||
|
|
||
| const logger = require('./utilities/logger'); | ||
| const { internalHandlers } = require('./utilities/internalHandlers'); | ||
|
|
@@ -15,15 +17,11 @@ const { blacklistedPrefixes } = require('../constants'); | |
| const api = require('./api/api'); | ||
| const dataWrapper = require('./data/wrapper'); | ||
| const kms = require('./kms/wrapper'); | ||
| const locationStorageCheck = | ||
| require('./api/apiUtils/object/locationStorageCheck'); | ||
| const locationStorageCheck = require('./api/apiUtils/object/locationStorageCheck'); | ||
| const vault = require('./auth/vault'); | ||
| const metadata = require('./metadata/wrapper'); | ||
| const { initManagement } = require('./management'); | ||
| const { | ||
| initManagementClient, | ||
| isManagementAgentUsed, | ||
| } = require('./management/agentClient'); | ||
| const { initManagementClient, isManagementAgentUsed } = require('./management/agentClient'); | ||
| const { startCleanupJob } = require('./api/apiUtils/rateLimit/cleanup'); | ||
| const { startRefillJob, stopRefillJob } = require('./api/apiUtils/rateLimit/refillJob'); | ||
|
|
||
|
|
@@ -46,8 +44,7 @@ updateAllEndpoints(); | |
| _config.on('location-constraints-update', () => { | ||
| if (implName === 'multipleBackends') { | ||
| const clients = parseLC(_config, vault); | ||
| client = new MultipleBackendGateway( | ||
| clients, metadata, locationStorageCheck); | ||
| client = new MultipleBackendGateway(clients, metadata, locationStorageCheck); | ||
| } | ||
| }); | ||
|
|
||
|
|
@@ -59,8 +56,7 @@ if (_config.localCache) { | |
| // stats client | ||
| const STATS_INTERVAL = 5; // 5 seconds | ||
| const STATS_EXPIRY = 30; // 30 seconds | ||
| const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL, | ||
| STATS_EXPIRY); | ||
| const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL, STATS_EXPIRY); | ||
| const enableRemoteManagement = true; | ||
|
|
||
| class S3Server { | ||
|
|
@@ -84,7 +80,7 @@ class S3Server { | |
| process.on('SIGHUP', this.cleanUp.bind(this)); | ||
| process.on('SIGQUIT', this.cleanUp.bind(this)); | ||
| process.on('SIGTERM', this.cleanUp.bind(this)); | ||
| process.on('SIGPIPE', () => { }); | ||
| process.on('SIGPIPE', () => {}); | ||
| // This will pick up exceptions up the stack | ||
| process.on('uncaughtException', err => { | ||
| // If just send the error object results in empty | ||
|
|
@@ -95,7 +91,7 @@ class S3Server { | |
| workerId: this.worker ? this.worker.id : undefined, | ||
| workerPid: this.worker ? this.worker.process.pid : undefined, | ||
| }); | ||
| this.caughtExceptionShutdown(); | ||
| void this.caughtExceptionShutdown(); | ||
| }); | ||
| this.started = false; | ||
| } | ||
|
|
@@ -130,9 +126,10 @@ class S3Server { | |
| const requestStartTime = process.hrtime.bigint(); | ||
|
|
||
| // Skip server access logs for heartbeat. | ||
| const isLoggingEnabled = _config.serverAccessLogs | ||
| && (_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY | ||
| || _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED); | ||
| const isLoggingEnabled = | ||
| _config.serverAccessLogs && | ||
| (_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY || | ||
| _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED); | ||
| const isInternalRoute = req.url.startsWith('/_'); | ||
| const isBackbeatRoute = req.url.startsWith('/_/backbeat/'); | ||
| if (isLoggingEnabled && (!isInternalRoute || isBackbeatRoute)) { | ||
|
|
@@ -176,9 +173,7 @@ class S3Server { | |
| labels.action = req.apiMethod; | ||
| } | ||
| monitoringClient.httpRequestsTotal.labels(labels).inc(); | ||
| monitoringClient.httpRequestDurationSeconds | ||
| .labels(labels) | ||
| .observe(responseTimeInNs / 1e9); | ||
| monitoringClient.httpRequestDurationSeconds.labels(labels).observe(responseTimeInNs / 1e9); | ||
| monitoringClient.httpActiveRequests.dec(); | ||
| }; | ||
| res.on('close', monitorEndOfRequest); | ||
|
|
@@ -231,14 +226,13 @@ class S3Server { | |
| }; | ||
|
|
||
| let reqUids = req.headers['x-scal-request-uids']; | ||
| if (reqUids !== undefined && !/*isValidReqUids*/(reqUids.length < 128)) { | ||
| if (reqUids !== undefined && !(/*isValidReqUids*/ (reqUids.length < 128))) { | ||
| // simply ignore invalid id (any user can provide an | ||
| // invalid request ID through a crafted header) | ||
| reqUids = undefined; | ||
| } | ||
| const log = (reqUids !== undefined ? | ||
| logger.newRequestLoggerFromSerializedUids(reqUids) : | ||
| logger.newRequestLogger()); | ||
| const log = | ||
| reqUids !== undefined ? logger.newRequestLoggerFromSerializedUids(reqUids) : logger.newRequestLogger(); | ||
| log.end().addDefaultFields(clientInfo); | ||
|
|
||
| log.debug('received admin request', clientInfo); | ||
|
|
@@ -292,8 +286,7 @@ class S3Server { | |
| server.requestTimeout = 0; // disabling request timeout | ||
|
|
||
| server.on('connection', socket => { | ||
| socket.on('error', err => logger.info('request rejected', | ||
| { error: err })); | ||
| socket.on('error', err => logger.info('request rejected', { error: err })); | ||
| }); | ||
|
|
||
| // https://nodejs.org/dist/latest-v6.x/ | ||
|
|
@@ -309,8 +302,11 @@ class S3Server { | |
| }; | ||
| const { address } = addr; | ||
| logger.info('server started', { | ||
| address, port, | ||
| pid: process.pid, serverIP: address, serverPort: port | ||
| address, | ||
| port, | ||
| pid: process.pid, | ||
| serverIP: address, | ||
| serverPort: port, | ||
| }); | ||
| }); | ||
|
|
||
|
|
@@ -323,32 +319,40 @@ class S3Server { | |
| this.servers.push(server); | ||
| } | ||
|
|
||
| /* | ||
| * This exits the running process properly. | ||
| */ | ||
| cleanUp() { | ||
| async cleanUp() { | ||
| logger.info('server shutting down'); | ||
| // Stop token refill job if running | ||
| if (this.config.rateLimiting?.enabled) { | ||
| stopRefillJob(logger); | ||
| } | ||
| Promise.all(this.servers.map(server => | ||
| new Promise(resolve => server.close(resolve)) | ||
| )).then(() => process.exit(0)); | ||
| try { | ||
| await Promise.all(this.servers.map(server => promisify(server.close.bind(server))())); | ||
| await tracing.close(); | ||
| } finally { | ||
| process.exit(0); | ||
| } | ||
| } | ||
|
|
||
| caughtExceptionShutdown() { | ||
| async caughtExceptionShutdown() { | ||
|
delthas marked this conversation as resolved.
|
||
| if (!this.cluster) { | ||
| process.exit(1); | ||
| try { | ||
| await tracing.close(); | ||
| } finally { | ||
| process.exit(1); | ||
| } | ||
| } | ||
| logger.error('shutdown of worker due to exception', { | ||
| workerId: this.worker ? this.worker.id : undefined, | ||
| workerPid: this.worker ? this.worker.process.pid : undefined, | ||
| }); | ||
| // Will close all servers, cause disconnect event on primary and kill | ||
| // worker process with 'SIGTERM'. | ||
| // worker.kill() is graceful (closes servers, disconnects IPC) but | ||
| // does not fire our SIGTERM handler, so the BatchSpanProcessor | ||
| // would lose buffered spans without an explicit flush here. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we/can we add tests for verify behavior on shutdown? |
||
| if (this.worker) { | ||
| this.worker.kill(); | ||
| try { | ||
| await tracing.close(); | ||
| } finally { | ||
| this.worker.kill(); | ||
| } | ||
| } | ||
| } | ||
|
delthas marked this conversation as resolved.
delthas marked this conversation as resolved.
|
||
|
|
||
|
|
@@ -363,10 +367,7 @@ class S3Server { | |
| } | ||
|
|
||
| initiateStartup(log) { | ||
| series([ | ||
| next => metadata.setup(next), | ||
| next => clientCheck(true, log, next), | ||
| ], (err, results) => { | ||
| series([next => metadata.setup(next), next => clientCheck(true, log, next)], (err, results) => { | ||
|
delthas marked this conversation as resolved.
delthas marked this conversation as resolved.
|
||
| if (err) { | ||
| log.warn('initial health check failed, delaying startup', { | ||
| error: err, | ||
|
|
@@ -417,8 +418,10 @@ class S3Server { | |
|
|
||
| try { | ||
| logger.info('ServerAccessLogger config', { config: _config.serverAccessLogs }); | ||
| if (_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY | ||
| || _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED) { | ||
| if ( | ||
| _config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY || | ||
| _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED | ||
| ) { | ||
| var serverAccessLogger = new ServerAccessLogger( | ||
| _config.serverAccessLogs.outputFile, | ||
| _config.serverAccessLogs.highWaterMarkBytes, | ||
|
|
@@ -434,7 +437,6 @@ class S3Server { | |
| logger.error('ServerAccessLogger creation error', error); | ||
| } | ||
|
|
||
|
|
||
| this.started = true; | ||
| }); | ||
| } | ||
|
|
@@ -490,8 +492,7 @@ function main() { | |
| }); | ||
|
|
||
| const metricServer = new S3Server(_config); | ||
| metricServer.startServer(_config.metricsListenOn, | ||
| _config.metricsPort, metricServer.routeAdminRequest); | ||
| metricServer.startServer(_config.metricsListenOn, _config.metricsPort, metricServer.routeAdminRequest); | ||
| } | ||
| if (_config.isCluster && cluster.isWorker) { | ||
| const server = new S3Server(_config, cluster.worker); | ||
|
|
||
|
delthas marked this conversation as resolved.
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| 'use strict'; | ||
|
|
||
| // Probe + scrape paths that should never produce a span. Filtered at | ||
| // ingest (not at the trace backend) because probe rate × pod count × | ||
| // always-on sampling overwhelms the exporter and storage with traffic | ||
| // nobody queries. | ||
| const HEALTH_PATHS = new Set(['/live', '/ready', '/_/healthcheck', '/_/healthcheck/deep', '/metrics']); | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should health path filtering be configurable, i.e. allow enabling it (possibly with an extra env variable)? |
||
| function isHealthPath(url) { | ||
|
delthas marked this conversation as resolved.
|
||
| if (typeof url !== 'string' || url.length === 0) { | ||
| return false; | ||
| } | ||
| const qIdx = url.indexOf('?'); | ||
| const path = qIdx === -1 ? url : url.slice(0, qIdx); | ||
| return HEALTH_PATHS.has(path); | ||
| } | ||
|
|
||
| module.exports = { isHealthPath }; | ||
Uh oh!
There was an error while loading. Please reload this page.