Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,6 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
return isGeneratedMethod ? fullMethodName : "other";
}

private static Context otelContextWithBaggage() {
Baggage baggage = BAGGAGE_KEY.get();
if (baggage == null) {
return Context.current();
}
return Context.current().with(baggage);
}

private static final class ClientTracer extends ClientStreamTracer {
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
Expand Down Expand Up @@ -282,7 +274,6 @@ public void streamClosed(Status status) {
}

void recordFinishedAttempt() {
Context otelContext = otelContextWithBaggage();
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target)
Expand All @@ -308,15 +299,15 @@ void recordFinishedAttempt() {

if (module.resource.clientAttemptDurationCounter() != null ) {
module.resource.clientAttemptDurationCounter()
.record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext);
.record(attemptNanos * SECONDS_PER_NANO, attribute, attemptsState.otelContext);
}
if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
module.resource.clientTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attribute, otelContext);
.record(outboundWireSize, attribute, attemptsState.otelContext);
}
if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
module.resource.clientTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attribute, otelContext);
.record(inboundWireSize, attribute, attemptsState.otelContext);
}
}
}
Expand All @@ -331,6 +322,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
private boolean callEnded;
private final String fullMethodName;
private final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
private final Context otelContext;
private Status status;
private long retryDelayNanos;
private long callLatencyNanos;
Expand All @@ -347,11 +339,12 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
OpenTelemetryMetricsModule module,
String target,
String fullMethodName,
List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins) {
List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins, Context otelContext) {
this.module = checkNotNull(module, "module");
this.target = checkNotNull(target, "target");
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.callPlugins = checkNotNull(callPlugins, "callPlugins");
this.otelContext = checkNotNull(otelContext, "otelContext");
this.attemptDelayStopwatch = module.stopwatchSupplier.get();
this.callStopWatch = module.stopwatchSupplier.get().start();

Expand All @@ -361,7 +354,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory

// Record here in case mewClientStreamTracer() would never be called.
if (module.resource.clientAttemptCountCounter() != null) {
module.resource.clientAttemptCountCounter().add(1, attribute);
module.resource.clientAttemptCountCounter().add(1, attribute, otelContext);
}
}

Expand All @@ -385,7 +378,7 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
TARGET_KEY, target);
if (module.resource.clientAttemptCountCounter() != null) {
module.resource.clientAttemptCountCounter().add(1, attribute);
module.resource.clientAttemptCountCounter().add(1, attribute, otelContext);
}
}
if (info.isTransparentRetry()) {
Expand Down Expand Up @@ -448,7 +441,6 @@ void callEnded(Status status) {
}

void recordFinishedCall() {
Context otelContext = otelContextWithBaggage();
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = newClientTracer(null);
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -548,6 +540,7 @@ private static final class ServerTracer extends ServerStreamTracer {
private final OpenTelemetryMetricsModule module;
private final String fullMethodName;
private final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
private Context otelContext = Context.root();
private volatile boolean isGeneratedMethod;
private volatile int streamClosed;
private final Stopwatch stopwatch;
Expand All @@ -562,19 +555,31 @@ private static final class ServerTracer extends ServerStreamTracer {
this.stopwatch = module.stopwatchSupplier.get().start();
}

@Override
public io.grpc.Context filterContext(io.grpc.Context context) {
Baggage baggage = BAGGAGE_KEY.get(context);
if (baggage != null) {
otelContext = Context.current().with(baggage);
} else {
otelContext = Context.current();
}
return context;
}

@Override
public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
// Only record method name as an attribute if isSampledToLocalTracing is set to true,
// which is true for all generated methods. Otherwise, programmatically
// created methods result in high cardinality metrics.
boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
isGeneratedMethod = isSampledToLocalTracing;

io.opentelemetry.api.common.Attributes attribute =
io.opentelemetry.api.common.Attributes.of(
METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));

if (module.resource.serverCallCountCounter() != null) {
module.resource.serverCallCountCounter().add(1, attribute);
module.resource.serverCallCountCounter().add(1, attribute, otelContext);
}
}

Expand Down Expand Up @@ -606,7 +611,6 @@ public void inboundWireSize(long bytes) {
*/
@Override
public void streamClosed(Status status) {
Context otelContext = otelContextWithBaggage();
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
Expand Down Expand Up @@ -657,7 +661,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
}
streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
}
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName,
streamPlugins);
}
}

Expand Down Expand Up @@ -694,7 +699,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
OpenTelemetryMetricsModule.this, target,
recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()),
callPlugins);
callPlugins, Context.current());
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
Expand All @@ -717,3 +722,4 @@ public void onClose(Status status, Metadata trailers) {
}
}
}

Loading
Loading