Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,32 @@ public boolean matches(ClassLoader classLoader) {
boolean muzzleMatches = muzzle().matches(classLoader);
if (muzzleMatches) {
InstrumenterState.applyInstrumentation(classLoader, instrumentationId);
if (instrumentationClass.contains("spark") || instrumentationClass.contains("Spark")) {
System.err.println(
"[DD-SPARK-DEBUG] MuzzleCheck PASSED: "
+ InstrumenterState.describe(instrumentationId)
+ " classloader="
+ classLoader);
}
} else {
InstrumenterState.blockInstrumentation(classLoader, instrumentationId);
if (instrumentationClass.contains("spark") || instrumentationClass.contains("Spark")) {
final List<Reference.Mismatch> mismatches =
muzzle.getMismatchedReferenceSources(classLoader);
System.err.println(
"[DD-SPARK-DEBUG] MuzzleCheck FAILED: "
+ InstrumenterState.describe(instrumentationId)
+ " classloader="
+ classLoader);
for (final Reference.Mismatch mismatch : mismatches) {
System.err.println(
"[DD-SPARK-DEBUG] MuzzleCheck mismatch: "
+ InstrumenterState.describe(instrumentationId)
+ " muzzle.mismatch=\""
+ mismatch
+ "\"");
}
}
if (log.isDebugEnabled()) {
final List<Reference.Mismatch> mismatches =
muzzle.getMismatchedReferenceSources(classLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,17 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
AgentThreadFactory.newAgentThread(
AgentThreadFactory.AgentThread.DATA_JOBS_MONITORING_SHUTDOWN_HOOK,
() -> {
System.err.println(
"[DD-SPARK-DEBUG] shutdownHook: applicationEnded="
+ applicationEnded
+ ", lastJobFailed="
+ lastJobFailed
+ ", lastSqlFailed="
+ lastSqlFailed
+ ", thread="
+ Thread.currentThread().getName());
if (!applicationEnded) {
log.info("Finishing application trace from shutdown hook");
System.err.println("[DD-SPARK-DEBUG] shutdownHook: calling finishApplication");
finishApplication(System.currentTimeMillis(), null, 0, null);
}
}));
Expand Down Expand Up @@ -319,7 +328,18 @@ private void captureOpenlineageContextIfPresent(
* has an error signal even when no job/stage/task events fire.
*/
public synchronized void onSqlFailure(Throwable throwable) {
System.err.println(
"[DD-SPARK-DEBUG] onSqlFailure: applicationEnded="
+ applicationEnded
+ ", throwable="
+ throwable.getClass().getName()
+ ": "
+ throwable.getMessage()
+ ", thread="
+ Thread.currentThread().getName());

if (applicationEnded) {
System.err.println("[DD-SPARK-DEBUG] onSqlFailure: skipping because applicationEnded=true");
return;
}
lastSqlFailed = true;
Expand All @@ -328,13 +348,19 @@ public synchronized void onSqlFailure(Throwable throwable) {
StringWriter sw = new StringWriter();
throwable.printStackTrace(new PrintWriter(sw));
lastSqlFailedStackTrace = sw.toString();

System.err.println("[DD-SPARK-DEBUG] onSqlFailure: lastSqlFailed set to true");
}

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
log.info(
"Received spark application end event, finish trace on this event: {}",
finishTraceOnApplicationEnd);
System.err.println(
"[DD-SPARK-DEBUG] onApplicationEnd: finishTraceOnApplicationEnd="
+ finishTraceOnApplicationEnd
+ ", applicationEnded="
+ applicationEnded
+ ", thread="
+ Thread.currentThread().getName());
notifyOl(x -> openLineageSparkListener.onApplicationEnd(x), applicationEnd);

if (finishTraceOnApplicationEnd) {
Expand All @@ -346,9 +372,29 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
// the signature of this function is changed
public synchronized void finishApplication(
long time, Throwable throwable, int exitCode, String msg) {
log.info("Finishing spark application trace");
System.err.println(
"[DD-SPARK-DEBUG] finishApplication: thread="
+ Thread.currentThread().getName()
+ ", applicationEnded="
+ applicationEnded
+ ", throwable="
+ (throwable != null
? throwable.getClass().getName() + ": " + throwable.getMessage()
: "null")
+ ", exitCode="
+ exitCode
+ ", msg="
+ msg
+ ", lastJobFailed="
+ lastJobFailed
+ ", lastSqlFailed="
+ lastSqlFailed
+ ", jobCount="
+ jobCount);

if (applicationEnded) {
System.err.println(
"[DD-SPARK-DEBUG] finishApplication: skipping because applicationEnded=true (duplicate call)");
return;
}
applicationEnded = true;
Expand All @@ -361,9 +407,12 @@ public synchronized void finishApplication(

initApplicationSpanIfNotInitialized();

String errorDecision;
if (throwable != null) {
errorDecision = "throwable";
applicationSpan.addThrowable(throwable);
} else if (exitCode != 0) {
errorDecision = "exitCode=" + exitCode;
applicationSpan.setError(true);
applicationSpan.setTag(
DDTags.ERROR_TYPE, "Spark Application Failed with exit code " + exitCode);
Expand All @@ -372,16 +421,21 @@ public synchronized void finishApplication(
applicationSpan.setTag(DDTags.ERROR_MSG, errorMessage);
applicationSpan.setTag(DDTags.ERROR_STACK, msg);
} else if (lastJobFailed) {
errorDecision = "lastJobFailed";
applicationSpan.setError(true);
applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed");
applicationSpan.setTag(DDTags.ERROR_MSG, lastJobFailedMessage);
applicationSpan.setTag(DDTags.ERROR_STACK, lastJobFailedStackTrace);
} else if (lastSqlFailed) {
errorDecision = "lastSqlFailed";
applicationSpan.setError(true);
applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark SQL Failed");
applicationSpan.setTag(DDTags.ERROR_MSG, lastSqlFailedMessage);
applicationSpan.setTag(DDTags.ERROR_STACK, lastSqlFailedStackTrace);
} else {
errorDecision = "none (SUCCESS)";
}
System.err.println("[DD-SPARK-DEBUG] finishApplication: errorDecision=" + errorDecision);

applicationMetrics.setSpanMetrics(applicationSpan);
applicationSpan.setMetric("spark.max_executor_count", maxExecutorCount);
Expand Down Expand Up @@ -486,6 +540,13 @@ private AgentSpan getOrCreateSqlSpan(

@Override
public synchronized void onJobStart(SparkListenerJobStart jobStart) {
System.err.println(
"[DD-SPARK-DEBUG] onJobStart: jobId="
+ jobStart.jobId()
+ ", stageCount="
+ getStageCount(jobStart)
+ ", thread="
+ Thread.currentThread().getName());
jobCount++;
if (jobSpans.size() > MAX_COLLECTION_SIZE) {
return;
Expand Down Expand Up @@ -544,8 +605,22 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {

@Override
public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
System.err.println(
"[DD-SPARK-DEBUG] onJobEnd: jobId="
+ jobEnd.jobId()
+ ", result="
+ jobEnd.jobResult().getClass().getSimpleName()
+ ", lastJobFailed="
+ lastJobFailed
+ ", lastSqlFailed="
+ lastSqlFailed
+ ", thread="
+ Thread.currentThread().getName());

AgentSpan jobSpan = jobSpans.remove(jobEnd.jobId());
if (jobSpan == null) {
System.err.println(
"[DD-SPARK-DEBUG] onJobEnd: no span found for jobId=" + jobEnd.jobId() + ", skipping");
return;
}

Expand All @@ -569,7 +644,9 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
}
} else {
lastJobFailed = false;
lastSqlFailed = false;
// Note: we intentionally do NOT reset lastSqlFailed here. A successful Spark job
// should not erase a prior SQL failure (e.g. from cleanup operations after a failed
// spark.sql() call). The SQL failure is only relevant at finishApplication time.
}

SparkAggregatedTaskMetrics metrics = jobMetrics.remove(jobEnd.jobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public void methodAdvice(MethodTransformer transformer) {
public static class PrepareSubmitEnvAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter(@Advice.Argument(0) SparkSubmitArguments submitArgs) {
System.err.println(
"[DD-SPARK-DEBUG] PrepareSubmitEnvAdvice.enter: primaryResource="
+ submitArgs.primaryResource()
+ ", thread="
+ Thread.currentThread().getName());

// Using pyspark `python script.py`, spark JVM is launched as PythonGatewayServer, which is
// exited using System.exit(0), leading to the exit advice not being called
Expand All @@ -99,20 +104,36 @@ public static void enter(@Advice.Argument(0) SparkSubmitArguments submitArgs) {

// prepareSubmitEnvironment might be called before/after runMain depending on spark version
AbstractDatadogSparkListener.finishTraceOnApplicationEnd = true;
System.err.println(
"[DD-SPARK-DEBUG] PrepareSubmitEnvAdvice: detected pyspark-shell, set isPysparkShell=true, finishTraceOnApplicationEnd=true");
}
}
}

public static class RunMainAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter() {
System.err.println(
"[DD-SPARK-DEBUG] RunMainAdvice.enter: isPysparkShell="
+ AbstractDatadogSparkListener.isPysparkShell
+ ", thread="
+ Thread.currentThread().getName());
if (!AbstractDatadogSparkListener.isPysparkShell) {
AbstractDatadogSparkListener.finishTraceOnApplicationEnd = false;
}
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(@Advice.Thrown Throwable throwable) {
System.err.println(
"[DD-SPARK-DEBUG] RunMainAdvice.exit: throwable="
+ (throwable != null
? throwable.getClass().getName() + ": " + throwable.getMessage()
: "null")
+ ", listenerNull="
+ (AbstractDatadogSparkListener.listener == null)
+ ", thread="
+ Thread.currentThread().getName());
if (AbstractDatadogSparkListener.listener != null) {
AbstractDatadogSparkListener.listener.finishApplication(
System.currentTimeMillis(), throwable, 0, null);
Expand All @@ -125,6 +146,15 @@ public static void exit(@Advice.Thrown Throwable throwable) {
public static class YarnFinishAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) String msg) {
System.err.println(
"[DD-SPARK-DEBUG] YarnFinishAdvice.enter: exitCode="
+ exitCode
+ ", msg="
+ msg
+ ", listenerNull="
+ (AbstractDatadogSparkListener.listener == null)
+ ", thread="
+ Thread.currentThread().getName());
if (AbstractDatadogSparkListener.listener != null) {
AbstractDatadogSparkListener.listener.finishApplication(
System.currentTimeMillis(), null, exitCode, msg);
Expand All @@ -133,10 +163,42 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
}

public static class SparkSqlFailureAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter(@Advice.Argument(0) String sqlText) {
System.err.println(
"[DD-SPARK-DEBUG] SparkSqlFailureAdvice.enter: thread="
+ Thread.currentThread().getName()
+ ", sql="
+ (sqlText != null && sqlText.length() > 200
? sqlText.substring(0, 200) + "..."
: sqlText)
+ ", listenerNull="
+ (AbstractDatadogSparkListener.listener == null));
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(@Advice.Thrown Throwable throwable) {
if (throwable != null && AbstractDatadogSparkListener.listener != null) {
AbstractDatadogSparkListener.listener.onSqlFailure(throwable);
if (throwable != null) {
System.err.println(
"[DD-SPARK-DEBUG] SparkSqlFailureAdvice.exit: thread="
+ Thread.currentThread().getName()
+ ", throwable="
+ throwable.getClass().getName()
+ ": "
+ throwable.getMessage()
+ ", listenerNull="
+ (AbstractDatadogSparkListener.listener == null));
if (AbstractDatadogSparkListener.listener != null) {
AbstractDatadogSparkListener.listener.onSqlFailure(throwable);
} else {
System.err.println(
"[DD-SPARK-DEBUG] SparkSqlFailureAdvice.exit: listener is null, cannot record SQL failure");
}
} else {
System.err.println(
"[DD-SPARK-DEBUG] SparkSqlFailureAdvice.exit: thread="
+ Thread.currentThread().getName()
+ ", success (no throwable)");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ public static void enter(@Advice.Argument(0) int exitCode) {
.getContextClassLoader()
.loadClass("datadog.trace.instrumentation.spark.AbstractDatadogSparkListener");
Object datadogListener = klass.getDeclaredField("listener").get(null);
System.err.println(
"[DD-SPARK-DEBUG] SparkExitAdvice.enter: exitCode="
+ exitCode
+ ", listenerNull="
+ (datadogListener == null)
+ ", thread="
+ Thread.currentThread().getName());
if (datadogListener != null) {
Method method =
datadogListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ public static void enter(@Advice.This SparkContext sparkContext) {
AbstractDatadogSparkListener.listener =
new DatadogSpark212Listener(
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());
System.err.println(
"[DD-SPARK-DEBUG] listener registered: class="
+ AbstractDatadogSparkListener.listener.getClass().getName()
+ ", appId="
+ sparkContext.applicationId()
+ ", thread="
+ Thread.currentThread().getName());
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ public static void enter(@Advice.This SparkContext sparkContext) {
AbstractDatadogSparkListener.listener =
new DatadogSpark213Listener(
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());
System.err.println(
"[DD-SPARK-DEBUG] listener registered: class="
+ AbstractDatadogSparkListener.listener.getClass().getName()
+ ", appId="
+ sparkContext.applicationId()
+ ", thread="
+ Thread.currentThread().getName());
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
}
}
Expand Down
Loading