Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions src/mcp/client/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,24 @@ async def post_writer(endpoint_url: str):

async def _send_message(session_message: SessionMessage) -> None:
logger.debug(f"Sending client message: {session_message}")
response = await client.post(
endpoint_url,
json=session_message.message.model_dump(
by_alias=True,
mode="json",
exclude_unset=True,
),
)
response.raise_for_status()
try:
response = await client.post(
endpoint_url,
json=session_message.message.model_dump(
by_alias=True,
mode="json",
exclude_unset=True,
),
)
response.raise_for_status()
except httpx.HTTPError as exc:
# Forward the failure to the caller via the read stream instead of
# letting it surface as a swallowed task-group error, which would
# leave read_stream.receive() blocked forever (#2110). Mirrors the
# stream-error handling in stdio.py and streamable_http.py.
logger.exception("Error sending client message")
await read_stream_writer.send(exc)
return
logger.debug(f"Client message sent successfully: {response.status_code}")

async for session_message in write_stream_reader:
Expand Down
49 changes: 48 additions & 1 deletion tests/shared/test_sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from inline_snapshot import snapshot
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.responses import Response, StreamingResponse
from starlette.routing import Mount, Route

import mcp.client.sse
Expand All @@ -25,6 +25,7 @@
from mcp.server.transport_security import TransportSecuritySettings
from mcp.shared._httpx_utils import McpHttpClientFactory
from mcp.shared.exceptions import MCPError
from mcp.shared.message import SessionMessage
from mcp.types import (
CallToolRequestParams,
CallToolResult,
Expand Down Expand Up @@ -108,6 +109,52 @@ def make_server_app() -> Starlette:
return make_app(Server(SERVER_NAME, on_read_resource=_handle_read_resource))


def make_failing_post_app() -> Starlette:
"""An SSE app that completes the handshake but fails every message POST with 503.

The `/sse` stream announces a valid endpoint (so the client reaches the POST path) and
stays open until the client disconnects; `/messages/` always returns 503. Used to drive
the client's POST-error propagation path (#2110).
"""

async def handle_sse(request: Request) -> Response:
async def event_stream() -> AsyncGenerator[bytes, None]:
yield b"event: endpoint\r\ndata: /messages/\r\n\r\n"
# Hold the stream open with a single, branch-free wait (so coverage is
# deterministic) that comfortably outlasts the in-process POST round trip.
# The client tears the connection down as soon as it receives the error.
await anyio.sleep(1.0)

return StreamingResponse(event_stream(), media_type="text/event-stream")

async def handle_message(request: Request) -> Response:
return Response("upstream exploded", status_code=503)

return Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Route("/messages/", endpoint=handle_message, methods=["POST"]),
]
)


@pytest.mark.anyio
async def test_sse_client_post_error_propagates_to_caller() -> None:
"""A non-2xx on the message POST surfaces to the caller via the read stream.

Regression test for #2110: the error was previously swallowed by the post_writer task
group and `read_stream.receive()` blocked forever.
"""
factory = in_process_client_factory(make_failing_post_app())
async with sse_client(f"{BASE_URL}/sse", httpx_client_factory=factory) as (read_stream, write_stream):
await write_stream.send(SessionMessage(types.JSONRPCRequest(jsonrpc="2.0", id=1, method="ping")))
with anyio.fail_after(10):
item = await read_stream.receive()

assert isinstance(item, httpx.HTTPStatusError)
assert item.response.status_code == 503


@pytest.mark.anyio
async def test_raw_sse_connection() -> None:
"""The SSE GET responds 200 with an event-stream content type, announcing the session
Expand Down
Loading