-
-
Notifications
You must be signed in to change notification settings - Fork 469
feat(spring-jakarta): [Queue Instrumentation 4] Add Kafka consumer instrumentation #5255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feat/queue-instrumentation-producer
Are you sure you want to change the base?
Changes from all commits
6099047
1f00027
2b74bab
be3a2ba
6450f63
f92f47c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| package io.sentry.spring.jakarta.kafka; | ||
|
|
||
| import io.sentry.ScopesAdapter; | ||
| import io.sentry.SentryLevel; | ||
| import java.lang.reflect.Field; | ||
| import org.jetbrains.annotations.ApiStatus; | ||
| import org.jetbrains.annotations.NotNull; | ||
| import org.jetbrains.annotations.Nullable; | ||
| import org.springframework.beans.BeansException; | ||
| import org.springframework.beans.factory.config.BeanPostProcessor; | ||
| import org.springframework.core.Ordered; | ||
| import org.springframework.core.PriorityOrdered; | ||
| import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; | ||
| import org.springframework.kafka.listener.RecordInterceptor; | ||
|
|
||
| /** | ||
| * Registers {@link SentryKafkaRecordInterceptor} on {@link AbstractKafkaListenerContainerFactory} | ||
| * beans. If an existing {@link RecordInterceptor} is already set, it is composed as a delegate. | ||
| */ | ||
| @ApiStatus.Internal | ||
| public final class SentryKafkaConsumerBeanPostProcessor | ||
| implements BeanPostProcessor, PriorityOrdered { | ||
|
|
||
| @Override | ||
| @SuppressWarnings("unchecked") | ||
| public @NotNull Object postProcessAfterInitialization( | ||
| final @NotNull Object bean, final @NotNull String beanName) throws BeansException { | ||
| if (bean instanceof AbstractKafkaListenerContainerFactory) { | ||
| final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory = | ||
| (AbstractKafkaListenerContainerFactory<?, ?, ?>) bean; | ||
|
|
||
| final @Nullable RecordInterceptor<?, ?> existing = getExistingInterceptor(factory); | ||
| if (existing instanceof SentryKafkaRecordInterceptor) { | ||
| return bean; | ||
| } | ||
|
|
||
| @SuppressWarnings("rawtypes") | ||
| final RecordInterceptor sentryInterceptor = | ||
| new SentryKafkaRecordInterceptor<>(ScopesAdapter.getInstance(), existing); | ||
| factory.setRecordInterceptor(sentryInterceptor); | ||
| } | ||
| return bean; | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private @Nullable RecordInterceptor<?, ?> getExistingInterceptor( | ||
| final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory) { | ||
| try { | ||
| final @NotNull Field field = | ||
| AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor"); | ||
| field.setAccessible(true); | ||
| return (RecordInterceptor<?, ?>) field.get(factory); | ||
| } catch (NoSuchFieldException | IllegalAccessException e) { | ||
| ScopesAdapter.getInstance() | ||
| .getOptions() | ||
| .getLogger() | ||
| .log( | ||
| SentryLevel.WARNING, | ||
| "Unable to read existing recordInterceptor from " | ||
| + "AbstractKafkaListenerContainerFactory via reflection. " | ||
| + "If you had a custom RecordInterceptor, it may not be chained with Sentry's interceptor.", | ||
| e); | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public int getOrder() { | ||
| return Ordered.LOWEST_PRECEDENCE; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| package io.sentry.spring.jakarta.kafka; | ||
|
|
||
| import io.sentry.BaggageHeader; | ||
| import io.sentry.IScopes; | ||
| import io.sentry.ISentryLifecycleToken; | ||
| import io.sentry.ITransaction; | ||
| import io.sentry.SentryTraceHeader; | ||
| import io.sentry.SpanDataConvention; | ||
| import io.sentry.SpanStatus; | ||
| import io.sentry.TransactionContext; | ||
| import io.sentry.TransactionOptions; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import org.apache.kafka.clients.consumer.Consumer; | ||
| import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
| import org.apache.kafka.common.header.Header; | ||
| import org.jetbrains.annotations.ApiStatus; | ||
| import org.jetbrains.annotations.NotNull; | ||
| import org.jetbrains.annotations.Nullable; | ||
| import org.springframework.kafka.listener.RecordInterceptor; | ||
|
|
||
| /** | ||
| * A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka | ||
| * records with distributed tracing support. | ||
| */ | ||
| @ApiStatus.Internal | ||
| public final class SentryKafkaRecordInterceptor<K, V> implements RecordInterceptor<K, V> { | ||
|
|
||
| static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.consumer"; | ||
|
|
||
| private final @NotNull IScopes scopes; | ||
| private final @Nullable RecordInterceptor<K, V> delegate; | ||
|
|
||
| private static final @NotNull ThreadLocal<SentryRecordContext> currentContext = | ||
| new ThreadLocal<>(); | ||
|
|
||
| public SentryKafkaRecordInterceptor(final @NotNull IScopes scopes) { | ||
| this(scopes, null); | ||
| } | ||
|
|
||
| public SentryKafkaRecordInterceptor( | ||
| final @NotNull IScopes scopes, final @Nullable RecordInterceptor<K, V> delegate) { | ||
| this.scopes = scopes; | ||
| this.delegate = delegate; | ||
| } | ||
|
|
||
| @Override | ||
| public @Nullable ConsumerRecord<K, V> intercept( | ||
| final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) { | ||
| if (!scopes.getOptions().isEnableQueueTracing()) { | ||
| return delegateIntercept(record, consumer); | ||
| } | ||
|
|
||
| final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor"); | ||
| final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); | ||
|
|
||
| continueTrace(forkedScopes, record); | ||
|
|
||
| final @Nullable ITransaction transaction = startTransaction(forkedScopes, record); | ||
| currentContext.set(new SentryRecordContext(lifecycleToken, transaction)); | ||
|
|
||
| return delegateIntercept(record, consumer); | ||
|
Comment on lines
+53
to
+63
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: When Suggested FixAfter calling Prompt for AI AgentDid we get this right? 👍 / 👎 to inform future reviews. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lifecycle token leaks if Sentry operations throw in interceptMedium Severity The Additional Locations (1)Reviewed by Cursor Bugbot for commit f92f47c. Configure here. |
||
| } | ||
|
|
||
| @Override | ||
| public void success( | ||
| final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) { | ||
| try { | ||
| if (delegate != null) { | ||
| delegate.success(record, consumer); | ||
| } | ||
| } finally { | ||
| finishSpan(SpanStatus.OK, null); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void failure( | ||
| final @NotNull ConsumerRecord<K, V> record, | ||
| final @NotNull Exception exception, | ||
| final @NotNull Consumer<K, V> consumer) { | ||
| try { | ||
| if (delegate != null) { | ||
| delegate.failure(record, exception, consumer); | ||
| } | ||
| } finally { | ||
| finishSpan(SpanStatus.INTERNAL_ERROR, exception); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void afterRecord( | ||
| final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) { | ||
| if (delegate != null) { | ||
| delegate.afterRecord(record, consumer); | ||
| } | ||
| } | ||
|
|
||
| private @Nullable ConsumerRecord<K, V> delegateIntercept( | ||
| final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) { | ||
| if (delegate != null) { | ||
| return delegate.intercept(record, consumer); | ||
| } | ||
| return record; | ||
| } | ||
|
|
||
| private void continueTrace( | ||
| final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) { | ||
| final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); | ||
| final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); | ||
| final @Nullable List<String> baggageHeaders = | ||
| baggage != null ? Collections.singletonList(baggage) : null; | ||
| forkedScopes.continueTrace(sentryTrace, baggageHeaders); | ||
| } | ||
|
|
||
| private @Nullable ITransaction startTransaction( | ||
| final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) { | ||
| if (!forkedScopes.getOptions().isTracingEnabled()) { | ||
| return null; | ||
| } | ||
|
|
||
| final @NotNull TransactionOptions txOptions = new TransactionOptions(); | ||
| txOptions.setOrigin(TRACE_ORIGIN); | ||
| txOptions.setBindToScope(true); | ||
|
|
||
| final @NotNull ITransaction transaction = | ||
| forkedScopes.startTransaction( | ||
| new TransactionContext("queue.process", "queue.process"), txOptions); | ||
|
|
||
| if (transaction.isNoOp()) { | ||
| return null; | ||
| } | ||
|
|
||
| transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); | ||
| transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); | ||
|
|
||
| final @Nullable String messageId = headerValue(record, "messaging.message.id"); | ||
| if (messageId != null) { | ||
| transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); | ||
| } | ||
|
|
||
| final @Nullable String enqueuedTimeStr = | ||
| headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); | ||
| if (enqueuedTimeStr != null) { | ||
| try { | ||
| final long enqueuedTime = Long.parseLong(enqueuedTimeStr); | ||
| final long latencyMs = System.currentTimeMillis() - enqueuedTime; | ||
| if (latencyMs >= 0) { | ||
| transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs); | ||
| } | ||
| } catch (NumberFormatException ignored) { | ||
| // ignore malformed header | ||
| } | ||
| } | ||
|
|
||
| return transaction; | ||
| } | ||
|
|
||
| private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) { | ||
| final @Nullable SentryRecordContext ctx = currentContext.get(); | ||
| if (ctx == null) { | ||
| return; | ||
| } | ||
| currentContext.remove(); | ||
|
|
||
| try { | ||
| final @Nullable ITransaction transaction = ctx.transaction; | ||
| if (transaction != null) { | ||
| transaction.setStatus(status); | ||
| if (throwable != null) { | ||
| transaction.setThrowable(throwable); | ||
| } | ||
| transaction.finish(); | ||
| } | ||
| } finally { | ||
| ctx.lifecycleToken.close(); | ||
| } | ||
| } | ||
|
|
||
| private @Nullable String headerValue( | ||
| final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) { | ||
| final @Nullable Header header = record.headers().lastHeader(headerName); | ||
| if (header == null || header.value() == null) { | ||
| return null; | ||
| } | ||
| return new String(header.value(), StandardCharsets.UTF_8); | ||
| } | ||
|
|
||
| private static final class SentryRecordContext { | ||
| final @NotNull ISentryLifecycleToken lifecycleToken; | ||
| final @Nullable ITransaction transaction; | ||
|
|
||
| SentryRecordContext( | ||
| final @NotNull ISentryLifecycleToken lifecycleToken, | ||
| final @Nullable ITransaction transaction) { | ||
| this.lifecycleToken = lifecycleToken; | ||
| this.transaction = transaction; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| package io.sentry.spring.jakarta.kafka | ||
|
|
||
| import kotlin.test.Test | ||
| import kotlin.test.assertSame | ||
| import kotlin.test.assertTrue | ||
| import org.mockito.kotlin.mock | ||
| import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory | ||
| import org.springframework.kafka.core.ConsumerFactory | ||
|
|
||
| class SentryKafkaConsumerBeanPostProcessorTest { | ||
|
|
||
| @Test | ||
| fun `wraps ConcurrentKafkaListenerContainerFactory with SentryKafkaRecordInterceptor`() { | ||
| val consumerFactory = mock<ConsumerFactory<String, String>>() | ||
| val factory = ConcurrentKafkaListenerContainerFactory<String, String>() | ||
| factory.consumerFactory = consumerFactory | ||
|
|
||
| val processor = SentryKafkaConsumerBeanPostProcessor() | ||
| processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") | ||
|
|
||
| // Verify via reflection that the interceptor was set | ||
| val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") | ||
| field.isAccessible = true | ||
| val interceptor = field.get(factory) | ||
| assertTrue(interceptor is SentryKafkaRecordInterceptor<*, *>) | ||
| } | ||
|
|
||
| @Test | ||
| fun `does not double-wrap when SentryKafkaRecordInterceptor already set`() { | ||
| val consumerFactory = mock<ConsumerFactory<String, String>>() | ||
| val factory = ConcurrentKafkaListenerContainerFactory<String, String>() | ||
| factory.consumerFactory = consumerFactory | ||
|
|
||
| val processor = SentryKafkaConsumerBeanPostProcessor() | ||
| // First wrap | ||
| processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") | ||
|
|
||
| val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") | ||
| field.isAccessible = true | ||
| val firstInterceptor = field.get(factory) | ||
|
|
||
| // Second wrap — should be idempotent | ||
| processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") | ||
| val secondInterceptor = field.get(factory) | ||
|
|
||
| assertSame(firstInterceptor, secondInterceptor) | ||
| } | ||
|
|
||
| @Test | ||
| fun `does not wrap non-factory beans`() { | ||
| val someBean = "not a factory" | ||
| val processor = SentryKafkaConsumerBeanPostProcessor() | ||
|
|
||
| val result = processor.postProcessAfterInitialization(someBean, "someBean") | ||
|
|
||
| assertSame(someBean, result) | ||
| } | ||
| } |


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason, we are going the composite route in the producer but adding the existing interceptor as a delegate here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a
CompositeRecordInterceptorThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On producer side we don't have a
ThreadLocalthat needs cleanup. For consumer side, it's possible, our interceptorsclearThreadStatemethod is never invoked if another interceptor throws.