diff --git a/src/adcp/decisioning/pg/buyer_agent_registry.py b/src/adcp/decisioning/pg/buyer_agent_registry.py index 4d8fa2bae..ffe26133d 100644 --- a/src/adcp/decisioning/pg/buyer_agent_registry.py +++ b/src/adcp/decisioning/pg/buyer_agent_registry.py @@ -81,6 +81,7 @@ import json import logging import re +import time from collections.abc import Callable from typing import TYPE_CHECKING, Any @@ -95,6 +96,7 @@ if TYPE_CHECKING: from psycopg_pool import ConnectionPool + from adcp.audit_sink import AuditSink from adcp.decisioning.registry_cache import CachingBuyerAgentRegistry logger = logging.getLogger(__name__) @@ -391,6 +393,73 @@ def with_caching( self.add_mutation_observer(lambda _op, _agent_url: cache.clear_sync()) return cache + def with_full_stack( + self, + *, + ttl_seconds: float = 60.0, + max_entries: int = 4096, + hit_callback: Callable[[str], None] | None = None, + rps_per_tenant: float = 100.0, + burst: float | None = None, + audit_sink: AuditSink | None = None, + sink_timeout_seconds: float = 5.0, + time_source: Callable[[], float] = time.monotonic, + ) -> CachingBuyerAgentRegistry: + """Return the canonical production registry wrapper stack. + + Builds and returns ``Caching(RateLimited(Auditing(self)))``: + + * cache is outermost so cached hits skip rate-limit accounting + and DB work; + * rate limiting applies only to cache misses that need inner + resolution; + * auditing wraps the SQL-backed store so DB ``resolved`` / + ``miss`` outcomes are recorded. + + ``audit_sink`` and ``sink_timeout_seconds`` are threaded through + all three layers, so cache hits/misses, rate-limit rejects, and + terminal DB outcomes can all land in the same audit trail. + ``time_source`` is shared by the cache and rate limiter for + deterministic tests. + + Mutations through this :class:`PgBuyerAgentRegistry` instance + clear the returned cache via the same observer wiring as + :meth:`with_caching`. Adopters needing a different layer order + should compose :class:`CachingBuyerAgentRegistry`, + :class:`RateLimitedBuyerAgentRegistry`, and + :class:`AuditingBuyerAgentRegistry` manually. + """ + from adcp.decisioning.registry_cache import ( + AuditingBuyerAgentRegistry, + CachingBuyerAgentRegistry, + RateLimitedBuyerAgentRegistry, + ) + + audited = AuditingBuyerAgentRegistry( + self, + audit_sink=audit_sink, + sink_timeout_seconds=sink_timeout_seconds, + ) + rate_limited = RateLimitedBuyerAgentRegistry( + audited, + rps_per_tenant=rps_per_tenant, + burst=burst, + audit_sink=audit_sink, + sink_timeout_seconds=sink_timeout_seconds, + time_source=time_source, + ) + cache = CachingBuyerAgentRegistry( + rate_limited, + ttl_seconds=ttl_seconds, + max_entries=max_entries, + hit_callback=hit_callback, + audit_sink=audit_sink, + sink_timeout_seconds=sink_timeout_seconds, + time_source=time_source, + ) + self.add_mutation_observer(lambda _op, _agent_url: cache.clear_sync()) + return cache + def _notify_mutation(self, op: str, agent_url: str) -> None: """Fire registered observers; log and swallow exceptions.""" for observer in self._mutation_observers: diff --git a/tests/conformance/decisioning/test_pg_buyer_agent_registry.py b/tests/conformance/decisioning/test_pg_buyer_agent_registry.py index 419e5afaf..cf2a56ab6 100644 --- a/tests/conformance/decisioning/test_pg_buyer_agent_registry.py +++ b/tests/conformance/decisioning/test_pg_buyer_agent_registry.py @@ -33,6 +33,7 @@ allow_module_level=True, ) +from adcp.audit_sink import AuditEvent # noqa: E402 from adcp.decisioning import ( # noqa: E402 ApiKeyCredential, BuyerAgent, @@ -40,6 +41,26 @@ OAuthCredential, ) from adcp.decisioning.pg import PgBuyerAgentRegistry # noqa: E402 +from adcp.decisioning.types import AdcpError # noqa: E402 + + +class CapturingAuditSink: + def __init__(self) -> None: + self.events: list[AuditEvent] = [] + + async def record(self, event: AuditEvent) -> None: + self.events.append(event) + + +class FakeClock: + def __init__(self, start: float = 1000.0) -> None: + self.now = start + + def __call__(self) -> float: + return self.now + + def advance(self, seconds: float) -> None: + self.now += seconds @pytest.fixture() @@ -388,3 +409,61 @@ def test_with_caching_returns_wired_cache(isolated_pool) -> None: "Cache served stale 'active' after pg.set_status — with_caching " "observer did not fire or did not invalidate" ) + + +def test_with_full_stack_wires_cache_invalidation_and_audit(isolated_pool) -> None: + registry = _registry(isolated_pool) + sink = CapturingAuditSink() + stack = registry.with_full_stack( + ttl_seconds=60.0, + rps_per_tenant=1000.0, + audit_sink=sink, + ) + + registry.upsert( + BuyerAgent( + agent_url="https://full-stack/", + display_name="Full Stack", + status="active", + ) + ) + + first = asyncio.run(stack.resolve_by_agent_url("https://full-stack/")) + second = asyncio.run(stack.resolve_by_agent_url("https://full-stack/")) + assert first is not None + assert second is not None + assert first.status == "active" + assert second.status == "active" + + registry.set_status("https://full-stack/", "suspended") + after_mutation = asyncio.run(stack.resolve_by_agent_url("https://full-stack/")) + assert after_mutation is not None + assert after_mutation.status == "suspended" + + outcomes = [event.details["outcome"] for event in sink.events] + assert outcomes.count("resolved") == 2 + assert outcomes.count("cached_hit") == 1 + + +def test_with_full_stack_rate_limit_fires_and_audits(isolated_pool) -> None: + registry = _registry(isolated_pool) + sink = CapturingAuditSink() + clock = FakeClock() + stack = registry.with_full_stack( + ttl_seconds=0.1, + rps_per_tenant=1.0, + burst=1.0, + audit_sink=sink, + time_source=clock, + ) + + assert asyncio.run(stack.resolve_by_agent_url("https://rate-limited/")) is None + clock.advance(0.2) + + with pytest.raises(AdcpError) as exc_info: + asyncio.run(stack.resolve_by_agent_url("https://rate-limited/")) + + assert exc_info.value.code == "PERMISSION_DENIED" + outcomes = [event.details["outcome"] for event in sink.events] + assert outcomes.count("miss") == 1 + assert outcomes.count("rate_limited") == 1