Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,49 @@ client = ADCPClient(agent, strict_idempotency=True) # default: False

**Security note on logs.** The SDK redacts `idempotency_key` in its own debug captures, but the underlying `httpx`/`httpcore` loggers log full request bodies at `DEBUG`. If you enable `logging.basicConfig(level=logging.DEBUG)` in production, raise those two loggers back to `INFO` — otherwise full keys end up in logs during the seller's replay TTL window and become a retry-pattern oracle for anyone who can read them.

### Building a seller: idempotency middleware

If you're building an AdCP seller, the companion middleware handles the `(principal, key, canonical-hash)` bookkeeping so you don't hand-roll it per tool handler. Drop `@idempotency.wrap` above each mutating handler and declare your replay window in capabilities:

```python
from adcp.server import ADCPHandler, IdempotencyStore, MemoryBackend, serve
from adcp.server.responses import capabilities_response

idempotency = IdempotencyStore(
backend=MemoryBackend(), # PgBackend with transactional commit is a follow-up
ttl_seconds=86400, # 24h, spec-recommended floor
)

class MySeller(ADCPHandler):
async def get_adcp_capabilities(self, params, context=None):
return capabilities_response(
["media_buy"],
idempotency=idempotency.capability(),
)

@idempotency.wrap
async def create_media_buy(self, params, context=None):
# Same key + canonical-equivalent payload → this body is skipped,
# the cached response is returned. Same key + different payload →
# IdempotencyConflictError raised before this runs, which the
# framework translates to IDEMPOTENCY_CONFLICT on the wire.
return my_business_logic(params)

serve(MySeller(), name="my-seller")
```

**What the middleware does for you:**

- Extracts `idempotency_key` from `params`, scopes lookups by `context.caller_identity` (per-principal — a security requirement from AdCP §2315)
- Hashes the payload with RFC 8785 JCS + SHA-256, stripping the spec's closed exclusion list (`idempotency_key`, `context`, `governance_context`, `push_notification_config.authentication.credentials`)
- On cache hit with matching hash: returns the cached response verbatim, skips your handler (deep-copied so caller mutation can't poison future replays)
- On cache hit with different hash: raises `IdempotencyConflictError`, which the framework surfaces as `IDEMPOTENCY_CONFLICT` on both MCP (`is_error=true` + text) and A2A (failed task with `adcp_error` DataPart)
- On cache miss: runs your handler, then commits the response

**Backends:** `MemoryBackend` ships now (tests, single-process agents). `PgBackend` is scaffolded — it raises `NotImplementedError` with a pointer to the follow-up issue. For production use across multiple workers, implement your own `IdempotencyBackend` subclass against Redis, Postgres, etc.

**Atomicity caveat:** `MemoryBackend` commits the cache entry AFTER your handler returns, so a crash between `handler success` and `cache commit` causes the retry to re-execute. `PgBackend` (follow-up) will commit the cache row in the same transaction as your business writes. Read the module docstring at `adcp.server.idempotency` before shipping this against a production database.

## Available Tools

All AdCP tools with full type safety:
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ dependencies = [
"mcp>=1.23.2",
"email-validator>=2.0.0",
"cryptography>=41.0.0",
# RFC 8785 JSON Canonicalization Scheme — used by the server-side
# idempotency middleware to compute the spec-mandated payload hash for
# replay detection. Tiny pure-Python dep, no transitive weight.
"rfc8785>=0.1.4",
]

[project.scripts]
Expand Down
4 changes: 4 additions & 0 deletions src/adcp/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ async def get_products(params, context=None):
resolve_account,
valid_actions_for_status,
)
from adcp.server.idempotency import IdempotencyStore, MemoryBackend
from adcp.server.mcp_tools import MCPToolSet, create_mcp_tools, get_tools_for_handler
from adcp.server.proposal import ProposalBuilder, ProposalNotSupported
from adcp.server.responses import (
Expand Down Expand Up @@ -138,6 +139,9 @@ async def get_products(params, context=None):
# A2A integration
"ADCPAgentExecutor",
"create_a2a_server",
# Idempotency middleware (AdCP #2315 seller side)
"IdempotencyStore",
"MemoryBackend",
# Test controller
"TestControllerStore",
"TestControllerError",
Expand Down
50 changes: 50 additions & 0 deletions src/adcp/server/a2a_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
TextPart,
)

from adcp.exceptions import ADCPError, ADCPTaskError
from adcp.server.base import ADCPHandler
from adcp.server.helpers import STANDARD_ERROR_CODES
from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler
from adcp.server.test_controller import TestControllerStore, _handle_test_controller

Expand Down Expand Up @@ -104,6 +106,16 @@ async def execute(
try:
result = await self._tool_callers[skill_name](params)
await self._send_result(event_queue, context, skill_name, result)
except ADCPError as exc:
# Application-layer AdCP error (IdempotencyConflictError etc.).
# Emit a failed task with the adcp_error in a DataPart per
# transport-errors.mdx §A2A Binding, plus a human-readable text
# part. The JSON-RPC channel is reserved for transport-level
# errors (auth rejected, rate-limited pre-dispatch).
logger.info(
"AdCP application error for skill %s: %s", skill_name, exc
)
await self._send_adcp_error(event_queue, context, exc)
except Exception:
logger.exception("Error executing skill %s", skill_name)
await self._send_error(
Expand Down Expand Up @@ -212,6 +224,44 @@ async def _send_error(
)
await event_queue.enqueue_event(task)

async def _send_adcp_error(
self,
event_queue: EventQueue,
context: RequestContext,
exc: ADCPError,
) -> None:
"""Publish a failed task carrying an AdCP ``adcp_error`` payload.

Follows transport-errors.mdx §A2A Binding: failed task with artifact
containing a ``DataPart`` keyed under ``adcp_error`` plus a terse
``TextPart`` for human/LLM consumption.
"""
# Derive the spec error code. ADCPTaskError carries a list of codes
# (e.g. IdempotencyConflictError → IDEMPOTENCY_CONFLICT); fall back
# to a generic INTERNAL_ERROR when the exception doesn't supply one.
code = "INTERNAL_ERROR"
if isinstance(exc, ADCPTaskError) and exc.error_codes:
code = str(exc.error_codes[0])

adcp_error: dict[str, Any] = {
"code": code,
"message": exc.message,
}
recovery = STANDARD_ERROR_CODES.get(code, {}).get("recovery")
if recovery:
adcp_error["recovery"] = recovery
suggestion = getattr(exc, "suggestion", None)
if suggestion:
adcp_error["suggestion"] = suggestion

task = _make_task(
context,
state=TaskState.failed,
data={"adcp_error": adcp_error},
message=exc.message,
)
await event_queue.enqueue_event(task)


# ------------------------------------------------------------------
# Task factory
Expand Down
11 changes: 11 additions & 0 deletions src/adcp/server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
"ACCOUNT_NOT_FOUND": {"recovery": "terminal", "message": "Account not found"},
"ACCOUNT_SUSPENDED": {"recovery": "terminal", "message": "Account suspended"},
"UNSUPPORTED_FEATURE": {"recovery": "terminal", "message": "Feature not supported"},
# Idempotency (AdCP #2315). Both are "terminal" from a retry-behavior
# standpoint — the caller MUST take a specific action (mint a fresh key or
# reconcile state) rather than blindly retry.
"IDEMPOTENCY_CONFLICT": {
"recovery": "terminal",
"message": "idempotency_key reused with a different payload",
},
"IDEMPOTENCY_EXPIRED": {
"recovery": "terminal",
"message": "Idempotency replay window has expired",
},
# --- SDK extensions (not in spec enum) ---
"NOT_SUPPORTED": {"recovery": "terminal", "message": "Operation not supported"},
}
Expand Down
77 changes: 77 additions & 0 deletions src/adcp/server/idempotency/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Server-side idempotency middleware for AdCP mutating tool handlers.

Implements the seller side of AdCP #2315: extract ``idempotency_key``, look up
cached responses scoped by ``(authenticated_principal, idempotency_key)``, and
replay the cached response verbatim when a subsequent request carries the same
key + canonicalized-equivalent payload. Reject key reuse with a different
payload as ``IDEMPOTENCY_CONFLICT``.

The spec contract lives at
``adcontextprotocol/adcp/docs/building/implementation/security.mdx#idempotency``.

Typical usage::

from adcp.server import ADCPHandler, IdempotencyStore, MemoryBackend, ToolContext
from adcp.server.responses import capabilities_response

idempotency = IdempotencyStore(
backend=MemoryBackend(),
ttl_seconds=86400, # 24 hours, matches spec minimum
)

class MySeller(ADCPHandler):
@idempotency.wrap
async def create_media_buy(self, params, context=None):
# ``params`` carries ``idempotency_key``; ``context.caller_identity``
# identifies the principal. Without either, the middleware falls
# through to this handler with no dedup (schema validation above
# us rejects missing keys per AdCP #2315).
return my_create_logic(params)

async def get_adcp_capabilities(self, params, context=None):
return capabilities_response(
["media_buy"],
idempotency=idempotency.capability(),
)

Callers who invoke the handler directly (tests, non-HTTP code paths) must
pass a :class:`adcp.server.base.ToolContext` with ``caller_identity`` set —
it's how the middleware scopes the cache namespace per-principal::

ctx = ToolContext(caller_identity="buyer-acme")
result = await seller.create_media_buy(
{"idempotency_key": key, ...}, ctx
)

Backends:

- :class:`MemoryBackend` — in-process dict with TTL; use for tests and
single-process reference implementations.
- :class:`PgBackend` — scaffold for a SQLAlchemy/asyncpg-backed store that can
commit cache writes atomically with business writes. Implementation arrives
in a follow-up PR.
"""

from adcp.server.idempotency.backends import (
CachedResponse,
IdempotencyBackend,
MemoryBackend,
PgBackend,
)
from adcp.server.idempotency.canonicalize import (
EXCLUDED_FIELDS,
canonical_json_sha256,
strip_excluded_fields,
)
from adcp.server.idempotency.store import IdempotencyStore

__all__ = [
"CachedResponse",
"EXCLUDED_FIELDS",
"IdempotencyBackend",
"IdempotencyStore",
"MemoryBackend",
"PgBackend",
"canonical_json_sha256",
"strip_excluded_fields",
]
Loading
Loading