2626 RequestId ,
2727 jsonrpc_message_adapter ,
2828)
29+ from mcp_types .version import MODERN_PROTOCOL_VERSIONS
2930from pydantic import ValidationError
3031
3132from mcp .client ._transport import TransportStreams
3233from mcp .shared ._compat import resync_tracer
3334from mcp .shared ._context_streams import ContextReceiveStream , ContextSendStream , create_context_streams
3435from mcp .shared ._httpx_utils import create_mcp_http_client
3536from mcp .shared .inbound import MCP_PROTOCOL_VERSION_HEADER
37+ from mcp .shared .jsonrpc_dispatcher import cancelled_request_id_from_params
3638from mcp .shared .message import ClientMessageMetadata , SessionMessage
3739
3840logger = logging .getLogger (__name__ )
@@ -70,6 +72,19 @@ class RequestContext:
7072 read_stream_writer : StreamWriter
7173
7274
75+ @dataclass (slots = True )
76+ class _InFlightPost :
77+ """A request POST in flight: its abort scope and the era it was sent under.
78+
79+ `modern` is the negotiated-version cache as of this request's dequeue, so a
80+ later cancel frame is interpreted under the era the request actually ran
81+ with, not whatever the cache says by then.
82+ """
83+
84+ scope : anyio .CancelScope
85+ modern : bool
86+
87+
7388class StreamableHTTPTransport :
7489 """StreamableHTTP client transport implementation."""
7590
@@ -81,21 +96,28 @@ def __init__(self, url: str) -> None:
8196 """
8297 self .url = url
8398 self .session_id : str | None = None
84- # Captured from each stamped POST's metadata. Reused on outbound HTTP that carries
85- # no per-message header (transport-internal GET/DELETE, and dispatcher-written
86- # response/error/cancel POSTs that bypass the session's stamp). Cleared when an
87- # `initialize` POST goes out so a probe-stamped value cannot leak onto the handshake.
99+ # Captured from each stamped message's metadata, synchronously in the
100+ # post_writer loop so the cache always reflects wire order (a POST task's
101+ # scheduling is arbitrary). Reused on outbound HTTP that carries no
102+ # per-message header (transport-internal GET/DELETE, and dispatcher-written
103+ # response/error POSTs that bypass the session's stamp), and consulted by
104+ # `_consume_modern_cancellation`. Cleared when an `initialize` message is
105+ # dequeued so a probe-stamped value cannot leak onto the handshake.
88106 self ._protocol_version_header : str | None = None
107+ # Every request's POST runs inside one of these so an outbound
108+ # `notifications/cancelled` at 2026 can abort it; see
109+ # `_consume_modern_cancellation`. Keys are verbatim-typed ("1" is not 1).
110+ self ._in_flight_posts : dict [RequestId , _InFlightPost ] = {}
89111
90112 def _prepare_headers (self ) -> dict [str , str ]:
91113 """Build MCP-specific request headers for any outbound HTTP request.
92114
93115 These are merged with the ``httpx.AsyncClient`` defaults (these take
94116 precedence). The cached ``MCP-Protocol-Version`` is included whenever
95117 present so messages that don't pass through the session's stamp —
96- response/error/cancel POSTs, transport-internal GET/DELETE — still
97- carry the negotiated version. Per-message headers are layered on top
98- by the caller.
118+ response/error POSTs, legacy cancel frames, transport-internal
119+ GET/DELETE — still carry the negotiated version. Per-message headers
120+ are layered on top by the caller.
99121 """
100122 headers : dict [str , str ] = {
101123 "accept" : "application/json, text/event-stream" ,
@@ -245,19 +267,57 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None:
245267 await event_source .response .aclose ()
246268 break
247269
270+ def _consume_modern_cancellation (self , session_message : SessionMessage ) -> bool :
271+ """Translate an outbound `notifications/cancelled` at 2026; True means "do not POST".
272+
273+ The 2026 wire defines no client-to-server notifications over streamable
274+ HTTP: closing a request's response stream IS its cancellation signal.
275+ The dispatcher still emits the courtesy frame as its abandon signal
276+ (every outbound cancel names one of our own request ids - the spec
277+ forbids cancelling a request the sender did not issue), so this
278+ transport translates it: when the named request's POST is in flight,
279+ that POST's own recorded era decides - abort-and-swallow at 2026, POST
280+ the frame below it (where the frame is the signal and a disconnect
281+ explicitly is not). With no POST to consult, the cached negotiated
282+ version decides; at 2026 the frame is swallowed even unmatched, so a
283+ late cancel racing the response cannot leak onto the wire.
284+ """
285+ message = session_message .message
286+ if not (isinstance (message , JSONRPCNotification ) and message .method == "notifications/cancelled" ):
287+ return False
288+ request_id = cancelled_request_id_from_params (message .params )
289+ post = self ._in_flight_posts .get (request_id ) if request_id is not None else None
290+ if post is not None :
291+ if not post .modern :
292+ return False
293+ logger .debug ("aborting in-flight POST for cancelled request %r" , request_id )
294+ post .scope .cancel ()
295+ return True
296+ return self ._protocol_version_header in MODERN_PROTOCOL_VERSIONS
297+
298+ async def _run_request_post (
299+ self ,
300+ post_fn : Callable [[], Awaitable [None ]],
301+ post : _InFlightPost ,
302+ request_id : RequestId ,
303+ ) -> None :
304+ """Run one request's POST inside its abort scope (see `_consume_modern_cancellation`)."""
305+ try :
306+ with post .scope :
307+ await post_fn ()
308+ finally :
309+ # Identity-guarded: a reused id may already have a successor
310+ # registered while this task unwinds - popping by key alone would
311+ # evict the live entry and leave the new POST unabortable.
312+ if self ._in_flight_posts .get (request_id ) is post :
313+ del self ._in_flight_posts [request_id ]
314+
248315 async def _handle_post_request (self , ctx : RequestContext ) -> None :
249316 """Handle a POST request with response processing."""
250317 message = ctx .session_message .message
251- is_initialization = self ._is_initialization_request (message )
252- if is_initialization :
253- # `initialize` is the negotiation, not a "subsequent request" — discard any
254- # probe-stamped value so the discover→fallback path can't leak it onto the handshake.
255- self ._protocol_version_header = None
256318 headers = self ._prepare_headers ()
257319 if ctx .metadata is not None and ctx .metadata .headers is not None :
258320 headers .update (ctx .metadata .headers )
259- if MCP_PROTOCOL_VERSION_HEADER in ctx .metadata .headers :
260- self ._protocol_version_header = ctx .metadata .headers [MCP_PROTOCOL_VERSION_HEADER ]
261321
262322 async with ctx .client .stream (
263323 "POST" ,
@@ -302,7 +362,7 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:
302362 await ctx .read_stream_writer .send (session_message )
303363 return
304364
305- if is_initialization :
365+ if self . _is_initialization_request ( message ) :
306366 self ._maybe_extract_session_id_from_response (response )
307367
308368 # Per https://modelcontextprotocol.io/specification/2025-06-18/basic#notifications:
@@ -455,6 +515,8 @@ async def post_writer(
455515
456516 async def _handle_message (session_message : SessionMessage ) -> None :
457517 message = session_message .message
518+ if self ._consume_modern_cancellation (session_message ):
519+ return
458520 metadata = (
459521 session_message .metadata
460522 if isinstance (session_message .metadata , ClientMessageMetadata )
@@ -470,6 +532,15 @@ async def _handle_message(session_message: SessionMessage) -> None:
470532 if self ._is_initialized_notification (message ):
471533 start_get_stream ()
472534
535+ if self ._is_initialization_request (message ):
536+ # `initialize` is the negotiation, not a "subsequent request" — discard any
537+ # probe-stamped value so the discover→fallback path can't leak it onto the handshake.
538+ self ._protocol_version_header = None
539+ elif metadata is not None and metadata .headers is not None :
540+ stamped_version = metadata .headers .get (MCP_PROTOCOL_VERSION_HEADER )
541+ if stamped_version is not None :
542+ self ._protocol_version_header = stamped_version
543+
473544 ctx = RequestContext (
474545 client = client ,
475546 session_id = self .session_id ,
@@ -486,7 +557,15 @@ async def handle_request_async():
486557
487558 # If this is a request, start a new task to handle it
488559 if isinstance (message , JSONRPCRequest ):
489- tg .start_soon (handle_request_async )
560+ # Register the abort scope before the spawn: the next
561+ # message through this loop can already be the abandon
562+ # signal for this id, ahead of the task ever running.
563+ post = _InFlightPost (
564+ scope = anyio .CancelScope (),
565+ modern = self ._protocol_version_header in MODERN_PROTOCOL_VERSIONS ,
566+ )
567+ self ._in_flight_posts [message .id ] = post
568+ tg .start_soon (self ._run_request_post , handle_request_async , post , message .id )
490569 else :
491570 await handle_request_async ()
492571
0 commit comments