Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/adcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ADCPError,
ADCPFeatureUnsupportedError,
ADCPProtocolError,
ADCPSigningRequiredError,
ADCPTaskError,
ADCPTimeoutError,
ADCPToolNotFoundError,
Expand Down Expand Up @@ -700,6 +701,7 @@ def get_adcp_version() -> str:
"ADCPTimeoutError",
"ADCPProtocolError",
"ADCPToolNotFoundError",
"ADCPSigningRequiredError",
"ADCPWebhookError",
"ADCPWebhookSignatureError",
"AdagentsValidationError",
Expand Down
116 changes: 115 additions & 1 deletion src/adcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,27 @@
import time
from collections.abc import Callable, Iterator
from datetime import datetime, timezone
from typing import Any
from typing import TYPE_CHECKING, Any

from a2a.types import Task, TaskStatusUpdateEvent
from pydantic import BaseModel

if TYPE_CHECKING:
import httpx

from adcp.capabilities import TASK_FEATURE_MAP, FeatureResolver
from adcp.exceptions import ADCPError, ADCPWebhookSignatureError
from adcp.protocols.a2a import A2AAdapter
from adcp.protocols.base import ProtocolAdapter
from adcp.protocols.mcp import MCPAdapter
from adcp.signing.autosign import (
SigningConfig,
operation_needs_signing,
)
from adcp.signing.autosign import (
current_operation as _signing_current_operation,
)
from adcp.signing.signer import sign_request
from adcp.types import (
ActivateSignalRequest,
ActivateSignalResponse,
Expand Down Expand Up @@ -293,6 +304,7 @@ def __init__(
capabilities_ttl: float = 3600.0,
validate_features: bool = False,
strict_idempotency: bool = False,
signing: SigningConfig | None = None,
):
"""
Initialize ADCP client for a single agent.
Expand All @@ -316,6 +328,17 @@ def __init__(
``IdempotencyUnsupportedError`` if the declaration is missing —
sellers that don't declare it provide no retry-safety guarantee
per AdCP #2315. Defaults to False for backward compatibility.
signing: Optional RFC 9421 request-signing config. When provided,
the client automatically attaches ``Signature`` /
``Signature-Input`` / ``Content-Digest`` headers to operations
the seller's ``request_signing`` capability lists in
``required_for``, ``warn_for``, or ``supported_for``. The
seller's ``covers_content_digest`` policy determines whether
the body is bound to the signature. Generate a key with
``adcp-keygen`` and publish the public JWK at your
``jwks_uri``. Supported on both A2A and MCP
(``mcp_transport="streamable_http"``); SSE-transport MCP
logs a warning and falls through unsigned.
"""
self.agent_config = agent_config
self.webhook_url_template = webhook_url_template
Expand All @@ -325,6 +348,7 @@ def __init__(
self.capabilities_ttl = capabilities_ttl
self.validate_features = validate_features
self.strict_idempotency = strict_idempotency
self.signing = signing

# Capabilities cache
self._capabilities: GetAdcpCapabilitiesResponse | None = None
Expand All @@ -349,6 +373,8 @@ def __init__(
self.adapter.idempotency_client_token = self._idempotency_client_token
if strict_idempotency:
self.adapter.idempotency_capability_check = self._ensure_idempotency_capability
if signing is not None:
self.adapter.signing_request_hook = self._sign_outgoing_request

# Initialize simple API accessor (lazy import to avoid circular dependency)
from adcp.simple import SimpleAPI
Expand Down Expand Up @@ -392,6 +418,89 @@ async def _ensure_idempotency_capability(self) -> None:
self._idempotency_capability_verified = False
raise

async def _sign_outgoing_request(self, request: httpx.Request) -> None:
"""httpx request event hook that attaches RFC 9421 signature headers.

Installed on the protocol adapter's httpx client when a
``SigningConfig`` was passed to ``ADCPClient``. Consults the
seller's advertised ``request_signing`` capability and signs only
the operations the seller listed in ``required_for``, ``warn_for``,
or ``supported_for`` — other requests (including the agent-card
fetch and ``get_adcp_capabilities`` itself) pass through unsigned.
The ``covers_content_digest`` tri-state determines whether the
body is bound to the signature.
"""
if self.signing is None:
return
operation = _signing_current_operation.get()
# Unset ContextVar → out-of-band call (agent-card fetch, session
# initialize, etc). Skip without fetching capabilities.
#
# get_adcp_capabilities → bootstrap carve-out: signing it would
# require capabilities we don't have yet, and if a pathological
# seller listed this op in its own required_for we'd recurse.
# Keep this check narrow — only operations strictly required to
# *obtain* capabilities belong here. Today that's just
# get_adcp_capabilities. A future adapter that adds another
# capabilities-precondition op MUST extend this guard.
if operation is None or operation == "get_adcp_capabilities":
return

caps = await self.fetch_capabilities()
req_signing = getattr(caps, "request_signing", None)

# Detect and surface a malformed seller config: supported=False is
# "signatures are ignored", but populating required_for alongside
# it is contradictory. The classifier correctly skips (matches
# verifier behavior) but the silent downgrade hides a config bug
# that will bite pilots.
if (
req_signing is not None
and not req_signing.supported
and (req_signing.required_for or req_signing.warn_for)
):
logger.warning(
"Seller %s advertises request_signing.supported=false but "
"populates required_for/warn_for — treating as unsupported "
"per spec. Verify the seller's capability advertisement.",
self.agent_config.id,
)

decision = operation_needs_signing(req_signing, operation)
if decision == "skip":
return

covers_policy: str | None = None
if req_signing is not None and req_signing.covers_content_digest is not None:
covers_policy = req_signing.covers_content_digest.value
if covers_policy == "forbidden":
cover_digest = False
elif covers_policy == "required":
cover_digest = True
else:
# "either" or absent — signer's choice; default stricter.
cover_digest = True

body = request.content
signed = sign_request(
method=request.method,
url=str(request.url),
headers=dict(request.headers),
body=body,
private_key=self.signing.private_key,
key_id=self.signing.key_id,
alg=self.signing.alg,
cover_content_digest=cover_digest,
tag=self.signing.tag,
)
# pop-then-set ensures our signed values are authoritative even if
# another hook or earlier layer added a same-named header. httpx
# headers are a case-insensitive MultiDict, so a naive assignment
# could leave a duplicate value in a different case.
for header_name, header_value in signed.as_dict().items():
request.headers.pop(header_name, None)
request.headers[header_name] = header_value

def get_webhook_url(self, task_type: str, operation_id: str) -> str:
"""Generate webhook URL for a task."""
if not self.webhook_url_template:
Expand Down Expand Up @@ -3623,6 +3732,7 @@ def __init__(
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None,
handlers: dict[str, Callable[..., Any]] | None = None,
signing: SigningConfig | None = None,
):
"""
Initialize multi-agent client.
Expand All @@ -3633,13 +3743,17 @@ def __init__(
webhook_secret: Secret for webhook verification
on_activity: Callback for activity events
handlers: Task completion handlers
signing: Optional RFC 9421 signing config forwarded to every
per-agent ADCPClient. The same identity signs traffic to
all agents. See ADCPClient.__init__ for details.
"""
self.agents = {
agent.id: ADCPClient(
agent,
webhook_url_template=webhook_url_template,
webhook_secret=webhook_secret,
on_activity=on_activity,
signing=signing,
)
for agent in agents
}
Expand Down
28 changes: 28 additions & 0 deletions src/adcp/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,34 @@ def __init__(
super().__init__(message, agent_id, agent_uri, suggestion)


class ADCPSigningRequiredError(ADCPError):
"""Raised when an operation in the seller's ``request_signing.required_for``
is called without a ``SigningConfig`` on the client.

Signing a ``required_for`` operation is mandatory — sending it unsigned
would produce a ``request_signature_required`` rejection from the seller.
Raising locally before the wire call saves a round-trip and gives the
caller a clear, actionable error.
"""

def __init__(
self,
operation: str,
agent_id: str | None = None,
agent_uri: str | None = None,
):
self.operation = operation
message = (
f"Operation {operation!r} is in the seller's request_signing.required_for "
f"list; signing is mandatory but no SigningConfig was provided"
)
suggestion = (
"Pass signing=SigningConfig(private_key=..., key_id=...) when "
"constructing ADCPClient. See adcp-keygen for key generation."
)
super().__init__(message, agent_id, agent_uri, suggestion)


class AdagentsValidationError(ADCPError):
"""Base error for adagents.json validation issues."""

Expand Down
41 changes: 36 additions & 5 deletions src/adcp/protocols/a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
IdempotencyExpiredError,
)
from adcp.protocols.base import ProtocolAdapter
from adcp.signing.autosign import current_operation as _signing_operation
from adcp.types.core import AgentConfig, DebugInfo, TaskResult, TaskStatus

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,11 +60,33 @@ async def _get_httpx_client(self) -> httpx.AsyncClient:
else:
headers[self.agent_config.auth_header] = self.agent_config.auth_token

self._httpx_client = httpx.AsyncClient(
limits=limits,
headers=headers,
timeout=self.agent_config.timeout,
)
# When ADCPClient installed a signing_request_hook, register it as
# an httpx request event hook so RFC 9421 signature headers are
# attached transparently to every outgoing request. The hook is
# a bound method, so each call reads the owning client's live
# state (signing config, cached capabilities) — it is *not* a
# snapshot. Out-of-band calls (e.g. agent-card fetch) no-op
# inside the hook because the autosign ContextVar isn't set.
#
# follow_redirects is forced off whenever signing is active: RFC
# 9421 binds the signature to the original `@authority`, so a 302
# would forward stale signature bytes to a new host. httpx's
# current default is False already, but pinning it matches the
# MCP factory's invariant and protects against future upstream
# changes or a2a-sdk overrides.
event_hooks: dict[str, list[Any]] = {}
client_kwargs: dict[str, Any] = {
"limits": limits,
"headers": headers,
"timeout": self.agent_config.timeout,
}
if self.signing_request_hook is not None:
event_hooks["request"] = [self.signing_request_hook]
client_kwargs["follow_redirects"] = False
if event_hooks:
client_kwargs["event_hooks"] = event_hooks

self._httpx_client = httpx.AsyncClient(**client_kwargs)
logger.debug(
f"Created HTTP client with connection pooling for agent {self.agent_config.id}"
)
Expand Down Expand Up @@ -196,6 +219,12 @@ async def _call_a2a_tool(
"params": _idempotency.redact_params(params),
}

# Stamp the AdCP operation name so the httpx request event hook
# installed by ADCPClient for RFC 9421 auto-signing can look up the
# right signing policy. Set only around send_message so out-of-band
# httpx calls (the agent-card fetch above, or unrelated work on
# sibling tasks) stay outside the signing scope.
signing_token = _signing_operation.set(tool_name)
try:
# Use official A2A client
sdk_response = await a2a_client.send_message(request)
Expand Down Expand Up @@ -320,6 +349,8 @@ async def _call_a2a_tool(
debug_info=debug_info,
idempotency_key=idempotency_key,
)
finally:
_signing_operation.reset(signing_token)

def _process_task_response(self, task: Task, debug_info: DebugInfo | None) -> TaskResult[Any]:
"""Process a Task response from A2A into our TaskResult format."""
Expand Down
12 changes: 11 additions & 1 deletion src/adcp/protocols/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
from typing import Any, TypeVar
from typing import TYPE_CHECKING, Any, TypeVar

from pydantic import BaseModel

from adcp.types.core import AgentConfig, TaskResult, TaskStatus
from adcp.utils.response_parser import parse_json_or_text, parse_mcp_content

if TYPE_CHECKING:
import httpx

T = TypeVar("T", bound=BaseModel)


Expand All @@ -34,6 +37,13 @@ def __init__(self, agent_config: AgentConfig):
# ``use_idempotency_key`` so a key pinned on one client does not bleed
# to sibling clients (cross-seller correlation risk per AdCP #2315).
self.idempotency_client_token: str | None = None
# Optional httpx request event hook. ADCPClient installs one when a
# SigningConfig is present; the hook attaches RFC 9421 Signature-Input
# / Signature / Content-Digest headers to outgoing requests that the
# seller's capability policy says should be signed. A2A consumes this
# via its httpx client's event_hooks; MCP consumes it via a custom
# httpx_client_factory passed to streamablehttp_client.
self.signing_request_hook: Callable[[httpx.Request], Awaitable[None]] | None = None

# ========================================================================
# Helper methods for response parsing
Expand Down
Loading
Loading