diff --git a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/muzzle/MuzzleCheck.java b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/muzzle/MuzzleCheck.java index 2544e2732c8..5491f632940 100644 --- a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/muzzle/MuzzleCheck.java +++ b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/muzzle/MuzzleCheck.java @@ -31,8 +31,32 @@ public boolean matches(ClassLoader classLoader) { boolean muzzleMatches = muzzle().matches(classLoader); if (muzzleMatches) { InstrumenterState.applyInstrumentation(classLoader, instrumentationId); + if (instrumentationClass.contains("spark") || instrumentationClass.contains("Spark")) { + System.err.println( + "[DD-SPARK-DEBUG] MuzzleCheck PASSED: " + + InstrumenterState.describe(instrumentationId) + + " classloader=" + + classLoader); + } } else { InstrumenterState.blockInstrumentation(classLoader, instrumentationId); + if (instrumentationClass.contains("spark") || instrumentationClass.contains("Spark")) { + final List mismatches = + muzzle.getMismatchedReferenceSources(classLoader); + System.err.println( + "[DD-SPARK-DEBUG] MuzzleCheck FAILED: " + + InstrumenterState.describe(instrumentationId) + + " classloader=" + + classLoader); + for (final Reference.Mismatch mismatch : mismatches) { + System.err.println( + "[DD-SPARK-DEBUG] MuzzleCheck mismatch: " + + InstrumenterState.describe(instrumentationId) + + " muzzle.mismatch=\"" + + mismatch + + "\""); + } + } if (log.isDebugEnabled()) { final List mismatches = muzzle.getMismatchedReferenceSources(classLoader); diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 08adbfb1cbc..5f76cdd45b2 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -173,8 +173,17 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp AgentThreadFactory.newAgentThread( AgentThreadFactory.AgentThread.DATA_JOBS_MONITORING_SHUTDOWN_HOOK, () -> { + System.err.println( + "[DD-SPARK-DEBUG] shutdownHook: applicationEnded=" + + applicationEnded + + ", lastJobFailed=" + + lastJobFailed + + ", lastSqlFailed=" + + lastSqlFailed + + ", thread=" + + Thread.currentThread().getName()); if (!applicationEnded) { - log.info("Finishing application trace from shutdown hook"); + System.err.println("[DD-SPARK-DEBUG] shutdownHook: calling finishApplication"); finishApplication(System.currentTimeMillis(), null, 0, null); } })); @@ -319,7 +328,18 @@ private void captureOpenlineageContextIfPresent( * has an error signal even when no job/stage/task events fire. */ public synchronized void onSqlFailure(Throwable throwable) { + System.err.println( + "[DD-SPARK-DEBUG] onSqlFailure: applicationEnded=" + + applicationEnded + + ", throwable=" + + throwable.getClass().getName() + + ": " + + throwable.getMessage() + + ", thread=" + + Thread.currentThread().getName()); + if (applicationEnded) { + System.err.println("[DD-SPARK-DEBUG] onSqlFailure: skipping because applicationEnded=true"); return; } lastSqlFailed = true; @@ -328,13 +348,19 @@ public synchronized void onSqlFailure(Throwable throwable) { StringWriter sw = new StringWriter(); throwable.printStackTrace(new PrintWriter(sw)); lastSqlFailedStackTrace = sw.toString(); + + System.err.println("[DD-SPARK-DEBUG] onSqlFailure: lastSqlFailed set to true"); } @Override public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { - log.info( - "Received spark application end event, finish trace on this event: {}", - finishTraceOnApplicationEnd); + System.err.println( + "[DD-SPARK-DEBUG] onApplicationEnd: finishTraceOnApplicationEnd=" + + finishTraceOnApplicationEnd + + ", applicationEnded=" + + applicationEnded + + ", thread=" + + Thread.currentThread().getName()); notifyOl(x -> openLineageSparkListener.onApplicationEnd(x), applicationEnd); if (finishTraceOnApplicationEnd) { @@ -346,9 +372,29 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { // the signature of this function is changed public synchronized void finishApplication( long time, Throwable throwable, int exitCode, String msg) { - log.info("Finishing spark application trace"); + System.err.println( + "[DD-SPARK-DEBUG] finishApplication: thread=" + + Thread.currentThread().getName() + + ", applicationEnded=" + + applicationEnded + + ", throwable=" + + (throwable != null + ? throwable.getClass().getName() + ": " + throwable.getMessage() + : "null") + + ", exitCode=" + + exitCode + + ", msg=" + + msg + + ", lastJobFailed=" + + lastJobFailed + + ", lastSqlFailed=" + + lastSqlFailed + + ", jobCount=" + + jobCount); if (applicationEnded) { + System.err.println( + "[DD-SPARK-DEBUG] finishApplication: skipping because applicationEnded=true (duplicate call)"); return; } applicationEnded = true; @@ -361,9 +407,12 @@ public synchronized void finishApplication( initApplicationSpanIfNotInitialized(); + String errorDecision; if (throwable != null) { + errorDecision = "throwable"; applicationSpan.addThrowable(throwable); } else if (exitCode != 0) { + errorDecision = "exitCode=" + exitCode; applicationSpan.setError(true); applicationSpan.setTag( DDTags.ERROR_TYPE, "Spark Application Failed with exit code " + exitCode); @@ -372,16 +421,21 @@ public synchronized void finishApplication( applicationSpan.setTag(DDTags.ERROR_MSG, errorMessage); applicationSpan.setTag(DDTags.ERROR_STACK, msg); } else if (lastJobFailed) { + errorDecision = "lastJobFailed"; applicationSpan.setError(true); applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed"); applicationSpan.setTag(DDTags.ERROR_MSG, lastJobFailedMessage); applicationSpan.setTag(DDTags.ERROR_STACK, lastJobFailedStackTrace); } else if (lastSqlFailed) { + errorDecision = "lastSqlFailed"; applicationSpan.setError(true); applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark SQL Failed"); applicationSpan.setTag(DDTags.ERROR_MSG, lastSqlFailedMessage); applicationSpan.setTag(DDTags.ERROR_STACK, lastSqlFailedStackTrace); + } else { + errorDecision = "none (SUCCESS)"; } + System.err.println("[DD-SPARK-DEBUG] finishApplication: errorDecision=" + errorDecision); applicationMetrics.setSpanMetrics(applicationSpan); applicationSpan.setMetric("spark.max_executor_count", maxExecutorCount); @@ -486,6 +540,13 @@ private AgentSpan getOrCreateSqlSpan( @Override public synchronized void onJobStart(SparkListenerJobStart jobStart) { + System.err.println( + "[DD-SPARK-DEBUG] onJobStart: jobId=" + + jobStart.jobId() + + ", stageCount=" + + getStageCount(jobStart) + + ", thread=" + + Thread.currentThread().getName()); jobCount++; if (jobSpans.size() > MAX_COLLECTION_SIZE) { return; @@ -544,8 +605,22 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { @Override public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { + System.err.println( + "[DD-SPARK-DEBUG] onJobEnd: jobId=" + + jobEnd.jobId() + + ", result=" + + jobEnd.jobResult().getClass().getSimpleName() + + ", lastJobFailed=" + + lastJobFailed + + ", lastSqlFailed=" + + lastSqlFailed + + ", thread=" + + Thread.currentThread().getName()); + AgentSpan jobSpan = jobSpans.remove(jobEnd.jobId()); if (jobSpan == null) { + System.err.println( + "[DD-SPARK-DEBUG] onJobEnd: no span found for jobId=" + jobEnd.jobId() + ", skipping"); return; } @@ -569,7 +644,9 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { } } else { lastJobFailed = false; - lastSqlFailed = false; + // Note: we intentionally do NOT reset lastSqlFailed here. A successful Spark job + // should not erase a prior SQL failure (e.g. from cleanup operations after a failed + // spark.sql() call). The SQL failure is only relevant at finishApplication time. } SparkAggregatedTaskMetrics metrics = jobMetrics.remove(jobEnd.jobId()); diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index 0afe0db4721..5526a78d6c3 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -89,6 +89,11 @@ public void methodAdvice(MethodTransformer transformer) { public static class PrepareSubmitEnvAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void enter(@Advice.Argument(0) SparkSubmitArguments submitArgs) { + System.err.println( + "[DD-SPARK-DEBUG] PrepareSubmitEnvAdvice.enter: primaryResource=" + + submitArgs.primaryResource() + + ", thread=" + + Thread.currentThread().getName()); // Using pyspark `python script.py`, spark JVM is launched as PythonGatewayServer, which is // exited using System.exit(0), leading to the exit advice not being called @@ -99,6 +104,8 @@ public static void enter(@Advice.Argument(0) SparkSubmitArguments submitArgs) { // prepareSubmitEnvironment might be called before/after runMain depending on spark version AbstractDatadogSparkListener.finishTraceOnApplicationEnd = true; + System.err.println( + "[DD-SPARK-DEBUG] PrepareSubmitEnvAdvice: detected pyspark-shell, set isPysparkShell=true, finishTraceOnApplicationEnd=true"); } } } @@ -106,6 +113,11 @@ public static void enter(@Advice.Argument(0) SparkSubmitArguments submitArgs) { public static class RunMainAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void enter() { + System.err.println( + "[DD-SPARK-DEBUG] RunMainAdvice.enter: isPysparkShell=" + + AbstractDatadogSparkListener.isPysparkShell + + ", thread=" + + Thread.currentThread().getName()); if (!AbstractDatadogSparkListener.isPysparkShell) { AbstractDatadogSparkListener.finishTraceOnApplicationEnd = false; } @@ -113,6 +125,15 @@ public static void enter() { @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) public static void exit(@Advice.Thrown Throwable throwable) { + System.err.println( + "[DD-SPARK-DEBUG] RunMainAdvice.exit: throwable=" + + (throwable != null + ? throwable.getClass().getName() + ": " + throwable.getMessage() + : "null") + + ", listenerNull=" + + (AbstractDatadogSparkListener.listener == null) + + ", thread=" + + Thread.currentThread().getName()); if (AbstractDatadogSparkListener.listener != null) { AbstractDatadogSparkListener.listener.finishApplication( System.currentTimeMillis(), throwable, 0, null); @@ -125,6 +146,15 @@ public static void exit(@Advice.Thrown Throwable throwable) { public static class YarnFinishAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) String msg) { + System.err.println( + "[DD-SPARK-DEBUG] YarnFinishAdvice.enter: exitCode=" + + exitCode + + ", msg=" + + msg + + ", listenerNull=" + + (AbstractDatadogSparkListener.listener == null) + + ", thread=" + + Thread.currentThread().getName()); if (AbstractDatadogSparkListener.listener != null) { AbstractDatadogSparkListener.listener.finishApplication( System.currentTimeMillis(), null, exitCode, msg); @@ -133,10 +163,42 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S } public static class SparkSqlFailureAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void enter(@Advice.Argument(0) String sqlText) { + System.err.println( + "[DD-SPARK-DEBUG] SparkSqlFailureAdvice.enter: thread=" + + Thread.currentThread().getName() + + ", sql=" + + (sqlText != null && sqlText.length() > 200 + ? sqlText.substring(0, 200) + "..." + : sqlText) + + ", listenerNull=" + + (AbstractDatadogSparkListener.listener == null)); + } + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) public static void exit(@Advice.Thrown Throwable throwable) { - if (throwable != null && AbstractDatadogSparkListener.listener != null) { - AbstractDatadogSparkListener.listener.onSqlFailure(throwable); + if (throwable != null) { + System.err.println( + "[DD-SPARK-DEBUG] SparkSqlFailureAdvice.exit: thread=" + + Thread.currentThread().getName() + + ", throwable=" + + throwable.getClass().getName() + + ": " + + throwable.getMessage() + + ", listenerNull=" + + (AbstractDatadogSparkListener.listener == null)); + if (AbstractDatadogSparkListener.listener != null) { + AbstractDatadogSparkListener.listener.onSqlFailure(throwable); + } else { + System.err.println( + "[DD-SPARK-DEBUG] SparkSqlFailureAdvice.exit: listener is null, cannot record SQL failure"); + } + } else { + System.err.println( + "[DD-SPARK-DEBUG] SparkSqlFailureAdvice.exit: thread=" + + Thread.currentThread().getName() + + ", success (no throwable)"); } } } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java index 1306da2b373..f86f9af04fe 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java @@ -14,6 +14,13 @@ public static void enter(@Advice.Argument(0) int exitCode) { .getContextClassLoader() .loadClass("datadog.trace.instrumentation.spark.AbstractDatadogSparkListener"); Object datadogListener = klass.getDeclaredField("listener").get(null); + System.err.println( + "[DD-SPARK-DEBUG] SparkExitAdvice.enter: exitCode=" + + exitCode + + ", listenerNull=" + + (datadogListener == null) + + ", thread=" + + Thread.currentThread().getName()); if (datadogListener != null) { Method method = datadogListener diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index 1d69489c7a6..0c9e1bb71a1 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -94,6 +94,13 @@ public static void enter(@Advice.This SparkContext sparkContext) { AbstractDatadogSparkListener.listener = new DatadogSpark212Listener( sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version()); + System.err.println( + "[DD-SPARK-DEBUG] listener registered: class=" + + AbstractDatadogSparkListener.listener.getClass().getName() + + ", appId=" + + sparkContext.applicationId() + + ", thread=" + + Thread.currentThread().getName()); sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener); } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index d9d5b34ae7d..9f07d4c6b7d 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -95,6 +95,13 @@ public static void enter(@Advice.This SparkContext sparkContext) { AbstractDatadogSparkListener.listener = new DatadogSpark213Listener( sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version()); + System.err.println( + "[DD-SPARK-DEBUG] listener registered: class=" + + AbstractDatadogSparkListener.listener.getClass().getName() + + ", appId=" + + sparkContext.applicationId() + + ", thread=" + + Thread.currentThread().getName()); sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener); } }