Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions src/adcp/decisioning/pg/buyer_agent_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import json
import logging
import re
import threading
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -171,6 +172,7 @@ def __init__(
self._pool = pool
self._table = table_name
self._mutation_observers: list[MutationObserver] = []
self._mutation_observers_lock = threading.Lock()

# Pre-format queries so the hot path doesn't f-string per call.
# All identifier substitutions are validated at __init__; row
Expand Down Expand Up @@ -353,8 +355,32 @@ def add_mutation_observer(self, observer: MutationObserver) -> None:
mutations propagate to read-side caches without manual
:meth:`CachingBuyerAgentRegistry.invalidate` calls. See
:meth:`with_caching` for the bundled pre-wired path.

Observer registration is thread-safe. Mutations notify a
snapshot of the current observer list; observers added or
removed while a notification is in flight apply to the next
mutation.
"""
with self._mutation_observers_lock:
self._mutation_observers.append(observer)

def remove_mutation_observer(self, observer: MutationObserver) -> bool:
"""Unregister a mutation observer.

Returns ``True`` when ``observer`` was registered and removed,
``False`` when it was not present. If the same callback was
registered multiple times, one registration is removed per call.

Removal is thread-safe. Mutations notify a snapshot of the
observer list, so removing an observer while a notification is
already in flight only affects subsequent mutations.
"""
self._mutation_observers.append(observer)
with self._mutation_observers_lock:
try:
self._mutation_observers.remove(observer)
except ValueError:
return False
return True

def with_caching(
self,
Expand Down Expand Up @@ -393,7 +419,9 @@ def with_caching(

def _notify_mutation(self, op: str, agent_url: str) -> None:
"""Fire registered observers; log and swallow exceptions."""
for observer in self._mutation_observers:
with self._mutation_observers_lock:
observers = tuple(self._mutation_observers)
for observer in observers:
try:
observer(op, agent_url)
except Exception: # noqa: BLE001 — observers must not break mutations
Expand Down
48 changes: 48 additions & 0 deletions tests/conformance/decisioning/test_pg_buyer_agent_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,54 @@ def test_add_mutation_observer_fires_on_set_status_and_delete(isolated_pool) ->
assert calls == [("set_status", "https://obs/"), ("delete", "https://obs/")]


def test_remove_mutation_observer_unregisters_callback(isolated_pool) -> None:
registry = _registry(isolated_pool)
calls: list[tuple[str, str]] = []

def observer(op: str, url: str) -> None:
calls.append((op, url))

registry.add_mutation_observer(observer)
assert registry.remove_mutation_observer(observer) is True
assert registry.remove_mutation_observer(observer) is False

registry.upsert(
BuyerAgent(
agent_url="https://removed-observer/",
display_name="Removed Observer",
status="active",
)
)

assert calls == []


def test_mutation_observer_self_remove_applies_after_current_notification(isolated_pool) -> None:
registry = _registry(isolated_pool)
calls: list[str] = []

def self_removing_observer(_op: str, _url: str) -> None:
calls.append("self-removing")
assert registry.remove_mutation_observer(self_removing_observer) is True

def persistent_observer(_op: str, _url: str) -> None:
calls.append("persistent")

registry.add_mutation_observer(self_removing_observer)
registry.add_mutation_observer(persistent_observer)

registry.upsert(
BuyerAgent(
agent_url="https://self-remove-observer/",
display_name="Self Remove Observer",
status="active",
)
)
registry.set_status("https://self-remove-observer/", "suspended")

assert calls == ["self-removing", "persistent", "persistent"]


def test_observer_exception_does_not_block_mutation(isolated_pool) -> None:
"""An observer raising must not propagate to the mutation caller."""
registry = _registry(isolated_pool)
Expand Down
Loading