Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ muzzle {
// KotlinAwareHandlerInstrumentation references Publisher from reactive-streams,
// which is not bundled in spring-messaging but is always present when Spring Kafka is.
extraDependency 'org.reactivestreams:reactive-streams:1.0.4'
// Spring Cloud AWS error-handler instrumentations reference ListenerExecutionFailedException.
extraDependency 'io.awspring.cloud:spring-cloud-aws-sqs:3.0.1'
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid gating spring-messaging on Spring Cloud AWS

In applications that use Spring Messaging/Spring Kafka but do not include spring-cloud-aws-sqs, adding this AWS dependency to the only spring-messaging muzzle directive makes the whole instrumentation safe only when AWS SQS is also present. The existing InvocableHandlerMethod/Kafka messaging instrumentation in this same module has no muzzleDirective() override, and the project docs note module directives are checked against all instrumentations by default, so non-AWS Spring Messaging apps can lose their existing spring.consume instrumentation instead of only skipping the new AWS-specific instrumenters. Split the AWS instrumenters behind their own directive/module or override muzzleDirective() so the base Spring Messaging instrumentation remains loadable without AWS.

Useful? React with 👍 / 👎.

}
}

Expand Down Expand Up @@ -45,6 +47,7 @@ kotlin {
dependencies {
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE'
compileOnly 'org.reactivestreams:reactive-streams:1.0.4'
compileOnly group: 'io.awspring.cloud', name: 'spring-cloud-aws-sqs', version: '3.0.1'
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common')

// capture SQS send and receive spans, propagate trace details in messages
Expand All @@ -69,6 +72,7 @@ dependencies {
// KotlinAwareHandlerInstrumentation relies on the reactive-streams and reactor instrumentation
testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0')
testImplementation project(':dd-java-agent:instrumentation:reactor-core-3.1')
testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation')

testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test'
testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package datadog.trace.instrumentation.springmessaging;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
import java.util.function.BiConsumer;

public final class SpringCloudAwsErrorHandlerHelper {
private SpringCloudAwsErrorHandlerHelper() {}

public static ListenerExecutionFailedException findListenerExecutionFailedException(
Throwable error) {
Throwable current = error;
while (current != null && !(current instanceof ListenerExecutionFailedException)) {
Throwable cause = current.getCause();
if (cause == current) {
return null;
}
current = cause;
}
return (ListenerExecutionFailedException) current;
}

public static final class CleanupOnError implements BiConsumer<Object, Throwable> {
private final ContextStore<ListenerExecutionFailedException, State> contextStore;

public CleanupOnError(ContextStore<ListenerExecutionFailedException, State> contextStore) {
this.contextStore = contextStore;
}

@Override
public void accept(Object ignored, Throwable error) {
if (error == null) {
return;
}
ListenerExecutionFailedException listenerException =
findListenerExecutionFailedException(error);
if (listenerException != null) {
SpringMessageErrorHandlerHelper.cancelContinuation(contextStore.get(listenerException));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package datadog.trace.instrumentation.springmessaging;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
import java.util.Map;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
public final class SpringCloudAwsErrorHandlerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public SpringCloudAwsErrorHandlerInstrumentation() {
super("spring-messaging", "spring-messaging-4");
}

@Override
public String instrumentedType() {
return "io.awspring.cloud.sqs.listener.pipeline.ErrorHandlerExecutionStage";
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SpringMessageErrorHandlerHelper",
packageName + ".SpringCloudAwsErrorHandlerHelper"
};
}

@Override
public Map<String, String> contextStore() {
return singletonMap(ListenerExecutionFailedException.class.getName(), State.class.getName());
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod()
.and(named("handleError"))
.and(takesArguments(2))
.and(takesArgument(1, Throwable.class)),
SpringCloudAwsErrorHandlerInstrumentation.class.getName()
+ "$ActivateErrorHandlerContinuation");
transformer.applyAdvice(
isMethod()
.and(named("handleErrors"))
.and(takesArguments(2))
.and(takesArgument(1, Throwable.class)),
SpringCloudAwsErrorHandlerInstrumentation.class.getName()
+ "$ActivateErrorHandlerContinuation");
transformer.applyAdvice(
isMethod().and(named("process")).and(takesArguments(2)),
SpringCloudAwsErrorHandlerInstrumentation.class.getName() + "$CleanupContinuation");
transformer.applyAdvice(
isMethod().and(named("processMany")).and(takesArguments(2)),
SpringCloudAwsErrorHandlerInstrumentation.class.getName() + "$CleanupContinuation");
}

public static class ActivateErrorHandlerContinuation {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter(@Advice.Argument(1) Throwable error) {
ListenerExecutionFailedException listenerException =
SpringCloudAwsErrorHandlerHelper.findListenerExecutionFailedException(error);
if (listenerException == null) {
return null;
}
ContextStore<ListenerExecutionFailedException, State> contextStore =
InstrumentationContext.get(ListenerExecutionFailedException.class, State.class);
State state = contextStore.get(listenerException);
return SpringMessageErrorHandlerHelper.activateContinuation(state);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(@Advice.Enter AgentScope scope) {
if (scope != null) {
scope.close();
}
}
}

public static class CleanupContinuation {

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Return(readOnly = false) java.util.concurrent.CompletableFuture<?> result) {
if (result != null) {
ContextStore<ListenerExecutionFailedException, State> contextStore =
InstrumentationContext.get(ListenerExecutionFailedException.class, State.class);
result =
result.whenComplete(new SpringCloudAwsErrorHandlerHelper.CleanupOnError(contextStore));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package datadog.trace.instrumentation.springmessaging;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
import java.util.Map;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
public final class SpringCloudAwsListenerAdapterInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public SpringCloudAwsListenerAdapterInstrumentation() {
super("spring-messaging", "spring-messaging-4");
}

@Override
public String instrumentedType() {
return "io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter";
}

@Override
public String[] helperClassNames() {
return new String[] {packageName + ".SpringMessageErrorHandlerHelper"};
}

@Override
public Map<String, String> contextStore() {
return singletonMap(ListenerExecutionFailedException.class.getName(), State.class.getName());
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("invokeHandler")).and(takesArguments(1)),
SpringCloudAwsListenerAdapterInstrumentation.class.getName() + "$InvokeHandlerAdvice");
transformer.applyAdvice(
isMethod()
.and(named("createListenerException"))
.and(takesArguments(2))
.and(takesArgument(1, Throwable.class)),
SpringCloudAwsListenerAdapterInstrumentation.class.getName()
+ "$CreateListenerExceptionAdvice");
}

public static class InvokeHandlerAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
SpringMessageErrorHandlerHelper.enterAwsListenerInvocation();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit() {
SpringMessageErrorHandlerHelper.clearPendingContinuation();
SpringMessageErrorHandlerHelper.exitAwsListenerInvocation();
}
}

public static class CreateListenerExceptionAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return ListenerExecutionFailedException listenerException) {
ContextStore<ListenerExecutionFailedException, State> contextStore =
InstrumentationContext.get(ListenerExecutionFailedException.class, State.class);
State state = contextStore.putIfAbsent(listenerException, State.FACTORY);
SpringMessageErrorHandlerHelper.transferPendingContinuation(state);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package datadog.trace.instrumentation.springmessaging;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.captureSpan;

import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;

public final class SpringMessageErrorHandlerHelper {
private static final ThreadLocal<Integer> AWS_LISTENER_DEPTH = ThreadLocal.withInitial(() -> 0);
private static final ThreadLocal<AgentScope.Continuation> PENDING_CONTINUATION =
new ThreadLocal<>();

private SpringMessageErrorHandlerHelper() {}

public static void enterAwsListenerInvocation() {
AWS_LISTENER_DEPTH.set(AWS_LISTENER_DEPTH.get() + 1);
}

public static void exitAwsListenerInvocation() {
int depth = AWS_LISTENER_DEPTH.get() - 1;
if (depth <= 0) {
AWS_LISTENER_DEPTH.remove();
} else {
AWS_LISTENER_DEPTH.set(depth);
}
}

public static boolean isInAwsListenerInvocation() {
return AWS_LISTENER_DEPTH.get() > 0;
}

public static void capturePendingContinuation(AgentSpan span) {
if (span == null) {
return;
}
AgentScope.Continuation existing = PENDING_CONTINUATION.get();
if (existing != null) {
existing.cancel();
}
PENDING_CONTINUATION.set(captureSpan(span));
}

public static void transferPendingContinuation(State state) {
AgentScope.Continuation continuation = PENDING_CONTINUATION.get();
PENDING_CONTINUATION.remove();
if (continuation == null) {
return;
}
if (state == null) {
continuation.cancel();
return;
}
state.setOrCancelContinuation(continuation);
}

public static void clearPendingContinuation() {
AgentScope.Continuation continuation = PENDING_CONTINUATION.get();
PENDING_CONTINUATION.remove();
if (continuation != null) {
continuation.cancel();
}
}

public static AgentScope activateContinuation(State state) {
if (state == null) {
return null;
}
AgentScope.Continuation continuation = state.getAndResetContinuation();
return continuation != null ? continuation.activate() : null;
}

public static void cancelContinuation(State state) {
if (state != null) {
state.closeContinuation();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public String[] helperClassNames() {
packageName + ".SpringMessageDecorator",
packageName + ".SpringMessageExtractAdapter",
packageName + ".SpringMessageExtractAdapter$1",
packageName + ".SpringMessageErrorHandlerHelper",
};
}

Expand Down Expand Up @@ -95,10 +96,13 @@ public static void onExit(
return;
}
AgentSpan span = scope.span();
scope.close();
if (null != error) {
DECORATE.onError(span, error);
if (SpringMessageErrorHandlerHelper.isInAwsListenerInvocation()) {
SpringMessageErrorHandlerHelper.capturePendingContinuation(span);
}
Comment on lines 99 to +103
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Propagate context for asynchronous listener failures

When an @SqsListener returns a CompletableFuture/CompletionStage that completes exceptionally after invoke returns, @Advice.Thrown error is null here, so no pending continuation is captured. Spring Cloud AWS wraps that later async failure in MessageListenerExecutionStage rather than through createListenerException, so the error handler still runs without the spring.consume context for the normal async-failure path; the added test only covers a method that throws before returning its future. Capture/attach the continuation for failed async results as well as synchronous throws.

Useful? React with 👍 / 👎.

}
scope.close();
if (result != null) {
Object wrappedResult =
AsyncResultExtensions.wrapAsyncResult(result, result.getClass(), span);
Expand Down
Loading
Loading