diff --git a/src/adcp/decisioning/pg/buyer_agent_registry.py b/src/adcp/decisioning/pg/buyer_agent_registry.py index 3db56e5af..4d8fa2bae 100644 --- a/src/adcp/decisioning/pg/buyer_agent_registry.py +++ b/src/adcp/decisioning/pg/buyer_agent_registry.py @@ -79,7 +79,9 @@ import asyncio import json +import logging import re +from collections.abc import Callable from typing import TYPE_CHECKING, Any from adcp.decisioning.registry import ( @@ -93,6 +95,17 @@ if TYPE_CHECKING: from psycopg_pool import ConnectionPool + from adcp.decisioning.registry_cache import CachingBuyerAgentRegistry + +logger = logging.getLogger(__name__) + +MutationObserver = Callable[[str, str], None] +"""Callback fired after a successful :class:`PgBuyerAgentRegistry` +mutation. Receives ``(operation, agent_url)`` where ``operation`` is +``"upsert"`` / ``"set_status"`` / ``"delete"``. Runs synchronously on +the caller's thread after the DB commit; raised exceptions are logged +but do not propagate to the mutation caller.""" + try: import psycopg # noqa: F401 import psycopg_pool # noqa: F401 @@ -157,6 +170,7 @@ def __init__( ) self._pool = pool self._table = table_name + self._mutation_observers: list[MutationObserver] = [] # Pre-format queries so the hot path doesn't f-string per call. # All identifier substitutions are validated at __init__; row @@ -301,6 +315,7 @@ def upsert(self, agent: BuyerAgent, *, api_key_id: str | None = None) -> None: ) with self._pool.connection() as conn, conn.cursor() as cur: cur.execute(self._sql_upsert, params) + self._notify_mutation("upsert", agent.agent_url) def set_status(self, agent_url: str, status: BuyerAgentStatus) -> None: """Update an agent's lifecycle status. Use to suspend / block @@ -309,6 +324,7 @@ def set_status(self, agent_url: str, status: BuyerAgentStatus) -> None: raise ValueError(f"status must be one of {sorted(_VALID_STATUSES)!r}, got {status!r}") with self._pool.connection() as conn, conn.cursor() as cur: cur.execute(self._sql_set_status, (status, agent_url)) + self._notify_mutation("set_status", agent_url) def delete(self, agent_url: str) -> None: """Remove an agent from the registry. @@ -318,6 +334,76 @@ def delete(self, agent_url: str) -> None: """ with self._pool.connection() as conn, conn.cursor() as cur: cur.execute(self._sql_delete, (agent_url,)) + self._notify_mutation("delete", agent_url) + + # ----- mutation observability ------------------------------------- + + def add_mutation_observer(self, observer: MutationObserver) -> None: + """Register a callback fired after every successful mutation. + + Observers receive ``(operation, agent_url)`` where + ``operation`` is one of ``"upsert"`` / ``"set_status"`` / + ``"delete"``. They run synchronously on the calling thread + AFTER the DB commit, so a failed commit does not invoke + observers. Exceptions raised by an observer are logged and + swallowed — they never block the mutation from succeeding or + prevent later observers from running. + + Typical use is wiring a cache-invalidation hook so admin + mutations propagate to read-side caches without manual + :meth:`CachingBuyerAgentRegistry.invalidate` calls. See + :meth:`with_caching` for the bundled pre-wired path. + """ + self._mutation_observers.append(observer) + + def with_caching( + self, + **cache_kwargs: Any, + ) -> CachingBuyerAgentRegistry: + """Return a :class:`CachingBuyerAgentRegistry` wrapping this + registry, pre-wired so mutations through this instance + automatically invalidate the cache. + + Forwards ``**cache_kwargs`` to + :class:`CachingBuyerAgentRegistry` (``ttl_seconds``, + ``max_entries``, ``hit_callback``, ``audit_sink``, + ``sink_timeout_seconds``, ``time_source``). + + Example:: + + pg = PgBuyerAgentRegistry(pool=pool) + registry = pg.with_caching(ttl_seconds=60, audit_sink=sink) + serve(buyer_agent_registry=registry, ...) + + # Admin mutations go through `pg` and invalidate the cache: + pg.upsert(BuyerAgent(agent_url=..., status="suspended")) + # Next resolve() through `registry` hits DB, sees suspended. + + Adopters with external admin paths (a separate process + writing to the same DB) still need :meth:`CachingBuyerAgentRegistry.invalidate` + or :meth:`clear_sync` — the observer hook fires on + mutations through *this* :class:`PgBuyerAgentRegistry` + instance only. + """ + from adcp.decisioning.registry_cache import CachingBuyerAgentRegistry + + cache = CachingBuyerAgentRegistry(self, **cache_kwargs) + self.add_mutation_observer(lambda _op, _agent_url: cache.clear_sync()) + return cache + + def _notify_mutation(self, op: str, agent_url: str) -> None: + """Fire registered observers; log and swallow exceptions.""" + for observer in self._mutation_observers: + try: + observer(op, agent_url) + except Exception: # noqa: BLE001 — observers must not break mutations + logger.warning( + "[adcp.buyer_agent_registry] mutation observer raised for " + "op=%s agent_url=%s", + op, + agent_url, + exc_info=True, + ) # ----- sync helpers (called via asyncio.to_thread) ---------------- diff --git a/src/adcp/decisioning/registry_cache.py b/src/adcp/decisioning/registry_cache.py index 40d0b8163..1eb87b1bd 100644 --- a/src/adcp/decisioning/registry_cache.py +++ b/src/adcp/decisioning/registry_cache.py @@ -414,6 +414,30 @@ async def invalidate(self, *, tenant_id: str | None, lookup_key: str) -> None: async with self._lock: self._cache.pop((tenant_id, lookup_key), None) + def clear_sync(self) -> None: + """Drop every cached entry from a sync context. + + Safe to call from any thread or coroutine without an event + loop. Atomic via the GIL on :meth:`OrderedDict.clear` — no + lock acquired, so a concurrent async ``_lookup`` / ``_store`` + may observe either the pre-clear or post-clear dict. The + worst case is one extra round-trip to the inner registry on + the next resolve, which is exactly what an invalidation is + supposed to cause. + + Use cases: mutation-observer hooks wired by + :meth:`PgBuyerAgentRegistry.with_caching`, post-config-reload + flushes from sync admin code. + + Full-clear (rather than per-key drop) trades a small amount + of over-invalidation for simplicity: mutations are admin-rare + and the next traffic burst rebuilds the working set within + TTL. Adopters needing finer-grained invalidation still have + :meth:`invalidate` for explicit ``(tenant_id, lookup_key)`` + drops. + """ + self._cache.clear() + async def clear(self) -> None: """Drop every cached entry. For tests + post-config-reload. diff --git a/tests/conformance/decisioning/test_pg_buyer_agent_registry.py b/tests/conformance/decisioning/test_pg_buyer_agent_registry.py index 5219df2b2..419e5afaf 100644 --- a/tests/conformance/decisioning/test_pg_buyer_agent_registry.py +++ b/tests/conformance/decisioning/test_pg_buyer_agent_registry.py @@ -301,3 +301,90 @@ def test_round_trip_with_no_optional_fields(isolated_pool) -> None: assert result.default_account_terms is None assert result.allowed_brands is None assert result.ext == {} + + +# ----- mutation observers + with_caching factory ------------------------- + + +def test_add_mutation_observer_fires_on_upsert(isolated_pool) -> None: + registry = _registry(isolated_pool) + calls: list[tuple[str, str]] = [] + registry.add_mutation_observer(lambda op, url: calls.append((op, url))) + + registry.upsert( + BuyerAgent( + agent_url="https://obs/", + display_name="Observed", + status="active", + ) + ) + assert calls == [("upsert", "https://obs/")] + + +def test_add_mutation_observer_fires_on_set_status_and_delete(isolated_pool) -> None: + registry = _registry(isolated_pool) + registry.upsert( + BuyerAgent( + agent_url="https://obs/", + display_name="Observed", + status="active", + ) + ) + calls: list[tuple[str, str]] = [] + registry.add_mutation_observer(lambda op, url: calls.append((op, url))) + + registry.set_status("https://obs/", "suspended") + registry.delete("https://obs/") + + assert calls == [("set_status", "https://obs/"), ("delete", "https://obs/")] + + +def test_observer_exception_does_not_block_mutation(isolated_pool) -> None: + """An observer raising must not propagate to the mutation caller.""" + registry = _registry(isolated_pool) + + def boom(_op: str, _url: str) -> None: + raise RuntimeError("observer raised") + + registry.add_mutation_observer(boom) + # Mutation succeeds despite the observer raising. + registry.upsert( + BuyerAgent( + agent_url="https://resilient/", + display_name="Resilient", + status="active", + ) + ) + # And the row landed in the DB. + result = asyncio.run(registry.resolve_by_agent_url("https://resilient/")) + assert result is not None + assert result.display_name == "Resilient" + + +def test_with_caching_returns_wired_cache(isolated_pool) -> None: + """`pg.with_caching()` returns a cache that auto-invalidates on + mutations through the same `pg` instance.""" + registry = _registry(isolated_pool) + cache = registry.with_caching(ttl_seconds=60.0) + + registry.upsert( + BuyerAgent( + agent_url="https://wired/", + display_name="Wired", + status="active", + ) + ) + # Warm cache via resolve. + first = asyncio.run(cache.resolve_by_agent_url("https://wired/")) + assert first is not None + assert first.status == "active" + + # Mutate through pg — cache MUST auto-invalidate. + registry.set_status("https://wired/", "suspended") + + second = asyncio.run(cache.resolve_by_agent_url("https://wired/")) + assert second is not None + assert second.status == "suspended", ( + "Cache served stale 'active' after pg.set_status — with_caching " + "observer did not fire or did not invalidate" + ) diff --git a/tests/test_buyer_agent_registry_cache.py b/tests/test_buyer_agent_registry_cache.py index a00439a00..793880e91 100644 --- a/tests/test_buyer_agent_registry_cache.py +++ b/tests/test_buyer_agent_registry_cache.py @@ -612,6 +612,35 @@ def test_rate_limit_rejects_zero_rps() -> None: RateLimitedBuyerAgentRegistry(FakeRegistry(), rps_per_tenant=0.0) +# ----- clear_sync: mutation-observer entry point --------------------- + + +async def test_clear_sync_drops_all_entries() -> None: + agent = BuyerAgent(agent_url="https://a.example/", display_name="A", status="active") + inner = FakeRegistry(agents={agent.agent_url: agent}) + cache = CachingBuyerAgentRegistry(inner, ttl_seconds=60.0) + + # Seed positive + negative cache entries. + assert await cache.resolve_by_agent_url(agent.agent_url) is agent + assert await cache.resolve_by_agent_url("https://miss.example/") is None + assert inner.agent_url_calls == 2 + + cache.clear_sync() + + # Both cached entries dropped — next reads hit the inner registry. + assert await cache.resolve_by_agent_url(agent.agent_url) is agent + assert await cache.resolve_by_agent_url("https://miss.example/") is None + assert inner.agent_url_calls == 4 + + +def test_clear_sync_is_callable_from_sync_context() -> None: + # Critical: must not require a running event loop. Mutation + # observers fire from PgBuyerAgentRegistry's sync mutation path. + cache = CachingBuyerAgentRegistry(FakeRegistry(), ttl_seconds=60.0) + # Synchronous invocation — no asyncio.run() wrapper. + cache.clear_sync() # would raise RuntimeError if it touched asyncio.Lock + + # ----- Suppress unused import warning for clarity in ide -----------