diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle index 5ce7c997f99..b056987e409 100644 --- a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle +++ b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle @@ -8,9 +8,35 @@ muzzle { } apply from: "$rootDir/gradle/java.gradle" +// Use slf4j-simple for tests; logback's synchronized appenders can pin virtual-thread carriers +// when many vthreads log concurrently, which deadlocks this module's vthread21Test suite. +apply from: "$rootDir/gradle/slf4j-simple.gradle" addTestSuiteForDir('latestDepTest', 'test') +// Separate suite for tests that need the JDK 21+ virtual-thread API. Kept apart from the +// default test suite so that the existing OkHttp 3.0 tests keep their Java 1.8 baseline. +addTestSuite('vthread21Test') + +tasks.named("compileVthread21TestJava", JavaCompile) { + configureCompiler(it, 21) +} + +tasks.named("vthread21Test", Test) { + testJvmConstraints { + minJavaVersion = JavaVersion.VERSION_21 + // Cap at 24 to match java-concurrent-21.0, which provides the virtual-thread scope propagation + // (TaskRunner/VirtualThread instrumentation) this suite relies on and is only supported through + // JDK 24. Without this, the suite runs on the JDK 25 / "tip" (26) CI shards and times out + // because that propagation chain isn't active there. + maxJavaVersion = JavaVersion.VERSION_24 + } +} + +tasks.named("check") { + dependsOn "vthread21Test" +} + dependencies { compileOnly(group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.0.0') @@ -38,4 +64,14 @@ dependencies { testRuntimeOnly(project(':dd-java-agent:instrumentation:datadog:asm:iast-instrumenter')) testRuntimeOnly(project(':dd-java-agent:instrumentation:java:java-net:java-net-1.8')) + + // vthread21Test inherits testImplementation via addTestSuite's extendsFrom wiring. + vthread21TestImplementation(group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.11.0') + vthread21TestImplementation(group: 'com.squareup.okio', name: 'okio', version: '1.14.0') + + // Pull in the JDK-21+ concurrent / lang instrumentations so the test installs the same + // TaskRunnerInstrumentation + VirtualThreadInstrumentation chain that profiling-backend + // exercises in production. + vthread21TestRuntimeOnly project(':dd-java-agent:instrumentation:java:java-concurrent:java-concurrent-21.0') + vthread21TestRuntimeOnly project(':dd-java-agent:instrumentation:java:java-lang:java-lang-21.0') } diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/main/java/datadog/trace/instrumentation/okhttp3/AsyncCallInstrumentation.java b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/main/java/datadog/trace/instrumentation/okhttp3/AsyncCallInstrumentation.java new file mode 100644 index 00000000000..ac77c39d85f --- /dev/null +++ b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/main/java/datadog/trace/instrumentation/okhttp3/AsyncCallInstrumentation.java @@ -0,0 +1,77 @@ +package datadog.trace.instrumentation.okhttp3; + +import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.Map; +import net.bytebuddy.asm.Advice; + +/** + * Captures the active scope at {@code AsyncCall.} (i.e. the moment {@code + * Call.enqueue(callback)} was invoked on the user's thread) and stores it in the {@code + * ContextStore} shared with the rest of the concurrent instrumentation. {@code + * RunnableInstrumentation} then re-activates that scope on {@code AsyncCall.run()} entry, which + * overrides whatever scope {@code TaskRunner.run()} (or {@code beforeExecute}) put in place from + * the dispatcher's worker thread. + * + *

Without this, {@code TaskRunnerInstrumentation} captures whatever scope happens to be active + * on the worker thread when {@code Dispatcher.promoteAndExecute()} dequeues and submits the call — + * and when promotion runs from inside {@code Dispatcher.finished()} (i.e. recursively from a + * different AsyncCall's run()), that scope belongs to the finishing call, not to the + * caller who actually enqueued this AsyncCall. Result: under concurrent OkHttp load, {@code + * okhttp.request} spans cross-contaminate between traces. + * + *

{@code AsyncCall} is an inner class of {@code RealCall} and transitively implements {@link + * Runnable}. This module targets OkHttp 3.x, where it lives at {@code okhttp3.RealCall$AsyncCall}. + * OkHttp 4.x+ relocated it to {@code okhttp3.internal.connection.RealCall$AsyncCall}, which is + * handled by the separate {@code okhttp-4.0} module. + */ +@AutoService(InstrumenterModule.class) +public final class AsyncCallInstrumentation extends InstrumenterModule.ContextTracking + implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { + + public AsyncCallInstrumentation() { + // Re-use the existing "okhttp" / "okhttp-3" instrumentation names so we don't introduce a + // separately-toggleable feature flag (DD_TRACE_OKHTTP_ASYNC_CALL_ENABLED). The capture here + // is conceptually part of the OkHttp instrumentation — if you disable OkHttp tracing, you + // also disable this capture, which is the right behavior. + // + // This is a ContextTracking module (like RunnableInstrumentation, which consumes the state we + // write) rather than a Tracing module: its sole job is to propagate the captured scope through + // the shared ContextStore, not to create spans of its own. + super("okhttp", "okhttp-3"); + } + + @Override + public String[] knownMatchingTypes() { + return new String[] { + "okhttp3.RealCall$AsyncCall", // OkHttp 3.x + }; + } + + @Override + public Map contextStore() { + // Same Runnable -> State store that RunnableInstrumentation reads from. + return singletonMap("java.lang.Runnable", State.class.getName()); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct"); + } + + public static final class Construct { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void captureScope(@Advice.This Runnable asyncCall) { + // AdviceUtils.capture is a no-op when async propagation is disabled or there's no active + // span — same behavior as the rest of the concurrent instrumentation. + capture(InstrumentationContext.get(Runnable.class, State.class), asyncCall); + } + } +} diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java new file mode 100644 index 00000000000..a2558906e4e --- /dev/null +++ b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java @@ -0,0 +1,307 @@ +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.sun.net.httpserver.HttpServer; +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.core.DDSpan; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.Dispatcher; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * End-to-end reproduction of profiling-backend PR#8520: swap OkHttp's Dispatcher executor from + * {@code Executors.newCachedThreadPool(...)} to {@code + * Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name(prefix, start).factory())}. + * + *

Each test runs against the same {@link HttpServer} mock. Inside a manually activated "parent" + * span it does {@code client.newCall(request).enqueue(callback)} and waits on a latch for the + * callback. The agent's OkHttp instrumentation injects {@code TracingInterceptor}, which creates + * the {@code okhttp.request} client span using whatever scope is active on the dispatcher worker. + * The assertions verify the client span lands under the parent — i.e., the dispatcher's + * worker thread saw the propagated scope. + * + *

{@code concurrentVirtualThreadPerTaskDispatcher_keepsEachTraceSeparate} is the regression test + * for the fix: it forces dispatcher-queue contention so calls are promoted from {@code + * Dispatcher.finished()} on a sibling's worker thread, and fails (cross-trace contamination) + * without the {@code AsyncCall.} scope capture. {@code virtualThreadPerTaskDispatcher_…} is a + * single-shot baseline: it confirms basic propagation through the virtual-thread dispatcher but + * does not by itself exercise the contamination path (single-shot requests never queue). + */ +class OkHttpVirtualThreadDispatcherTest extends AbstractInstrumentationTest { + + private static HttpServer mockServer; + private static String baseUrl; + + @BeforeAll + static void startServer() throws IOException { + mockServer = HttpServer.create(new InetSocketAddress("localhost", 0), 64); + mockServer.createContext( + "/ok", + exchange -> { + byte[] body = "ok".getBytes(); + exchange.sendResponseHeaders(200, body.length); + exchange.getResponseBody().write(body); + exchange.close(); + }); + // Default executor is single-threaded — give the server real concurrency so the test isn't + // bottlenecked on the mock backend itself. + mockServer.setExecutor(Executors.newCachedThreadPool()); + mockServer.start(); + baseUrl = + "http://" + + mockServer.getAddress().getHostString() + + ":" + + mockServer.getAddress().getPort(); + } + + @AfterAll + static void stopServer() { + if (mockServer != null) { + mockServer.stop(0); + mockServer = null; + } + } + + /** Activate a manual parent span, run the OkHttp call, wait for the okhttp.request child. */ + private void runUnderParent(OkHttpClient client, CountDownLatch done) { + AgentSpan parentSpan = AgentTracer.startSpan("test", "parent"); + try (AgentScope ignored = AgentTracer.activateSpan(parentSpan)) { + Request request = new Request.Builder().url(baseUrl + "/ok").build(); + client + .newCall(request) + .enqueue( + new Callback() { + @Override + public void onResponse(Call call, Response response) throws IOException { + response.body().close(); + done.countDown(); + } + + @Override + public void onFailure(Call call, IOException e) { + done.countDown(); + } + }); + if (!done.await(10, TimeUnit.SECONDS)) { + throw new AssertionError("timed out waiting for OkHttp callback"); + } + // Wait for the okhttp.request child to finish before closing the parent scope so the trace + // collector sees a complete trace. + blockUntilChildSpansFinished(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } finally { + parentSpan.finish(); + } + } + + @Test + void virtualThreadPerTaskDispatcher_parentsOkHttpSpanUnderParent() throws Exception { + // Exact shape from profiling-backend PR#8520. + ExecutorService dispatcherExecutor = + Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("okhttp-test-", 0).factory()); + OkHttpClient client = buildClient(dispatcherExecutor); + try { + runUnderParent(client, new CountDownLatch(1)); + } finally { + dispatcherExecutor.shutdown(); + } + + assertOkHttpSpanParentedUnderParent(); + } + + /** + * Concurrent stress test: spin up N independent parent traces, each enqueueing M OkHttp requests + * through the same virtual-thread dispatcher. Dispatcher capacity is intentionally set below N*M + * so that some calls get queued and then promoted from {@code Dispatcher.finished()} running on a + * dispatcher worker thread (a different parent's worker). + * + *

If the agent captures the worker's currently-active scope when the promoted call is + * submitted — instead of the scope active where the original {@code enqueue()} happened + * — the {@code okhttp.request} span will land in the wrong parent's trace. This test fails + * loudly on that cross-contamination. + */ + @Test + void concurrentVirtualThreadPerTaskDispatcher_keepsEachTraceSeparate() throws Exception { + int parentCount = 16; + int requestsPerParent = 4; + int totalRequests = parentCount * requestsPerParent; + + ExecutorService dispatcherExecutor = + Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("okhttp-burst-", 0).factory()); + OkHttpClient client = buildClient(dispatcherExecutor); + // Force queue contention: capacity is much smaller than the total in-flight requests, so + // many calls will be promoted from finished() rather than direct from enqueue(). + Dispatcher dispatcher = client.dispatcher(); + dispatcher.setMaxRequests(4); + dispatcher.setMaxRequestsPerHost(4); + + ExecutorService parentRunner = Executors.newFixedThreadPool(parentCount); + CountDownLatch allParentsDone = new CountDownLatch(parentCount); + AtomicReference failure = new AtomicReference<>(); + + try { + for (int i = 0; i < parentCount; i++) { + parentRunner.submit( + () -> { + try { + runParentBurst(client, requestsPerParent); + } catch (Throwable t) { + failure.compareAndSet(null, t); + } finally { + allParentsDone.countDown(); + } + }); + } + assertTrue(allParentsDone.await(60, TimeUnit.SECONDS), "parent threads timed out"); + } finally { + parentRunner.shutdown(); + dispatcherExecutor.shutdown(); + } + + if (failure.get() != null) { + throw new AssertionError("a parent-runner thread threw", failure.get()); + } + + // Each parent produces one trace containing 1 parent span + M okhttp.request spans. + writer.waitForTraces(parentCount); + + // Map trace-id -> spans-in-that-trace and assert structure. + Map> tracesByRoot = new HashMap<>(); + for (List trace : writer) { + DDSpan parentSpan = findByOp(trace, "parent"); + assertNotNull(parentSpan, "every collected trace should have a parent span"); + tracesByRoot.put(parentSpan.getSpanId(), trace); + } + assertEquals( + parentCount, + tracesByRoot.size(), + "expected one distinct trace per parent burst (no cross-trace contamination)"); + + int totalOkhttpSpans = 0; + List contamination = new ArrayList<>(); + for (Map.Entry> entry : tracesByRoot.entrySet()) { + long parentSpanId = entry.getKey(); + List trace = entry.getValue(); + int okhttpCountInThisTrace = 0; + for (DDSpan span : trace) { + if ("okhttp.request".contentEquals(span.getOperationName())) { + okhttpCountInThisTrace++; + if (span.getParentId() != parentSpanId) { + contamination.add( + "okhttp.request span " + + span.getSpanId() + + " has parentId=" + + span.getParentId() + + " but it lives in the trace rooted at " + + parentSpanId); + } + } + } + totalOkhttpSpans += okhttpCountInThisTrace; + assertEquals( + requestsPerParent, + okhttpCountInThisTrace, + "trace rooted at parent " + parentSpanId + " has wrong child count"); + } + + assertEquals(totalRequests, totalOkhttpSpans, "total okhttp.request spans across all traces"); + assertTrue( + contamination.isEmpty(), + "found cross-parented okhttp.request spans:\n - " + String.join("\n - ", contamination)); + } + + /** + * One parent burst: M OkHttp requests under a single freshly-started parent span. Deliberately + * does not block on per-parent child-span accounting — the whole point of the test + * is to detect when children leak to a sibling's trace, and per-parent blocking would just turn + * that into a timeout instead of producing a useful assertion message. Wait for the HTTP + * callbacks (so the request actually ran), then close the parent. + */ + private void runParentBurst(OkHttpClient client, int requestsPerParent) { + AgentSpan parentSpan = AgentTracer.startSpan("test", "parent"); + try (AgentScope ignored = AgentTracer.activateSpan(parentSpan)) { + CountDownLatch done = new CountDownLatch(requestsPerParent); + for (int i = 0; i < requestsPerParent; i++) { + Request request = new Request.Builder().url(baseUrl + "/ok").build(); + client + .newCall(request) + .enqueue( + new Callback() { + @Override + public void onResponse(Call call, Response response) throws IOException { + response.body().close(); + done.countDown(); + } + + @Override + public void onFailure(Call call, IOException e) { + done.countDown(); + } + }); + } + if (!done.await(30, TimeUnit.SECONDS)) { + throw new AssertionError("timed out waiting for OkHttp callbacks"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } finally { + parentSpan.finish(); + } + } + + private static OkHttpClient buildClient(ExecutorService dispatcherExecutor) { + Dispatcher dispatcher = new Dispatcher(dispatcherExecutor); + return new OkHttpClient.Builder().dispatcher(dispatcher).build(); + } + + private void assertOkHttpSpanParentedUnderParent() throws Exception { + writer.waitForTraces(1); + List trace = writer.get(0); + DDSpan parentSpan = findByOp(trace, "parent"); + DDSpan okhttpSpan = findByOp(trace, "okhttp.request"); + assertNotNull(parentSpan, "parent span should exist"); + assertNotNull( + okhttpSpan, + "okhttp.request client span should exist; if missing, propagation may have produced an" + + " orphan trace instead"); + assertEquals( + parentSpan.getTraceId().toLong(), + okhttpSpan.getTraceId().toLong(), + "okhttp.request span must share the parent's trace id"); + assertEquals( + parentSpan.getSpanId(), + okhttpSpan.getParentId(), + "okhttp.request span must be parented under the parent span active at enqueue() time"); + } + + private static DDSpan findByOp(List spans, String op) { + return spans.stream() + .filter(s -> op.contentEquals(s.getOperationName())) + .findFirst() + .orElse(null); + } +} diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-4.0/build.gradle b/dd-java-agent/instrumentation/okhttp/okhttp-4.0/build.gradle new file mode 100644 index 00000000000..9352fddbffd --- /dev/null +++ b/dd-java-agent/instrumentation/okhttp/okhttp-4.0/build.gradle @@ -0,0 +1,67 @@ +muzzle { + // No assertInverse: the advice references only java.lang.Runnable plus agent-bootstrap helpers, + // never an OkHttp type, so muzzle has nothing version-specific to fail on for pre-4.x. The real + // version gate is knownMatchingTypes — okhttp3.internal.connection.RealCall$AsyncCall only exists + // in 4.x+, so the instrumentation simply doesn't match on 3.x (which okhttp-3.0 handles instead). + pass { + group = "com.squareup.okhttp3" + module = "okhttp" + versions = "[4.0,)" + } +} + +apply from: "$rootDir/gradle/java.gradle" +// Use slf4j-simple for tests; logback's synchronized appenders can pin virtual-thread carriers +// when many vthreads log concurrently, which deadlocks the virtual-thread suites. +apply from: "$rootDir/gradle/slf4j-simple.gradle" + +// The only coverage here is the virtual-thread regression suite, which needs the JDK 21+ API. We +// run the same test source against both OkHttp 4.x and 5.x: +// - 4.x lives at com.squareup.okhttp3:okhttp +// - 5.x became a Kotlin-Multiplatform library; com.squareup.okhttp3:okhttp is a metadata shell and +// the JVM classes ship in com.squareup.okhttp3:okhttp-jvm (Gradle resolves this automatically +// via variant metadata). The matched type okhttp3.internal.connection.RealCall$AsyncCall is at +// the same FQN in both, so a single instrumentation covers both majors. +// Both suites share src/vthread21Test/java and are kept apart only to bind different OkHttp versions. +addTestSuiteForDir('vthread21Test4', 'vthread21Test') +addTestSuiteForDir('vthread21Test5', 'vthread21Test') + +['vthread21Test4', 'vthread21Test5'].each { suite -> + tasks.named("compile${suite.capitalize()}Java", JavaCompile) { + configureCompiler(it, 21) + } + tasks.named(suite, Test) { + testJvmConstraints { + minJavaVersion = JavaVersion.VERSION_21 + // Cap at 24 to match java-concurrent-21.0, which provides the virtual-thread scope propagation + // these suites rely on and is only supported through JDK 24. See okhttp-3.0 for details. + maxJavaVersion = JavaVersion.VERSION_24 + } + } + tasks.named("check") { + dependsOn suite + } +} + +dependencies { + compileOnly(group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.0.0') + + // OkHttp 4.x (Kotlin) pulls kotlin-stdlib and okio 2.x transitively. + vthread21Test4Implementation(group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.12.0') + // OkHttp 5.x resolves to okhttp-jvm via Gradle variant metadata; pulls kotlin-stdlib 2.x + okio 3.x. + vthread21Test5Implementation(group: 'com.squareup.okhttp3', name: 'okhttp', version: '5.4.0') + + // The core OkHttp tracing (OkHttp3Instrumentation -> TracingInterceptor, which creates the + // okhttp.request span) lives in the okhttp-3.0 module and applies to 4.x/5.x too. Pull it in so + // the tests exercise the full chain: core tracing + this module's AsyncCall scope capture. + vthread21Test4RuntimeOnly project(':dd-java-agent:instrumentation:okhttp:okhttp-3.0') + vthread21Test5RuntimeOnly project(':dd-java-agent:instrumentation:okhttp:okhttp-3.0') + + // Pull in the JDK-21+ concurrent / lang instrumentations so the tests install the same + // TaskRunnerInstrumentation + VirtualThreadInstrumentation chain that profiling-backend + // exercises in production. + vthread21Test4RuntimeOnly project(':dd-java-agent:instrumentation:java:java-concurrent:java-concurrent-21.0') + vthread21Test4RuntimeOnly project(':dd-java-agent:instrumentation:java:java-lang:java-lang-21.0') + vthread21Test5RuntimeOnly project(':dd-java-agent:instrumentation:java:java-concurrent:java-concurrent-21.0') + vthread21Test5RuntimeOnly project(':dd-java-agent:instrumentation:java:java-lang:java-lang-21.0') +} diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-4.0/src/main/java/datadog/trace/instrumentation/okhttp4/AsyncCallInstrumentation.java b/dd-java-agent/instrumentation/okhttp/okhttp-4.0/src/main/java/datadog/trace/instrumentation/okhttp4/AsyncCallInstrumentation.java new file mode 100644 index 00000000000..4c71d9a35be --- /dev/null +++ b/dd-java-agent/instrumentation/okhttp/okhttp-4.0/src/main/java/datadog/trace/instrumentation/okhttp4/AsyncCallInstrumentation.java @@ -0,0 +1,65 @@ +package datadog.trace.instrumentation.okhttp4; + +import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.Map; +import net.bytebuddy.asm.Advice; + +/** + * OkHttp 4.x+ variant of the {@code AsyncCall.} scope capture. Identical in behavior to the + * {@code okhttp-3.0} module's instrumentation — see that class for the full explanation of the + * dispatcher-recursion failure mode — but matches the relocated 4.x type. + * + *

{@code AsyncCall} is an inner class of {@code RealCall} and transitively implements {@link + * Runnable}. OkHttp 4.x moved {@code RealCall} from {@code okhttp3} into {@code + * okhttp3.internal.connection}, so the nested type is {@code + * okhttp3.internal.connection.RealCall$AsyncCall}. + */ +@AutoService(InstrumenterModule.class) +public final class AsyncCallInstrumentation extends InstrumenterModule.ContextTracking + implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { + + public AsyncCallInstrumentation() { + // Re-use the existing "okhttp" instrumentation name so this capture is enabled/disabled with + // OkHttp tracing as a whole, rather than introducing a separately-toggleable feature flag. + // + // This is a ContextTracking module (like RunnableInstrumentation, which consumes the state we + // write) rather than a Tracing module: its sole job is to propagate the captured scope through + // the shared ContextStore, not to create spans of its own. + super("okhttp", "okhttp-4"); + } + + @Override + public String[] knownMatchingTypes() { + return new String[] { + "okhttp3.internal.connection.RealCall$AsyncCall", // OkHttp 4.x+ + }; + } + + @Override + public Map contextStore() { + // Same Runnable -> State store that RunnableInstrumentation reads from. + return singletonMap("java.lang.Runnable", State.class.getName()); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct"); + } + + public static final class Construct { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void captureScope(@Advice.This Runnable asyncCall) { + // AdviceUtils.capture is a no-op when async propagation is disabled or there's no active + // span — same behavior as the rest of the concurrent instrumentation. + capture(InstrumentationContext.get(Runnable.class, State.class), asyncCall); + } + } +} diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-4.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java b/dd-java-agent/instrumentation/okhttp/okhttp-4.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java new file mode 100644 index 00000000000..a2558906e4e --- /dev/null +++ b/dd-java-agent/instrumentation/okhttp/okhttp-4.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java @@ -0,0 +1,307 @@ +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.sun.net.httpserver.HttpServer; +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.core.DDSpan; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.Dispatcher; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * End-to-end reproduction of profiling-backend PR#8520: swap OkHttp's Dispatcher executor from + * {@code Executors.newCachedThreadPool(...)} to {@code + * Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name(prefix, start).factory())}. + * + *

Each test runs against the same {@link HttpServer} mock. Inside a manually activated "parent" + * span it does {@code client.newCall(request).enqueue(callback)} and waits on a latch for the + * callback. The agent's OkHttp instrumentation injects {@code TracingInterceptor}, which creates + * the {@code okhttp.request} client span using whatever scope is active on the dispatcher worker. + * The assertions verify the client span lands under the parent — i.e., the dispatcher's + * worker thread saw the propagated scope. + * + *

{@code concurrentVirtualThreadPerTaskDispatcher_keepsEachTraceSeparate} is the regression test + * for the fix: it forces dispatcher-queue contention so calls are promoted from {@code + * Dispatcher.finished()} on a sibling's worker thread, and fails (cross-trace contamination) + * without the {@code AsyncCall.} scope capture. {@code virtualThreadPerTaskDispatcher_…} is a + * single-shot baseline: it confirms basic propagation through the virtual-thread dispatcher but + * does not by itself exercise the contamination path (single-shot requests never queue). + */ +class OkHttpVirtualThreadDispatcherTest extends AbstractInstrumentationTest { + + private static HttpServer mockServer; + private static String baseUrl; + + @BeforeAll + static void startServer() throws IOException { + mockServer = HttpServer.create(new InetSocketAddress("localhost", 0), 64); + mockServer.createContext( + "/ok", + exchange -> { + byte[] body = "ok".getBytes(); + exchange.sendResponseHeaders(200, body.length); + exchange.getResponseBody().write(body); + exchange.close(); + }); + // Default executor is single-threaded — give the server real concurrency so the test isn't + // bottlenecked on the mock backend itself. + mockServer.setExecutor(Executors.newCachedThreadPool()); + mockServer.start(); + baseUrl = + "http://" + + mockServer.getAddress().getHostString() + + ":" + + mockServer.getAddress().getPort(); + } + + @AfterAll + static void stopServer() { + if (mockServer != null) { + mockServer.stop(0); + mockServer = null; + } + } + + /** Activate a manual parent span, run the OkHttp call, wait for the okhttp.request child. */ + private void runUnderParent(OkHttpClient client, CountDownLatch done) { + AgentSpan parentSpan = AgentTracer.startSpan("test", "parent"); + try (AgentScope ignored = AgentTracer.activateSpan(parentSpan)) { + Request request = new Request.Builder().url(baseUrl + "/ok").build(); + client + .newCall(request) + .enqueue( + new Callback() { + @Override + public void onResponse(Call call, Response response) throws IOException { + response.body().close(); + done.countDown(); + } + + @Override + public void onFailure(Call call, IOException e) { + done.countDown(); + } + }); + if (!done.await(10, TimeUnit.SECONDS)) { + throw new AssertionError("timed out waiting for OkHttp callback"); + } + // Wait for the okhttp.request child to finish before closing the parent scope so the trace + // collector sees a complete trace. + blockUntilChildSpansFinished(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } finally { + parentSpan.finish(); + } + } + + @Test + void virtualThreadPerTaskDispatcher_parentsOkHttpSpanUnderParent() throws Exception { + // Exact shape from profiling-backend PR#8520. + ExecutorService dispatcherExecutor = + Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("okhttp-test-", 0).factory()); + OkHttpClient client = buildClient(dispatcherExecutor); + try { + runUnderParent(client, new CountDownLatch(1)); + } finally { + dispatcherExecutor.shutdown(); + } + + assertOkHttpSpanParentedUnderParent(); + } + + /** + * Concurrent stress test: spin up N independent parent traces, each enqueueing M OkHttp requests + * through the same virtual-thread dispatcher. Dispatcher capacity is intentionally set below N*M + * so that some calls get queued and then promoted from {@code Dispatcher.finished()} running on a + * dispatcher worker thread (a different parent's worker). + * + *

If the agent captures the worker's currently-active scope when the promoted call is + * submitted — instead of the scope active where the original {@code enqueue()} happened + * — the {@code okhttp.request} span will land in the wrong parent's trace. This test fails + * loudly on that cross-contamination. + */ + @Test + void concurrentVirtualThreadPerTaskDispatcher_keepsEachTraceSeparate() throws Exception { + int parentCount = 16; + int requestsPerParent = 4; + int totalRequests = parentCount * requestsPerParent; + + ExecutorService dispatcherExecutor = + Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("okhttp-burst-", 0).factory()); + OkHttpClient client = buildClient(dispatcherExecutor); + // Force queue contention: capacity is much smaller than the total in-flight requests, so + // many calls will be promoted from finished() rather than direct from enqueue(). + Dispatcher dispatcher = client.dispatcher(); + dispatcher.setMaxRequests(4); + dispatcher.setMaxRequestsPerHost(4); + + ExecutorService parentRunner = Executors.newFixedThreadPool(parentCount); + CountDownLatch allParentsDone = new CountDownLatch(parentCount); + AtomicReference failure = new AtomicReference<>(); + + try { + for (int i = 0; i < parentCount; i++) { + parentRunner.submit( + () -> { + try { + runParentBurst(client, requestsPerParent); + } catch (Throwable t) { + failure.compareAndSet(null, t); + } finally { + allParentsDone.countDown(); + } + }); + } + assertTrue(allParentsDone.await(60, TimeUnit.SECONDS), "parent threads timed out"); + } finally { + parentRunner.shutdown(); + dispatcherExecutor.shutdown(); + } + + if (failure.get() != null) { + throw new AssertionError("a parent-runner thread threw", failure.get()); + } + + // Each parent produces one trace containing 1 parent span + M okhttp.request spans. + writer.waitForTraces(parentCount); + + // Map trace-id -> spans-in-that-trace and assert structure. + Map> tracesByRoot = new HashMap<>(); + for (List trace : writer) { + DDSpan parentSpan = findByOp(trace, "parent"); + assertNotNull(parentSpan, "every collected trace should have a parent span"); + tracesByRoot.put(parentSpan.getSpanId(), trace); + } + assertEquals( + parentCount, + tracesByRoot.size(), + "expected one distinct trace per parent burst (no cross-trace contamination)"); + + int totalOkhttpSpans = 0; + List contamination = new ArrayList<>(); + for (Map.Entry> entry : tracesByRoot.entrySet()) { + long parentSpanId = entry.getKey(); + List trace = entry.getValue(); + int okhttpCountInThisTrace = 0; + for (DDSpan span : trace) { + if ("okhttp.request".contentEquals(span.getOperationName())) { + okhttpCountInThisTrace++; + if (span.getParentId() != parentSpanId) { + contamination.add( + "okhttp.request span " + + span.getSpanId() + + " has parentId=" + + span.getParentId() + + " but it lives in the trace rooted at " + + parentSpanId); + } + } + } + totalOkhttpSpans += okhttpCountInThisTrace; + assertEquals( + requestsPerParent, + okhttpCountInThisTrace, + "trace rooted at parent " + parentSpanId + " has wrong child count"); + } + + assertEquals(totalRequests, totalOkhttpSpans, "total okhttp.request spans across all traces"); + assertTrue( + contamination.isEmpty(), + "found cross-parented okhttp.request spans:\n - " + String.join("\n - ", contamination)); + } + + /** + * One parent burst: M OkHttp requests under a single freshly-started parent span. Deliberately + * does not block on per-parent child-span accounting — the whole point of the test + * is to detect when children leak to a sibling's trace, and per-parent blocking would just turn + * that into a timeout instead of producing a useful assertion message. Wait for the HTTP + * callbacks (so the request actually ran), then close the parent. + */ + private void runParentBurst(OkHttpClient client, int requestsPerParent) { + AgentSpan parentSpan = AgentTracer.startSpan("test", "parent"); + try (AgentScope ignored = AgentTracer.activateSpan(parentSpan)) { + CountDownLatch done = new CountDownLatch(requestsPerParent); + for (int i = 0; i < requestsPerParent; i++) { + Request request = new Request.Builder().url(baseUrl + "/ok").build(); + client + .newCall(request) + .enqueue( + new Callback() { + @Override + public void onResponse(Call call, Response response) throws IOException { + response.body().close(); + done.countDown(); + } + + @Override + public void onFailure(Call call, IOException e) { + done.countDown(); + } + }); + } + if (!done.await(30, TimeUnit.SECONDS)) { + throw new AssertionError("timed out waiting for OkHttp callbacks"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } finally { + parentSpan.finish(); + } + } + + private static OkHttpClient buildClient(ExecutorService dispatcherExecutor) { + Dispatcher dispatcher = new Dispatcher(dispatcherExecutor); + return new OkHttpClient.Builder().dispatcher(dispatcher).build(); + } + + private void assertOkHttpSpanParentedUnderParent() throws Exception { + writer.waitForTraces(1); + List trace = writer.get(0); + DDSpan parentSpan = findByOp(trace, "parent"); + DDSpan okhttpSpan = findByOp(trace, "okhttp.request"); + assertNotNull(parentSpan, "parent span should exist"); + assertNotNull( + okhttpSpan, + "okhttp.request client span should exist; if missing, propagation may have produced an" + + " orphan trace instead"); + assertEquals( + parentSpan.getTraceId().toLong(), + okhttpSpan.getTraceId().toLong(), + "okhttp.request span must share the parent's trace id"); + assertEquals( + parentSpan.getSpanId(), + okhttpSpan.getParentId(), + "okhttp.request span must be parented under the parent span active at enqueue() time"); + } + + private static DDSpan findByOp(List spans, String op) { + return spans.stream() + .filter(s -> op.contentEquals(s.getOperationName())) + .findFirst() + .orElse(null); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 487f6275cf1..4aea2f69b41 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -500,6 +500,7 @@ include( ":dd-java-agent:instrumentation:ognl-appsec-3.3.2", ":dd-java-agent:instrumentation:okhttp:okhttp-2.2", ":dd-java-agent:instrumentation:okhttp:okhttp-3.0", + ":dd-java-agent:instrumentation:okhttp:okhttp-4.0", ":dd-java-agent:instrumentation:openai-java:openai-java-3.0", ":dd-java-agent:instrumentation:opensearch:opensearch-rest-1.0", ":dd-java-agent:instrumentation:opensearch:opensearch-transport-1.0",