From 35eff18cf4bbcd063dfc08de7c7a06404b0cd46b Mon Sep 17 00:00:00 2001 From: Dharit Shah Date: Sun, 5 Apr 2026 14:49:32 -0400 Subject: [PATCH] Fix infinite reconnection loop in StreamableHTTP client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _handle_reconnection() reset the attempt counter to 0 when the SSE stream ended without delivering a complete response (only priming events). This made MAX_RECONNECTION_ATTEMPTS ineffective—a server that accepts connections but drops streams caused the client to retry forever. The fix distinguishes productive reconnections (ones that delivered actual message data like notifications) from unproductive ones (only priming events or nothing). Productive reconnections reset the counter so legitimate multi-close patterns continue working. Unproductive reconnections increment the counter, and once MAX_RECONNECTION_ATTEMPTS is reached the client sends an error back to the caller instead of silently returning (which caused the caller to hang). Made-with: Cursor Github-Issue: #2393 Made-with: Cursor --- src/mcp/client/streamable_http.py | 26 +++++++++++++--- tests/shared/test_streamable_http.py | 45 ++++++++++++++++++++++++---- 2 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9a119c633..da13794a8 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -380,8 +380,19 @@ async def _handle_reconnection( ) -> None: """Reconnect with Last-Event-ID to resume stream after server disconnect.""" # Bail if max retries exceeded - if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover - logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded") + if attempt >= MAX_RECONNECTION_ATTEMPTS: + logger.warning(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded") + if isinstance(ctx.session_message.message, JSONRPCRequest): # pragma: no branch + error_data = ErrorData( + code=INTERNAL_ERROR, + message=( + f"SSE stream disconnected and max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded" + ), + ) + error_msg = SessionMessage( + JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.id, error=error_data) + ) + await ctx.read_stream_writer.send(error_msg) return # Always wait - use server value or default @@ -404,6 +415,7 @@ async def _handle_reconnection( # Track for potential further reconnection reconnect_last_event_id: str = last_event_id reconnect_retry_ms = retry_interval_ms + received_data = False async for sse in event_source.aiter_sse(): if sse.id: # pragma: no branch @@ -421,9 +433,15 @@ async def _handle_reconnection( await event_source.response.aclose() return - # Stream ended again without response - reconnect again (reset attempt counter) + if sse.data: + received_data = True + + # Stream ended without response - reset counter only if we received + # actual message data (not just priming events), otherwise increment + # to prevent infinite reconnection loops when the server always drops. + next_attempt = 0 if received_data else attempt + 1 logger.info("SSE stream disconnected, reconnecting...") - await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0) + await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, next_attempt) except Exception as e: # pragma: no cover logger.debug(f"Reconnection failed: {e}") # Try to reconnect again if we still have an event ID diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3d5770fb6..8391adcb2 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -225,6 +225,11 @@ async def _handle_list_tools( # pragma: no cover description="Tool that closes standalone GET stream mid-operation", input_schema={"type": "object", "properties": {}}, ), + Tool( + name="tool_with_perpetual_stream_close", + description="Tool that always closes the stream without sending a response", + input_schema={"type": "object", "properties": {}}, + ), ] ) @@ -380,6 +385,16 @@ async def _handle_call_tool( # pragma: no cover return CallToolResult(content=[TextContent(type="text", text="Standalone stream close test done")]) + elif name == "tool_with_perpetual_stream_close": + # Repeatedly close the stream without ever sending a response. + # Used to verify that _handle_reconnection gives up after MAX_RECONNECTION_ATTEMPTS. + for _ in range(10): + if ctx.close_sse_stream: + await ctx.close_sse_stream() + await anyio.sleep(0.3) + # This response should never be reached by the client because reconnection gives up + return CallToolResult(content=[TextContent(type="text", text="Should not reach")]) + return CallToolResult(content=[TextContent(type="text", text=f"Called {name}")]) @@ -1086,7 +1101,7 @@ async def test_streamable_http_client_tool_invocation(initialized_client_session """Test client tool invocation.""" # First list tools tools = await initialized_client_session.list_tools() - assert len(tools.tools) == 10 + assert len(tools.tools) == 11 assert tools.tools[0].name == "test_tool" # Call the tool @@ -1116,7 +1131,7 @@ async def test_streamable_http_client_session_persistence(basic_server: None, ba # Make multiple requests to verify session persistence tools = await session.list_tools() - assert len(tools.tools) == 10 + assert len(tools.tools) == 11 # Read a resource resource = await session.read_resource(uri="foobar://test-persist") @@ -1138,7 +1153,7 @@ async def test_streamable_http_client_json_response(json_response_server: None, # Check tool listing tools = await session.list_tools() - assert len(tools.tools) == 10 + assert len(tools.tools) == 11 # Call a tool and verify JSON response handling result = await session.call_tool("test_tool", {}) @@ -1220,7 +1235,7 @@ async def test_streamable_http_client_session_termination(basic_server: None, ba # Make a request to confirm session is working tools = await session.list_tools() - assert len(tools.tools) == 10 + assert len(tools.tools) == 11 async with create_mcp_http_client(headers=headers) as httpx_client2: async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client2) as ( @@ -1281,7 +1296,7 @@ async def mock_delete(self: httpx.AsyncClient, *args: Any, **kwargs: Any) -> htt # Make a request to confirm session is working tools = await session.list_tools() - assert len(tools.tools) == 10 + assert len(tools.tools) == 11 async with create_mcp_http_client(headers=headers) as httpx_client2: async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client2) as ( @@ -2318,3 +2333,23 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( assert "content-type" in headers_data assert headers_data["content-type"] == "application/json" + + +@pytest.mark.anyio +async def test_reconnection_gives_up_after_max_attempts( + event_server: tuple[SimpleEventStore, str], +) -> None: + """Client should stop reconnecting after MAX_RECONNECTION_ATTEMPTS and return an error. + + Regression test for https://github.com/modelcontextprotocol/python-sdk/issues/2393: + _handle_reconnection used to reset the attempt counter to 0 when the stream ended + without a response, causing an infinite retry loop. + """ + _, server_url = event_server + + async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + + with pytest.raises(MCPError), anyio.fail_after(30): # pragma: no branch + await session.call_tool("tool_with_perpetual_stream_close", {})