diff --git a/src/adcp/decisioning/__init__.py b/src/adcp/decisioning/__init__.py index 552e849ba..289016140 100644 --- a/src/adcp/decisioning/__init__.py +++ b/src/adcp/decisioning/__init__.py @@ -66,6 +66,7 @@ def create_media_buy( from adcp.decisioning.accounts import ( AccountStore, AccountStoreList, + AccountStoreSyncAccounts, AccountStoreSyncGovernance, AccountStoreUpsert, ExplicitAccounts, @@ -312,6 +313,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "AccountNotFoundError", "AccountStore", "AccountStoreList", + "AccountStoreSyncAccounts", "AccountStoreSyncGovernance", "AccountStoreUpsert", "AdcpError", diff --git a/src/adcp/decisioning/accounts.py b/src/adcp/decisioning/accounts.py index 95386c490..b9bcf64aa 100644 --- a/src/adcp/decisioning/accounts.py +++ b/src/adcp/decisioning/accounts.py @@ -63,7 +63,7 @@ if TYPE_CHECKING: from adcp.decisioning.registry import BuyerAgent - from adcp.types import AccountReference + from adcp.types import AccountReference, SyncAccountsRequest #: Per-platform metadata generic. TMeta = TypeVar("TMeta", default=dict[str, Any]) @@ -260,8 +260,12 @@ def resolve( @runtime_checkable class AccountStoreUpsert(Protocol): - """``sync_accounts`` API surface. Optional adopter-side feature - that complements :class:`AccountStore.resolve`. + """Legacy ``sync_accounts`` dispatch surface (accounts list only). + + Kept for backward compatibility. New implementations should use + :class:`AccountStoreSyncAccounts` instead — it receives the full + :class:`SyncAccountsRequest` and gives the store access to + ``push_notification_config``, ``delete_missing``, and ``dry_run``. Not parameterized over ``TMeta`` — :meth:`upsert` returns :class:`SyncAccountsResultRow` (a wire-shaped row, no per-platform @@ -273,6 +277,10 @@ class AccountStoreUpsert(Protocol): inheritance) and the framework's dispatch shim picks it up via :func:`hasattr`. + The dispatch shim checks for :class:`AccountStoreSyncAccounts` first + (``sync_accounts`` method); falls back to this Protocol's + :meth:`upsert` when the newer method is absent. + **Backwards-compatible.** ``ctx`` is optional on the platform side, so adopter impls written before ctx threading landed (no ``ctx`` parameter) keep working — the framework's @@ -285,10 +293,71 @@ def upsert( refs: list[AccountReference], ctx: ResolveContext | None = None, ) -> Awaitable[list[SyncAccountsResultRow]] | list[SyncAccountsResultRow]: - """``sync_accounts`` API surface. Framework normalizes the - wire request; platform upserts and returns per-account result - rows. Raise :class:`adcp.decisioning.AdcpError` for - buyer-facing rejection. + """``sync_accounts`` dispatch via accounts list only. + + Kept for backward compatibility. New implementations should + use :class:`AccountStoreSyncAccounts` (``sync_accounts`` method) + to receive the full request including ``push_notification_config``, + ``delete_missing``, and ``dry_run``. + + ``ctx.auth_info`` carries the caller's authenticated + principal; ``ctx.agent`` carries the resolved + :class:`BuyerAgent` record (when a registry is configured). + Adopters implementing principal-keyed gates (e.g., + per-buyer-agent ``BILLING_NOT_PERMITTED_FOR_AGENT`` on the + spec's billing surfaces) read the principal here — same + threading as :meth:`AccountStore.resolve`. + + **Prefer ``ctx.agent`` over ``ctx.auth_info`` for + commercial-relationship decisions.** ``ctx.agent`` is the + registry-resolved durable identity (status, billing + capabilities, default account terms); ``ctx.auth_info`` + carries the raw transport-level credential. For billing gates + the registry-resolved identity is canonical. + """ + ... + + +@runtime_checkable +class AccountStoreSyncAccounts(Protocol): + """Preferred ``sync_accounts`` dispatch surface (full request). + + New implementations should use this Protocol in preference to + :class:`AccountStoreUpsert`. :meth:`sync_accounts` receives the + full :class:`SyncAccountsRequest`, giving the store access to + ``push_notification_config`` (the buyer's webhook URL for async + account-status change notifications), ``delete_missing``, + ``dry_run``, and all other request-envelope fields. + + Adopters implement this on the same object as :class:`AccountStore` + (Protocols are structural — Python doesn't require explicit + inheritance) and the framework's dispatch shim picks it up via + :func:`hasattr`. + + The dispatch shim checks for this Protocol's :meth:`sync_accounts` + method first; falls back to :class:`AccountStoreUpsert`'s + :meth:`upsert` when this method is absent — so existing + ``upsert`` impls continue to work without any change. + + **Backwards-compatible.** ``ctx`` is optional — the framework's + :func:`_call_with_optional_ctx` shim probes via + :func:`inspect.signature` and drops ``ctx`` for impls that + don't declare it. + """ + + def sync_accounts( + self, + params: SyncAccountsRequest, + ctx: ResolveContext | None = None, + ) -> Awaitable[list[SyncAccountsResultRow]] | list[SyncAccountsResultRow]: + """Preferred ``sync_accounts`` surface. Receives the full wire + request so the store can persist ``push_notification_config`` + (the buyer's webhook URL for async account-status change + notifications), honour ``delete_missing`` and ``dry_run``, and + access any other request-envelope field. + + Return per-account result rows. Raise + :class:`adcp.decisioning.AdcpError` for buyer-facing rejection. ``ctx.auth_info`` carries the caller's authenticated principal; ``ctx.agent`` carries the resolved diff --git a/src/adcp/decisioning/handler.py b/src/adcp/decisioning/handler.py index 21e74b609..007b79b0e 100644 --- a/src/adcp/decisioning/handler.py +++ b/src/adcp/decisioning/handler.py @@ -773,7 +773,7 @@ def _project_sync_audiences(result: Any) -> Any: def _project_sync_accounts(result: Any) -> Any: - """Project the adopter's ``upsert`` return into the + """Project the adopter's ``sync_accounts`` or ``upsert`` return into the ``sync_accounts`` wire envelope. :class:`AccountStoreUpsert` returns ``list[SyncAccountsResultRow]`` @@ -820,7 +820,7 @@ def _project_sync_accounts(result: Any) -> Any: # return is an adopter-contract violation that would otherwise # fail silently. logger.warning( - "AccountStore.upsert returned an unexpected shape %s; " + "AccountStore sync_accounts/upsert returned an unexpected shape %s; " "expected list[SyncAccountsResultRow] or a dict envelope. " "Passing through unchanged — credential scrubber may not " "reach nested fields.", @@ -1052,10 +1052,14 @@ def advertised_tools_for_instance(self) -> frozenset[str]: # without the polyfill. accounts = getattr(self._platform, "accounts", None) if "sync_accounts" in serving and ( - accounts is None or not callable(getattr(accounts, "upsert", None)) + accounts is None + or ( + not callable(getattr(accounts, "sync_accounts", None)) + and not callable(getattr(accounts, "upsert", None)) + ) ): serving.discard("sync_accounts") - self._log_account_tool_dropped("sync_accounts", "upsert") + self._log_account_tool_dropped("sync_accounts", "sync_accounts") if "list_accounts" in serving and ( accounts is None or not callable(getattr(accounts, "list", None)) ): @@ -1083,14 +1087,20 @@ def _log_account_tool_dropped(self, tool_name: str, method_name: str) -> None: if tool_name in seen: return seen.add(tool_name) + if method_name in ("sync_accounts", "upsert"): + protocol_class = "Upsert" + methods_hint = "'sync_accounts' (preferred) or 'upsert'" + else: + protocol_class = "List" + methods_hint = repr(method_name) logger.info( "PlatformHandler dropped %r from advertised_tools — " - "platform.accounts does not implement %r. " + "platform.accounts does not implement %s. " "Implement the optional AccountStore%s Protocol method " "(see adcp.decisioning.accounts) to surface this tool on the wire.", tool_name, - method_name, - "Upsert" if method_name == "upsert" else "List", + methods_hint, + protocol_class, ) def get_advertised_tools(self, *, advertise_all: bool | None = None) -> frozenset[str]: @@ -2071,16 +2081,22 @@ async def sync_accounts( # type: ignore[override] params: SyncAccountsRequest, context: ToolContext | None = None, ) -> SyncAccountsResponse | NotImplementedResponse: - """Route ``sync_accounts`` through :meth:`AccountStore.upsert`. + """Route ``sync_accounts`` through :meth:`AccountStoreUpsert.sync_accounts` + (preferred) or :meth:`AccountStoreUpsert.upsert` (legacy fallback). ``sync_accounts`` lives on the AccountStore Protocol surface, not on per-specialism platform methods — :data:`adcp.decisioning.platform_router._ACCOUNT_STORE_METHODS` already excludes it from per-tenant delegation. Surface ``OPERATION_NOT_SUPPORTED`` (via :meth:`_not_supported`) when - the store doesn't expose the optional :class:`AccountStoreUpsert` - Protocol — distinct from ``AttributeError`` (which is what an - unguarded ``getattr().()`` would produce). + the store exposes neither method. + + Dispatch priority: ``sync_accounts(params, ctx)`` is checked + first; it receives the full :class:`SyncAccountsRequest` so the + store can persist ``push_notification_config``, honor + ``delete_missing``, ``dry_run``, etc. Falls back to + ``upsert(refs, ctx)`` (accounts list only) for stores that + don't implement the newer method. ``ResolveContext`` carries the caller's :class:`AuthInfo` and resolved :class:`BuyerAgent` so adopter impls implementing @@ -2089,8 +2105,9 @@ async def sync_accounts( # type: ignore[override] canonical context — same threading :meth:`AccountStore.resolve` already uses. """ - upsert = getattr(self._platform.accounts, "upsert", None) - if not callable(upsert): + sync_accounts_fn = getattr(self._platform.accounts, "sync_accounts", None) + upsert_fn = getattr(self._platform.accounts, "upsert", None) + if not callable(sync_accounts_fn) and not callable(upsert_fn): return self._not_supported("sync_accounts") tool_ctx = context or ToolContext() # Prime auth-context only — DON'T call ``_resolve_account``. @@ -2101,8 +2118,13 @@ async def sync_accounts( # type: ignore[override] # account exists only after this call succeeds). await self._prime_auth_context(tool_ctx) resolve_ctx = self._make_resolve_context(tool_ctx, "sync_accounts") - refs = list(getattr(params, "accounts", []) or []) - result = _call_with_optional_ctx(upsert, refs, ctx=resolve_ctx) + if callable(sync_accounts_fn): + result = _call_with_optional_ctx(sync_accounts_fn, params, ctx=resolve_ctx) + elif callable(upsert_fn): + refs = list(getattr(params, "accounts", []) or []) + result = _call_with_optional_ctx(upsert_fn, refs, ctx=resolve_ctx) + else: + return self._not_supported("sync_accounts") if inspect.isawaitable(result): result = await result return cast("SyncAccountsResponse", _project_sync_accounts(result)) diff --git a/src/adcp/decisioning/platform_router.py b/src/adcp/decisioning/platform_router.py index 7b2396373..88c7ad88c 100644 --- a/src/adcp/decisioning/platform_router.py +++ b/src/adcp/decisioning/platform_router.py @@ -153,7 +153,9 @@ # These are framework-internal — the router does NOT delegate them # per-tenant (the AccountStore IS the tenant resolver, so threading # resolution through itself would loop). -_ACCOUNT_STORE_METHODS: frozenset[str] = frozenset({"resolve", "upsert", "list", "sync_governance"}) +_ACCOUNT_STORE_METHODS: frozenset[str] = frozenset( + {"resolve", "sync_accounts", "upsert", "list", "sync_governance"} +) def _protocol_method_names(proto: type) -> frozenset[str]: diff --git a/tests/test_decisioning_handler_shims.py b/tests/test_decisioning_handler_shims.py index d8af5fe0b..5faa357f9 100644 --- a/tests/test_decisioning_handler_shims.py +++ b/tests/test_decisioning_handler_shims.py @@ -1300,6 +1300,166 @@ class _Seller(DecisioningPlatform): assert result["dry_run"] is False +@pytest.mark.asyncio +async def test_sync_accounts_full_request_routes_to_sync_accounts_method(executor) -> None: + """When the store implements ``sync_accounts(params, ctx)``, the shim + passes the full :class:`SyncAccountsRequest` — including + ``push_notification_config`` — so the store can persist the webhook + registration. Issue #794: previously only ``params.accounts`` was + forwarded and the webhook URL was silently dropped.""" + received: list[dict] = [] + + class _StoreWithFullRequest: + resolution = "derived" + + def resolve(self, ref, auth_info=None): + from adcp.decisioning.types import Account + + return Account(id="acct_1", name="acct_1", status="active", metadata={}) + + def sync_accounts(self, params, ctx=None): + from adcp.decisioning.types import SyncAccountsResultRow + + received.append({"params": params, "ctx": ctx}) + return [ + SyncAccountsResultRow( + brand={"domain": "acme.com"}, + operator="acme.com", + action="created", + status="active", + account_id="acct_acme", + ) + ] + + class _Seller(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + + seller = _Seller() + seller.accounts = _StoreWithFullRequest() + + handler = PlatformHandler(seller, executor=executor, registry=InMemoryTaskRegistry()) + from adcp.types import SyncAccountsRequest + + req = SyncAccountsRequest.model_construct( + idempotency_key="abcdef0123456789", + accounts=[ + {"brand": {"domain": "acme.com"}, "operator": "acme.com", "billing": "advertiser"} + ], + push_notification_config={"url": "https://buyer.example.com/webhooks", "token": "tok_abc"}, + ) + result = await handler.sync_accounts(req, ToolContext()) + + assert len(received) == 1 + call = received[0] + # Full SyncAccountsRequest is passed — not just the accounts list. + assert call["params"] is req + # push_notification_config survives to the store. + assert call["params"].push_notification_config is not None + assert call["params"].push_notification_config["url"] == "https://buyer.example.com/webhooks" + # ResolveContext carries the tool name. + assert call["ctx"].tool_name == "sync_accounts" + # Wire envelope shape is still correct. + assert "accounts" in result + assert result["accounts"][0]["account_id"] == "acct_acme" + + +@pytest.mark.asyncio +async def test_sync_accounts_prefers_sync_accounts_over_upsert(executor) -> None: + """When the store implements BOTH ``sync_accounts`` and ``upsert``, + the shim calls ``sync_accounts`` and ignores ``upsert``.""" + sync_accounts_calls: list[dict] = [] + upsert_calls: list[dict] = [] + + class _StoreWithBoth: + resolution = "derived" + + def resolve(self, ref, auth_info=None): + from adcp.decisioning.types import Account + + return Account(id="acct_1", name="acct_1", status="active", metadata={}) + + def sync_accounts(self, params, ctx=None): + from adcp.decisioning.types import SyncAccountsResultRow + + sync_accounts_calls.append({"params": params}) + return [ + SyncAccountsResultRow( + brand={"domain": "acme.com"}, + operator="acme.com", + action="created", + status="active", + account_id="acct_acme", + ) + ] + + def upsert(self, refs, ctx=None): + from adcp.decisioning.types import SyncAccountsResultRow + + upsert_calls.append({"refs": refs}) + return [ + SyncAccountsResultRow( + brand={"domain": "acme.com"}, + operator="acme.com", + action="updated", + status="active", + account_id="acct_acme_upsert", + ) + ] + + class _Seller(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + + seller = _Seller() + seller.accounts = _StoreWithBoth() + + handler = PlatformHandler(seller, executor=executor, registry=InMemoryTaskRegistry()) + from adcp.types import SyncAccountsRequest + + req = SyncAccountsRequest.model_construct( + idempotency_key="abcdef0123456789", + accounts=[ + {"brand": {"domain": "acme.com"}, "operator": "acme.com", "billing": "advertiser"} + ], + ) + await handler.sync_accounts(req, ToolContext()) + + assert len(sync_accounts_calls) == 1 + assert len(upsert_calls) == 0 + + +def test_advertised_tools_for_instance_includes_sync_accounts_with_sync_accounts_only_store() -> None: + """A store implementing only ``sync_accounts`` (no ``upsert``) + must still advertise ``sync_accounts`` on the wire. Regression guard + for the capability-advertisement gate: the gate used to check only + for ``upsert``, so a ``sync_accounts``-only store would silently + drop the tool from ``tools/list``.""" + + class _SyncAccountsOnlyStore: + resolution = "derived" + + def resolve(self, ref, auth_info=None): + from adcp.decisioning.types import Account + + return Account(id="acct_1", name="acct_1", status="active", metadata={}) + + def sync_accounts(self, params, ctx=None): + return [] + + class _Seller(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + + seller = _Seller() + seller.accounts = _SyncAccountsOnlyStore() + + pool = ThreadPoolExecutor(max_workers=1) + try: + handler = PlatformHandler(seller, executor=pool, registry=InMemoryTaskRegistry()) + advertised = handler.advertised_tools_for_instance() + assert "sync_accounts" in advertised + finally: + pool.shutdown(wait=True) + + def test_advertised_tools_for_instance_drops_account_tools_without_store_methods() -> None: """Per-instance filter drops ``sync_accounts`` / ``list_accounts`` when the platform's :class:`AccountStore` doesn't expose the diff --git a/tests/test_platform_router.py b/tests/test_platform_router.py index 757404f03..f03679b25 100644 --- a/tests/test_platform_router.py +++ b/tests/test_platform_router.py @@ -493,6 +493,7 @@ def test_account_store_methods_denylist_matches_protocols() -> None: """ from adcp.decisioning.accounts import ( AccountStore, + AccountStoreSyncAccounts, AccountStoreList, AccountStoreSyncGovernance, AccountStoreUpsert, @@ -505,6 +506,7 @@ def test_account_store_methods_denylist_matches_protocols() -> None: _protocol_method_names(AccountStore) | _protocol_method_names(AccountStoreList) | _protocol_method_names(AccountStoreUpsert) + | _protocol_method_names(AccountStoreSyncAccounts) | _protocol_method_names(AccountStoreSyncGovernance) ) drift = expected ^ _ACCOUNT_STORE_METHODS