Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions postgresql_proxy/interceptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,20 @@ def intercept(self, packet_type, data):
# Query, ends with b'\x00'
data = self._intercept_query(data, ic_queries)
elif packet_type == b"P":
# Statement that needs parsing.
# First byte of the body is some Statement flag. Ignore, don't lose
# Next is the query, same as above, ends with an b'\x00'
# Last 2 bytes are the number of parameters. Ignore, don't lose
statement = data[0:1]
query = self._intercept_query(data[1:-2], ic_queries)
params = data[-2:]
data = statement + query + params
# Parse packet body:
# statement_name\x00 + query\x00 + int16(param_count) + uint32[]
# Keep the binary suffix untouched (count + OID array).
statement_end = data.find(b"\x00")
if statement_end != -1:
query_start = statement_end + 1
query_end = data.find(b"\x00", query_start)
if query_end != -1:
statement = data[:query_start]
query = self._intercept_query(
data[query_start : query_end + 1], ic_queries
)
params = data[query_end + 1 :]
data = statement + query + params

if packet_type == b"":
# Connection request / context. Ignore the first 4 bytes, keep it
Expand Down Expand Up @@ -101,7 +107,8 @@ def _intercept_context_data(self, data):
def _intercept_query(self, query, interceptors):
logging.getLogger("intercept").debug("intercepting query\n%s", query)
# Remove zero byte at the end
query = query[:-1].decode("utf-8")
codec = self.get_codec()
query = query[:-1].decode(codec)
for interceptor in interceptors:
func = self._get_plugin_interceptor_function(interceptor)
query = func(query, self.context)
Expand All @@ -113,7 +120,7 @@ def _intercept_query(self, query, interceptors):
)

# Append the zero byte at the end
return query.encode("utf-8") + b"\x00"
return query.encode(codec) + b"\x00"


class ResponseInterceptor(Interceptor):
Expand Down
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pytest==9.0.3
pytest-timeout==2.4.0
psycopg[binary]==3.3.4
36 changes: 36 additions & 0 deletions tests/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import threading
import time

import psycopg
import psycopg2
import pytest

Expand Down Expand Up @@ -332,3 +333,38 @@ def test_psql_ssl_file_batch_stress_no_hang(postgres_settings, ssl_proxy_port):
"psql -f batch succeeded but expected marker missing "
f"(run={run_idx + 1}, {elapsed=:.2f}s) stdout_tail={out_tail}"
)


def test_extended_query_protocol_parse_packet_with_high_oid_params_passes_through_proxy(
postgres_settings, plain_proxy_port
):
"""Regression: proxy must not corrupt Extended Query Protocol Parse packets.
psycopg v3 sends Parse → Bind → Execute for parameterized queries. The Parse body
ends with binary uint32 OIDs; jsonb OID 3802 (0x00000EDA) contains 0xDA which is
not valid UTF-8. The old interceptor sliced the body incorrectly and crashed on
decode, causing the connection to hang or drop.
"""
with psycopg.connect(
host="127.0.0.1",
port=plain_proxy_port,
user=postgres_settings["user"],
password=postgres_settings["password"],
dbname=postgres_settings["dbname"],
sslmode="disable",
) as conn:
with conn.cursor() as cur:
cur.execute(
"DROP TABLE IF EXISTS _test_jsonb_proxy_params;"
"CREATE TABLE _test_jsonb_proxy_params "
"(id serial PRIMARY KEY, data jsonb, label text);"
)

cur.execute(
"INSERT INTO _test_jsonb_proxy_params (data, label) "
"VALUES (%s, %s) RETURNING id",
(psycopg.types.json.Jsonb({"key": "value"}), "hello"),
)
row = cur.fetchone()

assert row is not None and row[0] >= 1
Loading