Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions src/adcp/decisioning/pg/buyer_agent_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import json
import logging
import re
import time
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

Expand All @@ -95,6 +96,7 @@
if TYPE_CHECKING:
from psycopg_pool import ConnectionPool

from adcp.audit_sink import AuditSink
from adcp.decisioning.registry_cache import CachingBuyerAgentRegistry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -391,6 +393,73 @@ def with_caching(
self.add_mutation_observer(lambda _op, _agent_url: cache.clear_sync())
return cache

def with_full_stack(
self,
*,
ttl_seconds: float = 60.0,
max_entries: int = 4096,
hit_callback: Callable[[str], None] | None = None,
rps_per_tenant: float = 100.0,
burst: float | None = None,
audit_sink: AuditSink | None = None,
sink_timeout_seconds: float = 5.0,
time_source: Callable[[], float] = time.monotonic,
) -> CachingBuyerAgentRegistry:
"""Return the canonical production registry wrapper stack.

Builds and returns ``Caching(RateLimited(Auditing(self)))``:

* cache is outermost so cached hits skip rate-limit accounting
and DB work;
* rate limiting applies only to cache misses that need inner
resolution;
* auditing wraps the SQL-backed store so DB ``resolved`` /
``miss`` outcomes are recorded.

``audit_sink`` and ``sink_timeout_seconds`` are threaded through
all three layers, so cache hits/misses, rate-limit rejects, and
terminal DB outcomes can all land in the same audit trail.
``time_source`` is shared by the cache and rate limiter for
deterministic tests.

Mutations through this :class:`PgBuyerAgentRegistry` instance
clear the returned cache via the same observer wiring as
:meth:`with_caching`. Adopters needing a different layer order
should compose :class:`CachingBuyerAgentRegistry`,
:class:`RateLimitedBuyerAgentRegistry`, and
:class:`AuditingBuyerAgentRegistry` manually.
"""
from adcp.decisioning.registry_cache import (
AuditingBuyerAgentRegistry,
CachingBuyerAgentRegistry,
RateLimitedBuyerAgentRegistry,
)

audited = AuditingBuyerAgentRegistry(
self,
audit_sink=audit_sink,
sink_timeout_seconds=sink_timeout_seconds,
)
rate_limited = RateLimitedBuyerAgentRegistry(
audited,
rps_per_tenant=rps_per_tenant,
burst=burst,
audit_sink=audit_sink,
sink_timeout_seconds=sink_timeout_seconds,
time_source=time_source,
)
cache = CachingBuyerAgentRegistry(
rate_limited,
ttl_seconds=ttl_seconds,
max_entries=max_entries,
hit_callback=hit_callback,
audit_sink=audit_sink,
sink_timeout_seconds=sink_timeout_seconds,
time_source=time_source,
)
self.add_mutation_observer(lambda _op, _agent_url: cache.clear_sync())
return cache

def _notify_mutation(self, op: str, agent_url: str) -> None:
"""Fire registered observers; log and swallow exceptions."""
for observer in self._mutation_observers:
Expand Down
79 changes: 79 additions & 0 deletions tests/conformance/decisioning/test_pg_buyer_agent_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,34 @@
allow_module_level=True,
)

from adcp.audit_sink import AuditEvent # noqa: E402
from adcp.decisioning import ( # noqa: E402
ApiKeyCredential,
BuyerAgent,
BuyerAgentDefaultTerms,
OAuthCredential,
)
from adcp.decisioning.pg import PgBuyerAgentRegistry # noqa: E402
from adcp.decisioning.types import AdcpError # noqa: E402


class CapturingAuditSink:
def __init__(self) -> None:
self.events: list[AuditEvent] = []

async def record(self, event: AuditEvent) -> None:
self.events.append(event)


class FakeClock:
def __init__(self, start: float = 1000.0) -> None:
self.now = start

def __call__(self) -> float:
return self.now

def advance(self, seconds: float) -> None:
self.now += seconds


@pytest.fixture()
Expand Down Expand Up @@ -388,3 +409,61 @@ def test_with_caching_returns_wired_cache(isolated_pool) -> None:
"Cache served stale 'active' after pg.set_status — with_caching "
"observer did not fire or did not invalidate"
)


def test_with_full_stack_wires_cache_invalidation_and_audit(isolated_pool) -> None:
registry = _registry(isolated_pool)
sink = CapturingAuditSink()
stack = registry.with_full_stack(
ttl_seconds=60.0,
rps_per_tenant=1000.0,
audit_sink=sink,
)

registry.upsert(
BuyerAgent(
agent_url="https://full-stack/",
display_name="Full Stack",
status="active",
)
)

first = asyncio.run(stack.resolve_by_agent_url("https://full-stack/"))
second = asyncio.run(stack.resolve_by_agent_url("https://full-stack/"))
assert first is not None
assert second is not None
assert first.status == "active"
assert second.status == "active"

registry.set_status("https://full-stack/", "suspended")
after_mutation = asyncio.run(stack.resolve_by_agent_url("https://full-stack/"))
assert after_mutation is not None
assert after_mutation.status == "suspended"

outcomes = [event.details["outcome"] for event in sink.events]
assert outcomes.count("resolved") == 2
assert outcomes.count("cached_hit") == 1


def test_with_full_stack_rate_limit_fires_and_audits(isolated_pool) -> None:
registry = _registry(isolated_pool)
sink = CapturingAuditSink()
clock = FakeClock()
stack = registry.with_full_stack(
ttl_seconds=0.1,
rps_per_tenant=1.0,
burst=1.0,
audit_sink=sink,
time_source=clock,
)

assert asyncio.run(stack.resolve_by_agent_url("https://rate-limited/")) is None
clock.advance(0.2)

with pytest.raises(AdcpError) as exc_info:
asyncio.run(stack.resolve_by_agent_url("https://rate-limited/"))

assert exc_info.value.code == "PERMISSION_DENIED"
outcomes = [event.details["outcome"] for event in sink.events]
assert outcomes.count("miss") == 1
assert outcomes.count("rate_limited") == 1
Loading