diff --git a/README.md b/README.md index 4ac0911bf..d637f8eb6 100644 --- a/README.md +++ b/README.md @@ -562,6 +562,49 @@ client = ADCPClient(agent, strict_idempotency=True) # default: False **Security note on logs.** The SDK redacts `idempotency_key` in its own debug captures, but the underlying `httpx`/`httpcore` loggers log full request bodies at `DEBUG`. If you enable `logging.basicConfig(level=logging.DEBUG)` in production, raise those two loggers back to `INFO` — otherwise full keys end up in logs during the seller's replay TTL window and become a retry-pattern oracle for anyone who can read them. +### Building a seller: idempotency middleware + +If you're building an AdCP seller, the companion middleware handles the `(principal, key, canonical-hash)` bookkeeping so you don't hand-roll it per tool handler. Drop `@idempotency.wrap` above each mutating handler and declare your replay window in capabilities: + +```python +from adcp.server import ADCPHandler, IdempotencyStore, MemoryBackend, serve +from adcp.server.responses import capabilities_response + +idempotency = IdempotencyStore( + backend=MemoryBackend(), # PgBackend with transactional commit is a follow-up + ttl_seconds=86400, # 24h, spec-recommended floor +) + +class MySeller(ADCPHandler): + async def get_adcp_capabilities(self, params, context=None): + return capabilities_response( + ["media_buy"], + idempotency=idempotency.capability(), + ) + + @idempotency.wrap + async def create_media_buy(self, params, context=None): + # Same key + canonical-equivalent payload → this body is skipped, + # the cached response is returned. Same key + different payload → + # IdempotencyConflictError raised before this runs, which the + # framework translates to IDEMPOTENCY_CONFLICT on the wire. + return my_business_logic(params) + +serve(MySeller(), name="my-seller") +``` + +**What the middleware does for you:** + +- Extracts `idempotency_key` from `params`, scopes lookups by `context.caller_identity` (per-principal — a security requirement from AdCP §2315) +- Hashes the payload with RFC 8785 JCS + SHA-256, stripping the spec's closed exclusion list (`idempotency_key`, `context`, `governance_context`, `push_notification_config.authentication.credentials`) +- On cache hit with matching hash: returns the cached response verbatim, skips your handler (deep-copied so caller mutation can't poison future replays) +- On cache hit with different hash: raises `IdempotencyConflictError`, which the framework surfaces as `IDEMPOTENCY_CONFLICT` on both MCP (`is_error=true` + text) and A2A (failed task with `adcp_error` DataPart) +- On cache miss: runs your handler, then commits the response + +**Backends:** `MemoryBackend` ships now (tests, single-process agents). `PgBackend` is scaffolded — it raises `NotImplementedError` with a pointer to the follow-up issue. For production use across multiple workers, implement your own `IdempotencyBackend` subclass against Redis, Postgres, etc. + +**Atomicity caveat:** `MemoryBackend` commits the cache entry AFTER your handler returns, so a crash between `handler success` and `cache commit` causes the retry to re-execute. `PgBackend` (follow-up) will commit the cache row in the same transaction as your business writes. Read the module docstring at `adcp.server.idempotency` before shipping this against a production database. + ## Available Tools All AdCP tools with full type safety: diff --git a/pyproject.toml b/pyproject.toml index 776a68083..c2c7dd231 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,10 @@ dependencies = [ "mcp>=1.23.2", "email-validator>=2.0.0", "cryptography>=41.0.0", + # RFC 8785 JSON Canonicalization Scheme — used by the server-side + # idempotency middleware to compute the spec-mandated payload hash for + # replay detection. Tiny pure-Python dep, no transitive weight. + "rfc8785>=0.1.4", ] [project.scripts] diff --git a/src/adcp/server/__init__.py b/src/adcp/server/__init__.py index 6bb09391b..c5e934e4d 100644 --- a/src/adcp/server/__init__.py +++ b/src/adcp/server/__init__.py @@ -79,6 +79,7 @@ async def get_products(params, context=None): resolve_account, valid_actions_for_status, ) +from adcp.server.idempotency import IdempotencyStore, MemoryBackend from adcp.server.mcp_tools import MCPToolSet, create_mcp_tools, get_tools_for_handler from adcp.server.proposal import ProposalBuilder, ProposalNotSupported from adcp.server.responses import ( @@ -138,6 +139,9 @@ async def get_products(params, context=None): # A2A integration "ADCPAgentExecutor", "create_a2a_server", + # Idempotency middleware (AdCP #2315 seller side) + "IdempotencyStore", + "MemoryBackend", # Test controller "TestControllerStore", "TestControllerError", diff --git a/src/adcp/server/a2a_server.py b/src/adcp/server/a2a_server.py index 0d490cc3b..614a0466f 100644 --- a/src/adcp/server/a2a_server.py +++ b/src/adcp/server/a2a_server.py @@ -35,7 +35,9 @@ TextPart, ) +from adcp.exceptions import ADCPError, ADCPTaskError from adcp.server.base import ADCPHandler +from adcp.server.helpers import STANDARD_ERROR_CODES from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler from adcp.server.test_controller import TestControllerStore, _handle_test_controller @@ -104,6 +106,16 @@ async def execute( try: result = await self._tool_callers[skill_name](params) await self._send_result(event_queue, context, skill_name, result) + except ADCPError as exc: + # Application-layer AdCP error (IdempotencyConflictError etc.). + # Emit a failed task with the adcp_error in a DataPart per + # transport-errors.mdx §A2A Binding, plus a human-readable text + # part. The JSON-RPC channel is reserved for transport-level + # errors (auth rejected, rate-limited pre-dispatch). + logger.info( + "AdCP application error for skill %s: %s", skill_name, exc + ) + await self._send_adcp_error(event_queue, context, exc) except Exception: logger.exception("Error executing skill %s", skill_name) await self._send_error( @@ -212,6 +224,44 @@ async def _send_error( ) await event_queue.enqueue_event(task) + async def _send_adcp_error( + self, + event_queue: EventQueue, + context: RequestContext, + exc: ADCPError, + ) -> None: + """Publish a failed task carrying an AdCP ``adcp_error`` payload. + + Follows transport-errors.mdx §A2A Binding: failed task with artifact + containing a ``DataPart`` keyed under ``adcp_error`` plus a terse + ``TextPart`` for human/LLM consumption. + """ + # Derive the spec error code. ADCPTaskError carries a list of codes + # (e.g. IdempotencyConflictError → IDEMPOTENCY_CONFLICT); fall back + # to a generic INTERNAL_ERROR when the exception doesn't supply one. + code = "INTERNAL_ERROR" + if isinstance(exc, ADCPTaskError) and exc.error_codes: + code = str(exc.error_codes[0]) + + adcp_error: dict[str, Any] = { + "code": code, + "message": exc.message, + } + recovery = STANDARD_ERROR_CODES.get(code, {}).get("recovery") + if recovery: + adcp_error["recovery"] = recovery + suggestion = getattr(exc, "suggestion", None) + if suggestion: + adcp_error["suggestion"] = suggestion + + task = _make_task( + context, + state=TaskState.failed, + data={"adcp_error": adcp_error}, + message=exc.message, + ) + await event_queue.enqueue_event(task) + # ------------------------------------------------------------------ # Task factory diff --git a/src/adcp/server/helpers.py b/src/adcp/server/helpers.py index 6d9ec0ed6..ce0465fc8 100644 --- a/src/adcp/server/helpers.py +++ b/src/adcp/server/helpers.py @@ -55,6 +55,17 @@ "ACCOUNT_NOT_FOUND": {"recovery": "terminal", "message": "Account not found"}, "ACCOUNT_SUSPENDED": {"recovery": "terminal", "message": "Account suspended"}, "UNSUPPORTED_FEATURE": {"recovery": "terminal", "message": "Feature not supported"}, + # Idempotency (AdCP #2315). Both are "terminal" from a retry-behavior + # standpoint — the caller MUST take a specific action (mint a fresh key or + # reconcile state) rather than blindly retry. + "IDEMPOTENCY_CONFLICT": { + "recovery": "terminal", + "message": "idempotency_key reused with a different payload", + }, + "IDEMPOTENCY_EXPIRED": { + "recovery": "terminal", + "message": "Idempotency replay window has expired", + }, # --- SDK extensions (not in spec enum) --- "NOT_SUPPORTED": {"recovery": "terminal", "message": "Operation not supported"}, } diff --git a/src/adcp/server/idempotency/__init__.py b/src/adcp/server/idempotency/__init__.py new file mode 100644 index 000000000..e3c444343 --- /dev/null +++ b/src/adcp/server/idempotency/__init__.py @@ -0,0 +1,77 @@ +"""Server-side idempotency middleware for AdCP mutating tool handlers. + +Implements the seller side of AdCP #2315: extract ``idempotency_key``, look up +cached responses scoped by ``(authenticated_principal, idempotency_key)``, and +replay the cached response verbatim when a subsequent request carries the same +key + canonicalized-equivalent payload. Reject key reuse with a different +payload as ``IDEMPOTENCY_CONFLICT``. + +The spec contract lives at +``adcontextprotocol/adcp/docs/building/implementation/security.mdx#idempotency``. + +Typical usage:: + + from adcp.server import ADCPHandler, IdempotencyStore, MemoryBackend, ToolContext + from adcp.server.responses import capabilities_response + + idempotency = IdempotencyStore( + backend=MemoryBackend(), + ttl_seconds=86400, # 24 hours, matches spec minimum + ) + + class MySeller(ADCPHandler): + @idempotency.wrap + async def create_media_buy(self, params, context=None): + # ``params`` carries ``idempotency_key``; ``context.caller_identity`` + # identifies the principal. Without either, the middleware falls + # through to this handler with no dedup (schema validation above + # us rejects missing keys per AdCP #2315). + return my_create_logic(params) + + async def get_adcp_capabilities(self, params, context=None): + return capabilities_response( + ["media_buy"], + idempotency=idempotency.capability(), + ) + +Callers who invoke the handler directly (tests, non-HTTP code paths) must +pass a :class:`adcp.server.base.ToolContext` with ``caller_identity`` set — +it's how the middleware scopes the cache namespace per-principal:: + + ctx = ToolContext(caller_identity="buyer-acme") + result = await seller.create_media_buy( + {"idempotency_key": key, ...}, ctx + ) + +Backends: + +- :class:`MemoryBackend` — in-process dict with TTL; use for tests and + single-process reference implementations. +- :class:`PgBackend` — scaffold for a SQLAlchemy/asyncpg-backed store that can + commit cache writes atomically with business writes. Implementation arrives + in a follow-up PR. +""" + +from adcp.server.idempotency.backends import ( + CachedResponse, + IdempotencyBackend, + MemoryBackend, + PgBackend, +) +from adcp.server.idempotency.canonicalize import ( + EXCLUDED_FIELDS, + canonical_json_sha256, + strip_excluded_fields, +) +from adcp.server.idempotency.store import IdempotencyStore + +__all__ = [ + "CachedResponse", + "EXCLUDED_FIELDS", + "IdempotencyBackend", + "IdempotencyStore", + "MemoryBackend", + "PgBackend", + "canonical_json_sha256", + "strip_excluded_fields", +] diff --git a/src/adcp/server/idempotency/backends.py b/src/adcp/server/idempotency/backends.py new file mode 100644 index 000000000..d51a00ebf --- /dev/null +++ b/src/adcp/server/idempotency/backends.py @@ -0,0 +1,221 @@ +"""Storage backends for :class:`~adcp.server.idempotency.IdempotencyStore`. + +A backend owns two responsibilities: + +1. Retrieve a cached response by ``(principal_id, idempotency_key)``, honoring + the seller's replay TTL. +2. Atomically commit ``(payload_hash, response)`` on a fresh key. Atomicity + with the handler's business writes is the backend's choice — :class:`MemoryBackend` + makes no such guarantee; :class:`PgBackend` (follow-up) will when the handler + uses the same engine. + +Backends expose async methods. The in-process :class:`MemoryBackend` is +synchronous under the hood but wrapped in ``async`` signatures so the store +can remain backend-agnostic. +""" + +from __future__ import annotations + +import asyncio +import time +from abc import ABC, abstractmethod +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True) +class CachedResponse: + """A single cached handler response keyed by ``(principal_id, key)``. + + :param payload_hash: Canonical JSON SHA-256 of the *original* request. On + replay we compare the new request's hash to this value; mismatch is + ``IDEMPOTENCY_CONFLICT``. + :param response: The response dict the handler returned. Returned verbatim + on replay — the seller injects ``replayed: true`` at the envelope + level before sending. + :param expires_at_epoch: Unix timestamp (seconds) when this entry becomes + eligible for eviction. Reads after this time return None. + """ + + payload_hash: str + response: dict[str, Any] + expires_at_epoch: float + + +class IdempotencyBackend(ABC): + """Abstract storage backend contract. + + All methods are async. Implementations MUST be safe to call concurrently + from multiple asyncio tasks — :class:`IdempotencyStore` does not serialize + access on the caller's behalf. + """ + + @abstractmethod + async def get( + self, principal_id: str, key: str + ) -> CachedResponse | None: + """Return the cached entry, or None if missing or expired.""" + + @abstractmethod + async def put( + self, + principal_id: str, + key: str, + entry: CachedResponse, + ) -> None: + """Store ``entry`` under ``(principal_id, key)``. Overwrites any prior + entry — the store only calls ``put`` after verifying the slot is empty + or expired, so an overwrite in that window is a legitimate retry of + the write itself.""" + + @abstractmethod + async def delete_expired(self, now_epoch: float | None = None) -> int: + """Best-effort sweep of expired entries. Returns the count removed. + + Sweeping is optional — :meth:`get` MUST self-filter expired entries. + Backends that have natural TTL primitives (Redis ``EXPIRE``, Postgres + partial indexes) may implement this as a no-op.""" + + +class MemoryBackend(IdempotencyBackend): + """In-process dict-backed store. + + Suitable for tests, single-process reference implementations, and local + development. **Not suitable for multi-process deployments** — each worker + has its own cache, so a retry that lands on a different worker is treated + as a fresh request. + + Thread safety: the backend uses an :class:`asyncio.Lock` to serialize + mutations of the shared dict. Reads go through the lock too; for a pure + in-process backend this is cheap and prevents torn reads across concurrent + ``get``/``put`` interleaving. + + :param clock: Callable returning the current epoch seconds. Override for + tests that need to advance time deterministically without monkeypatching + :mod:`time`. Defaults to :func:`time.time`. + """ + + def __init__(self, *, clock: Callable[[], float] = time.time) -> None: + self._store: dict[tuple[str, str], CachedResponse] = {} + self._lock = asyncio.Lock() + self._clock = clock + + async def get( + self, principal_id: str, key: str + ) -> CachedResponse | None: + async with self._lock: + entry = self._store.get((principal_id, key)) + if entry is None: + return None + if entry.expires_at_epoch <= self._clock(): + # Lazy expiry — drop the stale entry so the next request + # treats the slot as fresh and races to repopulate. + del self._store[(principal_id, key)] + return None + return entry + + async def put( + self, + principal_id: str, + key: str, + entry: CachedResponse, + ) -> None: + async with self._lock: + self._store[(principal_id, key)] = entry + + async def delete_expired(self, now_epoch: float | None = None) -> int: + cutoff = now_epoch if now_epoch is not None else self._clock() + async with self._lock: + stale = [k for k, v in self._store.items() if v.expires_at_epoch <= cutoff] + for k in stale: + del self._store[k] + return len(stale) + + async def clear(self) -> None: + """Remove all cached entries. + + Test-suite hook — handy for resetting state between fixtures when a + single :class:`MemoryBackend` is shared across multiple tests. + """ + async with self._lock: + self._store.clear() + + async def _size(self) -> int: + """Test-only: return the current entry count.""" + async with self._lock: + return len(self._store) + + +class PgBackend(IdempotencyBackend): + """PostgreSQL-backed store — **scaffold, not yet implemented**. + + .. warning:: + Calling ``PgBackend(...)`` raises ``NotImplementedError`` today. Use + :class:`MemoryBackend` for tests, or implement your own + :class:`IdempotencyBackend` subclass against the database of your + choice until this implementation lands. Tracked at + https://github.com/adcontextprotocol/adcp-client-python/issues/182. + + **Design intent.** Share a transaction with the handler's business + writes so the cache entry commits atomically with side effects. Without + that, a crash between ``handler success`` and ``cache commit`` causes + the retry to re-execute the handler, duplicating side effects. + + **Schema sketch for the implementer.** + + .. code-block:: sql + + CREATE TABLE adcp_idempotency ( + principal_id TEXT COLLATE "C" NOT NULL, + key TEXT COLLATE "C" NOT NULL, + payload_hash TEXT NOT NULL, + response JSONB NOT NULL, + expires_at TIMESTAMPTZ NOT NULL, + PRIMARY KEY (principal_id, key) + ); + + Notes: + + * ``COLLATE "C"`` (or ``CITEXT`` with a deliberate case policy) — avoid + the default locale collation on the identifier columns. On some + locales ``Principal-A`` and ``principal-a`` compare equal, which + would collapse distinct tenants into the same cache slot. + * Queries MUST filter on ``principal_id`` in the ``WHERE`` clause even + with the composite PK — row-level security (RLS) enforced via a + policy like ``USING (principal_id = current_setting('adcp.principal_id')::text)`` + gives belt-and-suspenders protection against accidental cross-tenant + reads in future handlers. + * ``get`` uses ``SELECT ... WHERE expires_at > now()``. + * ``put`` uses ``INSERT ... ON CONFLICT (principal_id, key) DO UPDATE``. + * Accept a SQLAlchemy/asyncpg session factory so the caller can thread + the handler's transaction through for atomic commit — the atomicity + guarantee is the whole reason to use a SQL backend. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError( + "PgBackend is scaffolded but not yet implemented. Use MemoryBackend " + "for tests, or implement your own IdempotencyBackend subclass " + "against your database of choice until the PgBackend implementation " + "lands. Tracking: " + "https://github.com/adcontextprotocol/adcp-client-python/issues/182." + ) + + async def get( + self, principal_id: str, key: str + ) -> CachedResponse | None: # pragma: no cover + raise NotImplementedError + + async def put( + self, + principal_id: str, + key: str, + entry: CachedResponse, + ) -> None: # pragma: no cover + raise NotImplementedError + + async def delete_expired( + self, now_epoch: float | None = None + ) -> int: # pragma: no cover + raise NotImplementedError diff --git a/src/adcp/server/idempotency/canonicalize.py b/src/adcp/server/idempotency/canonicalize.py new file mode 100644 index 000000000..fda2e248e --- /dev/null +++ b/src/adcp/server/idempotency/canonicalize.py @@ -0,0 +1,88 @@ +"""Canonical payload hashing for AdCP idempotency replay detection. + +The spec (AdCP #2315) defines payload equivalence as RFC 8785 JSON +Canonicalization Scheme over the request with a closed exclusion list. Two +requests with the same ``idempotency_key`` and the same canonical hash are +replays of each other; same key with different hash is +``IDEMPOTENCY_CONFLICT``. + +The exclusion list is **closed** — every other field participates in the hash, +including ``ext``. See +``adcontextprotocol/adcp/docs/building/implementation/security.mdx#idempotency``. +""" + +from __future__ import annotations + +import copy +import hashlib +from typing import Any + +import rfc8785 + +# Fields the spec excludes from the canonical hash. These are either +# protocol-level metadata (idempotency_key itself is the identifier) or +# routing/auth fields whose presence mustn't force a new resource on replay. +# +# ``push_notification_config.authentication.credentials`` lives under a nested +# path; ``_NESTED_EXCLUSIONS`` captures those. +EXCLUDED_FIELDS: frozenset[str] = frozenset( + { + "idempotency_key", + "context", + "governance_context", + } +) + +# Nested paths, as dotted strings. Stripped in order. +_NESTED_EXCLUSIONS: tuple[tuple[str, ...], ...] = ( + ("push_notification_config", "authentication", "credentials"), +) + + +def strip_excluded_fields(payload: dict[str, Any]) -> dict[str, Any]: + """Return a deep copy of ``payload`` with the spec's exclusion list removed. + + Top-level keys in :data:`EXCLUDED_FIELDS` are dropped. Nested paths in + ``_NESTED_EXCLUSIONS`` are traversed; the final leaf key is removed if the + traversal reaches it. Missing intermediate keys are a no-op — the caller's + payload is free to omit the push_notification_config entirely. + + The input dict is never mutated. + """ + out: dict[str, Any] = copy.deepcopy(payload) + for key in EXCLUDED_FIELDS: + out.pop(key, None) + for path in _NESTED_EXCLUSIONS: + _drop_nested(out, path) + return out + + +def _drop_nested(obj: dict[str, Any], path: tuple[str, ...]) -> None: + """Remove the leaf key of ``path`` from ``obj``, walking nested dicts.""" + if not path: + return + cursor: Any = obj + for key in path[:-1]: + if not isinstance(cursor, dict) or key not in cursor: + return + cursor = cursor[key] + if isinstance(cursor, dict): + cursor.pop(path[-1], None) + + +def canonical_json_sha256(payload: dict[str, Any]) -> str: + """Compute the spec's canonical payload fingerprint. + + 1. Strip the spec's exclusion list (see :func:`strip_excluded_fields`). + 2. RFC 8785 JCS canonicalize (stable key order, compact, UTF-8, spec- + compliant number serialization). + 3. SHA-256 over the canonical bytes; return hex digest. + + The result is stable across all conforming JCS implementations. Two + payloads whose hashes match are equivalent under AdCP replay semantics; + two with different hashes are distinct and MUST be treated as a conflict + when the caller supplies the same ``idempotency_key``. + """ + stripped = strip_excluded_fields(payload) + canonical = rfc8785.dumps(stripped) + return hashlib.sha256(canonical).hexdigest() diff --git a/src/adcp/server/idempotency/store.py b/src/adcp/server/idempotency/store.py new file mode 100644 index 000000000..855df6a31 --- /dev/null +++ b/src/adcp/server/idempotency/store.py @@ -0,0 +1,261 @@ +"""The :class:`IdempotencyStore` coordinator: canonical hashing + backend + decorator. + +Responsibilities: + +1. Extract ``idempotency_key`` from the incoming request. +2. Scope lookups by ``(principal_id, key)`` via the backend. +3. On cache hit with matching canonical payload hash: return the cached response + and mark ``replayed=True`` on the envelope. +4. On cache hit with a different hash: raise + :class:`adcp.exceptions.IdempotencyConflictError`. +5. On miss: run the wrapped handler, then commit ``(hash, response)`` to the + backend. + +Per-principal scoping is a hard security requirement (AdCP #2315): a key from +principal A on seller S has no meaning for principal B. The store pulls the +principal id from :class:`adcp.server.base.ToolContext.caller_identity`. If no +context / no caller_identity is supplied, the store refuses to proceed — +fail-closed rather than collapse every buyer into a shared namespace. +""" + +from __future__ import annotations + +import copy +import logging +import time +from collections.abc import Awaitable, Callable +from functools import wraps +from typing import Any + +from pydantic import BaseModel + +from adcp.exceptions import IdempotencyConflictError +from adcp.server.idempotency.backends import CachedResponse, IdempotencyBackend +from adcp.server.idempotency.canonicalize import canonical_json_sha256 + +logger = logging.getLogger(__name__) + +# Spec bounds from capabilities.idempotency.replay_ttl_seconds (1h-7d). +_MIN_TTL_SECONDS = 3600 +_MAX_TTL_SECONDS = 604800 + +HandlerFn = Callable[..., Awaitable[Any]] + + +class IdempotencyStore: + """Coordinator that binds canonical hashing to a storage backend. + + :param backend: A concrete :class:`IdempotencyBackend`. + :param ttl_seconds: How long cached responses remain replayable. Must be + within the spec's ``[3600, 604800]`` range (1h to 7d). 86400 (24h) is + the recommended floor and matches the compliance storyboard. + :param hash_fn: Optional override for the canonical hash function. Defaults + to :func:`canonical_json_sha256`. Exposed for tests and for anyone who + wants to experiment with alternative equivalence rules — though note + the spec mandates RFC 8785 JCS for interop. + """ + + def __init__( + self, + backend: IdempotencyBackend, + ttl_seconds: int = 86400, + hash_fn: Callable[[dict[str, Any]], str] = canonical_json_sha256, + *, + clock: Callable[[], float] = time.time, + ) -> None: + if not _MIN_TTL_SECONDS <= ttl_seconds <= _MAX_TTL_SECONDS: + raise ValueError( + f"ttl_seconds must be in [{_MIN_TTL_SECONDS}, {_MAX_TTL_SECONDS}] " + f"per AdCP spec (capabilities.idempotency.replay_ttl_seconds), " + f"got {ttl_seconds}" + ) + self.backend = backend + self.ttl_seconds = ttl_seconds + self._hash_fn = hash_fn + self._clock = clock + + def capability(self) -> dict[str, Any]: + """Return the capabilities fragment declaring this store's replay window. + + Embed under ``capabilities.adcp.idempotency`` on the seller's + ``get_adcp_capabilities`` response. Buyers read this to reason about + retry-safe windows (AdCP #2315):: + + caps.adcp.idempotency = idempotency.capability() + # → {"replay_ttl_seconds": 86400} + """ + return {"replay_ttl_seconds": self.ttl_seconds} + + def wrap(self, handler: HandlerFn) -> HandlerFn: + """Decorator that adds idempotency semantics to an AdCP handler method. + + The wrapped handler is called as ``handler(self, params, context)``. + ``params`` may be a dict or a Pydantic model — both are normalized to + a dict before hashing. The return value is coerced to a dict for + caching (via ``model_dump`` if Pydantic). + + The decorator always returns the handler's original object on a cache + miss and a best-effort Pydantic re-validation on a hit (when the + handler's declared return type exposes ``model_validate``). Callers + that return raw dicts get dicts back. + """ + + @wraps(handler) + async def _wrapped( + handler_self: Any, + params: Any, + context: Any = None, + *args: Any, + **kwargs: Any, + ) -> Any: + principal_id, idempotency_key, params_dict = self._prepare(params, context) + if principal_id is None or idempotency_key is None: + # No key → spec says the server MUST reject with INVALID_REQUEST. + # We let the handler run so validation layers above us (Pydantic, + # FastAPI, etc.) can reject with a typed error; the middleware's + # job is only to dedup when a key IS present. + return await handler(handler_self, params, context, *args, **kwargs) + + payload_hash = self._hash_fn(params_dict) + + cached = await self.backend.get(principal_id, idempotency_key) + if cached is not None: + if cached.payload_hash == payload_hash: + logger.debug( + "idempotency replay: principal=%s key_prefix=%s", + principal_id, + idempotency_key[:8], + ) + return _clone_response(cached.response) + # Same key, different payload — spec-defined conflict. + raise IdempotencyConflictError( + operation=getattr(handler, "__name__", "handler"), + errors=[ + { + "code": "IDEMPOTENCY_CONFLICT", + "message": ( + "idempotency_key reused with a different payload " + "(canonical hash mismatch)" + ), + } + ], + ) + + response = await handler(handler_self, params, context, *args, **kwargs) + # Deep-copy when caching so post-return mutation of the caller's + # copy can't poison future replays. `_clone_response` also deep- + # copies on the hit path, giving independent objects per replay. + response_dict = copy.deepcopy(_to_dict(response)) + entry = CachedResponse( + payload_hash=payload_hash, + response=response_dict, + expires_at_epoch=self._clock() + self.ttl_seconds, + ) + # Commit cache AFTER handler returns. Atomicity with the handler's + # side effects depends on the backend: MemoryBackend is best-effort + # (no transactional relationship to external resources); PgBackend + # (follow-up) will commit in the same transaction when the handler + # uses the same engine. On put failure we log loudly and return + # the handler's response — swallowing the exception would be wrong + # (operators need the signal that caching is broken), and raising + # would look to the caller like the handler failed, triggering a + # retry that re-executes side effects. Best compromise: warn + # operators, return the result, and accept that the next retry + # with this key will re-execute. + try: + await self.backend.put(principal_id, idempotency_key, entry) + except Exception: + logger.warning( + "Idempotency cache put failed for principal=%s key_prefix=%s — " + "handler completed but a subsequent retry with this key will " + "re-execute rather than replay. This indicates an operational " + "issue with the idempotency backend.", + principal_id, + idempotency_key[:8], + exc_info=True, + ) + return response + + return _wrapped + + def _prepare( + self, params: Any, context: Any + ) -> tuple[str | None, str | None, dict[str, Any]]: + """Normalize inputs and extract the (principal, key, params_dict) tuple. + + Returns ``(None, None, params_dict)`` when idempotency doesn't apply + (no caller identity or no key supplied). The caller falls through to + the plain handler in that case — validation of missing-key lives in + the request schema, not here. + """ + params_dict = _to_dict(params) + idempotency_key = params_dict.get("idempotency_key") + if not isinstance(idempotency_key, str) or not idempotency_key: + return None, None, params_dict + principal_id = _extract_principal_id(context) + if principal_id is None: + # No caller identity: we can't safely scope the key. Spec requires + # per-principal scope; anything else is a cross-principal replay + # attack surface. Fall through to the handler (which will process + # the request normally — no dedup, but no security regression). + return None, None, params_dict + return principal_id, idempotency_key, params_dict + + +def _to_dict(value: Any) -> dict[str, Any]: + """Coerce a request/response to a plain dict for hashing and caching.""" + if isinstance(value, dict): + return value + if isinstance(value, BaseModel): + return value.model_dump(mode="json", exclude_none=True) + if hasattr(value, "model_dump") and callable(value.model_dump): + return value.model_dump(mode="json", exclude_none=True) # type: ignore[no-any-return] + if hasattr(value, "__dict__"): + return dict(value.__dict__) + raise TypeError( + f"Cannot coerce {type(value).__name__} to dict for idempotency caching" + ) + + +def _extract_principal_id(context: Any) -> str | None: + """Pull the principal id from a ToolContext or equivalent shape. + + Accepts: + + * :class:`adcp.server.base.ToolContext` with ``caller_identity`` + * Any object exposing ``caller_identity`` / ``principal_id`` / ``principal.id`` + * A dict with any of the above keys + """ + if context is None: + return None + for attr in ("caller_identity", "principal_id"): + val = getattr(context, attr, None) + if isinstance(val, str) and val: + return val + principal = getattr(context, "principal", None) + if principal is not None: + val = getattr(principal, "id", None) + if isinstance(val, str) and val: + return val + if isinstance(context, dict): + for key in ("caller_identity", "principal_id"): + val = context.get(key) + if isinstance(val, str) and val: + return val + principal = context.get("principal") + if isinstance(principal, dict): + val = principal.get("id") + if isinstance(val, str) and val: + return val + return None + + +def _clone_response(response: dict[str, Any]) -> dict[str, Any]: + """Return a deep copy of the cached response. + + The whole point of a cached replay is "identical response, every time." + A shallow copy would let a caller mutate nested lists/dicts on first + replay and poison every subsequent one. Deep copy the whole tree so each + caller gets an independent object. + """ + return copy.deepcopy(response) diff --git a/src/adcp/server/responses.py b/src/adcp/server/responses.py index ee5871937..ae39c3d97 100644 --- a/src/adcp/server/responses.py +++ b/src/adcp/server/responses.py @@ -47,6 +47,7 @@ def capabilities_response( major_versions: list[int] | None = None, sandbox: bool = True, features: dict[str, Any] | None = None, + idempotency: dict[str, Any] | None = None, ) -> dict[str, Any]: """Build a get_adcp_capabilities response. @@ -56,9 +57,27 @@ def capabilities_response( major_versions: AdCP major versions. Defaults to [3]. sandbox: Whether this is a sandbox agent. Defaults to True. features: Additional feature flags. + idempotency: Optional idempotency declaration, nested under + ``adcp.idempotency`` per AdCP #2315. Pass the output of + :meth:`adcp.server.idempotency.IdempotencyStore.capability` here + to declare the seller's ``replay_ttl_seconds``. + + Example:: + + from adcp.server.responses import capabilities_response + from adcp.server.idempotency import IdempotencyStore, MemoryBackend + + store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=86400) + return capabilities_response( + ["media_buy"], + idempotency=store.capability(), + ) """ + adcp_info: dict[str, Any] = {"major_versions": major_versions or [3]} + if idempotency: + adcp_info["idempotency"] = idempotency resp: dict[str, Any] = { - "adcp": {"major_versions": major_versions or [3]}, + "adcp": adcp_info, "supported_protocols": supported_protocols, "sandbox": sandbox, } diff --git a/src/adcp/server/serve.py b/src/adcp/server/serve.py index 72fe26006..3d1fce027 100644 --- a/src/adcp/server/serve.py +++ b/src/adcp/server/serve.py @@ -211,8 +211,20 @@ def _register_tool( from mcp.server.fastmcp.utilities.func_metadata import ArgModelBase, FuncMetadata from pydantic import ConfigDict + from adcp.exceptions import ADCPError + from adcp.server.translate import translate_error + async def fn(**kwargs: Any) -> dict[str, Any]: - result = await caller(kwargs) + try: + result = await caller(kwargs) + except ADCPError as exc: + # Translate AdCP-typed exceptions (IdempotencyConflictError, + # ADCPTaskError with a spec code, etc.) into a ToolError so FastMCP + # surfaces ``is_error=true`` with the spec error code in the + # message text. Clients per AdCP §transport-errors will extract + # the code via either structuredContent.adcp_error (if populated) + # or the text-fallback path. + raise translate_error(exc, protocol="mcp") from exc if hasattr(result, "model_dump"): return result.model_dump(mode="json", exclude_none=True) # type: ignore[no-any-return] if isinstance(result, dict): diff --git a/tests/test_server_idempotency.py b/tests/test_server_idempotency.py new file mode 100644 index 000000000..73f72b95d --- /dev/null +++ b/tests/test_server_idempotency.py @@ -0,0 +1,597 @@ +"""Tests for the server-side idempotency middleware (AdCP #2315 seller side).""" + +from __future__ import annotations + +import asyncio +import time +import uuid +from typing import Any + +import pytest +from pydantic import BaseModel + +from adcp.exceptions import IdempotencyConflictError +from adcp.server.base import ToolContext +from adcp.server.idempotency import ( + EXCLUDED_FIELDS, + CachedResponse, + IdempotencyStore, + MemoryBackend, + PgBackend, + canonical_json_sha256, + strip_excluded_fields, +) + + +class TestCanonicalize: + """Hashing determinism + exclusion list behavior.""" + + def test_same_payload_same_hash(self) -> None: + a = {"brand": "acme", "budget": 100} + b = dict(a) + assert canonical_json_sha256(a) == canonical_json_sha256(b) + + def test_key_order_irrelevant(self) -> None: + a = canonical_json_sha256({"a": 1, "b": 2, "c": {"x": True, "y": False}}) + b = canonical_json_sha256({"c": {"y": False, "x": True}, "b": 2, "a": 1}) + assert a == b + + def test_different_payload_different_hash(self) -> None: + a = canonical_json_sha256({"brand": "acme", "budget": 100}) + b = canonical_json_sha256({"brand": "acme", "budget": 101}) + assert a != b + + def test_strip_idempotency_key(self) -> None: + stripped = strip_excluded_fields( + {"idempotency_key": "abc123def456ghi7", "brand": "acme"} + ) + assert stripped == {"brand": "acme"} + + def test_strip_context(self) -> None: + assert strip_excluded_fields({"context": "opaque", "x": 1}) == {"x": 1} + + def test_strip_governance_context(self) -> None: + assert strip_excluded_fields({"governance_context": {}, "x": 1}) == {"x": 1} + + def test_strip_nested_push_notification_credentials(self) -> None: + stripped = strip_excluded_fields( + { + "push_notification_config": { + "authentication": { + "credentials": "secret-token", + "scheme": "bearer", + }, + "url": "https://callback.example", + }, + "brand": "acme", + } + ) + # credentials removed; siblings preserved. + assert stripped == { + "push_notification_config": { + "authentication": {"scheme": "bearer"}, + "url": "https://callback.example", + }, + "brand": "acme", + } + + def test_strip_nested_missing_path_noop(self) -> None: + # No push_notification_config → no crash. + assert strip_excluded_fields({"brand": "acme"}) == {"brand": "acme"} + + def test_strip_preserves_ext(self) -> None: + # Spec: 'ext' is explicitly IN the hash. Don't strip it. + payload = {"ext": {"custom": "field"}, "brand": "acme"} + assert strip_excluded_fields(payload) == payload + + def test_strip_does_not_mutate_input(self) -> None: + original = {"idempotency_key": "abc123def456ghi7", "brand": "acme"} + strip_excluded_fields(original) + assert "idempotency_key" in original + + def test_hash_ignores_idempotency_key(self) -> None: + # Changing idempotency_key must NOT change the hash — that's the whole + # point: the spec defines equivalence over the payload minus the key. + a = canonical_json_sha256({"idempotency_key": "key-one" * 3, "brand": "acme"}) + b = canonical_json_sha256({"idempotency_key": "key-two" * 3, "brand": "acme"}) + assert a == b + + def test_exclusion_set_is_closed(self) -> None: + # Regression guard — if a maintainer adds fields to EXCLUDED_FIELDS + # without updating the spec, the test surfaces it. This test locks the + # closed set to what's actually in the spec. + assert EXCLUDED_FIELDS == frozenset( + {"idempotency_key", "context", "governance_context"} + ) + + +class TestMemoryBackend: + @pytest.mark.asyncio + async def test_put_then_get(self) -> None: + backend = MemoryBackend() + entry = CachedResponse( + payload_hash="abc", + response={"media_buy_id": "mb_1"}, + expires_at_epoch=time.time() + 60, + ) + await backend.put("principal-a", "key-1", entry) + got = await backend.get("principal-a", "key-1") + assert got is not None + assert got.payload_hash == "abc" + assert got.response == {"media_buy_id": "mb_1"} + + @pytest.mark.asyncio + async def test_miss(self) -> None: + backend = MemoryBackend() + got = await backend.get("principal-a", "unknown-key") + assert got is None + + @pytest.mark.asyncio + async def test_expired_entry_returns_none_and_evicts(self) -> None: + backend = MemoryBackend() + entry = CachedResponse( + payload_hash="abc", response={}, expires_at_epoch=time.time() - 1 + ) + await backend.put("principal-a", "key-1", entry) + assert await backend.get("principal-a", "key-1") is None + # Lazy eviction should have removed it. + assert await backend._size() == 0 + + @pytest.mark.asyncio + async def test_per_principal_scope(self) -> None: + # Same key across different principals must not collide. + backend = MemoryBackend() + await backend.put( + "principal-a", + "shared-key", + CachedResponse("h_a", {"who": "a"}, time.time() + 60), + ) + await backend.put( + "principal-b", + "shared-key", + CachedResponse("h_b", {"who": "b"}, time.time() + 60), + ) + a = await backend.get("principal-a", "shared-key") + b = await backend.get("principal-b", "shared-key") + assert a is not None and a.response == {"who": "a"} + assert b is not None and b.response == {"who": "b"} + + @pytest.mark.asyncio + async def test_delete_expired_sweeps(self) -> None: + backend = MemoryBackend() + now = time.time() + await backend.put( + "principal-a", "fresh", CachedResponse("h", {}, now + 60) + ) + await backend.put( + "principal-a", "stale", CachedResponse("h", {}, now - 1) + ) + removed = await backend.delete_expired(now) + assert removed == 1 + assert await backend._size() == 1 + + @pytest.mark.asyncio + async def test_concurrent_put_get(self) -> None: + # Under gather, mutations shouldn't interleave dangerously. + backend = MemoryBackend() + + async def writer(i: int) -> None: + await backend.put( + "principal", + f"key-{i}", + CachedResponse(f"h-{i}", {"i": i}, time.time() + 60), + ) + + await asyncio.gather(*[writer(i) for i in range(50)]) + hits = await asyncio.gather( + *[backend.get("principal", f"key-{i}") for i in range(50)] + ) + assert all(h is not None for h in hits) + assert all(h.response["i"] == i for i, h in enumerate(hits)) # type: ignore[union-attr] + + +class TestPgBackendScaffold: + def test_construction_raises(self) -> None: + # Scaffold shouldn't be usable yet; surface the follow-up issue clearly. + with pytest.raises(NotImplementedError, match="PgBackend"): + PgBackend() + + +class _FakeHandler: + """Minimal ADCPHandler-shaped object for tests.""" + + def __init__(self) -> None: + self.call_count = 0 + self.last_params: Any = None + + async def create_media_buy( + self, params: dict[str, Any], context: ToolContext | None = None + ) -> dict[str, Any]: + self.call_count += 1 + self.last_params = params + # Return a response that looks like CreateMediaBuyResponse minimally. + return { + "media_buy_id": f"mb_{self.call_count}", + "status": "completed", + } + + +class TestIdempotencyStoreWrap: + """End-to-end: decorator + backend + context scoping.""" + + def _make_store(self, ttl_seconds: int = 86400) -> IdempotencyStore: + return IdempotencyStore(backend=MemoryBackend(), ttl_seconds=ttl_seconds) + + @pytest.mark.asyncio + async def test_cache_miss_runs_handler_and_caches(self) -> None: + store = self._make_store() + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + params = { + "idempotency_key": str(uuid.uuid4()), + "brand": {"domain": "acme.example"}, + } + ctx = ToolContext(caller_identity="principal-a") + r1 = await wrapped(handler, params, ctx) + assert handler.call_count == 1 + assert r1["media_buy_id"] == "mb_1" + + @pytest.mark.asyncio + async def test_cache_hit_replays_without_handler_call(self) -> None: + store = self._make_store() + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + params = { + "idempotency_key": str(uuid.uuid4()), + "brand": {"domain": "acme.example"}, + } + ctx = ToolContext(caller_identity="principal-a") + r1 = await wrapped(handler, params, ctx) + r2 = await wrapped(handler, params, ctx) + assert handler.call_count == 1 # second call served from cache + assert r1 == r2 + + @pytest.mark.asyncio + async def test_cache_hit_different_payload_raises_conflict(self) -> None: + store = self._make_store() + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + key = str(uuid.uuid4()) + ctx = ToolContext(caller_identity="principal-a") + await wrapped(handler, {"idempotency_key": key, "brand": "A"}, ctx) + with pytest.raises(IdempotencyConflictError): + await wrapped(handler, {"idempotency_key": key, "brand": "B"}, ctx) + assert handler.call_count == 1 # conflict path does NOT run handler again + + @pytest.mark.asyncio + async def test_excluded_field_change_is_not_conflict(self) -> None: + # Changing context/governance_context between retries must not trigger + # CONFLICT per spec — those are excluded from the canonical hash. + store = self._make_store() + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + key = str(uuid.uuid4()) + ctx = ToolContext(caller_identity="principal-a") + r1 = await wrapped( + handler, {"idempotency_key": key, "brand": "A", "context": "ctx1"}, ctx + ) + r2 = await wrapped( + handler, {"idempotency_key": key, "brand": "A", "context": "ctx2"}, ctx + ) + assert r1 == r2 + assert handler.call_count == 1 + + @pytest.mark.asyncio + async def test_fresh_key_creates_new_resource(self) -> None: + store = self._make_store() + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + ctx = ToolContext(caller_identity="principal-a") + r1 = await wrapped(handler, {"idempotency_key": str(uuid.uuid4()), "b": 1}, ctx) + r2 = await wrapped(handler, {"idempotency_key": str(uuid.uuid4()), "b": 1}, ctx) + assert r1["media_buy_id"] != r2["media_buy_id"] + assert handler.call_count == 2 + + @pytest.mark.asyncio + async def test_per_principal_scope_enforced(self) -> None: + store = self._make_store() + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + key = str(uuid.uuid4()) + # Same key on two different principals must not share the slot. + r_a = await wrapped( + handler, + {"idempotency_key": key, "b": 1}, + ToolContext(caller_identity="principal-a"), + ) + r_b = await wrapped( + handler, + {"idempotency_key": key, "b": 1}, + ToolContext(caller_identity="principal-b"), + ) + assert r_a["media_buy_id"] != r_b["media_buy_id"] + assert handler.call_count == 2 + + @pytest.mark.asyncio + async def test_no_idempotency_key_falls_through(self) -> None: + # Middleware doesn't reject; server-side schema validation handles that. + store = self._make_store() + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + ctx = ToolContext(caller_identity="principal-a") + r1 = await wrapped(handler, {"brand": "acme"}, ctx) + r2 = await wrapped(handler, {"brand": "acme"}, ctx) + # Both ran — no dedup without a key. + assert handler.call_count == 2 + assert r1 != r2 + + @pytest.mark.asyncio + async def test_no_caller_identity_falls_through(self) -> None: + # Fail-closed: without a principal we can't safely scope the key, + # so skip dedup rather than collapse every buyer into one namespace. + store = self._make_store() + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + params = {"idempotency_key": str(uuid.uuid4()), "brand": "A"} + r1 = await wrapped(handler, params, None) + r2 = await wrapped(handler, params, None) + assert handler.call_count == 2 + assert r1 != r2 + + @pytest.mark.asyncio + async def test_context_as_dict(self) -> None: + # Convenience: accept a dict-shaped context. + store = self._make_store() + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + key = str(uuid.uuid4()) + r1 = await wrapped(handler, {"idempotency_key": key}, {"caller_identity": "principal-a"}) + r2 = await wrapped(handler, {"idempotency_key": key}, {"caller_identity": "principal-a"}) + assert r1 == r2 + assert handler.call_count == 1 + + @pytest.mark.asyncio + async def test_pydantic_params_accepted(self) -> None: + class Req(BaseModel): + idempotency_key: str + brand: str + + store = self._make_store() + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + req = Req(idempotency_key="x" * 20, brand="acme") + ctx = ToolContext(caller_identity="principal-a") + r1 = await wrapped(handler, req, ctx) + r2 = await wrapped(handler, req, ctx) + assert r1 == r2 + assert handler.call_count == 1 + + +class TestInstanceMethodDecorator: + """Exercise the canonical `@idempotency.wrap` shape on an instance method.""" + + @pytest.mark.asyncio + async def test_wrap_as_instance_method_decorator(self) -> None: + store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=86400) + + class SellerHandler: + def __init__(self) -> None: + self.calls = 0 + + @store.wrap + async def create_media_buy( + self, params: dict[str, Any], context: ToolContext | None = None + ) -> dict[str, Any]: + self.calls += 1 + return {"media_buy_id": f"mb_{self.calls}"} + + seller = SellerHandler() + key = str(uuid.uuid4()) + ctx = ToolContext(caller_identity="principal-a") + r1 = await seller.create_media_buy({"idempotency_key": key, "b": 1}, ctx) + r2 = await seller.create_media_buy({"idempotency_key": key, "b": 1}, ctx) + assert r1 == r2 + assert seller.calls == 1 + + +class TestCachedResponseImmutability: + @pytest.mark.asyncio + async def test_mutating_replay_does_not_poison_cache(self) -> None: + store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=86400) + + class H: + @store.wrap + async def create_media_buy(self, params: Any, context: Any = None) -> Any: + # Return a response with a nested mutable object the caller + # could mutate on replay. + return { + "media_buy_id": "mb_1", + "packages": [{"id": "pkg_a", "status": "pending"}], + } + + h = H() + key = str(uuid.uuid4()) + ctx = ToolContext(caller_identity="principal-a") + r1 = await h.create_media_buy({"idempotency_key": key}, ctx) + # Caller mutates the returned response. + r1["packages"][0]["status"] = "tampered" + r1["packages"].append({"id": "pkg_injected", "status": "evil"}) + # Second replay must NOT see the mutations. + r2 = await h.create_media_buy({"idempotency_key": key}, ctx) + assert r2["packages"] == [{"id": "pkg_a", "status": "pending"}] + + +class TestBackendPutFailure: + @pytest.mark.asyncio + async def test_put_failure_logs_warning_and_returns_handler_result( + self, caplog: Any + ) -> None: + import logging as _logging + + class BrokenBackend(MemoryBackend): + async def put(self, *args: Any, **kwargs: Any) -> None: + raise RuntimeError("simulated backend outage") + + store = IdempotencyStore(backend=BrokenBackend(), ttl_seconds=86400) + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + ctx = ToolContext(caller_identity="principal-a") + with caplog.at_level(_logging.WARNING, logger="adcp.server.idempotency.store"): + result = await wrapped( + handler, {"idempotency_key": str(uuid.uuid4()), "b": 1}, ctx + ) + assert result["media_buy_id"] == "mb_1" # handler ran, result returned + assert any("cache put failed" in rec.message for rec in caplog.records) + + +class TestWireTranslation: + """IdempotencyConflictError raised from a wrapped handler must surface on + the wire as IDEMPOTENCY_CONFLICT — not a generic 500 — on both MCP and A2A. + """ + + @pytest.mark.asyncio + async def test_mcp_conflict_translates_to_tool_error(self) -> None: + # MCP path: serve.py's _register_tool wraps caller in try/except ADCPError + # → translate_error → ToolError. Verify the translation chain directly. + from mcp.server.fastmcp.exceptions import ToolError + + from adcp.exceptions import ADCPError + from adcp.server.translate import translate_error + + store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=86400) + + class H(_FakeHandler): + @store.wrap + async def create_media_buy( # type: ignore[override] + self, params: dict[str, Any], context: Any = None + ) -> dict[str, Any]: + return await super().create_media_buy(params, context) + + h = H() + key = str(uuid.uuid4()) + ctx = ToolContext(caller_identity="principal-a") + await h.create_media_buy({"idempotency_key": key, "brand": "A"}, ctx) + + # Mirror the serve._register_tool wrapping shape that runs in + # production. The conflict surfaces as an ADCPError which the wrapper + # translates to a ToolError. + async def serve_fn(params: dict[str, Any]) -> dict[str, Any]: + try: + return await h.create_media_buy(params, ctx) + except ADCPError as exc: + raise translate_error(exc, protocol="mcp") from exc + + with pytest.raises(ToolError) as exc_info: + await serve_fn({"idempotency_key": key, "brand": "B"}) + assert "IDEMPOTENCY_CONFLICT" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_a2a_conflict_emits_failed_task_with_adcp_error(self) -> None: + # A2A path: ADCPAgentExecutor._send_adcp_error emits a TaskState.failed + # with a DataPart carrying {"adcp_error": {"code":..., "recovery":...}} + # per transport-errors.mdx §A2A Binding. + from a2a.types import DataPart, TaskState + + from adcp.exceptions import IdempotencyConflictError + from adcp.server.a2a_server import ADCPAgentExecutor + from adcp.server.base import ADCPHandler + + # Use a bare ADCPHandler subclass — executor setup walks tool defs. + class NoopSeller(ADCPHandler): + pass + + executor = ADCPAgentExecutor(NoopSeller()) + captured: list[Any] = [] + + class FakeQueue: + async def enqueue_event(self, event: Any) -> None: + captured.append(event) + + err = IdempotencyConflictError( + "create_media_buy", + [{"code": "IDEMPOTENCY_CONFLICT", "message": "drift"}], + ) + await executor._send_adcp_error(FakeQueue(), _make_context_shim(), err) + assert captured, "executor produced no event" + task = captured[0] + assert task.status.state == TaskState.failed + assert task.artifacts, "failed task missing artifacts" + data_parts = [ + p.root for p in task.artifacts[0].parts if isinstance(p.root, DataPart) + ] + assert data_parts, "failed task missing DataPart" + adcp_error = data_parts[0].data.get("adcp_error") + assert adcp_error is not None + assert adcp_error["code"] == "IDEMPOTENCY_CONFLICT" + assert adcp_error["recovery"] == "terminal" + + +def _make_context_shim() -> Any: + """Minimal RequestContext stub with only the attributes _make_task reads.""" + from types import SimpleNamespace + + return SimpleNamespace(task_id=None, context_id=None, message=None) + + +class TestCapability: + def test_capability_fragment(self) -> None: + store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=86400) + assert store.capability() == {"replay_ttl_seconds": 86400} + + def test_capabilities_response_accepts_idempotency(self) -> None: + from adcp.server.responses import capabilities_response + + store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=86400) + resp = capabilities_response(["media_buy"], idempotency=store.capability()) + assert resp["adcp"]["idempotency"] == {"replay_ttl_seconds": 86400} + + def test_capabilities_response_idempotency_omitted_when_none(self) -> None: + from adcp.server.responses import capabilities_response + + resp = capabilities_response(["media_buy"]) + assert "idempotency" not in resp["adcp"] + + def test_server_reexports(self) -> None: + from adcp.server import IdempotencyStore as Store + from adcp.server import MemoryBackend as Backend + + assert Store is IdempotencyStore + assert Backend is MemoryBackend + + def test_ttl_bounds_enforced_low(self) -> None: + with pytest.raises(ValueError, match="3600"): + IdempotencyStore(backend=MemoryBackend(), ttl_seconds=1800) + + def test_ttl_bounds_enforced_high(self) -> None: + with pytest.raises(ValueError, match="604800"): + IdempotencyStore(backend=MemoryBackend(), ttl_seconds=1_000_000) + + def test_ttl_minimum_accepted(self) -> None: + store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=3600) + assert store.capability() == {"replay_ttl_seconds": 3600} + + def test_ttl_maximum_accepted(self) -> None: + store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=604800) + assert store.capability() == {"replay_ttl_seconds": 604800} + + +class TestTTLExpiry: + @pytest.mark.asyncio + async def test_cached_response_expires_after_ttl(self) -> None: + # Inject a fake clock so the test doesn't have to monkeypatch time. + current = [1_000_000.0] + + def fake_clock() -> float: + return current[0] + + backend = MemoryBackend(clock=fake_clock) + store = IdempotencyStore(backend=backend, ttl_seconds=3600, clock=fake_clock) + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + key = str(uuid.uuid4()) + ctx = ToolContext(caller_identity="principal-a") + await wrapped(handler, {"idempotency_key": key, "b": 1}, ctx) + # Advance past the TTL. + current[0] += 7200 + await wrapped(handler, {"idempotency_key": key, "b": 1}, ctx) + assert handler.call_count == 2