Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions src/adcp/decisioning/pg/buyer_agent_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@

import asyncio
import json
import logging
import re
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

from adcp.decisioning.registry import (
Expand All @@ -93,6 +95,17 @@
if TYPE_CHECKING:
from psycopg_pool import ConnectionPool

from adcp.decisioning.registry_cache import CachingBuyerAgentRegistry

logger = logging.getLogger(__name__)

MutationObserver = Callable[[str, str], None]
"""Callback fired after a successful :class:`PgBuyerAgentRegistry`
mutation. Receives ``(operation, agent_url)`` where ``operation`` is
``"upsert"`` / ``"set_status"`` / ``"delete"``. Runs synchronously on
the caller's thread after the DB commit; raised exceptions are logged
but do not propagate to the mutation caller."""

try:
import psycopg # noqa: F401
import psycopg_pool # noqa: F401
Expand Down Expand Up @@ -157,6 +170,7 @@ def __init__(
)
self._pool = pool
self._table = table_name
self._mutation_observers: list[MutationObserver] = []

# Pre-format queries so the hot path doesn't f-string per call.
# All identifier substitutions are validated at __init__; row
Expand Down Expand Up @@ -301,6 +315,7 @@ def upsert(self, agent: BuyerAgent, *, api_key_id: str | None = None) -> None:
)
with self._pool.connection() as conn, conn.cursor() as cur:
cur.execute(self._sql_upsert, params)
self._notify_mutation("upsert", agent.agent_url)

def set_status(self, agent_url: str, status: BuyerAgentStatus) -> None:
"""Update an agent's lifecycle status. Use to suspend / block
Expand All @@ -309,6 +324,7 @@ def set_status(self, agent_url: str, status: BuyerAgentStatus) -> None:
raise ValueError(f"status must be one of {sorted(_VALID_STATUSES)!r}, got {status!r}")
with self._pool.connection() as conn, conn.cursor() as cur:
cur.execute(self._sql_set_status, (status, agent_url))
self._notify_mutation("set_status", agent_url)

def delete(self, agent_url: str) -> None:
"""Remove an agent from the registry.
Expand All @@ -318,6 +334,76 @@ def delete(self, agent_url: str) -> None:
"""
with self._pool.connection() as conn, conn.cursor() as cur:
cur.execute(self._sql_delete, (agent_url,))
self._notify_mutation("delete", agent_url)

# ----- mutation observability -------------------------------------

def add_mutation_observer(self, observer: MutationObserver) -> None:
"""Register a callback fired after every successful mutation.

Observers receive ``(operation, agent_url)`` where
``operation`` is one of ``"upsert"`` / ``"set_status"`` /
``"delete"``. They run synchronously on the calling thread
AFTER the DB commit, so a failed commit does not invoke
observers. Exceptions raised by an observer are logged and
swallowed — they never block the mutation from succeeding or
prevent later observers from running.

Typical use is wiring a cache-invalidation hook so admin
mutations propagate to read-side caches without manual
:meth:`CachingBuyerAgentRegistry.invalidate` calls. See
:meth:`with_caching` for the bundled pre-wired path.
"""
self._mutation_observers.append(observer)

def with_caching(
self,
**cache_kwargs: Any,
) -> CachingBuyerAgentRegistry:
"""Return a :class:`CachingBuyerAgentRegistry` wrapping this
registry, pre-wired so mutations through this instance
automatically invalidate the cache.

Forwards ``**cache_kwargs`` to
:class:`CachingBuyerAgentRegistry` (``ttl_seconds``,
``max_entries``, ``hit_callback``, ``audit_sink``,
``sink_timeout_seconds``, ``time_source``).

Example::

pg = PgBuyerAgentRegistry(pool=pool)
registry = pg.with_caching(ttl_seconds=60, audit_sink=sink)
serve(buyer_agent_registry=registry, ...)

# Admin mutations go through `pg` and invalidate the cache:
pg.upsert(BuyerAgent(agent_url=..., status="suspended"))
# Next resolve() through `registry` hits DB, sees suspended.

Adopters with external admin paths (a separate process
writing to the same DB) still need :meth:`CachingBuyerAgentRegistry.invalidate`
or :meth:`clear_sync` — the observer hook fires on
mutations through *this* :class:`PgBuyerAgentRegistry`
instance only.
"""
from adcp.decisioning.registry_cache import CachingBuyerAgentRegistry

cache = CachingBuyerAgentRegistry(self, **cache_kwargs)
self.add_mutation_observer(lambda _op, _agent_url: cache.clear_sync())
return cache

def _notify_mutation(self, op: str, agent_url: str) -> None:
"""Fire registered observers; log and swallow exceptions."""
for observer in self._mutation_observers:
try:
observer(op, agent_url)
except Exception: # noqa: BLE001 — observers must not break mutations
logger.warning(
"[adcp.buyer_agent_registry] mutation observer raised for "
"op=%s agent_url=%s",
op,
agent_url,
exc_info=True,
)

# ----- sync helpers (called via asyncio.to_thread) ----------------

Expand Down
24 changes: 24 additions & 0 deletions src/adcp/decisioning/registry_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,30 @@ async def invalidate(self, *, tenant_id: str | None, lookup_key: str) -> None:
async with self._lock:
self._cache.pop((tenant_id, lookup_key), None)

def clear_sync(self) -> None:
"""Drop every cached entry from a sync context.

Safe to call from any thread or coroutine without an event
loop. Atomic via the GIL on :meth:`OrderedDict.clear` — no
lock acquired, so a concurrent async ``_lookup`` / ``_store``
may observe either the pre-clear or post-clear dict. The
worst case is one extra round-trip to the inner registry on
the next resolve, which is exactly what an invalidation is
supposed to cause.

Use cases: mutation-observer hooks wired by
:meth:`PgBuyerAgentRegistry.with_caching`, post-config-reload
flushes from sync admin code.

Full-clear (rather than per-key drop) trades a small amount
of over-invalidation for simplicity: mutations are admin-rare
and the next traffic burst rebuilds the working set within
TTL. Adopters needing finer-grained invalidation still have
:meth:`invalidate` for explicit ``(tenant_id, lookup_key)``
drops.
"""
self._cache.clear()

async def clear(self) -> None:
"""Drop every cached entry. For tests + post-config-reload.

Expand Down
87 changes: 87 additions & 0 deletions tests/conformance/decisioning/test_pg_buyer_agent_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,90 @@ def test_round_trip_with_no_optional_fields(isolated_pool) -> None:
assert result.default_account_terms is None
assert result.allowed_brands is None
assert result.ext == {}


# ----- mutation observers + with_caching factory -------------------------


def test_add_mutation_observer_fires_on_upsert(isolated_pool) -> None:
registry = _registry(isolated_pool)
calls: list[tuple[str, str]] = []
registry.add_mutation_observer(lambda op, url: calls.append((op, url)))

registry.upsert(
BuyerAgent(
agent_url="https://obs/",
display_name="Observed",
status="active",
)
)
assert calls == [("upsert", "https://obs/")]


def test_add_mutation_observer_fires_on_set_status_and_delete(isolated_pool) -> None:
registry = _registry(isolated_pool)
registry.upsert(
BuyerAgent(
agent_url="https://obs/",
display_name="Observed",
status="active",
)
)
calls: list[tuple[str, str]] = []
registry.add_mutation_observer(lambda op, url: calls.append((op, url)))

registry.set_status("https://obs/", "suspended")
registry.delete("https://obs/")

assert calls == [("set_status", "https://obs/"), ("delete", "https://obs/")]


def test_observer_exception_does_not_block_mutation(isolated_pool) -> None:
"""An observer raising must not propagate to the mutation caller."""
registry = _registry(isolated_pool)

def boom(_op: str, _url: str) -> None:
raise RuntimeError("observer raised")

registry.add_mutation_observer(boom)
# Mutation succeeds despite the observer raising.
registry.upsert(
BuyerAgent(
agent_url="https://resilient/",
display_name="Resilient",
status="active",
)
)
# And the row landed in the DB.
result = asyncio.run(registry.resolve_by_agent_url("https://resilient/"))
assert result is not None
assert result.display_name == "Resilient"


def test_with_caching_returns_wired_cache(isolated_pool) -> None:
"""`pg.with_caching()` returns a cache that auto-invalidates on
mutations through the same `pg` instance."""
registry = _registry(isolated_pool)
cache = registry.with_caching(ttl_seconds=60.0)

registry.upsert(
BuyerAgent(
agent_url="https://wired/",
display_name="Wired",
status="active",
)
)
# Warm cache via resolve.
first = asyncio.run(cache.resolve_by_agent_url("https://wired/"))
assert first is not None
assert first.status == "active"

# Mutate through pg — cache MUST auto-invalidate.
registry.set_status("https://wired/", "suspended")

second = asyncio.run(cache.resolve_by_agent_url("https://wired/"))
assert second is not None
assert second.status == "suspended", (
"Cache served stale 'active' after pg.set_status — with_caching "
"observer did not fire or did not invalidate"
)
29 changes: 29 additions & 0 deletions tests/test_buyer_agent_registry_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,35 @@ def test_rate_limit_rejects_zero_rps() -> None:
RateLimitedBuyerAgentRegistry(FakeRegistry(), rps_per_tenant=0.0)


# ----- clear_sync: mutation-observer entry point ---------------------


async def test_clear_sync_drops_all_entries() -> None:
agent = BuyerAgent(agent_url="https://a.example/", display_name="A", status="active")
inner = FakeRegistry(agents={agent.agent_url: agent})
cache = CachingBuyerAgentRegistry(inner, ttl_seconds=60.0)

# Seed positive + negative cache entries.
assert await cache.resolve_by_agent_url(agent.agent_url) is agent
assert await cache.resolve_by_agent_url("https://miss.example/") is None
assert inner.agent_url_calls == 2

cache.clear_sync()

# Both cached entries dropped — next reads hit the inner registry.
assert await cache.resolve_by_agent_url(agent.agent_url) is agent
assert await cache.resolve_by_agent_url("https://miss.example/") is None
assert inner.agent_url_calls == 4


def test_clear_sync_is_callable_from_sync_context() -> None:
# Critical: must not require a running event loop. Mutation
# observers fire from PgBuyerAgentRegistry's sync mutation path.
cache = CachingBuyerAgentRegistry(FakeRegistry(), ttl_seconds=60.0)
# Synchronous invocation — no asyncio.run() wrapper.
cache.clear_sync() # would raise RuntimeError if it touched asyncio.Lock


# ----- Suppress unused import warning for clarity in ide -----------


Expand Down
Loading