diff --git a/src/adcp/decisioning/pg/buyer_agent_registry.py b/src/adcp/decisioning/pg/buyer_agent_registry.py index 4d8fa2bae..8826f2f6d 100644 --- a/src/adcp/decisioning/pg/buyer_agent_registry.py +++ b/src/adcp/decisioning/pg/buyer_agent_registry.py @@ -81,6 +81,7 @@ import json import logging import re +import threading from collections.abc import Callable from typing import TYPE_CHECKING, Any @@ -171,6 +172,7 @@ def __init__( self._pool = pool self._table = table_name self._mutation_observers: list[MutationObserver] = [] + self._mutation_observers_lock = threading.Lock() # Pre-format queries so the hot path doesn't f-string per call. # All identifier substitutions are validated at __init__; row @@ -353,8 +355,32 @@ def add_mutation_observer(self, observer: MutationObserver) -> None: mutations propagate to read-side caches without manual :meth:`CachingBuyerAgentRegistry.invalidate` calls. See :meth:`with_caching` for the bundled pre-wired path. + + Observer registration is thread-safe. Mutations notify a + snapshot of the current observer list; observers added or + removed while a notification is in flight apply to the next + mutation. + """ + with self._mutation_observers_lock: + self._mutation_observers.append(observer) + + def remove_mutation_observer(self, observer: MutationObserver) -> bool: + """Unregister a mutation observer. + + Returns ``True`` when ``observer`` was registered and removed, + ``False`` when it was not present. If the same callback was + registered multiple times, one registration is removed per call. + + Removal is thread-safe. Mutations notify a snapshot of the + observer list, so removing an observer while a notification is + already in flight only affects subsequent mutations. """ - self._mutation_observers.append(observer) + with self._mutation_observers_lock: + try: + self._mutation_observers.remove(observer) + except ValueError: + return False + return True def with_caching( self, @@ -393,7 +419,9 @@ def with_caching( def _notify_mutation(self, op: str, agent_url: str) -> None: """Fire registered observers; log and swallow exceptions.""" - for observer in self._mutation_observers: + with self._mutation_observers_lock: + observers = tuple(self._mutation_observers) + for observer in observers: try: observer(op, agent_url) except Exception: # noqa: BLE001 — observers must not break mutations diff --git a/tests/conformance/decisioning/test_pg_buyer_agent_registry.py b/tests/conformance/decisioning/test_pg_buyer_agent_registry.py index 419e5afaf..8b68440bf 100644 --- a/tests/conformance/decisioning/test_pg_buyer_agent_registry.py +++ b/tests/conformance/decisioning/test_pg_buyer_agent_registry.py @@ -339,6 +339,54 @@ def test_add_mutation_observer_fires_on_set_status_and_delete(isolated_pool) -> assert calls == [("set_status", "https://obs/"), ("delete", "https://obs/")] +def test_remove_mutation_observer_unregisters_callback(isolated_pool) -> None: + registry = _registry(isolated_pool) + calls: list[tuple[str, str]] = [] + + def observer(op: str, url: str) -> None: + calls.append((op, url)) + + registry.add_mutation_observer(observer) + assert registry.remove_mutation_observer(observer) is True + assert registry.remove_mutation_observer(observer) is False + + registry.upsert( + BuyerAgent( + agent_url="https://removed-observer/", + display_name="Removed Observer", + status="active", + ) + ) + + assert calls == [] + + +def test_mutation_observer_self_remove_applies_after_current_notification(isolated_pool) -> None: + registry = _registry(isolated_pool) + calls: list[str] = [] + + def self_removing_observer(_op: str, _url: str) -> None: + calls.append("self-removing") + assert registry.remove_mutation_observer(self_removing_observer) is True + + def persistent_observer(_op: str, _url: str) -> None: + calls.append("persistent") + + registry.add_mutation_observer(self_removing_observer) + registry.add_mutation_observer(persistent_observer) + + registry.upsert( + BuyerAgent( + agent_url="https://self-remove-observer/", + display_name="Self Remove Observer", + status="active", + ) + ) + registry.set_status("https://self-remove-observer/", "suspended") + + assert calls == ["self-removing", "persistent", "persistent"] + + def test_observer_exception_does_not_block_mutation(isolated_pool) -> None: """An observer raising must not propagate to the mutation caller.""" registry = _registry(isolated_pool)