Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/adcp/decisioning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def create_media_buy(
from adcp.decisioning.accounts import (
AccountStore,
AccountStoreList,
AccountStoreSyncAccounts,
AccountStoreSyncGovernance,
AccountStoreUpsert,
ExplicitAccounts,
Expand Down Expand Up @@ -312,6 +313,7 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"AccountNotFoundError",
"AccountStore",
"AccountStoreList",
"AccountStoreSyncAccounts",
"AccountStoreSyncGovernance",
"AccountStoreUpsert",
"AdcpError",
Expand Down
83 changes: 76 additions & 7 deletions src/adcp/decisioning/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

if TYPE_CHECKING:
from adcp.decisioning.registry import BuyerAgent
from adcp.types import AccountReference
from adcp.types import AccountReference, SyncAccountsRequest

#: Per-platform metadata generic.
TMeta = TypeVar("TMeta", default=dict[str, Any])
Expand Down Expand Up @@ -260,8 +260,12 @@ def resolve(

@runtime_checkable
class AccountStoreUpsert(Protocol):
"""``sync_accounts`` API surface. Optional adopter-side feature
that complements :class:`AccountStore.resolve`.
"""Legacy ``sync_accounts`` dispatch surface (accounts list only).

Kept for backward compatibility. New implementations should use
:class:`AccountStoreSyncAccounts` instead — it receives the full
:class:`SyncAccountsRequest` and gives the store access to
``push_notification_config``, ``delete_missing``, and ``dry_run``.

Not parameterized over ``TMeta`` — :meth:`upsert` returns
:class:`SyncAccountsResultRow` (a wire-shaped row, no per-platform
Expand All @@ -273,6 +277,10 @@ class AccountStoreUpsert(Protocol):
inheritance) and the framework's dispatch shim picks it up via
:func:`hasattr`.

The dispatch shim checks for :class:`AccountStoreSyncAccounts` first
(``sync_accounts`` method); falls back to this Protocol's
:meth:`upsert` when the newer method is absent.

**Backwards-compatible.** ``ctx`` is optional on the platform
side, so adopter impls written before ctx threading landed (no
``ctx`` parameter) keep working — the framework's
Expand All @@ -285,10 +293,71 @@ def upsert(
refs: list[AccountReference],
ctx: ResolveContext | None = None,
) -> Awaitable[list[SyncAccountsResultRow]] | list[SyncAccountsResultRow]:
"""``sync_accounts`` API surface. Framework normalizes the
wire request; platform upserts and returns per-account result
rows. Raise :class:`adcp.decisioning.AdcpError` for
buyer-facing rejection.
"""``sync_accounts`` dispatch via accounts list only.

Kept for backward compatibility. New implementations should
use :class:`AccountStoreSyncAccounts` (``sync_accounts`` method)
to receive the full request including ``push_notification_config``,
``delete_missing``, and ``dry_run``.

``ctx.auth_info`` carries the caller's authenticated
principal; ``ctx.agent`` carries the resolved
:class:`BuyerAgent` record (when a registry is configured).
Adopters implementing principal-keyed gates (e.g.,
per-buyer-agent ``BILLING_NOT_PERMITTED_FOR_AGENT`` on the
spec's billing surfaces) read the principal here — same
threading as :meth:`AccountStore.resolve`.

**Prefer ``ctx.agent`` over ``ctx.auth_info`` for
commercial-relationship decisions.** ``ctx.agent`` is the
registry-resolved durable identity (status, billing
capabilities, default account terms); ``ctx.auth_info``
carries the raw transport-level credential. For billing gates
the registry-resolved identity is canonical.
"""
...


@runtime_checkable
class AccountStoreSyncAccounts(Protocol):
"""Preferred ``sync_accounts`` dispatch surface (full request).

New implementations should use this Protocol in preference to
:class:`AccountStoreUpsert`. :meth:`sync_accounts` receives the
full :class:`SyncAccountsRequest`, giving the store access to
``push_notification_config`` (the buyer's webhook URL for async
account-status change notifications), ``delete_missing``,
``dry_run``, and all other request-envelope fields.

Adopters implement this on the same object as :class:`AccountStore`
(Protocols are structural — Python doesn't require explicit
inheritance) and the framework's dispatch shim picks it up via
:func:`hasattr`.

The dispatch shim checks for this Protocol's :meth:`sync_accounts`
method first; falls back to :class:`AccountStoreUpsert`'s
:meth:`upsert` when this method is absent — so existing
``upsert`` impls continue to work without any change.

**Backwards-compatible.** ``ctx`` is optional — the framework's
:func:`_call_with_optional_ctx` shim probes via
:func:`inspect.signature` and drops ``ctx`` for impls that
don't declare it.
"""

def sync_accounts(
self,
params: SyncAccountsRequest,
ctx: ResolveContext | None = None,
) -> Awaitable[list[SyncAccountsResultRow]] | list[SyncAccountsResultRow]:
"""Preferred ``sync_accounts`` surface. Receives the full wire
request so the store can persist ``push_notification_config``
(the buyer's webhook URL for async account-status change
notifications), honour ``delete_missing`` and ``dry_run``, and
access any other request-envelope field.

Return per-account result rows. Raise
:class:`adcp.decisioning.AdcpError` for buyer-facing rejection.

``ctx.auth_info`` carries the caller's authenticated
principal; ``ctx.agent`` carries the resolved
Expand Down
52 changes: 37 additions & 15 deletions src/adcp/decisioning/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ def _project_sync_audiences(result: Any) -> Any:


def _project_sync_accounts(result: Any) -> Any:
"""Project the adopter's ``upsert`` return into the
"""Project the adopter's ``sync_accounts`` or ``upsert`` return into the
``sync_accounts`` wire envelope.

:class:`AccountStoreUpsert` returns ``list[SyncAccountsResultRow]``
Expand Down Expand Up @@ -820,7 +820,7 @@ def _project_sync_accounts(result: Any) -> Any:
# return is an adopter-contract violation that would otherwise
# fail silently.
logger.warning(
"AccountStore.upsert returned an unexpected shape %s; "
"AccountStore sync_accounts/upsert returned an unexpected shape %s; "
"expected list[SyncAccountsResultRow] or a dict envelope. "
"Passing through unchanged — credential scrubber may not "
"reach nested fields.",
Expand Down Expand Up @@ -1052,10 +1052,14 @@ def advertised_tools_for_instance(self) -> frozenset[str]:
# without the polyfill.
accounts = getattr(self._platform, "accounts", None)
if "sync_accounts" in serving and (
accounts is None or not callable(getattr(accounts, "upsert", None))
accounts is None
or (
not callable(getattr(accounts, "sync_accounts", None))
and not callable(getattr(accounts, "upsert", None))
)
):
serving.discard("sync_accounts")
self._log_account_tool_dropped("sync_accounts", "upsert")
self._log_account_tool_dropped("sync_accounts", "sync_accounts")
if "list_accounts" in serving and (
accounts is None or not callable(getattr(accounts, "list", None))
):
Expand Down Expand Up @@ -1083,14 +1087,20 @@ def _log_account_tool_dropped(self, tool_name: str, method_name: str) -> None:
if tool_name in seen:
return
seen.add(tool_name)
if method_name in ("sync_accounts", "upsert"):
protocol_class = "Upsert"
methods_hint = "'sync_accounts' (preferred) or 'upsert'"
else:
protocol_class = "List"
methods_hint = repr(method_name)
logger.info(
"PlatformHandler dropped %r from advertised_tools — "
"platform.accounts does not implement %r. "
"platform.accounts does not implement %s. "
"Implement the optional AccountStore%s Protocol method "
"(see adcp.decisioning.accounts) to surface this tool on the wire.",
tool_name,
method_name,
"Upsert" if method_name == "upsert" else "List",
methods_hint,
protocol_class,
)

def get_advertised_tools(self, *, advertise_all: bool | None = None) -> frozenset[str]:
Expand Down Expand Up @@ -2071,16 +2081,22 @@ async def sync_accounts( # type: ignore[override]
params: SyncAccountsRequest,
context: ToolContext | None = None,
) -> SyncAccountsResponse | NotImplementedResponse:
"""Route ``sync_accounts`` through :meth:`AccountStore.upsert`.
"""Route ``sync_accounts`` through :meth:`AccountStoreUpsert.sync_accounts`
(preferred) or :meth:`AccountStoreUpsert.upsert` (legacy fallback).

``sync_accounts`` lives on the AccountStore Protocol surface,
not on per-specialism platform methods —
:data:`adcp.decisioning.platform_router._ACCOUNT_STORE_METHODS`
already excludes it from per-tenant delegation. Surface
``OPERATION_NOT_SUPPORTED`` (via :meth:`_not_supported`) when
the store doesn't expose the optional :class:`AccountStoreUpsert`
Protocol — distinct from ``AttributeError`` (which is what an
unguarded ``getattr().()`` would produce).
the store exposes neither method.

Dispatch priority: ``sync_accounts(params, ctx)`` is checked
first; it receives the full :class:`SyncAccountsRequest` so the
store can persist ``push_notification_config``, honor
``delete_missing``, ``dry_run``, etc. Falls back to
``upsert(refs, ctx)`` (accounts list only) for stores that
don't implement the newer method.

``ResolveContext`` carries the caller's :class:`AuthInfo` and
resolved :class:`BuyerAgent` so adopter impls implementing
Expand All @@ -2089,8 +2105,9 @@ async def sync_accounts( # type: ignore[override]
canonical context — same threading
:meth:`AccountStore.resolve` already uses.
"""
upsert = getattr(self._platform.accounts, "upsert", None)
if not callable(upsert):
sync_accounts_fn = getattr(self._platform.accounts, "sync_accounts", None)
upsert_fn = getattr(self._platform.accounts, "upsert", None)
if not callable(sync_accounts_fn) and not callable(upsert_fn):
return self._not_supported("sync_accounts")
tool_ctx = context or ToolContext()
# Prime auth-context only — DON'T call ``_resolve_account``.
Expand All @@ -2101,8 +2118,13 @@ async def sync_accounts( # type: ignore[override]
# account exists only after this call succeeds).
await self._prime_auth_context(tool_ctx)
resolve_ctx = self._make_resolve_context(tool_ctx, "sync_accounts")
refs = list(getattr(params, "accounts", []) or [])
result = _call_with_optional_ctx(upsert, refs, ctx=resolve_ctx)
if callable(sync_accounts_fn):
result = _call_with_optional_ctx(sync_accounts_fn, params, ctx=resolve_ctx)
elif callable(upsert_fn):
refs = list(getattr(params, "accounts", []) or [])
result = _call_with_optional_ctx(upsert_fn, refs, ctx=resolve_ctx)
else:
return self._not_supported("sync_accounts")
if inspect.isawaitable(result):
result = await result
return cast("SyncAccountsResponse", _project_sync_accounts(result))
Expand Down
4 changes: 3 additions & 1 deletion src/adcp/decisioning/platform_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@
# These are framework-internal — the router does NOT delegate them
# per-tenant (the AccountStore IS the tenant resolver, so threading
# resolution through itself would loop).
_ACCOUNT_STORE_METHODS: frozenset[str] = frozenset({"resolve", "upsert", "list", "sync_governance"})
_ACCOUNT_STORE_METHODS: frozenset[str] = frozenset(
{"resolve", "sync_accounts", "upsert", "list", "sync_governance"}
)


def _protocol_method_names(proto: type) -> frozenset[str]:
Expand Down
Loading
Loading