-
Notifications
You must be signed in to change notification settings - Fork 608
feat(starlette): Support span streaming #6123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f50e1ab
32286b8
50352ea
6eacecc
943c3d4
4493974
92b4121
29de949
48415ad
3ae7b8a
35c6906
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| import asyncio | ||
| import functools | ||
| import json | ||
| import warnings | ||
| from collections.abc import Set | ||
| from copy import deepcopy | ||
|
|
@@ -20,10 +21,12 @@ | |
| ) | ||
| from sentry_sdk.integrations.asgi import SentryAsgiMiddleware | ||
| from sentry_sdk.scope import should_send_default_pii | ||
| from sentry_sdk.traces import NoOpStreamedSpan, StreamedSpan | ||
| from sentry_sdk.tracing import ( | ||
| SOURCE_FOR_STYLE, | ||
| TransactionSource, | ||
| ) | ||
| from sentry_sdk.tracing_utils import has_span_streaming_enabled | ||
| from sentry_sdk.utils import ( | ||
| AnnotatedValue, | ||
| capture_internal_exceptions, | ||
|
|
@@ -147,7 +150,8 @@ async def _create_span_call( | |
| send: "Callable[[Dict[str, Any]], Awaitable[None]]", | ||
| **kwargs: "Any", | ||
| ) -> None: | ||
| integration = sentry_sdk.get_client().get_integration(StarletteIntegration) | ||
| client = sentry_sdk.get_client() | ||
| integration = client.get_integration(StarletteIntegration) | ||
| if integration is None: | ||
| return await old_call(app, scope, receive, send, **kwargs) | ||
|
|
||
|
|
@@ -164,22 +168,38 @@ async def _create_span_call( | |
| return await old_call(app, scope, receive, send, **kwargs) | ||
|
|
||
| middleware_name = app.__class__.__name__ | ||
| is_span_streaming_enabled = has_span_streaming_enabled(client.options) | ||
|
|
||
| def _start_middleware_span(op: str, name: str) -> "Any": | ||
| if is_span_streaming_enabled: | ||
| return sentry_sdk.traces.start_span( | ||
| name=name, | ||
| attributes={ | ||
| "sentry.op": op, | ||
| "sentry.origin": StarletteIntegration.origin, | ||
| "middleware.name": middleware_name, | ||
| }, | ||
| ) | ||
| return sentry_sdk.start_span( | ||
| op=op, | ||
| name=name, | ||
| origin=StarletteIntegration.origin, | ||
| ) | ||
|
|
||
| with sentry_sdk.start_span( | ||
| op=OP.MIDDLEWARE_STARLETTE, | ||
| name=middleware_name, | ||
| origin=StarletteIntegration.origin, | ||
| with _start_middleware_span( | ||
| op=OP.MIDDLEWARE_STARLETTE, name=middleware_name | ||
| ) as middleware_span: | ||
| middleware_span.set_tag("starlette.middleware_name", middleware_name) | ||
| if not is_span_streaming_enabled: | ||
| middleware_span.set_tag("starlette.middleware_name", middleware_name) | ||
|
|
||
| # Creating spans for the "receive" callback | ||
| async def _sentry_receive(*args: "Any", **kwargs: "Any") -> "Any": | ||
| with sentry_sdk.start_span( | ||
| with _start_middleware_span( | ||
| op=OP.MIDDLEWARE_STARLETTE_RECEIVE, | ||
| name=getattr(receive, "__qualname__", str(receive)), | ||
| origin=StarletteIntegration.origin, | ||
| ) as span: | ||
| span.set_tag("starlette.middleware_name", middleware_name) | ||
| if not is_span_streaming_enabled: | ||
| span.set_tag("starlette.middleware_name", middleware_name) | ||
| return await receive(*args, **kwargs) | ||
|
|
||
| receive_name = getattr(receive, "__name__", str(receive)) | ||
|
|
@@ -188,12 +208,12 @@ async def _sentry_receive(*args: "Any", **kwargs: "Any") -> "Any": | |
|
|
||
| # Creating spans for the "send" callback | ||
| async def _sentry_send(*args: "Any", **kwargs: "Any") -> "Any": | ||
| with sentry_sdk.start_span( | ||
| with _start_middleware_span( | ||
| op=OP.MIDDLEWARE_STARLETTE_SEND, | ||
| name=getattr(send, "__qualname__", str(send)), | ||
| origin=StarletteIntegration.origin, | ||
| ) as span: | ||
| span.set_tag("starlette.middleware_name", middleware_name) | ||
| if not is_span_streaming_enabled: | ||
| span.set_tag("starlette.middleware_name", middleware_name) | ||
| return await send(*args, **kwargs) | ||
|
|
||
| send_name = getattr(send, "__name__", str(send)) | ||
|
|
@@ -214,6 +234,16 @@ async def _sentry_send(*args: "Any", **kwargs: "Any") -> "Any": | |
| return middleware_class | ||
|
|
||
|
|
||
| def _serialize_body_data(data: "Any") -> str: | ||
| # data may be a JSON-serializable value, an AnnotatedValue, or a dict with AnnotatedValue values | ||
| def _default(value: "Any") -> "Any": | ||
| if isinstance(value, AnnotatedValue): | ||
| return value.value | ||
| return str(value) | ||
|
|
||
| return json.dumps(data, default=_default) | ||
|
Comment on lines
+237
to
+244
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the data here have I hope we can just get rid of the whole
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It can when a raw value is present, or if the request body exceeds the size limit set by the Both of these occur within the
From this thread, sanitization, at least for the request body, will be the user's responsibility using the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha. In that case looks good -- but in case of Stamping this already to unblock. |
||
|
|
||
|
|
||
| @ensure_integration_enabled(StarletteIntegration) | ||
| def _capture_exception(exception: BaseException, handled: "Any" = False) -> None: | ||
| event, hint = event_from_exception( | ||
|
|
@@ -439,9 +469,8 @@ def _sentry_request_response(func: "Callable[[Any], Any]") -> "ASGIApp": | |
| if is_coroutine: | ||
|
|
||
| async def _sentry_async_func(*args: "Any", **kwargs: "Any") -> "Any": | ||
| integration = sentry_sdk.get_client().get_integration( | ||
| StarletteIntegration | ||
| ) | ||
| client = sentry_sdk.get_client() | ||
| integration = client.get_integration(StarletteIntegration) | ||
| if integration is None: | ||
| return await old_func(*args, **kwargs) | ||
|
|
||
|
|
@@ -481,6 +510,24 @@ def event_processor( | |
| _make_request_event_processor(request, integration) | ||
| ) | ||
|
|
||
| is_span_streaming_enabled = has_span_streaming_enabled(client.options) | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| if is_span_streaming_enabled: | ||
| current_span = sentry_sdk.get_current_span() | ||
|
|
||
| if ( | ||
| info | ||
| and "data" in info | ||
| and isinstance(current_span, StreamedSpan) | ||
| and not isinstance(current_span, NoOpStreamedSpan) | ||
| ): | ||
| data = info["data"] | ||
|
|
||
| with capture_internal_exceptions(): | ||
| current_span._segment.set_attribute( | ||
| "http.request.body.data", | ||
| _serialize_body_data(data), | ||
| ) | ||
|
|
||
| return await old_func(*args, **kwargs) | ||
|
|
||
| func = _sentry_async_func | ||
|
|
@@ -496,7 +543,13 @@ def _sentry_sync_func(*args: "Any", **kwargs: "Any") -> "Any": | |
| return old_func(*args, **kwargs) | ||
|
|
||
| current_scope = sentry_sdk.get_current_scope() | ||
| if current_scope.transaction is not None: | ||
| current_span = current_scope.span | ||
|
|
||
| if isinstance(current_span, StreamedSpan) and not isinstance( | ||
| current_span, NoOpStreamedSpan | ||
| ): | ||
| current_span._segment._update_active_thread() | ||
| elif current_scope.transaction is not None: | ||
| current_scope.transaction.update_active_thread() | ||
|
|
||
| sentry_scope = sentry_sdk.get_isolation_scope() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The associated convention PR changed this from
starlette.middleware.nametomiddleware.name. I left the legacy stream alone because I didn't want to break existing functionality for users.