diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle
index 336ea73a0ec..66fc04a0396 100644
--- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle
+++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle
@@ -1,13 +1,24 @@
+import org.jetbrains.kotlin.gradle.dsl.JvmTarget
+import org.jetbrains.kotlin.gradle.dsl.KotlinVersion
+
+plugins {
+ id 'org.jetbrains.kotlin.jvm'
+}
+
muzzle {
pass {
group = 'org.springframework'
module = 'spring-messaging'
versions = "[4.0.0.RELEASE,)"
assertInverse = true
+ // KotlinAwareHandlerInstrumentation references Publisher from reactive-streams,
+ // which is not bundled in spring-messaging but is always present when Spring Kafka is.
+ extraDependency 'org.reactivestreams:reactive-streams:1.0.4'
}
}
apply from: "$rootDir/gradle/java.gradle"
+apply from: "$rootDir/gradle/test-with-kotlin.gradle"
testJvmConstraints {
minJavaVersion = JavaVersion.VERSION_17
@@ -16,13 +27,24 @@ testJvmConstraints {
addTestSuiteForDir('latestDepTest', 'test')
["compileTestGroovy", "compileLatestDepTestGroovy"].each { name ->
+ def kotlinTaskName = name.replace("Groovy", "Kotlin")
tasks.named(name, GroovyCompile) {
configureCompiler(it, 17)
+ classpath += files(tasks.named(kotlinTaskName).map { it.destinationDirectory })
+ }
+}
+
+kotlin {
+ compilerOptions {
+ jvmTarget = JvmTarget.JVM_1_8
+ apiVersion = KotlinVersion.KOTLIN_1_9
+ languageVersion = KotlinVersion.KOTLIN_1_9
}
}
dependencies {
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE'
+ compileOnly 'org.reactivestreams:reactive-streams:1.0.4'
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common')
// capture SQS send and receive spans, propagate trace details in messages
@@ -36,6 +58,32 @@ dependencies {
}
testImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.13', version: '1.2.3'
+ // Spring Kafka + embedded Kafka broker for coroutine tests
+ testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.4', {
+ exclude group: 'org.apache.kafka'
+ }
+ testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.3.4', {
+ exclude group: 'org.apache.kafka'
+ }
+
+ // KotlinAwareHandlerInstrumentation relies on the reactive-streams instrumentation
+ testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0')
+
+ testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test'
+ testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
+ testImplementation 'org.apache.kafka:kafka-clients:3.8.0:test'
+ testImplementation 'org.apache.kafka:kafka_2.13:3.8.0'
+ testImplementation 'org.apache.kafka:kafka_2.13:3.8.0:test'
+
+ testImplementation libs.kotlin
+ testImplementation "org.jetbrains.kotlin:kotlin-reflect"
+ testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.8.+"
+ testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.+"
+ testImplementation "io.projectreactor:reactor-core:3.+"
+
+ testRuntimeOnly project(':dd-java-agent:instrumentation:kotlin-coroutines-1.3')
+ testRuntimeOnly project(':dd-java-agent:instrumentation:kafka:kafka-clients-3.8')
+
latestDepTestImplementation group: 'org.springframework', name: 'spring-messaging', version: '6.+', {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java
new file mode 100644
index 00000000000..72f2c181996
--- /dev/null
+++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java
@@ -0,0 +1,71 @@
+package datadog.trace.instrumentation.springmessaging;
+
+import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.isMethod;
+
+import com.google.auto.service.AutoService;
+import datadog.context.Context;
+import datadog.trace.agent.tooling.Instrumenter;
+import datadog.trace.agent.tooling.InstrumenterModule;
+import datadog.trace.bootstrap.InstrumentationContext;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import net.bytebuddy.asm.Advice;
+import org.reactivestreams.Publisher;
+
+/**
+ * Instruments {@code KotlinAwareInvocableHandlerMethod.doInvoke()} to attach the current {@link
+ * Context} to the returned {@link Publisher} so that the reactive-streams instrumentation activates
+ * it during subscription.
+ *
+ *
When a Spring Kafka listener is a Kotlin {@code suspend fun}, {@code
+ * KotlinAwareInvocableHandlerMethod.doInvoke()} returns a cold {@code Mono} immediately, before the
+ * listener body runs. By the time the {@code Mono} is subscribed (and the underlying {@code
+ * AbstractCoroutine} is constructed), the {@code spring.consume} scope opened by {@link
+ * SpringMessageHandlerInstrumentation} has already been closed. This advice captures {@link
+ * Context#current()} at {@code doInvoke()} exit — while {@code spring.consume} is still active —
+ * and stores it on the Publisher. The reactive-streams {@code PublisherInstrumentation} then
+ * retrieves and activates it during subscription so that {@code DatadogThreadContextElement} picks
+ * up the correct parent context when the underlying {@code AbstractCoroutine} is constructed.
+ */
+@AutoService(InstrumenterModule.class)
+public class KotlinAwareHandlerInstrumentation extends InstrumenterModule.Tracing
+ implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
+
+ public KotlinAwareHandlerInstrumentation() {
+ super("spring-messaging", "spring-messaging-4");
+ }
+
+ @Override
+ public Map contextStore() {
+ return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
+ }
+
+ @Override
+ public List typeInstrumentations() {
+ return Collections.singletonList(new KotlinAwareHandlerInstrumentation());
+ }
+
+ @Override
+ public String instrumentedType() {
+ return "org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod";
+ }
+
+ @Override
+ public void methodAdvice(MethodTransformer transformer) {
+ transformer.applyAdvice(
+ isMethod().and(named("doInvoke")),
+ KotlinAwareHandlerInstrumentation.class.getName() + "$DoInvokeAdvice");
+ }
+
+ public static class DoInvokeAdvice {
+ @Advice.OnMethodExit(suppress = Throwable.class)
+ public static void onExit(@Advice.Return Object result) {
+ if (result instanceof Publisher) {
+ InstrumentationContext.get(Publisher.class, Context.class)
+ .put((Publisher>) result, Context.current());
+ }
+ }
+ }
+}
diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageAsyncHelper.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageAsyncHelper.java
new file mode 100644
index 00000000000..4185199a5da
--- /dev/null
+++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageAsyncHelper.java
@@ -0,0 +1,145 @@
+package datadog.trace.instrumentation.springmessaging;
+
+import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE;
+
+import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
+import java.lang.reflect.Method;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+public final class SpringMessageAsyncHelper {
+ private SpringMessageAsyncHelper() {}
+
+ private static final ClassValue REACTOR_CALLBACK_METHODS =
+ new ReactorCallbacksClassValue();
+
+ public static Object wrapAsyncResult(Object result, AgentSpan span) {
+ if (result == null) {
+ return null;
+ }
+ SpanFinisher finisher = new SpanFinisher(span);
+ if (result instanceof CompletionStage>) {
+ return ((CompletionStage>) result)
+ .whenComplete(new CompletionStageFinishCallback(finisher));
+ }
+ ReactorCallbackMethods callbackMethods = REACTOR_CALLBACK_METHODS.get(result.getClass());
+ if (!callbackMethods.supported()) {
+ return null;
+ }
+ try {
+ Object wrapped = callbackMethods.doOnError.invoke(result, new ErrorCallback(finisher));
+ wrapped = callbackMethods.doOnTerminate.invoke(wrapped, new FinishCallback(finisher));
+ return callbackMethods.doOnCancel.invoke(wrapped, new FinishCallback(finisher));
+ } catch (Throwable ignored) {
+ return null;
+ }
+ }
+
+ static final class ReactorCallbacksClassValue extends ClassValue {
+ @Override
+ protected ReactorCallbackMethods computeValue(Class> type) {
+ try {
+ Method doOnError = type.getMethod("doOnError", Consumer.class);
+ Method doOnTerminate = type.getMethod("doOnTerminate", Runnable.class);
+ Method doOnCancel = type.getMethod("doOnCancel", Runnable.class);
+ return new ReactorCallbackMethods(doOnError, doOnTerminate, doOnCancel);
+ } catch (Throwable ignored) {
+ return ReactorCallbackMethods.UNSUPPORTED;
+ }
+ }
+ }
+
+ static final class ReactorCallbackMethods {
+ static final ReactorCallbackMethods UNSUPPORTED = new ReactorCallbackMethods(null, null, null);
+
+ final Method doOnError;
+ final Method doOnTerminate;
+ final Method doOnCancel;
+
+ ReactorCallbackMethods(Method doOnError, Method doOnTerminate, Method doOnCancel) {
+ this.doOnError = doOnError;
+ this.doOnTerminate = doOnTerminate;
+ this.doOnCancel = doOnCancel;
+ }
+
+ boolean supported() {
+ return doOnError != null && doOnTerminate != null && doOnCancel != null;
+ }
+ }
+
+ static final class SpanFinisher {
+ private final AgentSpan span;
+ private final AtomicBoolean finished = new AtomicBoolean(false);
+
+ SpanFinisher(AgentSpan span) {
+ this.span = span;
+ }
+
+ void onError(Throwable throwable) {
+ DECORATE.onError(span, throwable);
+ }
+
+ void finish() {
+ if (finished.compareAndSet(false, true)) {
+ DECORATE.beforeFinish(span);
+ span.finish();
+ }
+ }
+ }
+
+ static final class CompletionStageFinishCallback implements BiConsumer