Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
poetry-version: ["1.8.2"]

steps:
Expand Down
58 changes: 21 additions & 37 deletions aws_advanced_python_wrapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
from logging import DEBUG, getLogger

from aws_advanced_python_wrapper.pep249 import (DatabaseError, DataError,
Error, IntegrityError,
InterfaceError, InternalError,
NotSupportedError,
OperationalError,
ProgrammingError)
from .cleanup import release_resources
from .driver_info import DriverInfo
from .utils.utils import LogUtils
from .wrapper import AwsWrapperConnection

# PEP249 compliance
connect = AwsWrapperConnection.connect
apilevel = "2.0"
threadsafety = 2
paramstyle = "pyformat"

# Public API
__all__ = [
'connect',
'AwsWrapperConnection',
'release_resources',
'set_logger',
'apilevel',
'threadsafety',
'paramstyle',
'Error',
'InterfaceError',
'DatabaseError',
'DataError',
'OperationalError',
'IntegrityError',
'InternalError',
'ProgrammingError',
'NotSupportedError'
]
from aws_advanced_python_wrapper import _dbapi
from aws_advanced_python_wrapper.cleanup import release_resources
from aws_advanced_python_wrapper.driver_info import DriverInfo
from aws_advanced_python_wrapper.utils.utils import LogUtils
from aws_advanced_python_wrapper.wrapper import AwsWrapperConnection

# Populate the full PEP 249 module surface (exceptions, type ctors/singletons,
# apilevel/threadsafety/paramstyle). `connect` stays bound to
# AwsWrapperConnection.connect for back-compat with existing callers.
_dbapi.install(sys.modules[__name__].__dict__, connect=AwsWrapperConnection.connect)

__version__ = DriverInfo.DRIVER_VERSION


def set_logger(name='aws_advanced_python_wrapper', level=DEBUG, format_string=None):
def set_logger(name="aws_advanced_python_wrapper", level=DEBUG, format_string=None):
LogUtils.setup_logger(getLogger(name), level, format_string)


__all__ = (
"AwsWrapperConnection",
"release_resources",
"set_logger",
*_dbapi._PEP249_NAMES,
"connect",
)
135 changes: 135 additions & 0 deletions aws_advanced_python_wrapper/_dbapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Canonical PEP 249 module surface shared by the top-level wrapper module
and the per-driver DBAPI submodules (`aws_advanced_python_wrapper.psycopg`,
`aws_advanced_python_wrapper.mysql_connector`).

Consumers should NOT import from this module directly. The public DBAPI
module surface lives on the top-level module and the per-driver submodules,
populated via `install()`.
"""

from __future__ import annotations

from datetime import date as Date # noqa: N812
from datetime import datetime as Timestamp # noqa: N812
from datetime import time as Time # noqa: N812
from time import localtime
from typing import Callable, Optional

from aws_advanced_python_wrapper.pep249 import DatabaseError # noqa: F401
from aws_advanced_python_wrapper.pep249 import DataError # noqa: F401
from aws_advanced_python_wrapper.pep249 import Error # noqa: F401
from aws_advanced_python_wrapper.pep249 import IntegrityError # noqa: F401
from aws_advanced_python_wrapper.pep249 import InterfaceError # noqa: F401
from aws_advanced_python_wrapper.pep249 import InternalError # noqa: F401
from aws_advanced_python_wrapper.pep249 import NotSupportedError # noqa: F401
from aws_advanced_python_wrapper.pep249 import OperationalError # noqa: F401
from aws_advanced_python_wrapper.pep249 import ProgrammingError # noqa: F401
from aws_advanced_python_wrapper.pep249 import Warning # noqa: F401

apilevel = "2.0"
threadsafety = 2
paramstyle = "pyformat"


def Binary(data: bytes) -> bytes: # noqa: N802
return bytes(data)


def DateFromTicks(ticks: float) -> Date: # noqa: N802
return Date(*localtime(ticks)[:3])


def TimeFromTicks(ticks: float) -> Time: # noqa: N802
return Time(*localtime(ticks)[3:6])


def TimestampFromTicks(ticks: float) -> Timestamp: # noqa: N802
return Timestamp(*localtime(ticks)[:6])


class _DBAPISet(frozenset):
"""Type-object singleton per PEP 249: compares equal to any contained type code."""

def __eq__(self, other: object) -> bool:
if isinstance(other, (int, str)):
return other in self
return super().__eq__(other)

def __ne__(self, other: object) -> bool:
return not self.__eq__(other)

def __hash__(self) -> int:
return super().__hash__()


# Type-code sources:
# PG: psycopg.postgres.types (OIDs)
# MySQL: mysql.connector.FieldType (ints)
# Union both into each singleton.

# PG text-like OIDs: text(25), varchar(1043), bpchar(1042), char(18),
# name(19), json(114), jsonb(3802)
# MySQL FieldType string-like: VAR_STRING(253), STRING(254), VARCHAR(15)
STRING = _DBAPISet([25, 1043, 1042, 18, 19, 114, 3802, 253, 254, 15])

# PG binary: bytea(17)
# MySQL FieldType BLOB family: TINY_BLOB(249), MEDIUM_BLOB(250),
# LONG_BLOB(251), BLOB(252)
BINARY = _DBAPISet([17, 249, 250, 251, 252])

# PG numeric: int2(21), int4(23), int8(20), float4(700), float8(701),
# numeric(1700), money(790)
# MySQL FieldType numeric: DECIMAL(0), TINY(1), SHORT(2), LONG(3),
# FLOAT(4), DOUBLE(5), LONGLONG(8), INT24(9),
# NEWDECIMAL(246)
NUMBER = _DBAPISet([21, 23, 20, 700, 701, 1700, 790, 0, 1, 2, 3, 4, 5, 8, 9, 246])

# PG datetime: date(1082), time(1083), timestamp(1114), timestamptz(1184),
# timetz(1266), interval(1186)
# MySQL FieldType datetime: DATE(10), TIME(11), DATETIME(12), YEAR(13),
# NEWDATE(14), TIMESTAMP(7)
DATETIME = _DBAPISet([1082, 1083, 1114, 1184, 1266, 1186, 10, 11, 12, 13, 14, 7])

# PG rowid: oid(26). MySQL has no ROWID equivalent; left PG-only.
ROWID = _DBAPISet([26])


_PEP249_NAMES = (
"Warning", "Error", "InterfaceError", "DatabaseError",
"DataError", "OperationalError", "IntegrityError",
"InternalError", "ProgrammingError", "NotSupportedError",
"Date", "Time", "Timestamp",
"DateFromTicks", "TimeFromTicks", "TimestampFromTicks",
"Binary", "STRING", "BINARY", "NUMBER", "DATETIME", "ROWID",
"apilevel", "threadsafety", "paramstyle",
)


def install(module_ns: dict, connect: Optional[Callable] = None) -> None:
"""Populate `module_ns` with the PEP 249 module surface.

If `connect` is provided, `module_ns['connect']` is set to it and 'connect'
is added to `module_ns['__all__']`.
"""
source = globals()
for name in _PEP249_NAMES:
module_ns[name] = source[name]
if connect is not None:
module_ns["connect"] = connect
module_ns["__all__"] = (*_PEP249_NAMES, "connect")
else:
module_ns["__all__"] = tuple(_PEP249_NAMES)
9 changes: 7 additions & 2 deletions aws_advanced_python_wrapper/driver_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def execute(
exec_func: Callable,
*args: Any,
exec_timeout: Optional[float] = None,
conn: Optional[Connection] = None,
**kwargs: Any) -> Cursor:
if DbApiMethod.ALL.method_name not in self.network_bound_methods and method_name not in self.network_bound_methods:
return exec_func()
Expand All @@ -138,7 +139,11 @@ def execute(

if exec_timeout > 0:
try:
execute_with_timeout = timeout(self._thread_pool, exec_timeout)(exec_func)
# Pass conn so that, on timeout, the abandoned operation's socket is
# shut down and its worker thread is awaited before we propagate --
# otherwise a later close/reuse of conn races the still-running
# operation (cross-thread use-after-free in the driver, env-4 SIGSEGV).
execute_with_timeout = timeout(self._thread_pool, exec_timeout, conn)(exec_func)
return execute_with_timeout()
except TimeoutError as e:
raise QueryTimeoutError(Messages.get_formatted("DriverDialect.ExecuteTimeout", method_name)) from e
Expand All @@ -161,7 +166,7 @@ def ping(self, conn: Connection) -> bool:
try:
with conn.cursor() as cursor:
query = DriverDialect._QUERY
self.execute(DbApiMethod.CURSOR_EXECUTE.method_name, lambda: cursor.execute(query), query, exec_timeout=10)
self.execute(DbApiMethod.CURSOR_EXECUTE.method_name, lambda: cursor.execute(query), query, exec_timeout=10, conn=conn)
cursor.fetchone()
return True
except Exception:
Expand Down
23 changes: 17 additions & 6 deletions aws_advanced_python_wrapper/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from typing import Optional

from .pep249 import Error
from .pep249 import Error, InterfaceError, NotSupportedError, OperationalError


class AwsWrapperError(Error):
Expand All @@ -30,15 +30,15 @@ def __init__(self, message: str = "", original_error: Optional[Exception] = None
self.driver_error = original_error


class UnsupportedOperationError(AwsWrapperError):
class UnsupportedOperationError(AwsWrapperError, NotSupportedError):
__module__ = "aws_advanced_python_wrapper"


class QueryTimeoutError(AwsWrapperError):
class QueryTimeoutError(AwsWrapperError, OperationalError):
__module__ = "aws_advanced_python_wrapper"


class FailoverError(Error):
class FailoverError(OperationalError):
__module__ = "aws_advanced_python_wrapper"


Expand All @@ -51,12 +51,23 @@ class FailoverFailedError(FailoverError):


class FailoverSuccessError(FailoverError):
# SA classification is handled at the dialect boundary by
# ``sqlalchemy_dialects._exception_handling._FailoverSuccessRewrapMixin``,
# which catches FailoverSuccessError in ``do_execute`` /
# ``do_executemany`` and re-raises as the dialect's native
# OperationalError. Do NOT add driver-native OperationalError classes
# (psycopg / mysql.connector) as bases here: Django's
# ``wrap_database_errors`` walks ``issubclass`` against the driver's
# own error module and would swallow FailoverSuccessError before any
# user ``except FailoverSuccessError:`` handler could see it
# (regression seen in tests/integration/container/django/
# test_django_plugins.py::test_django_failover_during_query).
__module__ = "aws_advanced_python_wrapper"


class ReadWriteSplittingError(AwsWrapperError):
class ReadWriteSplittingError(AwsWrapperError, InterfaceError):
__module__ = "aws_advanced_python_wrapper"


class AwsConnectError(AwsWrapperError):
class AwsConnectError(AwsWrapperError, OperationalError):
__module__ = "aws_advanced_python_wrapper"
11 changes: 11 additions & 0 deletions aws_advanced_python_wrapper/exception_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ def is_network_exception(self, error: Optional[Exception] = None, sql_state: Opt
def is_login_exception(self, error: Optional[Exception] = None, sql_state: Optional[str] = None) -> bool:
"""
Checks whether the given error is caused by failing to authenticate the user.

Note for subclassers: some callers (notably ``HostMonitor`` in
``cluster_topology_monitor`` since commit ``724de17``) treat a
``True`` result as **bounded transient** — they retry on Aurora's
PAM-service-restart window during writer promotion before giving up.
If you override this method and intend a fast-fail "credentials are
permanently bad" signal, be aware your override may be retried up
to ~5 seconds before propagating. Callers that need fail-fast
semantics should classify those errors elsewhere (e.g. as a
dedicated non-network non-login exception).

:param error: The error raised by the target driver.
:param sql_state: The SQL State associated with the error.
:return: True if the error is caused by a login issue, False otherwise.
Expand Down
66 changes: 66 additions & 0 deletions aws_advanced_python_wrapper/mysql_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""PEP 249 DBAPI module bound to mysql-connector-python.

Enables SQLAlchemy's creator-pattern:

from sqlalchemy import create_engine
from aws_advanced_python_wrapper.mysql_connector import connect

engine = create_engine(
"mysql+mysqlconnector://",
creator=lambda: connect(
"host=... user=... database=...",
wrapper_dialect="aurora-mysql",
),
)
"""

from __future__ import annotations

import sys
from typing import Any

import mysql.connector as _mysql_connector
from mysql.connector import connect as _mysql_connect

from aws_advanced_python_wrapper import _dbapi
from aws_advanced_python_wrapper.wrapper import AwsWrapperConnection


def __getattr__(name: str) -> Any:
"""Forward missing attributes to the underlying mysql.connector module.

See ``aws_advanced_python_wrapper/psycopg.py`` for the rationale.
"""
try:
return getattr(_mysql_connector, name)
except AttributeError:
raise AttributeError(
f"module 'aws_advanced_python_wrapper.mysql_connector' has no attribute {name!r}"
) from None


def connect(conninfo: str = "", **kwargs: Any) -> AwsWrapperConnection:
"""PEP 249 `connect`, target-driver-bound to mysql-connector-python.

Equivalent to::

AwsWrapperConnection.connect(mysql.connector.connect, conninfo, **kwargs)
"""
return AwsWrapperConnection.connect(_mysql_connect, conninfo, **kwargs)


_dbapi.install(sys.modules[__name__].__dict__, connect=connect)
Loading