diff --git a/.github/workflows/test-integrations-web-1.yml b/.github/workflows/test-integrations-web-1.yml index 6ba5e95a5a..2aaf29f9af 100644 --- a/.github/workflows/test-integrations-web-1.yml +++ b/.github/workflows/test-integrations-web-1.yml @@ -24,13 +24,11 @@ jobs: image: postgres env: POSTGRES_PASSWORD: sentry - # Set health checks to wait until postgres has started options: >- --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 - # Maps tcp port 5432 on service container to the host ports: - 5432:5432 env: diff --git a/scripts/populate_tox/config.py b/scripts/populate_tox/config.py index f1b3b205bf..317afa53ca 100644 --- a/scripts/populate_tox/config.py +++ b/scripts/populate_tox/config.py @@ -13,6 +13,13 @@ }, "python": ">=3.7", }, + "aiomysql": { + "package": "aiomysql", + "deps": { + "*": ["pytest-asyncio", "cryptography"], + }, + "python": ">=3.7", + }, "anthropic": { "package": "anthropic", "deps": { diff --git a/scripts/split_tox_gh_actions/split_tox_gh_actions.py b/scripts/split_tox_gh_actions/split_tox_gh_actions.py index eb987999df..d6b5494515 100755 --- a/scripts/split_tox_gh_actions/split_tox_gh_actions.py +++ b/scripts/split_tox_gh_actions/split_tox_gh_actions.py @@ -46,6 +46,10 @@ "asyncpg", } +FRAMEWORKS_NEEDING_MYSQL = { + "aiomysql", +} + FRAMEWORKS_NEEDING_REDIS = { "celery", } @@ -99,6 +103,7 @@ "gcp", ], "DBs": [ + "aiomysql", "asyncpg", "clickhouse_driver", "pymongo", @@ -330,6 +335,7 @@ def render_template(group, frameworks, py_versions): "frameworks": frameworks, "needs_clickhouse": bool(set(frameworks) & FRAMEWORKS_NEEDING_CLICKHOUSE), "needs_docker": bool(set(frameworks) & FRAMEWORKS_NEEDING_DOCKER), + "needs_mysql": bool(set(frameworks) & FRAMEWORKS_NEEDING_MYSQL), "needs_postgres": bool(set(frameworks) & FRAMEWORKS_NEEDING_POSTGRES), "needs_redis": bool(set(frameworks) & FRAMEWORKS_NEEDING_REDIS), "needs_java": bool(set(frameworks) & FRAMEWORKS_NEEDING_JAVA), diff --git a/scripts/split_tox_gh_actions/templates/test_group.jinja b/scripts/split_tox_gh_actions/templates/test_group.jinja index b6554f157f..a6d301a22f 100644 --- a/scripts/split_tox_gh_actions/templates/test_group.jinja +++ b/scripts/split_tox_gh_actions/templates/test_group.jinja @@ -8,7 +8,7 @@ python-version: [{{ py_versions|join(",") }}] os: [ubuntu-22.04] - {% if needs_docker or needs_postgres or needs_redis %} + {% if needs_docker or needs_postgres or needs_redis or needs_mysql %} services: {% if needs_docker %} docker: @@ -41,6 +41,20 @@ ports: - 6379:6379 {% endif %} + {% if needs_mysql %} + mysql: + image: mysql + env: + MYSQL_ROOT_PASSWORD: sentry + MYSQL_DATABASE: test_db + options: >- + --health-cmd "mysqladmin ping -h localhost" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 3306:3306 + {% endif %} {% endif %} env: # 3.6/3.7 run in the python:X.Y container; tell uv to use that system Python. @@ -53,6 +67,12 @@ {% if needs_redis %} SENTRY_PYTHON_TEST_REDIS_HOST: {% raw %}${{ (matrix.python-version == '3.6' || matrix.python-version == '3.7') && 'redis' || 'localhost' }}{% endraw %} {% endif %} + {% if needs_mysql %} + SENTRY_PYTHON_TEST_MYSQL_HOST: {% raw %}${{ (matrix.python-version == '3.6' || matrix.python-version == '3.7') && 'mysql' || 'localhost' }}{% endraw %} + SENTRY_PYTHON_TEST_MYSQL_USER: root + SENTRY_PYTHON_TEST_MYSQL_PASSWORD: sentry + SENTRY_PYTHON_TEST_MYSQL_DB: test_db + {% endif %} container: {% raw %}${{ (matrix.python-version == '3.6' || matrix.python-version == '3.7') && format('python:{0}', matrix.python-version) || null }}{% endraw %} steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 diff --git a/sentry_sdk/integrations/__init__.py b/sentry_sdk/integrations/__init__.py index 02123be85b..c0cacc9426 100644 --- a/sentry_sdk/integrations/__init__.py +++ b/sentry_sdk/integrations/__init__.py @@ -118,6 +118,7 @@ def iter_default_integrations( _MIN_VERSIONS = { "aiohttp": (3, 4), + "aiomysql": (0, 1, 1), "anthropic": (0, 16), "ariadne": (0, 20), "arq": (0, 23), diff --git a/sentry_sdk/integrations/aiomysql.py b/sentry_sdk/integrations/aiomysql.py new file mode 100644 index 0000000000..49459268e6 --- /dev/null +++ b/sentry_sdk/integrations/aiomysql.py @@ -0,0 +1,275 @@ +# -*- coding: utf-8 -*- +from __future__ import annotations + +from typing import Any, Awaitable, Callable, TypeVar + +import sentry_sdk +from sentry_sdk.consts import OP, SPANDATA +from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version +from sentry_sdk.traces import StreamedSpan +from sentry_sdk.tracing_utils import ( + add_query_source, + has_span_streaming_enabled, + record_sql_queries, +) +from sentry_sdk.utils import ( + capture_internal_exceptions, + parse_version, +) + +try: + import aiomysql # type: ignore[import-not-found] + from aiomysql.connection import Connection # type: ignore[import-not-found] + from aiomysql.cursors import Cursor # type: ignore[import-not-found] +except ImportError: + raise DidNotEnable("aiomysql not installed.") + + +class AioMySQLIntegration(Integration): + identifier = "aiomysql" + origin = f"auto.db.{identifier}" + _record_params = False + + def __init__(self, *, record_params: bool = False): + AioMySQLIntegration._record_params = record_params + + @staticmethod + def setup_once() -> None: + aiomysql_version = parse_version(aiomysql.__version__) + _check_minimum_version(AioMySQLIntegration, aiomysql_version) + + Cursor.execute = _wrap_execute(Cursor.execute) + Cursor.executemany = _wrap_executemany(Cursor.executemany) + + # Patch Connection._connect — this catches ALL connections: + # - aiomysql.connect() + # - aiomysql.create_pool() (pool.py does `from .connection import connect` + # which ultimately calls Connection._connect) + # - Reconnects + Connection._connect = _wrap_connect(Connection._connect) + + +T = TypeVar("T") + + +def _normalize_query(query: str | bytes | bytearray) -> str: + if isinstance(query, (bytes, bytearray)): + query = query.decode("utf-8", errors="replace") + return " ".join(query.split()) + + +def _wrap_execute(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: + """Wrap Cursor.execute to capture SQL queries.""" + + async def _inner(*args: Any, **kwargs: Any) -> T: + if sentry_sdk.get_client().get_integration(AioMySQLIntegration) is None: + return await f(*args, **kwargs) + + cursor = args[0] + + # Skip if flagged by executemany (avoids double-recording). + # Do NOT reset the flag here — it must stay True for the entire + # duration of executemany, which may call execute multiple times + # in a loop (non-INSERT fallback). Only _wrap_executemany's + # finally block should clear it. + if getattr(cursor, "_sentry_skip_next_execute", False): + return await f(*args, **kwargs) + + query = args[1] if len(args) > 1 else kwargs.get("query", "") + query_str = _normalize_query(query) + params = args[2] if len(args) > 2 else kwargs.get("args") + + conn = _get_connection(cursor) + + integration = sentry_sdk.get_client().get_integration(AioMySQLIntegration) + params_list = params if integration and integration._record_params else None + param_style = "pyformat" if params_list else None + + with record_sql_queries( + cursor=None, + query=query_str, + params_list=params_list, + paramstyle=param_style, + executemany=False, + span_origin=AioMySQLIntegration.origin, + ) as span: + if conn: + _set_db_data(span, conn) + res = await f(*args, **kwargs) + if isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + + if not isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + + return res + + return _inner + + +def _wrap_executemany(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: + """Wrap Cursor.executemany to capture SQL queries.""" + + async def _inner(*args: Any, **kwargs: Any) -> T: + if sentry_sdk.get_client().get_integration(AioMySQLIntegration) is None: + return await f(*args, **kwargs) + + cursor = args[0] + query = args[1] if len(args) > 1 else kwargs.get("query", "") + query_str = _normalize_query(query) + seq_of_params = args[2] if len(args) > 2 else kwargs.get("args") + + conn = _get_connection(cursor) + + integration = sentry_sdk.get_client().get_integration(AioMySQLIntegration) + params_list = ( + seq_of_params if integration and integration._record_params else None + ) + param_style = "pyformat" if params_list else None + + # Prevent double-recording: _do_execute_many calls self.execute internally + cursor._sentry_skip_next_execute = True + try: + with record_sql_queries( + cursor=None, + query=query_str, + params_list=params_list, + paramstyle=param_style, + executemany=True, + span_origin=AioMySQLIntegration.origin, + ) as span: + if conn: + _set_db_data(span, conn) + res = await f(*args, **kwargs) + if isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + + if not isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + + return res + finally: + cursor._sentry_skip_next_execute = False + + return _inner + + +def _get_connection(cursor: Any) -> Any: + """Get the underlying connection from a cursor.""" + return getattr(cursor, "connection", None) + + +def _wrap_connect(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: + """Wrap Connection._connect to capture connection spans.""" + + async def _inner(self: "Connection") -> T: + client = sentry_sdk.get_client() + if client.get_integration(AioMySQLIntegration) is None: + return await f(self) + + if has_span_streaming_enabled(client.options): + breadcrumb_data = _get_connect_data(self, use_streaming_keys=True) + + span_attributes: dict[str, Any] = { + "sentry.op": OP.DB, + "sentry.origin": AioMySQLIntegration.origin, + } | breadcrumb_data + + with sentry_sdk.traces.start_span( + name="connect", attributes=span_attributes + ) as span: + with capture_internal_exceptions(): + sentry_sdk.add_breadcrumb( + message="connect", category="query", data=breadcrumb_data + ) + res = await f(self) + else: + connect_data = _get_connect_data(self) + + with sentry_sdk.start_span( + op=OP.DB, + name="connect", + origin=AioMySQLIntegration.origin, + ) as span: + _set_db_data(span, self) + + with capture_internal_exceptions(): + sentry_sdk.add_breadcrumb( + message="connect", + category="query", + data=connect_data, + ) + res = await f(self) + + return res + + return _inner + + +def _get_connect_data(conn: Any, *, use_streaming_keys: bool = False) -> dict[str, Any]: + if use_streaming_keys: + db_system = SPANDATA.DB_SYSTEM_NAME + db_name = SPANDATA.DB_NAMESPACE + else: + db_system = SPANDATA.DB_SYSTEM + db_name = SPANDATA.DB_NAME + + data: dict[str, Any] = { + db_system: "mysql", + SPANDATA.DB_DRIVER_NAME: "aiomysql", + } + + host = getattr(conn, "host", None) + if host is not None: + data[SPANDATA.SERVER_ADDRESS] = host + + port = getattr(conn, "port", None) + if port is not None: + data[SPANDATA.SERVER_PORT] = port + + database = getattr(conn, "db", None) + if database is not None: + data[db_name] = database + + user = getattr(conn, "user", None) + if user is not None: + data[SPANDATA.DB_USER] = user + + return data + + +def _set_db_data(span: Any, conn: Any) -> None: + """Set database-related span data from connection object.""" + if isinstance(span, StreamedSpan): + set_value = span.set_attribute + db_system = SPANDATA.DB_SYSTEM_NAME + db_name = SPANDATA.DB_NAMESPACE + else: + # Remove this else block once we've completely migrated to streamed spans + # The use of deprecated attributes here is to ensure backwards compatibility + set_value = span.set_data + db_system = SPANDATA.DB_SYSTEM + db_name = SPANDATA.DB_NAME + + set_value(db_system, "mysql") + set_value(SPANDATA.DB_DRIVER_NAME, "aiomysql") + + host = getattr(conn, "host", None) + if host is not None: + set_value(SPANDATA.SERVER_ADDRESS, host) + + port = getattr(conn, "port", None) + if port is not None: + set_value(SPANDATA.SERVER_PORT, port) + + database = getattr(conn, "db", None) + if database is not None: + set_value(db_name, database) + + user = getattr(conn, "user", None) + if user is not None: + set_value(SPANDATA.DB_USER, user) diff --git a/tests/integrations/aiomysql/__init__.py b/tests/integrations/aiomysql/__init__.py new file mode 100644 index 0000000000..d927c2ddea --- /dev/null +++ b/tests/integrations/aiomysql/__init__.py @@ -0,0 +1,4 @@ +import pytest + +pytest.importorskip("aiomysql") +pytest.importorskip("pytest_asyncio") diff --git a/tests/integrations/aiomysql/test_aiomysql.py b/tests/integrations/aiomysql/test_aiomysql.py new file mode 100644 index 0000000000..53ea226a80 --- /dev/null +++ b/tests/integrations/aiomysql/test_aiomysql.py @@ -0,0 +1,801 @@ +""" +Tests need pytest-asyncio installed. + +Tests need a local MySQL instance running. This can be done using: +```sh +docker run --rm --name some-mysql -e MYSQL_ROOT_PASSWORD=root -e MYSQL_DATABASE=test -d -p 3306:3306 mysql:8.0 +``` + +The tests use the following credentials to establish a database connection. +""" + +import datetime +import os +import warnings +from contextlib import contextmanager +from unittest import mock + +import aiomysql +import pytest +import pytest_asyncio + +from sentry_sdk import capture_message, start_transaction +from sentry_sdk.consts import SPANDATA +from sentry_sdk.integrations.aiomysql import AioMySQLIntegration +from sentry_sdk.tracing_utils import record_sql_queries +from tests.conftest import ApproxDict + +MYSQL_HOST = os.getenv("SENTRY_PYTHON_TEST_MYSQL_HOST", "localhost") +MYSQL_PORT = int(os.getenv("SENTRY_PYTHON_TEST_MYSQL_PORT", "3306")) +MYSQL_USER = os.getenv("SENTRY_PYTHON_TEST_MYSQL_USER", "root") +MYSQL_PASSWORD = os.getenv("SENTRY_PYTHON_TEST_MYSQL_PASSWORD", "root") +MYSQL_DB_BASE = os.getenv("SENTRY_PYTHON_TEST_MYSQL_DB", "test") + + +def _get_db_name(): + pid = os.getpid() + return f"{MYSQL_DB_BASE}_{pid}" + + +MYSQL_DB = _get_db_name() + +CRUMBS_CONNECT = { + "category": "query", + "data": ApproxDict( + { + "db.name": MYSQL_DB, + "db.system": "mysql", + "db.user": MYSQL_USER, + "server.address": MYSQL_HOST, + "server.port": MYSQL_PORT, + } + ), + "message": "connect", + "type": "default", +} + + +@pytest_asyncio.fixture(autouse=True) +async def _clean_mysql(): + conn = await aiomysql.connect( + host=MYSQL_HOST, + port=MYSQL_PORT, + user=MYSQL_USER, + password=MYSQL_PASSWORD, + autocommit=True, + ) + try: + async with conn.cursor() as cur: + # Suppress MySQL warnings about unknown tables / existing databases + with warnings.catch_warnings(): + warnings.simplefilter("ignore", Warning) + await cur.execute(f"CREATE DATABASE IF NOT EXISTS `{MYSQL_DB}`") + await cur.execute(f"USE `{MYSQL_DB}`") + await cur.execute("DROP TABLE IF EXISTS users") + await cur.execute( + """ + CREATE TABLE users( + id INT AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255), + password VARCHAR(255), + dob DATE + ) + """ + ) + finally: + conn.close() + + +def _connect_args(): + return { + "host": MYSQL_HOST, + "port": MYSQL_PORT, + "user": MYSQL_USER, + "password": MYSQL_PASSWORD, + "db": MYSQL_DB, + "autocommit": True, + } + + +@pytest.mark.asyncio +async def test_connect(sentry_init, capture_events) -> None: + sentry_init( + integrations=[AioMySQLIntegration()], + _experiments={"record_sql_params": True}, + ) + events = capture_events() + + conn = await aiomysql.connect(**_connect_args()) + conn.close() + + capture_message("hi") + + (event,) = events + + for crumb in event["breadcrumbs"]["values"]: + del crumb["timestamp"] + + assert event["breadcrumbs"]["values"] == [CRUMBS_CONNECT] + + +@pytest.mark.asyncio +async def test_execute(sentry_init, capture_events) -> None: + sentry_init( + integrations=[AioMySQLIntegration()], + _experiments={"record_sql_params": True}, + ) + events = capture_events() + + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'pw', '1990-12-25')", + ) + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES (%s, %s, %s)", + ("Bob", "secret_pw", datetime.date(1984, 3, 1)), + ) + await cur.execute("SELECT * FROM users WHERE name = %s", ("Bob",)) + row = await cur.fetchone() + assert row[1] == "Bob" + + conn.close() + + capture_message("hi") + + (event,) = events + + for crumb in event["breadcrumbs"]["values"]: + del crumb["timestamp"] + + assert event["breadcrumbs"]["values"] == [ + CRUMBS_CONNECT, + { + "category": "query", + "data": {}, + "message": "INSERT INTO users(name, password, dob) VALUES ('Alice', 'pw', '1990-12-25')", + "type": "default", + }, + { + "category": "query", + "data": {}, + "message": "INSERT INTO users(name, password, dob) VALUES (%s, %s, %s)", + "type": "default", + }, + { + "category": "query", + "data": {}, + "message": "SELECT * FROM users WHERE name = %s", + "type": "default", + }, + ] + + +@pytest.mark.asyncio +async def test_execute_many(sentry_init, capture_events) -> None: + sentry_init( + integrations=[AioMySQLIntegration()], + _experiments={"record_sql_params": True}, + ) + events = capture_events() + + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.executemany( + "INSERT INTO users(name, password, dob) VALUES (%s, %s, %s)", + [ + ("Bob", "secret_pw", datetime.date(1984, 3, 1)), + ("Alice", "pw", datetime.date(1990, 12, 25)), + ], + ) + + conn.close() + + capture_message("hi") + + (event,) = events + + for crumb in event["breadcrumbs"]["values"]: + del crumb["timestamp"] + + assert event["breadcrumbs"]["values"] == [ + CRUMBS_CONNECT, + { + "category": "query", + "data": {"db.executemany": True}, + "message": "INSERT INTO users(name, password, dob) VALUES (%s, %s, %s)", + "type": "default", + }, + ] + + +@pytest.mark.asyncio +async def test_execute_many_non_insert(sentry_init, capture_events) -> None: + """Test executemany with non-INSERT queries (falls back to row-by-row).""" + sentry_init( + integrations=[AioMySQLIntegration()], + _experiments={"record_sql_params": True}, + ) + events = capture_events() + + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + # Pre-populate users table + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES (%s, %s, %s)", + ("Alice", "pw1", datetime.date(1990, 1, 1)), + ) + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES (%s, %s, %s)", + ("Bob", "pw2", datetime.date(1991, 2, 2)), + ) + # Non-INSERT executemany — uses row-by-row fallback internally + await cur.executemany( + "UPDATE users SET password = %s WHERE name = %s", + [ + ("new_pw_1", "Alice"), + ("new_pw_2", "Bob"), + ], + ) + + conn.close() + + capture_message("hi") + + (event,) = events + + for crumb in event["breadcrumbs"]["values"]: + del crumb["timestamp"] + + # Should have: connect + INSERT*2 + single executemany span (no double-recording) + crumbs = event["breadcrumbs"]["values"] + query_crumbs = [c for c in crumbs if c["category"] == "query"] + executemany_crumbs = [ + c for c in query_crumbs if c.get("data", {}).get("db.executemany") + ] + # Only ONE executemany breadcrumb — no duplicates from internal execute calls + assert len(executemany_crumbs) == 1 + assert "UPDATE users SET password = %s" in executemany_crumbs[0]["message"] + + +@pytest.mark.asyncio +async def test_record_params(sentry_init, capture_events) -> None: + sentry_init( + integrations=[AioMySQLIntegration(record_params=True)], + _experiments={"record_sql_params": True}, + ) + events = capture_events() + + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES (%s, %s, %s)", + ("Bob", "secret_pw", datetime.date(1984, 3, 1)), + ) + + conn.close() + + capture_message("hi") + + (event,) = events + + for crumb in event["breadcrumbs"]["values"]: + del crumb["timestamp"] + + assert event["breadcrumbs"]["values"] == [ + CRUMBS_CONNECT, + { + "category": "query", + "data": { + "db.params": ["Bob", "secret_pw", "datetime.date(1984, 3, 1)"], + "db.paramstyle": "format", + }, + "message": "INSERT INTO users(name, password, dob) VALUES (%s, %s, %s)", + "type": "default", + }, + ] + + +@pytest.mark.asyncio +async def test_cursor_context_manager(sentry_init, capture_events) -> None: + sentry_init( + integrations=[AioMySQLIntegration()], + _experiments={"record_sql_params": True}, + ) + events = capture_events() + + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.executemany( + "INSERT INTO users(name, password, dob) VALUES (%s, %s, %s)", + [ + ("Bob", "secret_pw", datetime.date(1984, 3, 1)), + ("Alice", "pw", datetime.date(1990, 12, 25)), + ], + ) + await cur.execute( + "SELECT * FROM users WHERE dob > %s", + (datetime.date(1970, 1, 1),), + ) + rows = await cur.fetchall() + assert len(rows) == 2 + + conn.close() + + capture_message("hi") + + (event,) = events + + for crumb in event["breadcrumbs"]["values"]: + del crumb["timestamp"] + + crumbs = event["breadcrumbs"]["values"] + assert crumbs[0] == CRUMBS_CONNECT + assert crumbs[1]["category"] == "query" + assert "INSERT" in crumbs[1]["message"] + assert crumbs[2]["category"] == "query" + assert "SELECT" in crumbs[2]["message"] + + +@pytest.mark.asyncio +async def test_cursor_async_iteration(sentry_init, capture_events) -> None: + sentry_init( + integrations=[AioMySQLIntegration()], + _experiments={"record_sql_params": True}, + ) + events = capture_events() + + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES (%s, %s, %s)", + ("Charlie", "pw3", datetime.date(1995, 5, 15)), + ) + await cur.execute("SELECT * FROM users WHERE name = %s", ("Charlie",)) + async for row in cur: + assert row[1] == "Charlie" + + conn.close() + + capture_message("hi") + + (event,) = events + + for crumb in event["breadcrumbs"]["values"]: + del crumb["timestamp"] + + crumbs = event["breadcrumbs"]["values"] + assert crumbs[0] == CRUMBS_CONNECT + assert len([c for c in crumbs if c["category"] == "query"]) >= 2 + + +@pytest.mark.asyncio +async def test_connection_pool(sentry_init, capture_events) -> None: + sentry_init( + integrations=[AioMySQLIntegration()], + _experiments={"record_sql_params": True}, + ) + events = capture_events() + + pool_size = 2 + + pool = await aiomysql.create_pool( + host=MYSQL_HOST, + port=MYSQL_PORT, + user=MYSQL_USER, + password=MYSQL_PASSWORD, + db=MYSQL_DB, + autocommit=True, + minsize=pool_size, + maxsize=pool_size, + ) + + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES (%s, %s, %s)", + ("Dave", "pw4", datetime.date(1988, 7, 20)), + ) + await cur.execute("SELECT * FROM users WHERE name = %s", ("Dave",)) + row = await cur.fetchone() + assert row is not None + assert row[1] == "Dave" + + pool.close() + await pool.wait_closed() + + capture_message("hi") + + (event,) = events + + for crumb in event["breadcrumbs"]["values"]: + del crumb["timestamp"] + + crumbs = event["breadcrumbs"]["values"] + # Verify queries were captured + query_crumbs = [c for c in crumbs if c["category"] == "query"] + assert len(query_crumbs) >= 2 # INSERT + SELECT + + # Verify connect spans were created for pool connections + connect_crumbs = [c for c in crumbs if c.get("message") == "connect"] + assert len(connect_crumbs) >= pool_size # One connect span per pooled connection + for crumb in connect_crumbs: + assert crumb["data"]["db.system"] == "mysql" + assert crumb["data"]["server.address"] == MYSQL_HOST + + +@pytest.mark.asyncio +async def test_query_source_disabled(sentry_init, capture_events): + sentry_options = { + "integrations": [AioMySQLIntegration()], + "traces_sample_rate": 1.0, + "enable_db_query_source": False, + "db_query_source_threshold_ms": 0, + } + + sentry_init(**sentry_options) + + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) + + conn.close() + + (event,) = events + + span = event["spans"][-1] + assert span["description"].startswith("INSERT INTO") + + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO not in data + assert SPANDATA.CODE_NAMESPACE not in data + assert SPANDATA.CODE_FILEPATH not in data + assert SPANDATA.CODE_FUNCTION not in data + + +@pytest.mark.asyncio +@pytest.mark.parametrize("enable_db_query_source", [None, True]) +async def test_query_source_enabled( + sentry_init, capture_events, enable_db_query_source +): + sentry_options = { + "integrations": [AioMySQLIntegration()], + "traces_sample_rate": 1.0, + "db_query_source_threshold_ms": 0, + } + if enable_db_query_source is not None: + sentry_options["enable_db_query_source"] = enable_db_query_source + + sentry_init(**sentry_options) + + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) + + conn.close() + + (event,) = events + + span = event["spans"][-1] + assert span["description"].startswith("INSERT INTO") + + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FILEPATH in data + assert SPANDATA.CODE_FUNCTION in data + + +@pytest.mark.asyncio +async def test_query_source(sentry_init, capture_events): + sentry_init( + integrations=[AioMySQLIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + ) + + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) + + conn.close() + + (event,) = events + + span = event["spans"][-1] + assert span["description"].startswith("INSERT INTO") + + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FILEPATH in data + assert SPANDATA.CODE_FUNCTION in data + + assert type(data.get(SPANDATA.CODE_LINENO)) == int + assert data.get(SPANDATA.CODE_LINENO) > 0 + assert ( + data.get(SPANDATA.CODE_NAMESPACE) == "tests.integrations.aiomysql.test_aiomysql" + ) + assert data.get(SPANDATA.CODE_FILEPATH).endswith( + "tests/integrations/aiomysql/test_aiomysql.py" + ) + + is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + assert is_relative_path + + assert data.get(SPANDATA.CODE_FUNCTION) == "test_query_source" + + +@pytest.mark.asyncio +async def test_no_query_source_if_duration_too_short(sentry_init, capture_events): + sentry_init( + integrations=[AioMySQLIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=100, + ) + + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + conn = await aiomysql.connect(**_connect_args()) + + @contextmanager + def fake_record_sql_queries(*args, **kwargs): + with record_sql_queries(*args, **kwargs) as span: + pass + span.start_timestamp = datetime.datetime(2024, 1, 1, microsecond=0) + span.timestamp = datetime.datetime(2024, 1, 1, microsecond=99999) + yield span + + async with conn.cursor() as cur: + with mock.patch( + "sentry_sdk.integrations.aiomysql.record_sql_queries", + fake_record_sql_queries, + ): + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) + + conn.close() + + (event,) = events + + span = event["spans"][-1] + assert span["description"].startswith("INSERT INTO") + + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO not in data + assert SPANDATA.CODE_NAMESPACE not in data + assert SPANDATA.CODE_FILEPATH not in data + assert SPANDATA.CODE_FUNCTION not in data + + +@pytest.mark.asyncio +async def test_query_source_if_duration_over_threshold(sentry_init, capture_events): + sentry_init( + integrations=[AioMySQLIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=100, + ) + + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + conn = await aiomysql.connect(**_connect_args()) + + @contextmanager + def fake_record_sql_queries(*args, **kwargs): + with record_sql_queries(*args, **kwargs) as span: + pass + span.start_timestamp = datetime.datetime(2024, 1, 1, microsecond=0) + span.timestamp = datetime.datetime(2024, 1, 1, microsecond=100001) + yield span + + async with conn.cursor() as cur: + with mock.patch( + "sentry_sdk.integrations.aiomysql.record_sql_queries", + fake_record_sql_queries, + ): + await cur.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) + + conn.close() + + (event,) = events + + span = event["spans"][-1] + assert span["description"].startswith("INSERT INTO") + + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FILEPATH in data + assert SPANDATA.CODE_FUNCTION in data + + assert type(data.get(SPANDATA.CODE_LINENO)) == int + assert data.get(SPANDATA.CODE_LINENO) > 0 + assert ( + data.get(SPANDATA.CODE_NAMESPACE) == "tests.integrations.aiomysql.test_aiomysql" + ) + assert data.get(SPANDATA.CODE_FILEPATH).endswith( + "tests/integrations/aiomysql/test_aiomysql.py" + ) + + is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + assert is_relative_path + + assert ( + data.get(SPANDATA.CODE_FUNCTION) + == "test_query_source_if_duration_over_threshold" + ) + + +@pytest.mark.asyncio +async def test_span_origin(sentry_init, capture_events): + sentry_init( + integrations=[AioMySQLIntegration()], + traces_sample_rate=1.0, + ) + + events = capture_events() + + with start_transaction(name="test_transaction"): + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.execute("SELECT 1") + await cur.execute("SELECT 2") + + conn.close() + + (event,) = events + + assert event["contexts"]["trace"]["origin"] == "manual" + + for span in event["spans"]: + assert span["origin"] == "auto.db.aiomysql" + + +@pytest.mark.asyncio +async def test_multiline_query_description_normalized(sentry_init, capture_events): + sentry_init( + integrations=[AioMySQLIntegration()], + traces_sample_rate=1.0, + ) + events = capture_events() + + with start_transaction(name="test_transaction"): + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.execute( + """ + SELECT + id, + name + FROM + users + WHERE + name = 'Alice' + """ + ) + + conn.close() + + (event,) = events + + spans = [ + s + for s in event["spans"] + if s["op"] == "db" and "SELECT" in s.get("description", "") + ] + assert len(spans) == 1 + assert spans[0]["description"] == "SELECT id, name FROM users WHERE name = 'Alice'" + + +@pytest.mark.asyncio +async def test_before_send_transaction_sees_normalized_description( + sentry_init, capture_events +): + def before_send_transaction(event, hint): + for span in event.get("spans", []): + desc = span.get("description", "") + if "SELECT id, name FROM users" in desc: + span["description"] = "filtered" + return event + + sentry_init( + integrations=[AioMySQLIntegration()], + traces_sample_rate=1.0, + before_send_transaction=before_send_transaction, + ) + events = capture_events() + + with start_transaction(name="test_transaction"): + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.execute( + """ + SELECT + id, + name + FROM + users + """ + ) + + conn.close() + + (event,) = events + spans = [ + s + for s in event["spans"] + if s["op"] == "db" and "filtered" in s.get("description", "") + ] + + assert len(spans) == 1 + assert spans[0]["description"] == "filtered" + + +@pytest.mark.asyncio +async def test_db_data_on_spans(sentry_init, capture_events): + """Test that database connection data is properly set on spans.""" + sentry_init( + integrations=[AioMySQLIntegration()], + traces_sample_rate=1.0, + ) + events = capture_events() + + with start_transaction(name="test_transaction"): + conn = await aiomysql.connect(**_connect_args()) + + async with conn.cursor() as cur: + await cur.execute("SELECT 1") + + conn.close() + + (event,) = events + + db_spans = [s for s in event["spans"] if s["op"] == "db"] + assert len(db_spans) > 0 + + query_span = [s for s in db_spans if "SELECT" in s.get("description", "")][0] + assert query_span["data"].get(SPANDATA.DB_SYSTEM) == "mysql" + assert query_span["data"].get(SPANDATA.SERVER_ADDRESS) == MYSQL_HOST + assert query_span["data"].get(SPANDATA.SERVER_PORT) == MYSQL_PORT + assert query_span["data"].get(SPANDATA.DB_NAME) == MYSQL_DB + assert query_span["data"].get(SPANDATA.DB_USER) == MYSQL_USER