diff --git a/docs/handler-authoring.md b/docs/handler-authoring.md index 83184bcda..9e4399d0b 100644 --- a/docs/handler-authoring.md +++ b/docs/handler-authoring.md @@ -249,16 +249,93 @@ reset; your persistent store can't:** sweep deleting tasks in `completed` / `canceled` / `failed` states older than your retention policy. +### Durable push-notification config storage + +Clients subscribe to task progress by calling +`tasks/pushNotificationConfig/set`. a2a-sdk's default behavior is +**push-notif disabled** — the endpoint surfaces +`UnsupportedOperationError` until you wire a store. Sellers that accept +push-notif subscriptions pass one: + +```python +from adcp.server import serve +from examples.a2a_db_tasks import ( + SqliteTaskStore, + SqlitePushNotificationConfigStore, +) + +serve( + MyAgent(), + transport="a2a", + task_store=SqliteTaskStore("/var/lib/myagent/tasks.db"), + push_config_store=SqlitePushNotificationConfigStore( + "/var/lib/myagent/push_configs.db" + ), +) +``` + +**Three things a durable push-notification config store MUST do — +beyond the four from the TaskStore section above:** + +1. **Validate the client-supplied `url` against an allowlist before + persisting.** a2a-sdk's push-notif sender POSTs full task JSON to + whatever URL is stored, with no built-in validation. An attacker + registering `url=http://169.254.169.254/…` (cloud metadata) or + `http://localhost:5432/` (internal services) gets SSRF + + exfiltration in one call — the task JSON that lands on the + attacker's server includes `history` and `artifacts`. The + reference impl does NOT validate URLs; the seller's store (or + a pre-persist hook) must. Reject non-https, reject RFC 1918 / + IPv6 link-local, and require the host match an egress allowlist + before `set_info` writes anything. +2. **Treat `PushNotificationConfig.authentication.credentials` and + `PushNotificationConfig.token` as secrets at rest.** Clients pass + bearer tokens / shared secrets so the agent's callbacks can + authenticate. The reference impl serialises them to plaintext JSON + under `chmod 0o600` — safe on a single-user host but that + guarantee doesn't survive backups, Docker bind mounts with wrong + umask, DB-to-Postgres migrations, or shared-volume mounts. + Production stores should envelope-encrypt those fields, or persist + opaque references and keep the secrets in a dedicated backend + (Vault, AWS KMS, GCP Secret Manager). +3. **Scope by principal, not just by tenant.** a2a-sdk's ABC doesn't + pass a `ServerCallContext` to push-config methods, so scoping has + to happen out-of-band. The reference `SqlitePushNotificationConfigStore` + reads a `ContextVar` your auth middleware populates and writes a + `scope` column on every row. Cross-scope isolation works; **within + a scope, multiple principals can still overwrite each other's + configs** (same `(scope, task_id)`, client omits `config_id`, PK + collision). For multi-principal-per-tenant deployments, widen the + scope to include the principal (e.g. `f"{tenant}:{principal}"`) or + require clients to supply an explicit `config_id`. + +**Scoping caveat.** The reference impl's ContextVar approach has a +known gap: a2a-sdk's push-notif sender runs in a background +`asyncio.Task` that inherits the ContextVar snapshot from +task-creation time. If the seller's auth middleware has already reset +the ContextVar before the sender reads it, `get_info` returns empty +and notifications silently drop. Sellers running non-blocking +push-notifs must propagate scope into the sender path explicitly — +either capture the scope at `set_info` time and stash it alongside +the config, or override a2a-sdk's `BasePushNotificationSender` to +re-set the ContextVar before calling `get_info`. Not yet addressed in +the SDK. + +**Operator-facing failure modes.** When `scope_provider` returns +`None`, the reference store falls through to an `__anonymous__` +bucket and emits a one-time `UserWarning`. Silent fall-through would +share one push-notif bucket across every unauthenticated caller. The +warning is the signal your auth middleware isn't populating the +ContextVar — treat it as a P0. + ### Known gaps -- Push-notification config is in-memory only — tracked at - [#225](https://github.com/adcontextprotocol/adcp-client-python/issues/225). - Per-skill middleware hooks for audit logging / activity feeds don't exist yet — tracked at [#226](https://github.com/adcontextprotocol/adcp-client-python/issues/226). -Once #225 and #226 land, A2A adoption reaches parity with MCP for -production agents. +Once #226 lands, A2A adoption reaches parity with MCP for production +agents. ## Testing diff --git a/examples/a2a_db_tasks.py b/examples/a2a_db_tasks.py index 6265b4ce7..54d94484c 100644 --- a/examples/a2a_db_tasks.py +++ b/examples/a2a_db_tasks.py @@ -1,8 +1,10 @@ -"""Example: A2A agent with a durable, scope-isolated SQLite-backed ``TaskStore``. +"""Example: A2A agent with durable, scope-isolated SQLite-backed +``TaskStore`` + ``PushNotificationConfigStore``. -A2A's default ``InMemoryTaskStore`` is single-process and non-durable — -fine for demos but tasks vanish on restart. Production agents need a -durable store so long-running operations survive process restarts and +A2A's defaults (``InMemoryTaskStore`` + ``InMemoryPushNotificationConfigStore``) +are single-process and non-durable — fine for demos but tasks and +push-notif subscriptions vanish on restart. Production agents need +durable stores so long-running operations survive process restarts and can be resumed by whichever worker picks up the request. This example wires up a minimal SQLite-backed store that implements @@ -22,6 +24,29 @@ organization IDs, etc.) should override ``_scope_from_context`` to return *their* scope key — the lookup filter then follows automatically. +**Security model — push-notification config store adds two threats +tenant-scoping alone does NOT address:** + +1. **SSRF via unvalidated webhook URLs.** Clients supply + ``PushNotificationConfig.url`` when subscribing to task progress; + a2a-sdk's push-notif sender POSTs the full task JSON to that URL + with no built-in validation. An attacker can register + ``url=http://169.254.169.254/…`` (cloud metadata), + ``http://localhost:5432/`` (internal services), link-local IPs, + etc. The store persists URLs verbatim — URL validation is the + seller's responsibility. Reject non-https, reject RFC 1918 / IPv6 + link-local, check against an egress allowlist before persisting. +2. **Webhook secrets stored plaintext.** + ``PushNotificationConfig.authentication.credentials`` and + ``PushNotificationConfig.token`` are bearer tokens / shared + secrets clients pass for authenticated callbacks. The reference + impl serialises them to JSON under chmod 0o600 — safe on a + single-user host, but backups, Docker bind mounts with wrong + umask, DB migrations, and shared-volume mounts all lose that + guarantee. Production stores should envelope-encrypt or redact + these fields, or move them to a secrets backend and persist only + opaque references. + **Not production-ready.** Remaining gaps for real deployments: - Postgres/MySQL + async driver (asyncpg / aiomysql). @@ -57,13 +82,20 @@ import json import os import sqlite3 +import uuid +import warnings +from collections.abc import Callable from contextlib import asynccontextmanager +from contextvars import ContextVar from pathlib import Path from typing import Any from a2a.server.context import ServerCallContext +from a2a.server.tasks.push_notification_config_store import ( + PushNotificationConfigStore, +) from a2a.server.tasks.task_store import TaskStore -from a2a.types import Task +from a2a.types import PushNotificationConfig, Task from adcp.server import ADCPHandler, serve from adcp.server.responses import capabilities_response, products_response @@ -194,6 +226,244 @@ async def delete(self, task_id: str, context: ServerCallContext | None = None) - ) +# ---------------------------------------------------------------------- +# SQLite-backed PushNotificationConfigStore +# ---------------------------------------------------------------------- + +# Three things a durable push-notification config store MUST do in +# production — the reference impl below only enforces isolation. +# Sellers adapting this to their stack need to layer on the other two: +# +# 1. Validate the client-supplied ``url`` against an allowlist before +# persisting. a2a-sdk's push-notif sender POSTs full task JSON to +# whatever URL is stored — no built-in allowlist. An unvalidated +# store is a cloud-metadata SSRF and a task-content exfiltration +# primitive. Reject non-https, reject private/link-local IPs, reject +# hosts outside your egress allowlist. ``ssrf_guard()``-style +# helper below is a starting point. +# 2. Treat ``PushNotificationConfig.authentication.credentials`` and +# ``PushNotificationConfig.token`` as secrets at rest. The reference +# impl serialises them to plaintext JSON under chmod 0600 — safe on +# a single-user host but loses that guarantee across backups, +# Docker bind mounts with wrong umask, and DB migrations. Either +# encrypt those fields or move them to a secrets backend. +# 3. Isolate by principal, not just by scope. Within a single auth +# scope (e.g. "tenant-acme") multiple principals may share access +# to the same task. The reference impl keys on ``(scope, task_id, +# config_id)`` and falls ``config_id`` back to ``task_id`` when +# the client omits it — two principals registering without a +# ``config_id`` overwrite each other silently. Either require an +# explicit ``config_id`` from the client, or widen the scope key to +# include the principal. +_current_push_config_scope: ContextVar[str | None] = ContextVar( + "adcp_push_config_scope", default=None +) +"""Default ContextVar used by ``SqlitePushNotificationConfigStore`` when +no ``scope_provider`` is supplied. HTTP auth middleware sets it per +request; the store reads it on every op. Exposed at module level so +a seller with their own auth middleware can pair it with this +reference impl without subclassing.""" + + +def _default_push_config_scope_provider() -> str | None: + """Read the current push-config scope from the module-level + ``ContextVar``. Returned ``None`` = "unauthenticated request" per + ``_ANONYMOUS_SCOPE`` below.""" + return _current_push_config_scope.get() + + +class SqlitePushNotificationConfigStore(PushNotificationConfigStore): + """Durable A2A ``PushNotificationConfigStore`` backed by a single + SQLite file, scoped by an authenticated principal resolved at + set/get/delete time via a ``scope_provider`` callable. + + a2a-sdk's ``PushNotificationConfigStore`` ABC does **not** pass a + ``ServerCallContext`` to ``set_info`` / ``get_info`` / + ``delete_info`` (unlike the ``TaskStore`` ABC), so scoping has to + happen out-of-band. The canonical pattern is a ``ContextVar`` the + seller's HTTP auth middleware populates per request — the + ``_default_push_config_scope_provider()`` factory below reads the + module-level ``_current_push_config_scope``. Sellers who already + maintain their own ContextVar (or prefer thread-locals, Starlette + ``request.state``, etc.) inject a custom provider. + + Example — wiring the default ContextVar from auth middleware:: + + from contextvars import ContextVar + + from examples.a2a_db_tasks import ( + SqlitePushNotificationConfigStore, + _current_push_config_scope, + ) + + class AuthMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request, call_next): + scope = _resolve_tenant_scope(request) # your auth logic + token = _current_push_config_scope.set(scope) + try: + return await call_next(request) + finally: + _current_push_config_scope.reset(token) + + serve( + agent, + transport="a2a", + push_config_store=SqlitePushNotificationConfigStore( + db_path="a2a_push_configs.db", + ), + ) + + Example — injecting a custom provider (different ContextVar, or + pulling from your own auth layer):: + + my_scope: ContextVar[str | None] = ContextVar("my_scope") + store = SqlitePushNotificationConfigStore( + db_path="a2a_push_configs.db", + scope_provider=lambda: my_scope.get(default=None), + ) + + **Fails closed on anonymous requests.** If the provider returns + ``None``, a ``UserWarning`` is emitted once per store instance and + the store falls through to ``__anonymous__`` — unauthenticated + requests end up sharing one giant scope. Operators should reject + unauthenticated push-notif-config requests at the auth layer + before the store is touched; the warning is the signal they + forgot to. + + **Background-task caveat — sender path.** a2a-sdk's push-notif + sender calls ``get_info()`` from a background ``asyncio.Task`` + spawned by ``DefaultRequestHandler``. That task inherits the + ContextVar snapshot captured at task-creation time; if the + seller's auth middleware has already reset the ContextVar before + the background task reads it, ``get_info()`` will return an empty + list and notifications silently drop. Sellers running non-blocking + push-notifs MUST propagate scope into the sender path explicitly — + either by capturing the scope into a closure at ``set_info()`` time + and stashing it alongside the config, or by overriding + a2a-sdk's ``BasePushNotificationSender`` to re-set the ContextVar + before calling ``get_info``. Not yet addressed by the SDK; + tracked separately. + """ + + def __init__( + self, + db_path: str | Path = "a2a_push_configs.db", + *, + scope_provider: Callable[[], str | None] | None = None, + ) -> None: + self._db_path = str(db_path) + self._scope_provider = scope_provider or _default_push_config_scope_provider + self._init_schema() + self._warned_anonymous = False + + def _init_schema(self) -> None: + path = Path(self._db_path) + first_create = not path.exists() + with sqlite3.connect(self._db_path) as conn: + # One task can have multiple push-notif configs; the config + # id is the secondary key. scope isolates across tenants. + conn.execute( + """ + CREATE TABLE IF NOT EXISTS a2a_push_configs ( + scope TEXT NOT NULL, + task_id TEXT NOT NULL, + config_id TEXT NOT NULL, + config_json TEXT NOT NULL, + updated_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), + PRIMARY KEY (scope, task_id, config_id) + ) + """ + ) + if first_create: + with contextlib.suppress(OSError): + os.chmod(self._db_path, 0o600) + + def _scope(self) -> str: + scope = self._scope_provider() + if not scope: + if not self._warned_anonymous: + self._warned_anonymous = True + warnings.warn( + "SqlitePushNotificationConfigStore: scope_provider " + "returned None — operating in the __anonymous__ scope. " + "All unauthenticated requests will share a single " + "push-notif bucket, which is unsafe in multi-tenant " + "deployments. Wire your auth middleware to populate " + "the ContextVar (or reject unauthenticated push-notif-" + "config requests at the auth layer) before production.", + UserWarning, + stacklevel=3, + ) + return _ANONYMOUS_SCOPE + return scope + + @asynccontextmanager + async def _conn(self): + conn = sqlite3.connect(self._db_path) + try: + yield conn + except Exception: + conn.rollback() + raise + else: + conn.commit() + finally: + conn.close() + + async def set_info(self, task_id: str, notification_config: PushNotificationConfig) -> None: + scope = self._scope() + # PushNotificationConfig.id is optional on the wire; when the + # client didn't supply one we synthesise a UUID so two clients + # registering on the same task without explicit ids don't + # silently overwrite each other. Production stores should + # consider requiring an explicit config_id and rejecting + # None outright — the client loses the ability to delete the + # config they just created unless they round-trip the + # server-assigned id. + config_id = notification_config.id or f"auto-{uuid.uuid4()}" + config_json = notification_config.model_dump_json(exclude_none=True) + async with self._conn() as conn: + conn.execute( + "INSERT OR REPLACE INTO a2a_push_configs " + "(scope, task_id, config_id, config_json, updated_at) " + "VALUES (?, ?, ?, ?, strftime('%s','now'))", + (scope, task_id, config_id, config_json), + ) + + async def get_info(self, task_id: str) -> list[PushNotificationConfig]: + scope = self._scope() + async with self._conn() as conn: + rows = conn.execute( + "SELECT config_json FROM a2a_push_configs " "WHERE scope = ? AND task_id = ?", + (scope, task_id), + ).fetchall() + return [PushNotificationConfig.model_validate(json.loads(r[0])) for r in rows] + + async def delete_info(self, task_id: str, config_id: str | None = None) -> None: + scope = self._scope() + async with self._conn() as conn: + if config_id is None: + # a2a-sdk's ABC semantic: ``delete_info(task_id, None)`` + # removes every config for the task. Within a scope + # with multiple principals, this lets any principal + # wipe every other principal's subscriptions — a + # tenant-local DoS. Production stores that admit + # multi-principal-per-scope should reject + # ``config_id=None`` or authorise the caller against + # each config row. The reference impl honours the ABC + # semantic as-is. + conn.execute( + "DELETE FROM a2a_push_configs WHERE scope = ? AND task_id = ?", + (scope, task_id), + ) + else: + conn.execute( + "DELETE FROM a2a_push_configs " + "WHERE scope = ? AND task_id = ? AND config_id = ?", + (scope, task_id, config_id), + ) + + # ---------------------------------------------------------------------- # Minimal handler so the example runs end-to-end # ---------------------------------------------------------------------- @@ -213,12 +483,14 @@ async def get_products(self, params: Any, context: Any = None) -> dict[str, Any] def main() -> None: - store = SqliteTaskStore(db_path="a2a_tasks.db") + task_store = SqliteTaskStore(db_path="a2a_tasks.db") + push_store = SqlitePushNotificationConfigStore(db_path="a2a_push_configs.db") serve( DemoAgent(), name="a2a-db-tasks-demo", transport="a2a", - task_store=store, + task_store=task_store, + push_config_store=push_store, ) diff --git a/src/adcp/server/a2a_server.py b/src/adcp/server/a2a_server.py index 3cca85895..2e58d6a25 100644 --- a/src/adcp/server/a2a_server.py +++ b/src/adcp/server/a2a_server.py @@ -39,6 +39,9 @@ from adcp.server.base import ADCPHandler, ToolContext if TYPE_CHECKING: + from a2a.server.tasks.push_notification_config_store import ( + PushNotificationConfigStore, + ) from a2a.server.tasks.task_store import TaskStore from adcp.server.serve import ContextFactory @@ -441,6 +444,7 @@ def create_a2a_server( test_controller: TestControllerStore | None = None, context_factory: ContextFactory | None = None, task_store: TaskStore | None = None, + push_config_store: PushNotificationConfigStore | None = None, ) -> Any: """Create an A2A Starlette application from an ADCP handler. @@ -470,6 +474,24 @@ def create_a2a_server( a reference SQLite-backed implementation and ``docs/handler-authoring.md`` for the persistence caveats on the default store. + push_config_store: Optional a2a-sdk + :class:`~a2a.server.tasks.push_notification_config_store.PushNotificationConfigStore` + instance for persisting push-notification configs that clients + register via ``tasks/pushNotificationConfig/set``. **When + unset, a2a-sdk surfaces push-notif endpoints as + ``UnsupportedOperationError``** — clients cannot register + subscriptions at all. Set this only when your agent is ready + to accept push-notif subscriptions. See + ``examples/a2a_db_tasks.py`` for a reference SQLite-backed + implementation that pairs with the ``SqliteTaskStore`` there. + + Security note: unlike ``TaskStore``, a2a-sdk's + ``PushNotificationConfigStore`` ABC does not pass a + ``ServerCallContext`` to ``set_info`` / ``get_info`` / + ``delete_info``. Scoping by principal has to happen out-of-band + (via a ``ContextVar`` your auth middleware populates) or by + composition with a tenant-scoped ``TaskStore`` — the reference + impl shows the ContextVar pattern. Returns: A Starlette app ready to be run with uvicorn. @@ -494,9 +516,14 @@ def create_a2a_server( if task_store is None: task_store = InMemoryTaskStore() + # DefaultRequestHandler stores push_config_store verbatim and treats + # None as "push-notif endpoints unsupported" (UnsupportedOperationError + # on tasks/pushNotificationConfig/*). Passing None is the correct + # default; sellers opt in by wiring a store. request_handler = DefaultRequestHandler( agent_executor=executor, task_store=task_store, + push_config_store=push_config_store, ) a2a_app = A2AStarletteApplication( diff --git a/src/adcp/server/serve.py b/src/adcp/server/serve.py index 49f8e8f74..c49a546dc 100644 --- a/src/adcp/server/serve.py +++ b/src/adcp/server/serve.py @@ -27,6 +27,9 @@ async def get_adcp_capabilities(self, params, context=None): from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler if TYPE_CHECKING: + from a2a.server.tasks.push_notification_config_store import ( + PushNotificationConfigStore, + ) from a2a.server.tasks.task_store import TaskStore from adcp.server.test_controller import TestControllerStore @@ -108,6 +111,7 @@ def serve( test_controller: TestControllerStore | None = None, context_factory: ContextFactory | None = None, task_store: TaskStore | None = None, + push_config_store: PushNotificationConfigStore | None = None, ) -> None: """Start an MCP or A2A server from an ADCP handler or server builder. @@ -130,6 +134,12 @@ def serve( persistence (A2A transport only). Defaults to ``InMemoryTaskStore`` — tasks don't survive restart. See ``examples/a2a_db_tasks.py`` for the production pattern. + push_config_store: Optional a2a-sdk ``PushNotificationConfigStore`` + for push-notif subscription persistence (A2A transport only). + When unset, a2a-sdk surfaces the push-notif endpoints as + ``UnsupportedOperationError`` — clients cannot register + subscriptions at all. See ``examples/a2a_db_tasks.py`` for + a durable reference implementation. Security: This function does NOT configure authentication. In production, @@ -176,6 +186,7 @@ async def force_account_status(self, account_id, status): test_controller=test_controller, context_factory=context_factory, task_store=task_store, + push_config_store=push_config_store, ) elif transport in ("streamable-http", "sse", "stdio"): _serve_mcp( @@ -303,6 +314,7 @@ def _serve_a2a( test_controller: TestControllerStore | None, context_factory: ContextFactory | None = None, task_store: TaskStore | None = None, + push_config_store: PushNotificationConfigStore | None = None, ) -> None: """Start an A2A server using uvicorn.""" import uvicorn @@ -318,6 +330,7 @@ def _serve_a2a( test_controller=test_controller, context_factory=context_factory, task_store=task_store, + push_config_store=push_config_store, ) sock = _bind_reusable_socket("0.0.0.0", resolved_port) try: diff --git a/tests/test_a2a_server.py b/tests/test_a2a_server.py index 6160cad65..72ba70ffd 100644 --- a/tests/test_a2a_server.py +++ b/tests/test_a2a_server.py @@ -610,3 +610,259 @@ def _ctx(name: str) -> ServerCallContext: await store.delete("shared-task-id", context=_ctx("tenant-b-principal")) still_a = await store.get("shared-task-id", context=_ctx("tenant-a-principal")) assert still_a is not None, "SqliteTaskStore cross-scope delete removed tenant A's task." + + +# --------------------------------------------------------------------------- +# Pluggable PushNotificationConfigStore (issue #225) +# --------------------------------------------------------------------------- + + +class _RecordingPushConfigStore: + """Duck-typed PushNotificationConfigStore — records every call for + test assertions. Same shape/role as ``_RecordingTaskStore`` above.""" + + def __init__(self) -> None: + self.sets: list[tuple[str, str]] = [] # (task_id, config_id) + self.gets: list[str] = [] + self.deletes: list[tuple[str, str | None]] = [] + self._store: dict[tuple[str, str], Any] = {} + + async def set_info(self, task_id: str, notification_config: Any) -> None: + config_id = getattr(notification_config, "id", None) or task_id + self.sets.append((task_id, config_id)) + self._store[(task_id, config_id)] = notification_config + + async def get_info(self, task_id: str) -> list[Any]: + self.gets.append(task_id) + return [v for (tid, _cid), v in self._store.items() if tid == task_id] + + async def delete_info(self, task_id: str, config_id: str | None = None) -> None: + self.deletes.append((task_id, config_id)) + if config_id is None: + keys = [k for k in self._store if k[0] == task_id] + for k in keys: + del self._store[k] + else: + self._store.pop((task_id, config_id), None) + + +@pytest.mark.skipif( + sys.version_info < (3, 11), + reason="a2a-sdk starlette integration requires Python 3.11+", +) +def test_create_a2a_server_omits_push_config_store_by_default(): + """Omitting ``push_config_store`` preserves a2a-sdk's default: + ``DefaultRequestHandler._push_config_store`` stays ``None`` and + push-notif endpoints surface as ``UnsupportedOperationError``. + Sellers opt-in to the feature by wiring a store.""" + app = create_a2a_server(_TestHandler(), name="test-agent") + handler = _extract_default_request_handler(app) + assert handler._push_config_store is None, ( + "push_config_store should default to None so push-notif endpoints " + "remain unsupported until the seller explicitly opts in. Instead " + f"got {type(handler._push_config_store).__name__}." + ) + + +@pytest.mark.skipif( + sys.version_info < (3, 11), + reason="a2a-sdk starlette integration requires Python 3.11+", +) +def test_create_a2a_server_accepts_custom_push_config_store(): + """Custom store must thread through to + ``DefaultRequestHandler.push_config_store`` — the contract of the + hook.""" + store = _RecordingPushConfigStore() + app = create_a2a_server(_TestHandler(), name="test-agent", push_config_store=store) + handler = _extract_default_request_handler(app) + assert handler._push_config_store is store, ( + "create_a2a_server(push_config_store=...) dropped the custom store; " + f"handler._push_config_store is {type(handler._push_config_store).__name__}." + ) + + +async def test_sqlite_push_config_store_isolates_scopes_by_contextvar(): + """Reference ``SqlitePushNotificationConfigStore`` scopes reads and + writes by the ContextVar the seller's auth middleware populates. + Cross-tenant registration must never surface another tenant's + push-notif callback URL.""" + import importlib.util + import tempfile + from pathlib import Path + + from a2a.types import PushNotificationConfig + + example_path = Path(__file__).parent.parent / "examples" / "a2a_db_tasks.py" + spec = importlib.util.spec_from_file_location("_a2a_db_tasks_example", example_path) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + + with tempfile.TemporaryDirectory() as tmp: + db = Path(tmp) / "push.db" + store = mod.SqlitePushNotificationConfigStore(db_path=db) + scope_var = mod._current_push_config_scope + + cfg = PushNotificationConfig(id="cfg-1", url="https://callback.tenant-a.example/webhook") + + # Tenant A sets a config on task-shared. + tok_a = scope_var.set("tenant-a") + try: + await store.set_info("task-shared", cfg) + got_a = await store.get_info("task-shared") + assert len(got_a) == 1 and str(got_a[0].url) == str(cfg.url) + finally: + scope_var.reset(tok_a) + + # Tenant B queries the same task — must see nothing. + tok_b = scope_var.set("tenant-b") + try: + got_b = await store.get_info("task-shared") + assert got_b == [], ( + "SqlitePushNotificationConfigStore returned tenant A's " + "push-notif config to tenant B — the reference impl is " + "leaking callback URLs across principals." + ) + + # And tenant B's delete must not affect tenant A. + await store.delete_info("task-shared") + finally: + scope_var.reset(tok_b) + + tok_a2 = scope_var.set("tenant-a") + try: + still_a = await store.get_info("task-shared") + assert len(still_a) == 1, ( + "SqlitePushNotificationConfigStore cross-scope delete " "removed tenant A's config." + ) + finally: + scope_var.reset(tok_a2) + + +@pytest.mark.skipif( + sys.version_info < (3, 11), + reason="a2a-sdk starlette integration requires Python 3.11+", +) +async def test_custom_push_config_store_receives_sets_from_handler(): + """Behavioral test (parallel to ``test_custom_task_store_receives_saves_from_skill_dispatch``). + If a2a-sdk ever renames or bypasses ``DefaultRequestHandler._push_config_store`` + while leaving the attribute intact, the attribute-identity check in + the earlier test passes while production calls skip our store + entirely. This asserts the handler's + ``on_set_task_push_notification_config`` actually routes set-info + through our store.""" + import contextlib as _ctxlib + + from a2a.types import ( + PushNotificationConfig, + TaskPushNotificationConfig, + ) + from a2a.utils.errors import ServerError + + push_store = _RecordingPushConfigStore() + # Need a populated TaskStore because on_set validates the task exists + # before forwarding to push_config_store.set_info. Pre-seed a task. + task_store = _RecordingTaskStore() + from a2a.types import TaskStatus + + await task_store.save(Task(id="task-1", context_id="ctx-1", status=TaskStatus(state="working"))) + + app = create_a2a_server( + _TestHandler(), + name="behavioral-push", + task_store=task_store, + push_config_store=push_store, + ) + handler = _extract_default_request_handler(app) + + params = TaskPushNotificationConfig( + task_id="task-1", + push_notification_config=PushNotificationConfig( + id="cfg-1", url="https://callback.example/hook" + ), + ) + with _ctxlib.suppress(ServerError): + await handler.on_set_task_push_notification_config(params) + + assert ("task-1", "cfg-1") in push_store.sets, ( + "DefaultRequestHandler.on_set_task_push_notification_config did not " + "route to our custom push_config_store. The kwarg is wired but not " + "exercised at runtime." + ) + + +async def test_sqlite_push_config_store_warns_once_on_anonymous_scope(): + """Reference impl must fail LOUD when the scope_provider returns + None — silent fall-through to the anonymous bucket is the + multi-tenant footgun security review flagged. Warning fires once + per store instance (not per call) so operators notice without + flooding logs.""" + import importlib.util + import tempfile + import warnings as _warnings + from pathlib import Path + + from a2a.types import PushNotificationConfig + + example_path = Path(__file__).parent.parent / "examples" / "a2a_db_tasks.py" + spec = importlib.util.spec_from_file_location("_a2a_db_tasks_ex_warn", example_path) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + + with tempfile.TemporaryDirectory() as tmp: + db = Path(tmp) / "anon.db" + # Force the anonymous path by supplying a provider that always + # returns None. + store = mod.SqlitePushNotificationConfigStore(db_path=db, scope_provider=lambda: None) + + cfg = PushNotificationConfig(url="https://x.example/hook") + with _warnings.catch_warnings(record=True) as caught: + _warnings.simplefilter("always") + await store.set_info("task-1", cfg) + await store.set_info("task-2", cfg) + await store.set_info("task-3", cfg) + + anon_warnings = [w for w in caught if "SqlitePushNotificationConfigStore" in str(w.message)] + assert len(anon_warnings) == 1, ( + "Anonymous-scope warning should fire exactly once per store " + f"instance, got {len(anon_warnings)}." + ) + + +async def test_sqlite_push_config_store_synthesises_config_id_when_omitted(): + """Client not supplying ``PushNotificationConfig.id`` must not cause + two registrations on the same task to overwrite each other via + INSERT OR REPLACE collision on the composite PK. Reference impl + synthesises a UUID; two sets should produce two rows.""" + import importlib.util + import tempfile + from pathlib import Path + + from a2a.types import PushNotificationConfig + + example_path = Path(__file__).parent.parent / "examples" / "a2a_db_tasks.py" + spec = importlib.util.spec_from_file_location("_a2a_db_tasks_ex_uuid", example_path) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + + with tempfile.TemporaryDirectory() as tmp: + db = Path(tmp) / "uuid.db" + store = mod.SqlitePushNotificationConfigStore(db_path=db, scope_provider=lambda: "tenant-a") + + await store.set_info( + "shared-task", + PushNotificationConfig(url="https://first.example/hook"), + ) + await store.set_info( + "shared-task", + PushNotificationConfig(url="https://second.example/hook"), + ) + + configs = await store.get_info("shared-task") + assert len(configs) == 2, ( + "Two set_info calls with id=None collapsed into one row — the " + "fallback config_id must synthesise a unique value to prevent " + "silent overwrite." + )