diff --git a/src/adcp/__init__.py b/src/adcp/__init__.py index 93b8969e6..0af8b5008 100644 --- a/src/adcp/__init__.py +++ b/src/adcp/__init__.py @@ -34,6 +34,7 @@ ADCPError, ADCPFeatureUnsupportedError, ADCPProtocolError, + ADCPSigningRequiredError, ADCPTaskError, ADCPTimeoutError, ADCPToolNotFoundError, @@ -700,6 +701,7 @@ def get_adcp_version() -> str: "ADCPTimeoutError", "ADCPProtocolError", "ADCPToolNotFoundError", + "ADCPSigningRequiredError", "ADCPWebhookError", "ADCPWebhookSignatureError", "AdagentsValidationError", diff --git a/src/adcp/client.py b/src/adcp/client.py index 83ef63cbe..36479b3b2 100644 --- a/src/adcp/client.py +++ b/src/adcp/client.py @@ -11,16 +11,27 @@ import time from collections.abc import Callable, Iterator from datetime import datetime, timezone -from typing import Any +from typing import TYPE_CHECKING, Any from a2a.types import Task, TaskStatusUpdateEvent from pydantic import BaseModel +if TYPE_CHECKING: + import httpx + from adcp.capabilities import TASK_FEATURE_MAP, FeatureResolver from adcp.exceptions import ADCPError, ADCPWebhookSignatureError from adcp.protocols.a2a import A2AAdapter from adcp.protocols.base import ProtocolAdapter from adcp.protocols.mcp import MCPAdapter +from adcp.signing.autosign import ( + SigningConfig, + operation_needs_signing, +) +from adcp.signing.autosign import ( + current_operation as _signing_current_operation, +) +from adcp.signing.signer import sign_request from adcp.types import ( ActivateSignalRequest, ActivateSignalResponse, @@ -293,6 +304,7 @@ def __init__( capabilities_ttl: float = 3600.0, validate_features: bool = False, strict_idempotency: bool = False, + signing: SigningConfig | None = None, ): """ Initialize ADCP client for a single agent. @@ -316,6 +328,17 @@ def __init__( ``IdempotencyUnsupportedError`` if the declaration is missing — sellers that don't declare it provide no retry-safety guarantee per AdCP #2315. Defaults to False for backward compatibility. + signing: Optional RFC 9421 request-signing config. When provided, + the client automatically attaches ``Signature`` / + ``Signature-Input`` / ``Content-Digest`` headers to operations + the seller's ``request_signing`` capability lists in + ``required_for``, ``warn_for``, or ``supported_for``. The + seller's ``covers_content_digest`` policy determines whether + the body is bound to the signature. Generate a key with + ``adcp-keygen`` and publish the public JWK at your + ``jwks_uri``. Supported on both A2A and MCP + (``mcp_transport="streamable_http"``); SSE-transport MCP + logs a warning and falls through unsigned. """ self.agent_config = agent_config self.webhook_url_template = webhook_url_template @@ -325,6 +348,7 @@ def __init__( self.capabilities_ttl = capabilities_ttl self.validate_features = validate_features self.strict_idempotency = strict_idempotency + self.signing = signing # Capabilities cache self._capabilities: GetAdcpCapabilitiesResponse | None = None @@ -349,6 +373,8 @@ def __init__( self.adapter.idempotency_client_token = self._idempotency_client_token if strict_idempotency: self.adapter.idempotency_capability_check = self._ensure_idempotency_capability + if signing is not None: + self.adapter.signing_request_hook = self._sign_outgoing_request # Initialize simple API accessor (lazy import to avoid circular dependency) from adcp.simple import SimpleAPI @@ -392,6 +418,89 @@ async def _ensure_idempotency_capability(self) -> None: self._idempotency_capability_verified = False raise + async def _sign_outgoing_request(self, request: httpx.Request) -> None: + """httpx request event hook that attaches RFC 9421 signature headers. + + Installed on the protocol adapter's httpx client when a + ``SigningConfig`` was passed to ``ADCPClient``. Consults the + seller's advertised ``request_signing`` capability and signs only + the operations the seller listed in ``required_for``, ``warn_for``, + or ``supported_for`` — other requests (including the agent-card + fetch and ``get_adcp_capabilities`` itself) pass through unsigned. + The ``covers_content_digest`` tri-state determines whether the + body is bound to the signature. + """ + if self.signing is None: + return + operation = _signing_current_operation.get() + # Unset ContextVar → out-of-band call (agent-card fetch, session + # initialize, etc). Skip without fetching capabilities. + # + # get_adcp_capabilities → bootstrap carve-out: signing it would + # require capabilities we don't have yet, and if a pathological + # seller listed this op in its own required_for we'd recurse. + # Keep this check narrow — only operations strictly required to + # *obtain* capabilities belong here. Today that's just + # get_adcp_capabilities. A future adapter that adds another + # capabilities-precondition op MUST extend this guard. + if operation is None or operation == "get_adcp_capabilities": + return + + caps = await self.fetch_capabilities() + req_signing = getattr(caps, "request_signing", None) + + # Detect and surface a malformed seller config: supported=False is + # "signatures are ignored", but populating required_for alongside + # it is contradictory. The classifier correctly skips (matches + # verifier behavior) but the silent downgrade hides a config bug + # that will bite pilots. + if ( + req_signing is not None + and not req_signing.supported + and (req_signing.required_for or req_signing.warn_for) + ): + logger.warning( + "Seller %s advertises request_signing.supported=false but " + "populates required_for/warn_for — treating as unsupported " + "per spec. Verify the seller's capability advertisement.", + self.agent_config.id, + ) + + decision = operation_needs_signing(req_signing, operation) + if decision == "skip": + return + + covers_policy: str | None = None + if req_signing is not None and req_signing.covers_content_digest is not None: + covers_policy = req_signing.covers_content_digest.value + if covers_policy == "forbidden": + cover_digest = False + elif covers_policy == "required": + cover_digest = True + else: + # "either" or absent — signer's choice; default stricter. + cover_digest = True + + body = request.content + signed = sign_request( + method=request.method, + url=str(request.url), + headers=dict(request.headers), + body=body, + private_key=self.signing.private_key, + key_id=self.signing.key_id, + alg=self.signing.alg, + cover_content_digest=cover_digest, + tag=self.signing.tag, + ) + # pop-then-set ensures our signed values are authoritative even if + # another hook or earlier layer added a same-named header. httpx + # headers are a case-insensitive MultiDict, so a naive assignment + # could leave a duplicate value in a different case. + for header_name, header_value in signed.as_dict().items(): + request.headers.pop(header_name, None) + request.headers[header_name] = header_value + def get_webhook_url(self, task_type: str, operation_id: str) -> str: """Generate webhook URL for a task.""" if not self.webhook_url_template: @@ -3623,6 +3732,7 @@ def __init__( webhook_secret: str | None = None, on_activity: Callable[[Activity], None] | None = None, handlers: dict[str, Callable[..., Any]] | None = None, + signing: SigningConfig | None = None, ): """ Initialize multi-agent client. @@ -3633,6 +3743,9 @@ def __init__( webhook_secret: Secret for webhook verification on_activity: Callback for activity events handlers: Task completion handlers + signing: Optional RFC 9421 signing config forwarded to every + per-agent ADCPClient. The same identity signs traffic to + all agents. See ADCPClient.__init__ for details. """ self.agents = { agent.id: ADCPClient( @@ -3640,6 +3753,7 @@ def __init__( webhook_url_template=webhook_url_template, webhook_secret=webhook_secret, on_activity=on_activity, + signing=signing, ) for agent in agents } diff --git a/src/adcp/exceptions.py b/src/adcp/exceptions.py index 480aa2f88..e7a4fef5c 100644 --- a/src/adcp/exceptions.py +++ b/src/adcp/exceptions.py @@ -217,6 +217,34 @@ def __init__( super().__init__(message, agent_id, agent_uri, suggestion) +class ADCPSigningRequiredError(ADCPError): + """Raised when an operation in the seller's ``request_signing.required_for`` + is called without a ``SigningConfig`` on the client. + + Signing a ``required_for`` operation is mandatory — sending it unsigned + would produce a ``request_signature_required`` rejection from the seller. + Raising locally before the wire call saves a round-trip and gives the + caller a clear, actionable error. + """ + + def __init__( + self, + operation: str, + agent_id: str | None = None, + agent_uri: str | None = None, + ): + self.operation = operation + message = ( + f"Operation {operation!r} is in the seller's request_signing.required_for " + f"list; signing is mandatory but no SigningConfig was provided" + ) + suggestion = ( + "Pass signing=SigningConfig(private_key=..., key_id=...) when " + "constructing ADCPClient. See adcp-keygen for key generation." + ) + super().__init__(message, agent_id, agent_uri, suggestion) + + class AdagentsValidationError(ADCPError): """Base error for adagents.json validation issues.""" diff --git a/src/adcp/protocols/a2a.py b/src/adcp/protocols/a2a.py index 8887e48bb..a29baf50d 100644 --- a/src/adcp/protocols/a2a.py +++ b/src/adcp/protocols/a2a.py @@ -29,6 +29,7 @@ IdempotencyExpiredError, ) from adcp.protocols.base import ProtocolAdapter +from adcp.signing.autosign import current_operation as _signing_operation from adcp.types.core import AgentConfig, DebugInfo, TaskResult, TaskStatus logger = logging.getLogger(__name__) @@ -59,11 +60,33 @@ async def _get_httpx_client(self) -> httpx.AsyncClient: else: headers[self.agent_config.auth_header] = self.agent_config.auth_token - self._httpx_client = httpx.AsyncClient( - limits=limits, - headers=headers, - timeout=self.agent_config.timeout, - ) + # When ADCPClient installed a signing_request_hook, register it as + # an httpx request event hook so RFC 9421 signature headers are + # attached transparently to every outgoing request. The hook is + # a bound method, so each call reads the owning client's live + # state (signing config, cached capabilities) — it is *not* a + # snapshot. Out-of-band calls (e.g. agent-card fetch) no-op + # inside the hook because the autosign ContextVar isn't set. + # + # follow_redirects is forced off whenever signing is active: RFC + # 9421 binds the signature to the original `@authority`, so a 302 + # would forward stale signature bytes to a new host. httpx's + # current default is False already, but pinning it matches the + # MCP factory's invariant and protects against future upstream + # changes or a2a-sdk overrides. + event_hooks: dict[str, list[Any]] = {} + client_kwargs: dict[str, Any] = { + "limits": limits, + "headers": headers, + "timeout": self.agent_config.timeout, + } + if self.signing_request_hook is not None: + event_hooks["request"] = [self.signing_request_hook] + client_kwargs["follow_redirects"] = False + if event_hooks: + client_kwargs["event_hooks"] = event_hooks + + self._httpx_client = httpx.AsyncClient(**client_kwargs) logger.debug( f"Created HTTP client with connection pooling for agent {self.agent_config.id}" ) @@ -196,6 +219,12 @@ async def _call_a2a_tool( "params": _idempotency.redact_params(params), } + # Stamp the AdCP operation name so the httpx request event hook + # installed by ADCPClient for RFC 9421 auto-signing can look up the + # right signing policy. Set only around send_message so out-of-band + # httpx calls (the agent-card fetch above, or unrelated work on + # sibling tasks) stay outside the signing scope. + signing_token = _signing_operation.set(tool_name) try: # Use official A2A client sdk_response = await a2a_client.send_message(request) @@ -320,6 +349,8 @@ async def _call_a2a_tool( debug_info=debug_info, idempotency_key=idempotency_key, ) + finally: + _signing_operation.reset(signing_token) def _process_task_response(self, task: Task, debug_info: DebugInfo | None) -> TaskResult[Any]: """Process a Task response from A2A into our TaskResult format.""" diff --git a/src/adcp/protocols/base.py b/src/adcp/protocols/base.py index 55c732a67..a015430fd 100644 --- a/src/adcp/protocols/base.py +++ b/src/adcp/protocols/base.py @@ -4,13 +4,16 @@ from abc import ABC, abstractmethod from collections.abc import Awaitable, Callable -from typing import Any, TypeVar +from typing import TYPE_CHECKING, Any, TypeVar from pydantic import BaseModel from adcp.types.core import AgentConfig, TaskResult, TaskStatus from adcp.utils.response_parser import parse_json_or_text, parse_mcp_content +if TYPE_CHECKING: + import httpx + T = TypeVar("T", bound=BaseModel) @@ -34,6 +37,13 @@ def __init__(self, agent_config: AgentConfig): # ``use_idempotency_key`` so a key pinned on one client does not bleed # to sibling clients (cross-seller correlation risk per AdCP #2315). self.idempotency_client_token: str | None = None + # Optional httpx request event hook. ADCPClient installs one when a + # SigningConfig is present; the hook attaches RFC 9421 Signature-Input + # / Signature / Content-Digest headers to outgoing requests that the + # seller's capability policy says should be signed. A2A consumes this + # via its httpx client's event_hooks; MCP consumes it via a custom + # httpx_client_factory passed to streamablehttp_client. + self.signing_request_hook: Callable[[httpx.Request], Awaitable[None]] | None = None # ======================================================================== # Helper methods for response parsing diff --git a/src/adcp/protocols/mcp.py b/src/adcp/protocols/mcp.py index 1c4d21b41..e10b5b014 100644 --- a/src/adcp/protocols/mcp.py +++ b/src/adcp/protocols/mcp.py @@ -5,6 +5,7 @@ import asyncio import logging import time +from collections.abc import Awaitable, Callable from contextlib import AsyncExitStack from typing import TYPE_CHECKING, Any from urllib.parse import urlparse @@ -22,6 +23,7 @@ logger = logging.getLogger(__name__) if TYPE_CHECKING: + import httpx from mcp import ClientSession try: @@ -34,12 +36,14 @@ MCP_AVAILABLE = False try: + import httpx as _httpx from httpx import HTTPStatusError HTTPX_AVAILABLE = True except ImportError: HTTPX_AVAILABLE = False HTTPStatusError = None # type: ignore[assignment, misc] + _httpx = None # type: ignore[assignment] import json @@ -51,6 +55,7 @@ IdempotencyExpiredError, ) from adcp.protocols.base import ProtocolAdapter +from adcp.signing.autosign import current_operation as _signing_operation from adcp.types.core import DebugInfo, TaskResult, TaskStatus # Spec-defined limits from docs/building/implementation/mcp-response-extraction.mdx @@ -60,6 +65,43 @@ _MAX_ERROR_CODE_LEN = 64 +def _make_signing_http_factory( + hook: Callable[[httpx.Request], Awaitable[None]], +) -> Callable[..., httpx.AsyncClient]: + """Build an ``httpx_client_factory`` that installs a signing request hook. + + ``streamablehttp_client`` accepts a factory with signature + ``(headers, timeout, auth) -> httpx.AsyncClient``. Our factory forwards + those kwargs, registers the signing hook as an httpx request event + hook, and disables redirect following: an RFC 9421 signature binds the + original ``@authority``, so a 302 to a different host would send the + signed request onward with a stale authority and fail verification. + """ + + def factory( + headers: dict[str, str] | None = None, + timeout: httpx.Timeout | None = None, + auth: httpx.Auth | None = None, + **extra: Any, + ) -> httpx.AsyncClient: + # Forward any future MCP-SDK kwargs (e.g. verify=, cert=) verbatim + # so adding a new factory parameter upstream doesn't break signing. + kwargs: dict[str, Any] = { + "follow_redirects": False, + "event_hooks": {"request": [hook]}, + **extra, + } + if timeout is not None: + kwargs["timeout"] = timeout + if headers is not None: + kwargs["headers"] = headers + if auth is not None: + kwargs["auth"] = auth + return _httpx.AsyncClient(**kwargs) + + return factory + + def _text_of(item: Any) -> str | None: """Return the text payload of an MCP content item, or None if not a text item.""" if isinstance(item, dict): @@ -302,6 +344,24 @@ async def _get_session(self) -> ClientSession: base_uri = self.agent_config.agent_uri.rstrip("/") urls_to_try.append(f"{base_uri}/mcp") + # RFC 9421 auto-signing: if ADCPClient installed a signing request + # hook, wire it into streamable_http via a custom httpx client + # factory. SSE transport has no equivalent knob — warn the user + # and fall through to unsigned SSE. + streamable_http_extra: dict[str, Any] = {} + if self.signing_request_hook is not None: + if self.agent_config.mcp_transport == "streamable_http": + streamable_http_extra["httpx_client_factory"] = ( + _make_signing_http_factory(self.signing_request_hook) + ) + else: + logger.warning( + "RFC 9421 auto-signing is not supported on MCP SSE " + "transport for agent %s; use mcp_transport='streamable_http' " + "to sign outgoing requests.", + self.agent_config.id, + ) + last_error = None for url in urls_to_try: try: @@ -310,7 +370,10 @@ async def _get_session(self) -> ClientSession: # Use streamable HTTP transport (newer, bidirectional) read, write, _get_session_id = await self._exit_stack.enter_async_context( streamablehttp_client( - url, headers=headers, timeout=self.agent_config.timeout + url, + headers=headers, + timeout=self.agent_config.timeout, + **streamable_http_extra, ) ) else: @@ -443,8 +506,17 @@ async def _call_mcp_tool(self, tool_name: str, params: dict[str, Any]) -> TaskRe "transport": self.agent_config.mcp_transport, } - # Call the tool using MCP client session - result = await session.call_tool(tool_name, params) + # Stamp the AdCP operation name so the httpx request event hook + # installed by ADCPClient (when a SigningConfig is present) can + # look up the seller's signing policy for this call. Scoped + # tightly around call_tool so session.initialize() above and + # other out-of-band traffic stay outside the signing scope. + signing_token = _signing_operation.set(tool_name) + try: + # Call the tool using MCP client session + result = await session.call_tool(tool_name, params) + finally: + _signing_operation.reset(signing_token) # Check if this is an error response is_error = hasattr(result, "isError") and result.isError diff --git a/src/adcp/signing/__init__.py b/src/adcp/signing/__init__.py index 00d0bd8a4..abf6ac49d 100644 --- a/src/adcp/signing/__init__.py +++ b/src/adcp/signing/__init__.py @@ -6,6 +6,11 @@ from __future__ import annotations +from adcp.signing.autosign import ( + SigningConfig, + SigningDecision, + operation_needs_signing, +) from adcp.signing.canonical import ( SignatureInputLabel, build_signature_base, @@ -121,6 +126,8 @@ "SignatureInputLabel", "SignatureVerificationError", "SignedHeaders", + "SigningConfig", + "SigningDecision", "StaticJwksResolver", "VerifiedSigner", "VerifierCapability", @@ -136,6 +143,7 @@ "default_jwks_fetcher", "extract_signature_bytes", "format_signature_header", + "operation_needs_signing", "parse_signature_input_header", "private_key_from_jwk", "public_key_from_jwk", diff --git a/src/adcp/signing/autosign.py b/src/adcp/signing/autosign.py new file mode 100644 index 000000000..575142964 --- /dev/null +++ b/src/adcp/signing/autosign.py @@ -0,0 +1,170 @@ +"""Client-side orchestration for auto-signing outgoing AdCP requests. + +`SigningConfig` bundles the private key and key-id a client uses to sign. +`operation_needs_signing` reads the seller's advertised ``request_signing`` +capability block and classifies each outgoing operation as required, +optional, or skip — letting the caller decide whether to invoke +`sign_request` and what to do if no config is available. + +The pure `sign_request` primitive lives in `adcp.signing.signer`; this +module is the thin glue between capabilities and that primitive. +""" + +from __future__ import annotations + +from contextvars import ContextVar +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Literal + +from adcp.signing.constants import DEFAULT_TAG +from adcp.signing.crypto import ALG_ED25519, ALLOWED_ALGS, PrivateKey + +if TYPE_CHECKING: + from adcp.types.generated_poc.protocol.get_adcp_capabilities_response import ( + RequestSigning, + ) + + +# The operation name the adapter is currently dispatching. The A2A / MCP +# adapters set this before invoking their respective SDKs so the shared +# httpx request event hook can look up the seller's signing policy for the +# right operation name. Unset (default None) during out-of-band HTTP calls +# like A2A agent-card fetches, which the hook then skips. +# +# Invariant: do NOT spawn background tasks that make unrelated httpx +# calls inside the scope where this var is set. ContextVar values copy +# into child tasks on ``asyncio.create_task``, so a background task +# spawned inside a signing scope would inherit the operation name and +# sign a request the caller didn't intend to sign. +current_operation: ContextVar[str | None] = ContextVar( + "adcp_signing_current_operation", default=None +) + + +SigningDecision = Literal["required", "optional", "skip"] +"""Outcome of classifying an operation against a seller's signing policy. + +* ``"required"`` — the seller has listed the operation in ``required_for``. + The client MUST sign; if no ``SigningConfig`` is available the caller + should raise rather than send an unsigned request the seller will reject. +* ``"optional"`` — the seller has listed the operation in ``warn_for`` or + ``supported_for``. Sign if a ``SigningConfig`` is available; skip without + error otherwise (per spec: ``warn_for`` logs failures but does not reject; + ``supported_for`` accepts both signed and unsigned). +* ``"skip"`` — the operation is not in any list, or the seller does not + advertise signing support at all. Do not sign. +""" + + +@dataclass(frozen=True) +class SigningConfig: + """Client-side signing credentials for RFC 9421 request signing. + + Passed to ``ADCPClient`` / ``ADCPMultiAgentClient`` at construction. + The same config signs traffic to every agent the client talks to — + the buyer is one identity from the seller's point of view. + + Parameters + ---------- + private_key: + The signing key (``Ed25519PrivateKey`` or ``EllipticCurvePrivateKey`` + on P-256). Generated via the ``adcp-keygen`` CLI or loaded from an + existing PEM via ``cryptography.hazmat.primitives.serialization``. + key_id: + The ``kid`` value the verifier will look up in the seller-side JWKS. + Must match the ``kid`` of the public key published at the buyer's + ``jwks_uri`` (advertised in the buyer's ``brand.json``). + alg: + RFC 9421 algorithm identifier. One of ``"ed25519"`` or + ``"ecdsa-p256-sha256"``. Defaults to ``"ed25519"``. + tag: + Signature tag. Defaults to the AdCP request-signing tag and should + not need to be overridden. + """ + + private_key: PrivateKey + key_id: str + alg: str = ALG_ED25519 + tag: str = DEFAULT_TAG + + def __post_init__(self) -> None: + if self.alg not in ALLOWED_ALGS: + raise ValueError( + f"alg must be one of {sorted(ALLOWED_ALGS)}, got {self.alg!r}" + ) + if not self.key_id: + raise ValueError("key_id must be a non-empty string") + + def __repr__(self) -> str: + # Redact the private key from string representations so accidental + # logging of the config (or of an exception that closes over it) + # doesn't surface key material even in summary form. + return ( + f"SigningConfig(key_id={self.key_id!r}, alg={self.alg!r}, " + f"tag={self.tag!r}, private_key=)" + ) + + +@dataclass(frozen=True) +class _OperationLists: + """Normalized operation lists extracted from a RequestSigning block. + + The generated pydantic model uses ``list[str] | None`` with ``None`` + and ``[]`` both representing "no operations". This helper flattens to + frozensets for O(1) membership checks. + """ + + required: frozenset[str] = field(default_factory=frozenset) + warn: frozenset[str] = field(default_factory=frozenset) + supported: frozenset[str] = field(default_factory=frozenset) + + +def _extract(capability: RequestSigning) -> _OperationLists: + return _OperationLists( + required=frozenset(capability.required_for or ()), + warn=frozenset(capability.warn_for or ()), + supported=frozenset(capability.supported_for or ()), + ) + + +def operation_needs_signing( + capability: RequestSigning | None, + operation: str, +) -> SigningDecision: + """Classify whether to sign an outgoing operation against a seller's policy. + + Precedence follows the spec: ``required_for`` > ``warn_for`` > + ``supported_for``. An operation named in ``required_for`` is classified + ``"required"`` even if it also appears in the weaker lists. + + If ``capability`` is ``None`` (the seller doesn't advertise a + ``request_signing`` block at all) or ``capability.supported`` is + ``False``, always returns ``"skip"``. + + Parameters + ---------- + capability: + The ``request_signing`` block from the seller's + ``get_adcp_capabilities`` response, or ``None``. + operation: + The AdCP protocol operation name, e.g. ``"create_media_buy"``. Not + an MCP tool name or A2A skill name — the verifier compares against + the protocol names. + """ + if capability is None or not capability.supported: + return "skip" + + lists = _extract(capability) + if operation in lists.required: + return "required" + if operation in lists.warn or operation in lists.supported: + return "optional" + return "skip" + + +__all__ = [ + "SigningConfig", + "SigningDecision", + "current_operation", + "operation_needs_signing", +] diff --git a/tests/conformance/signing/test_autosign.py b/tests/conformance/signing/test_autosign.py new file mode 100644 index 000000000..ef5b684af --- /dev/null +++ b/tests/conformance/signing/test_autosign.py @@ -0,0 +1,140 @@ +"""Unit tests for the autosign capability classifier + SigningConfig. + +These test the client-side orchestration layer — no wire calls, no +verifier interaction. They establish the precedence rules documented in +the AdCP security profile (required_for > warn_for > supported_for) and +the invariants on SigningConfig. +""" + +from __future__ import annotations + +import pytest +from cryptography.hazmat.primitives.asymmetric import ec, ed25519 + +from adcp.signing import SigningConfig, operation_needs_signing +from adcp.signing.crypto import ALG_ED25519, ALG_ES256 +from adcp.types.generated_poc.protocol.get_adcp_capabilities_response import ( + CoversContentDigest, + RequestSigning, +) + +# -- helpers ------------------------------------------------------------ + + +def _cap( + *, + supported: bool = True, + required: list[str] | None = None, + warn: list[str] | None = None, + supported_for: list[str] | None = None, + covers: CoversContentDigest = CoversContentDigest.either, +) -> RequestSigning: + return RequestSigning( + supported=supported, + covers_content_digest=covers, + required_for=required or [], + warn_for=warn or [], + supported_for=supported_for, + ) + + +# -- operation_needs_signing -------------------------------------------- + + +def test_skip_when_capability_absent() -> None: + assert operation_needs_signing(None, "create_media_buy") == "skip" + + +def test_skip_when_capability_unsupported() -> None: + cap = _cap(supported=False, required=["create_media_buy"]) + assert operation_needs_signing(cap, "create_media_buy") == "skip" + + +def test_required_when_op_in_required_for() -> None: + cap = _cap(required=["create_media_buy"]) + assert operation_needs_signing(cap, "create_media_buy") == "required" + + +def test_optional_when_op_in_warn_for_only() -> None: + cap = _cap(warn=["sync_creatives"]) + assert operation_needs_signing(cap, "sync_creatives") == "optional" + + +def test_optional_when_op_in_supported_for_only() -> None: + cap = _cap(supported_for=["get_products"]) + assert operation_needs_signing(cap, "get_products") == "optional" + + +def test_skip_when_op_in_no_list() -> None: + cap = _cap(required=["create_media_buy"]) + assert operation_needs_signing(cap, "get_products") == "skip" + + +def test_required_wins_over_warn() -> None: + # required_for takes precedence even when the op also appears in warn_for. + cap = _cap(required=["create_media_buy"], warn=["create_media_buy"]) + assert operation_needs_signing(cap, "create_media_buy") == "required" + + +def test_required_wins_over_supported() -> None: + cap = _cap( + required=["create_media_buy"], + supported_for=["create_media_buy", "get_products"], + ) + assert operation_needs_signing(cap, "create_media_buy") == "required" + + +def test_warn_and_supported_both_optional() -> None: + # Spec allows supported_for to be a superset of warn_for. + cap = _cap(warn=["sync_creatives"], supported_for=["sync_creatives"]) + assert operation_needs_signing(cap, "sync_creatives") == "optional" + + +def test_empty_lists_and_none_lists_equivalent() -> None: + cap_empty = _cap(required=[], warn=[], supported_for=[]) + cap_none = RequestSigning( + supported=True, + covers_content_digest=CoversContentDigest.either, + required_for=None, + warn_for=None, + supported_for=None, + ) + assert operation_needs_signing(cap_empty, "anything") == "skip" + assert operation_needs_signing(cap_none, "anything") == "skip" + + +# -- SigningConfig ------------------------------------------------------ + + +def test_signing_config_accepts_ed25519_key() -> None: + key = ed25519.Ed25519PrivateKey.generate() + cfg = SigningConfig(private_key=key, key_id="buyer-1") + assert cfg.alg == ALG_ED25519 + assert cfg.key_id == "buyer-1" + assert cfg.private_key is key + + +def test_signing_config_accepts_es256_key() -> None: + key = ec.generate_private_key(ec.SECP256R1()) + cfg = SigningConfig(private_key=key, key_id="buyer-2", alg=ALG_ES256) + assert cfg.alg == ALG_ES256 + assert cfg.key_id == "buyer-2" + + +def test_signing_config_rejects_empty_key_id() -> None: + key = ed25519.Ed25519PrivateKey.generate() + with pytest.raises(ValueError, match="key_id"): + SigningConfig(private_key=key, key_id="") + + +def test_signing_config_rejects_unknown_alg() -> None: + key = ed25519.Ed25519PrivateKey.generate() + with pytest.raises(ValueError, match="alg"): + SigningConfig(private_key=key, key_id="buyer-1", alg="hs256") + + +def test_signing_config_is_frozen() -> None: + key = ed25519.Ed25519PrivateKey.generate() + cfg = SigningConfig(private_key=key, key_id="buyer-1") + with pytest.raises((AttributeError, Exception)): + cfg.key_id = "mutated" # type: ignore[misc] diff --git a/tests/conformance/signing/test_autosign_e2e.py b/tests/conformance/signing/test_autosign_e2e.py new file mode 100644 index 000000000..ec539ebf0 --- /dev/null +++ b/tests/conformance/signing/test_autosign_e2e.py @@ -0,0 +1,242 @@ +"""End-to-end smoke test: auto-sign hook through real httpx to a verifier. + +The unit tests in ``test_autosign_hook.py`` call +``ADCPClient._sign_outgoing_request`` directly against a synthetic +``httpx.Request``. This file pins the layer above: httpx actually invokes +our request event hook before writing bytes, and the bytes it writes are +accepted by a live RFC 9421 verifier behind an ASGI app. If the hook +ever stops firing on the real wire path (e.g., an upstream httpx change +that drops event hooks in a code path we care about), these tests catch +it. + +Uses Starlette directly (not FastAPI) so the test runs without the +optional FastAPI dependency — Starlette is already a transitive dep of +the project's core SDKs. +""" + +from __future__ import annotations + +import json +import time +from pathlib import Path +from unittest.mock import AsyncMock + +import httpx +import pytest +from starlette.applications import Starlette +from starlette.requests import Request +from starlette.responses import JSONResponse +from starlette.routing import Route + +from adcp.client import ADCPClient +from adcp.signing import ( + SignatureVerificationError, + SigningConfig, + StaticJwksResolver, + VerifierCapability, + VerifyOptions, + private_key_from_jwk, + unauthorized_response_headers, + verify_starlette_request, +) +from adcp.signing.autosign import current_operation +from adcp.types.core import AgentConfig, Protocol +from adcp.types.generated_poc.protocol.get_adcp_capabilities_response import ( + Adcp, + CoversContentDigest, + GetAdcpCapabilitiesResponse, + Idempotency, + MajorVersion, + RequestSigning, + SupportedProtocol, +) + +VECTORS_DIR = Path(__file__).parent.parent / "vectors" / "request-signing" +KEYS = json.loads((VECTORS_DIR / "keys.json").read_text())["keys"] +ED25519_KEY = next(k for k in KEYS if k["kid"] == "test-ed25519-2026") + + +def _build_verifier_app( + *, + covers: str, + required_for: frozenset[str], +) -> Starlette: + jwks_resolver = StaticJwksResolver({"keys": [ED25519_KEY]}) + + async def verify(request: Request) -> JSONResponse: + operation = request.path_params["operation"] + options = VerifyOptions( + now=float(int(time.time())), + capability=VerifierCapability( + covers_content_digest=covers, # type: ignore[arg-type] + required_for=required_for, + ), + operation=operation, + jwks_resolver=jwks_resolver, + ) + try: + signer = await verify_starlette_request(request, options=options) + except SignatureVerificationError as exc: + return JSONResponse( + {"error": exc.code, "message": str(exc)}, + status_code=401, + headers=unauthorized_response_headers(exc), + ) + return JSONResponse({"verified_key_id": signer.key_id, "alg": signer.alg}) + + return Starlette(routes=[Route("/adcp/{operation}", verify, methods=["POST"])]) + + +def _make_caps( + *, + required: list[str] | None = None, + supported_for: list[str] | None = None, + covers: CoversContentDigest = CoversContentDigest.either, +) -> GetAdcpCapabilitiesResponse: + return GetAdcpCapabilitiesResponse( + adcp=Adcp( + major_versions=[MajorVersion(root=3)], + idempotency=Idempotency(replay_ttl_seconds=86400), + ), + supported_protocols=[SupportedProtocol.media_buy], + request_signing=RequestSigning( + supported=True, + covers_content_digest=covers, + required_for=required or [], + warn_for=[], + supported_for=supported_for, + ), + ) + + +@pytest.fixture() +def signing_config() -> SigningConfig: + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + return SigningConfig(private_key=private_key, key_id=ED25519_KEY["kid"]) + + +@pytest.fixture() +def signing_client(signing_config: SigningConfig) -> ADCPClient: + agent = AgentConfig( + id="smoke-seller", + agent_uri="http://verifier.test", + protocol=Protocol.A2A, + ) + return ADCPClient(agent, signing=signing_config) + + +@pytest.mark.parametrize( + ("covers", "required", "operation"), + [ + (CoversContentDigest.either, ["create_media_buy"], "create_media_buy"), + (CoversContentDigest.required, ["create_media_buy"], "create_media_buy"), + (CoversContentDigest.forbidden, ["create_media_buy"], "create_media_buy"), + ], +) +async def test_hook_on_real_httpx_round_trip_accepted_by_verifier( + signing_client: ADCPClient, + covers: CoversContentDigest, + required: list[str], + operation: str, +) -> None: + """With the hook installed on a real AsyncClient, the server verifies the request. + + This is the load-bearing contract: httpx runs the hook, the hook + mutates the request headers, those bytes reach the server, and the + server's RFC 9421 verifier accepts them. + """ + signing_client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps(required=required, covers=covers) + ) + verifier_covers = covers.value if covers is not None else "either" + app = _build_verifier_app( + covers=verifier_covers, + required_for=frozenset(required), + ) + transport = httpx.ASGITransport(app=app) + + async with httpx.AsyncClient( + transport=transport, + base_url="http://verifier.test", + event_hooks={"request": [signing_client._sign_outgoing_request]}, + ) as client: + token = current_operation.set(operation) + try: + resp = await client.post( + f"/adcp/{operation}", + content=b'{"plan_id":"p1"}', + headers={"Content-Type": "application/json"}, + ) + finally: + current_operation.reset(token) + + assert resp.status_code == 200, resp.text + assert resp.json()["verified_key_id"] == "test-ed25519-2026" + + +async def test_hook_skips_when_context_var_unset_server_rejects( + signing_client: ADCPClient, +) -> None: + """With ContextVar unset (simulating an out-of-band call like agent-card + fetch), the hook does nothing — the server gets an unsigned request and + rejects with request_signature_required.""" + signing_client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps(required=["create_media_buy"]) + ) + app = _build_verifier_app( + covers="either", required_for=frozenset({"create_media_buy"}) + ) + transport = httpx.ASGITransport(app=app) + + async with httpx.AsyncClient( + transport=transport, + base_url="http://verifier.test", + event_hooks={"request": [signing_client._sign_outgoing_request]}, + ) as client: + # No current_operation.set — the hook will see None and skip. + resp = await client.post( + "/adcp/create_media_buy", + content=b'{"plan_id":"p1"}', + headers={"Content-Type": "application/json"}, + ) + + assert resp.status_code == 401 + assert resp.json()["error"] == "request_signature_required" + + +async def test_hook_skips_get_adcp_capabilities_server_rejects_if_listed( + signing_client: ADCPClient, +) -> None: + """Bootstrap carve-out: get_adcp_capabilities is never signed even if the + seller pathologically listed it in required_for. The server would + reject in that case — which proves the carve-out is operating (the + client is NOT sending a signature, even though it has capabilities + advertising get_adcp_capabilities as required).""" + signing_client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps(required=["get_adcp_capabilities"]) + ) + app = _build_verifier_app( + covers="either", + required_for=frozenset({"get_adcp_capabilities"}), + ) + transport = httpx.ASGITransport(app=app) + + async with httpx.AsyncClient( + transport=transport, + base_url="http://verifier.test", + event_hooks={"request": [signing_client._sign_outgoing_request]}, + ) as client: + token = current_operation.set("get_adcp_capabilities") + try: + resp = await client.post( + "/adcp/get_adcp_capabilities", + content=b"{}", + headers={"Content-Type": "application/json"}, + ) + finally: + current_operation.reset(token) + + # Server sees no signature → 401. Test confirms the bootstrap carve-out + # short-circuited before signing (otherwise server would have accepted). + assert resp.status_code == 401 + assert resp.json()["error"] == "request_signature_required" diff --git a/tests/conformance/signing/test_autosign_hook.py b/tests/conformance/signing/test_autosign_hook.py new file mode 100644 index 000000000..b9bc9a3bd --- /dev/null +++ b/tests/conformance/signing/test_autosign_hook.py @@ -0,0 +1,469 @@ +"""Tests for the ADCPClient auto-sign httpx request event hook. + +These exercise ``ADCPClient._sign_outgoing_request`` directly: build a real +``httpx.Request``, call the hook with a controlled capability response and +``current_operation`` ContextVar, then verify the resulting signature with +the RFC 9421 verifier. The goal is to prove the client-side orchestration +produces bytes the server-side verifier accepts — without standing up an +A2A SDK or a full test server. +""" + +from __future__ import annotations + +import json +import time +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock + +import httpx +import pytest + +from adcp.client import ADCPClient +from adcp.signing import ( + SigningConfig, + StaticJwksResolver, + VerifierCapability, + VerifyOptions, + private_key_from_jwk, + verify_request_signature, +) +from adcp.signing.autosign import current_operation +from adcp.types.core import AgentConfig, Protocol +from adcp.types.generated_poc.protocol.get_adcp_capabilities_response import ( + Adcp, + CoversContentDigest, + GetAdcpCapabilitiesResponse, + Idempotency, + MajorVersion, + RequestSigning, + SupportedProtocol, +) + +VECTORS_DIR = Path(__file__).parent.parent / "vectors" / "request-signing" +KEYS = json.loads((VECTORS_DIR / "keys.json").read_text())["keys"] +ED25519_KEY = next(k for k in KEYS if k["kid"] == "test-ed25519-2026") + + +# -- fixtures ------------------------------------------------------------ + + +def _make_client(signing: SigningConfig | None = None) -> ADCPClient: + agent = AgentConfig( + id="test-seller", + agent_uri="https://seller.example.com", + protocol=Protocol.A2A, + ) + return ADCPClient(agent, signing=signing) + + +def _make_caps( + *, + required: list[str] | None = None, + warn: list[str] | None = None, + supported_for: list[str] | None = None, + covers: CoversContentDigest = CoversContentDigest.either, + signing_supported: bool = True, +) -> GetAdcpCapabilitiesResponse: + return GetAdcpCapabilitiesResponse( + adcp=Adcp( + major_versions=[MajorVersion(root=3)], + idempotency=Idempotency(replay_ttl_seconds=86400), + ), + supported_protocols=[SupportedProtocol.media_buy], + request_signing=RequestSigning( + supported=signing_supported, + covers_content_digest=covers, + required_for=required or [], + warn_for=warn or [], + supported_for=supported_for, + ), + ) + + +def _build_request( + url: str = "https://seller.example.com/adcp/create_media_buy", + body: bytes = b'{"plan_id":"p1"}', +) -> httpx.Request: + # httpx.Request constructor populates headers and content eagerly, + # matching the shape the event hook receives during real dispatch. + return httpx.Request( + method="POST", + url=url, + headers={"Content-Type": "application/json"}, + content=body, + ) + + +def _verify( + request: httpx.Request, + body: bytes, + *, + operation: str, + covers_policy: str = "either", + required_for: frozenset[str] = frozenset(), +) -> None: + jwks_resolver = StaticJwksResolver({"keys": [ED25519_KEY]}) + options = VerifyOptions( + now=float(int(time.time())), + capability=VerifierCapability( + covers_content_digest=covers_policy, # type: ignore[arg-type] + required_for=required_for, + ), + operation=operation, + jwks_resolver=jwks_resolver, + ) + verify_request_signature( + method=request.method, + url=str(request.url), + headers=dict(request.headers), + body=body, + options=options, + ) + + +@pytest.fixture() +def signing_config() -> SigningConfig: + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + return SigningConfig(private_key=private_key, key_id=ED25519_KEY["kid"]) + + +# -- construction -------------------------------------------------------- + + +def test_no_signing_kwarg_leaves_adapter_hook_unset() -> None: + client = _make_client() + assert client.signing is None + assert client.adapter.signing_request_hook is None + + +def test_signing_kwarg_installs_adapter_hook(signing_config: SigningConfig) -> None: + client = _make_client(signing=signing_config) + assert client.signing is signing_config + assert client.adapter.signing_request_hook is not None + + +# -- hook: skip paths ---------------------------------------------------- + + +async def test_hook_skips_when_signing_is_none() -> None: + # No signing config → hook itself isn't installed by __init__, + # but the method is still callable as a bound method for clarity: + # invoking it manually should return without mutating headers. + client = _make_client() + request = _build_request() + before = dict(request.headers) + # current_operation set; signing None → still skip. + token = current_operation.set("create_media_buy") + try: + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + assert dict(request.headers) == before + + +async def test_hook_skips_when_context_var_unset(signing_config: SigningConfig) -> None: + client = _make_client(signing=signing_config) + request = _build_request() + before = dict(request.headers) + # No current_operation set → simulated out-of-band request. + await client._sign_outgoing_request(request) + assert dict(request.headers) == before + + +async def test_hook_skips_for_get_adcp_capabilities(signing_config: SigningConfig) -> None: + client = _make_client(signing=signing_config) + request = _build_request() + before = dict(request.headers) + token = current_operation.set("get_adcp_capabilities") + try: + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + assert dict(request.headers) == before + + +async def test_hook_skips_when_op_not_in_any_list(signing_config: SigningConfig) -> None: + client = _make_client(signing=signing_config) + client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps(required=["create_media_buy"]) + ) + request = _build_request(url="https://seller.example.com/adcp/get_products") + token = current_operation.set("get_products") + try: + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + assert "Signature" not in request.headers + assert "Signature-Input" not in request.headers + + +async def test_hook_skips_when_seller_unsupported(signing_config: SigningConfig) -> None: + client = _make_client(signing=signing_config) + client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps(signing_supported=False, required=["create_media_buy"]) + ) + request = _build_request() + token = current_operation.set("create_media_buy") + try: + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + assert "Signature" not in request.headers + + +# -- hook: sign paths ---------------------------------------------------- + + +async def test_hook_signs_required_for_op_and_server_verifies( + signing_config: SigningConfig, +) -> None: + client = _make_client(signing=signing_config) + client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps(required=["create_media_buy"]) + ) + body = b'{"plan_id":"p1"}' + request = _build_request(body=body) + token = current_operation.set("create_media_buy") + try: + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + + assert "Signature" in request.headers + assert "Signature-Input" in request.headers + # covers_content_digest defaults to "either" → signer's choice → + # default stricter (cover), so Content-Digest must be present. + assert "Content-Digest" in request.headers + _verify( + request, + body, + operation="create_media_buy", + required_for=frozenset({"create_media_buy"}), + ) + + +async def test_hook_signs_supported_for_op(signing_config: SigningConfig) -> None: + client = _make_client(signing=signing_config) + client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps(supported_for=["sync_creatives"]) + ) + body = b'{"creatives":[]}' + request = _build_request( + url="https://seller.example.com/adcp/sync_creatives", + body=body, + ) + token = current_operation.set("sync_creatives") + try: + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + + assert "Signature" in request.headers + _verify(request, body, operation="sync_creatives") + + +async def test_hook_signs_warn_for_op(signing_config: SigningConfig) -> None: + client = _make_client(signing=signing_config) + client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps(warn=["update_media_buy"]) + ) + body = b'{"media_buy_id":"m1"}' + request = _build_request( + url="https://seller.example.com/adcp/update_media_buy", + body=body, + ) + token = current_operation.set("update_media_buy") + try: + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + + assert "Signature" in request.headers + _verify(request, body, operation="update_media_buy") + + +# -- hook: covers_content_digest tri-state ------------------------------ + + +async def test_hook_honors_covers_required(signing_config: SigningConfig) -> None: + client = _make_client(signing=signing_config) + client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps( + required=["create_media_buy"], + covers=CoversContentDigest.required, + ) + ) + body = b'{"plan_id":"p1"}' + request = _build_request(body=body) + token = current_operation.set("create_media_buy") + try: + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + + assert "Content-Digest" in request.headers + _verify( + request, + body, + operation="create_media_buy", + covers_policy="required", + required_for=frozenset({"create_media_buy"}), + ) + + +async def test_hook_honors_covers_forbidden(signing_config: SigningConfig) -> None: + client = _make_client(signing=signing_config) + client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps( + required=["create_media_buy"], + covers=CoversContentDigest.forbidden, + ) + ) + body = b'{"plan_id":"p1"}' + request = _build_request(body=body) + token = current_operation.set("create_media_buy") + try: + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + + # covers_content_digest=forbidden → sign WITHOUT binding body. + assert "Signature" in request.headers + assert "Content-Digest" not in request.headers + _verify( + request, + body, + operation="create_media_buy", + covers_policy="forbidden", + required_for=frozenset({"create_media_buy"}), + ) + + +# -- invariants --------------------------------------------------------- + + +async def test_clearing_signing_post_init_silently_skips( + signing_config: SigningConfig, +) -> None: + # ``signing`` is a public attribute; if a caller clears it after + # construction the hook must short-circuit cleanly rather than raise + # or produce a partial signature. The earliest guard in the hook + # (``self.signing is None``) enforces this. + client = _make_client(signing=signing_config) + client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps(required=["create_media_buy"]) + ) + client.signing = None + request = _build_request() + token = current_operation.set("create_media_buy") + try: + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + assert "Signature" not in request.headers + + +async def test_hook_warns_on_contradictory_seller_policy( + signing_config: SigningConfig, + caplog: pytest.LogCaptureFixture, +) -> None: + # supported=False + non-empty required_for is a seller-config error. + # The classifier correctly skips, but a silent skip hides a misconfig + # that will bite pilots — surface it as a warning. + import logging as _logging + + client = _make_client(signing=signing_config) + client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps( + signing_supported=False, required=["create_media_buy"] + ) + ) + request = _build_request() + token = current_operation.set("create_media_buy") + try: + with caplog.at_level(_logging.WARNING, logger="adcp.client"): + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + + assert "Signature" not in request.headers + warnings = [r for r in caplog.records if r.levelno == _logging.WARNING] + assert any( + "supported=false" in r.getMessage().lower() for r in warnings + ), [r.getMessage() for r in warnings] + + +# -- concurrency -------------------------------------------------------- + + +async def test_context_var_isolates_concurrent_calls( + signing_config: SigningConfig, +) -> None: + """Two tasks dispatching different operations must see their own names. + + The ``current_operation`` ContextVar's whole purpose (vs a thread-local + or instance attribute) is to isolate per-task values across + ``asyncio.gather``. This test pins that invariant: concurrent hook + invocations with different operations each sign with the correct + operation context. + """ + import asyncio + + client = _make_client(signing=signing_config) + client.fetch_capabilities = AsyncMock( # type: ignore[method-assign] + return_value=_make_caps( + required=["create_media_buy"], + supported_for=["sync_creatives"], + ) + ) + + async def _dispatch(op: str, body: bytes) -> httpx.Request: + request = _build_request( + url=f"https://seller.example.com/adcp/{op}", body=body + ) + token = current_operation.set(op) + try: + # Yield to let the other task interleave between the set and + # the hook body, stressing the isolation invariant. + await asyncio.sleep(0) + await client._sign_outgoing_request(request) + finally: + current_operation.reset(token) + return request + + req_create, req_sync = await asyncio.gather( + _dispatch("create_media_buy", b'{"plan_id":"p1"}'), + _dispatch("sync_creatives", b'{"creatives":[]}'), + ) + + # Each request's signature must validate for its own operation name. + _verify( + req_create, + b'{"plan_id":"p1"}', + operation="create_media_buy", + required_for=frozenset({"create_media_buy"}), + ) + _verify(req_sync, b'{"creatives":[]}', operation="sync_creatives") + + +# -- multi-agent forwarding --------------------------------------------- + + +def test_multi_agent_client_forwards_signing(signing_config: SigningConfig) -> None: + from adcp import ADCPMultiAgentClient + + agents = [ + AgentConfig(id="a", agent_uri="https://a.example", protocol=Protocol.A2A), + AgentConfig(id="b", agent_uri="https://b.example", protocol=Protocol.A2A), + ] + multi = ADCPMultiAgentClient(agents=agents, signing=signing_config) + assert multi.agent("a").signing is signing_config + assert multi.agent("b").signing is signing_config + assert multi.agent("a").adapter.signing_request_hook is not None + assert multi.agent("b").adapter.signing_request_hook is not None + + +# -- suppress fixture warnings for unused params ------------------------ + +_ = Any # silence flake on module-level Any import diff --git a/tests/conformance/signing/test_autosign_mcp.py b/tests/conformance/signing/test_autosign_mcp.py new file mode 100644 index 000000000..5397046f8 --- /dev/null +++ b/tests/conformance/signing/test_autosign_mcp.py @@ -0,0 +1,224 @@ +"""Tests for MCP adapter auto-signing wiring. + +The hook behavior itself (``ADCPClient._sign_outgoing_request``) is covered +by ``test_autosign_hook.py`` — both adapters share it. These tests focus +on MCP-specific plumbing: the custom ``httpx_client_factory`` that +``streamablehttp_client`` receives, the SSE-transport warning path, and +the ``current_operation`` ContextVar scope around ``session.call_tool``. +""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx +import pytest + +from adcp.client import ADCPClient +from adcp.protocols.mcp import MCPAdapter, _make_signing_http_factory +from adcp.signing import SigningConfig, private_key_from_jwk +from adcp.signing.autosign import current_operation +from adcp.types.core import AgentConfig, Protocol + +VECTORS_DIR = Path(__file__).parent.parent / "vectors" / "request-signing" +KEYS = json.loads((VECTORS_DIR / "keys.json").read_text())["keys"] +ED25519_KEY = next(k for k in KEYS if k["kid"] == "test-ed25519-2026") + + +@pytest.fixture() +def signing_config() -> SigningConfig: + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + return SigningConfig(private_key=private_key, key_id=ED25519_KEY["kid"]) + + +# -- factory ------------------------------------------------------------ + + +async def _dummy_hook(_request: httpx.Request) -> None: + return None + + +def test_factory_disables_follow_redirects() -> None: + factory = _make_signing_http_factory(_dummy_hook) + client = factory() + assert client.follow_redirects is False + + +def test_factory_installs_request_event_hook() -> None: + factory = _make_signing_http_factory(_dummy_hook) + client = factory() + hooks = client.event_hooks.get("request") or [] + assert hooks == [_dummy_hook] + + +def test_factory_forwards_headers_timeout_auth() -> None: + factory = _make_signing_http_factory(_dummy_hook) + timeout = httpx.Timeout(7.5) + client = factory( + headers={"X-Buyer": "b1"}, + timeout=timeout, + auth=None, + ) + assert client.headers["X-Buyer"] == "b1" + # httpx normalizes the timeout to an internal object; reading back the + # connect component is enough to prove it was threaded through. + assert client.timeout.connect == 7.5 + + +def test_factory_accepts_none_args() -> None: + # MCP's default factory is called with all-None sometimes; ours should + # accept that shape without raising. + factory = _make_signing_http_factory(_dummy_hook) + client = factory() + assert isinstance(client, httpx.AsyncClient) + + +# -- SSE transport warning ---------------------------------------------- + + +def _make_mcp_adapter(transport: str) -> MCPAdapter: + agent = AgentConfig( + id="mcp-seller", + agent_uri="https://seller.example.com/mcp", + protocol=Protocol.MCP, + mcp_transport=transport, + ) + return MCPAdapter(agent) + + +async def test_sse_transport_with_signing_logs_warning_and_skips( + signing_config: SigningConfig, + caplog: pytest.LogCaptureFixture, +) -> None: + adapter = _make_mcp_adapter("sse") + adapter.signing_request_hook = AsyncMock() + + # Capture the warning emitted during session setup. We short-circuit + # the actual HTTP attempt by patching sse_client to raise immediately, + # so we only exercise the code path up to the warning. + with patch("adcp.protocols.mcp.sse_client") as mock_sse: + mock_sse.side_effect = RuntimeError("stop here") + with caplog.at_level(logging.WARNING, logger="adcp.protocols.mcp"): + with pytest.raises(Exception): + await adapter._get_session() + + warnings = [r for r in caplog.records if r.levelno == logging.WARNING] + assert any( + "RFC 9421 auto-signing is not supported on MCP SSE" in r.getMessage() + for r in warnings + ), [r.getMessage() for r in warnings] + + +async def test_streamable_http_with_signing_wires_factory( + signing_config: SigningConfig, +) -> None: + adapter = _make_mcp_adapter("streamable_http") + adapter.signing_request_hook = AsyncMock() + + # Patch streamablehttp_client so we can assert on the kwargs it + # receives — specifically that httpx_client_factory is present. + captured_kwargs: dict[str, Any] = {} + + def _fake_streamable_http(*args: Any, **kwargs: Any) -> Any: + captured_kwargs.update(kwargs) + raise RuntimeError("stop here") # bail before any real I/O + + with patch("adcp.protocols.mcp.streamablehttp_client", side_effect=_fake_streamable_http): + with pytest.raises(Exception): + await adapter._get_session() + + assert "httpx_client_factory" in captured_kwargs + factory = captured_kwargs["httpx_client_factory"] + # Sanity check: the factory produces an AsyncClient with the signing hook. + client = factory() + assert client.follow_redirects is False + assert client.event_hooks["request"] == [adapter.signing_request_hook] + + +async def test_streamable_http_without_signing_no_factory_kwarg() -> None: + adapter = _make_mcp_adapter("streamable_http") + # No signing_request_hook installed → factory kwarg not added. + + captured_kwargs: dict[str, Any] = {} + + def _fake_streamable_http(*args: Any, **kwargs: Any) -> Any: + captured_kwargs.update(kwargs) + raise RuntimeError("stop here") + + with patch("adcp.protocols.mcp.streamablehttp_client", side_effect=_fake_streamable_http): + with pytest.raises(Exception): + await adapter._get_session() + + assert "httpx_client_factory" not in captured_kwargs + + +# -- ContextVar scope around call_tool ---------------------------------- + + +async def test_current_operation_set_around_call_tool( + signing_config: SigningConfig, +) -> None: + """Verify the ContextVar is set during call_tool and reset on the way out.""" + observed: list[str | None] = [] + + async def _capture(*_args: Any, **_kwargs: Any) -> Any: + observed.append(current_operation.get()) + result = MagicMock() + result.isError = False + result.content = [] + result.structuredContent = None + return result + + adapter = _make_mcp_adapter("streamable_http") + fake_session = MagicMock() + fake_session.call_tool = _capture + adapter._get_session = AsyncMock(return_value=fake_session) # type: ignore[method-assign] + + # Before the call, ContextVar is unset. + assert current_operation.get() is None + await adapter._call_mcp_tool("create_media_buy", {}) + # After the call, ContextVar is back to unset (token reset). + assert current_operation.get() is None + # Inside the call, it was set to the operation name. + assert observed == ["create_media_buy"] + + +async def test_context_var_reset_on_exception() -> None: + """If call_tool raises, the ContextVar still resets.""" + + async def _boom(*_args: Any, **_kwargs: Any) -> Any: + raise RuntimeError("boom") + + adapter = _make_mcp_adapter("streamable_http") + fake_session = MagicMock() + fake_session.call_tool = _boom + adapter._get_session = AsyncMock(return_value=fake_session) # type: ignore[method-assign] + + assert current_operation.get() is None + result = await adapter._call_mcp_tool("create_media_buy", {}) + # _call_mcp_tool catches broad exceptions and returns a failed TaskResult, + # so it shouldn't re-raise here; the point is that ContextVar still reset. + assert current_operation.get() is None + assert result.success is False + + +# -- ADCPClient wires hook into adapter -------------------------------- + + +def test_adcp_client_mcp_adapter_receives_hook(signing_config: SigningConfig) -> None: + agent = AgentConfig( + id="mcp-seller", + agent_uri="https://seller.example.com/mcp", + protocol=Protocol.MCP, + ) + client = ADCPClient(agent, signing=signing_config) + hook = client.adapter.signing_request_hook + assert hook is not None + # Bound methods compare equal even though each access creates a new + # descriptor, so == is the right check here (not `is`). + assert hook == client._sign_outgoing_request + assert hook.__self__ is client # type: ignore[union-attr]