Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void configureServerBuilder(ServerBuilder<?> serverBuilder) {
}

@VisibleForTesting
void configureChannelBuilder(ManagedChannelBuilder<?> builder) {
public void configureChannelBuilder(ManagedChannelBuilder<?> builder) {
delegate.configureChannelBuilder(builder);
}

Expand Down Expand Up @@ -115,6 +115,14 @@ public Builder sdk(OpenTelemetry sdk) {
return this;
}

/**
* Enables or disables tracing.
*/
public Builder enableTracing(boolean enable) {
InternalGrpcOpenTelemetry.enableTracing(delegate, enable);
return this;
}

/**
* Adds optionalLabelKey to all the metrics that can provide value for the
* optionalLabelKey.
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ opencensus-exporter-trace-stackdriver = { module = "io.opencensus:opencensus-exp
opencensus-impl = { module = "io.opencensus:opencensus-impl", version.ref = "opencensus" }
opentelemetry-api = "io.opentelemetry:opentelemetry-api:1.60.1"
opentelemetry-exporter-prometheus = "io.opentelemetry:opentelemetry-exporter-prometheus:1.60.1-alpha"
opentelemetry-exporter-otlp = "io.opentelemetry:opentelemetry-exporter-otlp:1.60.1"
opentelemetry-gcp-resources = "io.opentelemetry.contrib:opentelemetry-gcp-resources:1.54.0-alpha"
opentelemetry-sdk-extension-autoconfigure = "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.60.1"
opentelemetry-sdk-testing = "io.opentelemetry:opentelemetry-sdk-testing:1.60.1"
Expand Down
1 change: 1 addition & 0 deletions interop-testing/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies {
libraries.netty.tcnative,
libraries.netty.tcnative.classes,
libraries.opentelemetry.exporter.prometheus, // For xds interop client
libraries.opentelemetry.exporter.otlp,
project(':grpc-googleapis'),
project(':grpc-grpclb'),
project(':grpc-rls')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import io.grpc.gcp.csm.observability.CsmObservability;

/**
* Application that starts a client for the {@link TestServiceGrpc.TestServiceImplBase} and runs
Expand All @@ -99,6 +100,13 @@ public class TestServiceClient {
public static void main(String[] args) throws Exception {
final TestServiceClient client = new TestServiceClient();
client.parseArgs(args);
if (client.enableOpentelemetryTracing) {
io.opentelemetry.api.OpenTelemetry otel = io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
io.grpc.opentelemetry.GrpcOpenTelemetry gotel = io.grpc.opentelemetry.GrpcOpenTelemetry.newBuilder()
.sdk(otel)
.build();
gotel.registerGlobal();
}
customBackendMetricsLoadBalancerProvider = new CustomBackendMetricsLoadBalancerProvider();
LoadBalancerRegistry.getDefaultRegistry().register(customBackendMetricsLoadBalancerProvider);
client.setUp();
Expand All @@ -107,6 +115,10 @@ public static void main(String[] args) throws Exception {
client.run();
} finally {
client.tearDown();
if (client.enableOpentelemetryTracing) {
System.out.println("Sleeping to flush spans...");
Thread.sleep(2000);
}
}
}

Expand Down Expand Up @@ -136,6 +148,7 @@ public static void main(String[] args) throws Exception {
private int soakResponseSize = 314159;
private int numThreads = 1;
private String additionalMetadata = "";
private boolean enableOpentelemetryTracing = false;
private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider;

private Tester tester = new Tester();
Expand Down Expand Up @@ -167,6 +180,8 @@ void parseArgs(String[] args) throws Exception {
serverHostOverride = value;
} else if ("server_port".equals(key)) {
serverPort = Integer.parseInt(value);
} else if ("enable_opentelemetry_tracing".equals(key)) {
enableOpentelemetryTracing = Boolean.parseBoolean(value);
} else if ("test_case".equals(key)) {
testCase = value;
} else if ("num_times".equals(key)) {
Expand Down Expand Up @@ -599,6 +614,9 @@ private class Tester extends AbstractInteropTest {
@Override
protected ManagedChannelBuilder<?> createChannelBuilder() {
boolean useGeneric = false;
if (enableOpentelemetryTracing) {
useGeneric = true;
}
ChannelCredentials channelCredentials;
if (customCredentialsType != null) {
useGeneric = true; // Retain old behavior; avoids erroring if incompatible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.testing.integration;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.gcp.csm.observability.CsmObservability;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.BindableService;
import io.grpc.Grpc;
Expand Down Expand Up @@ -46,6 +47,13 @@ public class TestServiceServer {
public static void main(String[] args) throws Exception {
final TestServiceServer server = new TestServiceServer();
server.parseArgs(args);
if (server.enableOpentelemetryTracing) {
io.opentelemetry.api.OpenTelemetry otel = io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
io.grpc.opentelemetry.GrpcOpenTelemetry gotel = io.grpc.opentelemetry.GrpcOpenTelemetry.newBuilder()
.sdk(otel)
.build();
gotel.registerGlobal();
}
if (server.useTls) {
System.out.println(
"\nUsing fake CA for TLS certificate. Test clients should expect host\n"
Expand Down Expand Up @@ -75,6 +83,7 @@ public void run() {
private int port = 8080;
private boolean useTls = true;
private boolean useAlts = false;
private boolean enableOpentelemetryTracing = false;

private ScheduledExecutorService executor;
private Server server;
Expand Down Expand Up @@ -106,6 +115,8 @@ void parseArgs(String[] args) {
port = Integer.parseInt(value);
} else if ("use_tls".equals(key)) {
useTls = Boolean.parseBoolean(value);
} else if ("enable_opentelemetry_tracing".equals(key)) {
enableOpentelemetryTracing = Boolean.parseBoolean(value);
} else if ("use_alts".equals(key)) {
useAlts = Boolean.parseBoolean(value);
} else if ("local_handshaker_port".equals(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.grpc.InsecureChannelCredentials;
import io.grpc.InsecureServerCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
Expand All @@ -60,6 +61,7 @@
import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse;
import io.grpc.xds.XdsChannelCredentials;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -104,6 +106,7 @@ public final class XdsTestClient {
private long currentRequestId;
private ListeningScheduledExecutorService exec;
private CsmObservability csmObservability;
private OpenTelemetrySdk openTelemetrySdk;

/**
* The main application allowing this client to be launched from the command line.
Expand Down Expand Up @@ -265,14 +268,23 @@ private static RpcType parseRpc(String rpc) {
@IgnoreJRERequirement // OpenTelemetry uses Java 8+ APIs
private void run() {
if (enableCsmObservability) {
Map<String, String> props = new HashMap<>();
props.put("otel.logs.exporter", "none");
props.put("otel.metrics.exporter", "otlp");
String tracesExporter = System.getenv("OTEL_TRACES_EXPORTER");
if (tracesExporter != null) {
props.put("otel.traces.exporter", tracesExporter);
} else {
props.put("otel.traces.exporter", "none");
}

AutoConfiguredOpenTelemetrySdk autoSdk = AutoConfiguredOpenTelemetrySdk.builder()
.addPropertiesSupplier(() -> props)
.build();
openTelemetrySdk = autoSdk.getOpenTelemetrySdk();
csmObservability = CsmObservability.newBuilder()
.sdk(AutoConfiguredOpenTelemetrySdk.builder()
.addPropertiesSupplier(() -> ImmutableMap.of(
"otel.logs.exporter", "none",
"otel.metrics.exporter", "prometheus",
"otel.traces.exporter", "none"))
.build()
.getOpenTelemetrySdk())
.sdk(openTelemetrySdk)
.enableTracing(!"none".equals(props.get("otel.traces.exporter")))
.build();
csmObservability.registerGlobal();
}
Expand All @@ -289,14 +301,16 @@ private void run() {
try {
statsServer.start();
for (int i = 0; i < numChannels; i++) {
channels.add(
Grpc.newChannelBuilder(
ManagedChannelBuilder<?> builder = Grpc.newChannelBuilder(
server,
secureMode
? XdsChannelCredentials.create(InsecureChannelCredentials.create())
: InsecureChannelCredentials.create())
.enableRetry()
.build());
.enableRetry();
if (enableCsmObservability) {
csmObservability.configureChannelBuilder(builder);
}
channels.add(builder.build());
}
exec = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
Payload requestPayload = Payload.newBuilder()
Expand Down Expand Up @@ -325,6 +339,9 @@ private void stop() throws InterruptedException {
if (csmObservability != null) {
csmObservability.close();
}
if (openTelemetrySdk != null) {
openTelemetrySdk.close();
}
}


Expand Down Expand Up @@ -373,6 +390,13 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
@Override
public void onHeaders(Metadata headers) {
hostnameRef.set(headers.get(XdsTestServer.HOSTNAME_KEY));
io.opentelemetry.api.trace.Span currentSpan = io.opentelemetry.api.trace.Span.current();
for (String key : config.metadata.keys()) {
String value = config.metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
if (value != null) {
currentSpan.setAttribute("custom.metadata." + key, value);
}
}
super.onHeaders(headers);
}
},
Expand Down Expand Up @@ -406,44 +430,56 @@ public void onNext(EmptyProtos.Empty response) {}
.setPayload(requestPayload)
.setResponseSize(responseSize)
.build();
stub.unaryCall(
request,
new StreamObserver<SimpleResponse>() {
@Override
public void onCompleted() {
handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers);
}

@Override
public void onError(Throwable t) {
if (printResponse) {
logger.log(Level.WARNING, "Rpc failed", t);
io.opentelemetry.api.baggage.BaggageBuilder baggageBuilder = io.opentelemetry.api.baggage.Baggage.builder();
for (String key : config.metadata.keys()) {
String value = config.metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
if (value != null) {
baggageBuilder.put(key, value);
}
}
io.opentelemetry.api.baggage.Baggage baggage = baggageBuilder.build();

try (io.opentelemetry.context.Scope scope = io.opentelemetry.context.Context.current().with(baggage).makeCurrent()) {
stub.unaryCall(
request,
new StreamObserver<SimpleResponse>() {
@Override
public void onCompleted() {
handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers);
}
handleRpcError(requestId, config.rpcType, Status.fromThrowable(t),
savedWatchers);
}

@Override
public void onNext(SimpleResponse response) {
// TODO(ericgribkoff) Currently some test environments cannot access the stats RPC
// service and rely on parsing stdout.
if (printResponse) {
System.out.println(
"Greeting: Hello world, this is "
+ response.getHostname()
+ ", from "
+ clientCallRef
.get()
.getAttributes()
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
@Override
public void onError(Throwable t) {
if (printResponse) {
logger.log(Level.WARNING, "Rpc failed", t);
}
handleRpcError(requestId, config.rpcType, Status.fromThrowable(t),
savedWatchers);
}
// Use the hostname from the response if not present in the metadata.
// TODO(ericgribkoff) Delete when server is deployed that sets metadata value.
if (hostnameRef.get() == null) {
hostnameRef.set(response.getHostname());

@Override
public void onNext(SimpleResponse response) {
// TODO(ericgribkoff) Currently some test environments cannot access the stats RPC
// service and rely on parsing stdout.
if (printResponse) {
System.out.println(
"Greeting: Hello world, this is "
+ response.getHostname()
+ ", from "
+ clientCallRef
.get()
.getAttributes()
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
}
// Use the hostname from the response if not present in the metadata.
// TODO(ericgribkoff) Delete when server is deployed that sets metadata value.
if (hostnameRef.get() == null) {
hostnameRef.set(response.getHostname());
}
}
}
});
});
}
} else {
throw new AssertionError("Unknown RPC type: " + config.rpcType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@
import io.grpc.testing.integration.Messages.SimpleResponse;
import io.grpc.xds.XdsServerBuilder;
import io.grpc.xds.XdsServerCredentials;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
Expand Down Expand Up @@ -92,6 +95,7 @@ public final class XdsTestServer {
private String host;
private Util.AddressType addressType = Util.AddressType.IPV4_IPV6;
private CsmObservability csmObservability;
private OpenTelemetrySdk openTelemetrySdk;

/**
* The main application allowing this client to be launched from the command line.
Expand Down Expand Up @@ -197,14 +201,23 @@ void parseArgs(String[] args) {
@IgnoreJRERequirement // OpenTelemetry uses Java 8+ APIs
void start() throws Exception {
if (enableCsmObservability) {
Map<String, String> props = new HashMap<>();
props.put("otel.logs.exporter", "none");
props.put("otel.metrics.exporter", "otlp");
String tracesExporter = System.getenv("OTEL_TRACES_EXPORTER");
if (tracesExporter != null) {
props.put("otel.traces.exporter", tracesExporter);
} else {
props.put("otel.traces.exporter", "none");
}

AutoConfiguredOpenTelemetrySdk autoSdk = AutoConfiguredOpenTelemetrySdk.builder()
.addPropertiesSupplier(() -> props)
.build();
openTelemetrySdk = autoSdk.getOpenTelemetrySdk();
csmObservability = CsmObservability.newBuilder()
.sdk(AutoConfiguredOpenTelemetrySdk.builder()
.addPropertiesSupplier(() -> ImmutableMap.of(
"otel.logs.exporter", "none",
"otel.metrics.exporter", "prometheus",
"otel.traces.exporter", "none"))
.build()
.getOpenTelemetrySdk())
.sdk(openTelemetrySdk)
.enableTracing(!"none".equals(props.get("otel.traces.exporter")))
.build();
csmObservability.registerGlobal();
}
Expand Down Expand Up @@ -301,6 +314,9 @@ void stop() throws Exception {
if (csmObservability != null) {
csmObservability.close();
}
if (openTelemetrySdk != null) {
openTelemetrySdk.close();
}
}

private void blockUntilShutdown() throws InterruptedException {
Expand Down
Loading
Loading