Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/adcp/webhook_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
build_async_ip_pinned_transport,
)
from adcp.signing.standard_webhooks import decode_secret as _decode_sw_secret
from adcp.types import GeneratedTaskStatus
from adcp.types import GeneratedTaskStatus, TaskType
from adcp.types.generated_poc.core.async_response_data import AdcpAsyncResponseData
from adcp.webhook_auth import (
AdcpLegacyHmacStrategy,
Expand All @@ -68,6 +68,7 @@
from adcp.webhooks import (
create_mcp_webhook_payload,
generate_webhook_idempotency_key,
to_wire_dict,
)

# The signer emits a signature valid for 300 seconds; anything beyond that
Expand Down Expand Up @@ -521,7 +522,7 @@ async def send_mcp(
url: str,
task_id: str,
status: GeneratedTaskStatus | str,
task_type: str | None = None,
task_type: TaskType | str,
result: AdcpAsyncResponseData | dict[str, Any] | None = None,
timestamp: datetime | None = None,
operation_id: str | None = None,
Expand Down Expand Up @@ -562,8 +563,8 @@ async def send_mcp(
)
return await self.send_raw(
url=url,
idempotency_key=str(payload["idempotency_key"]),
payload=payload,
idempotency_key=payload.idempotency_key,
payload=to_wire_dict(payload),
extra_headers=extra_headers,
)

Expand Down
19 changes: 13 additions & 6 deletions src/adcp/webhook_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
# isort rule.
UTC = timezone.utc

# Runtime import — used to coerce TaskType enum to its on-wire string at
# the audit-log boundary. Must not be inside TYPE_CHECKING.
from adcp.types import TaskType # noqa: E402

if TYPE_CHECKING:
from adcp.types import GeneratedTaskStatus
from adcp.webhook_sender import WebhookDeliveryResult, WebhookSender
Expand Down Expand Up @@ -210,7 +214,7 @@ async def send_mcp(
url: str,
task_id: str,
status: GeneratedTaskStatus | str,
task_type: str | None = None,
task_type: TaskType | str,
result: Any = None,
token: str | None = None,
sequence_key: str | None = None,
Expand Down Expand Up @@ -397,7 +401,7 @@ async def send_mcp(
url: str,
task_id: str,
status: GeneratedTaskStatus | str,
task_type: str | None = None,
task_type: TaskType | str,
result: Any = None,
token: str | None = None,
sequence_key: str | None = None,
Expand Down Expand Up @@ -443,6 +447,9 @@ async def send_mcp(
"""
breaker = self._breaker_for(breaker_key or url)
sequence_number: int | None = None # allocated AFTER breaker check
# Audit log + DeliveryAttempt store the on-wire string; TaskType
# enum is normalized once here so every record sees the same value.
task_type_str: str = task_type.value if isinstance(task_type, TaskType) else task_type

if not breaker.can_attempt():
await self._record(
Expand All @@ -459,7 +466,7 @@ async def send_mcp(
occurred_at=datetime.now(UTC),
will_retry=False,
next_retry_at=None,
task_type=task_type,
task_type=task_type_str,
task_id=task_id,
payload_size_bytes=None,
notification_type=notification_type,
Expand Down Expand Up @@ -521,7 +528,7 @@ async def send_mcp(
occurred_at=attempt_started,
will_retry=False,
next_retry_at=None,
task_type=task_type,
task_type=task_type_str,
task_id=task_id,
payload_size_bytes=len(last_result.sent_body),
notification_type=notification_type,
Expand Down Expand Up @@ -556,7 +563,7 @@ async def send_mcp(
occurred_at=attempt_started,
will_retry=will_retry,
next_retry_at=next_retry_at,
task_type=task_type,
task_type=task_type_str,
task_id=task_id,
payload_size_bytes=len(last_result.sent_body),
notification_type=notification_type,
Expand Down Expand Up @@ -588,7 +595,7 @@ async def send_mcp(
occurred_at=attempt_started,
will_retry=will_retry,
next_retry_at=next_retry_at,
task_type=task_type,
task_type=task_type_str,
task_id=task_id,
payload_size_bytes=None,
notification_type=notification_type,
Expand Down
15 changes: 11 additions & 4 deletions src/adcp/webhook_supervisor_pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
except ImportError:
PG_AVAILABLE = False

from adcp.types import TaskType
from adcp.webhook_supervisor import (
CircuitBreakerPolicy,
DeliveryAttempt,
Expand Down Expand Up @@ -412,7 +413,7 @@ async def send_mcp(
url: str,
task_id: str,
status: GeneratedTaskStatus | str,
task_type: str | None = None,
task_type: TaskType | str,
result: Any = None,
token: str | None = None,
sequence_key: str | None = None,
Expand Down Expand Up @@ -444,6 +445,12 @@ async def send_mcp(
"is running. Call asyncio.create_task(supervisor.run_worker()) at startup."
)

# The queue column is TEXT; psycopg would otherwise bind a TaskType
# enum via repr (`"TaskType.create_media_buy"`), corrupting the wire
# value. Normalize once here so every persistence + log site sees
# the on-wire string.
task_type_str: str = task_type.value if isinstance(task_type, TaskType) else task_type

bkey = breaker_key or url

# Check circuit state; reject immediately if OPEN within the timeout window.
Expand Down Expand Up @@ -473,7 +480,7 @@ async def send_mcp(
occurred_at=occurred_at,
will_retry=False,
next_retry_at=None,
task_type=task_type,
task_type=task_type_str,
task_id=task_id,
payload_size_bytes=None,
notification_type=notification_type,
Expand All @@ -483,7 +490,7 @@ async def send_mcp(
logger.warning(
"[adcp.webhook_supervisor_pg] circuit OPEN for %s — skipped %s",
bkey,
task_type or "webhook",
task_type_str,
)
return None
# Open timeout elapsed; transition to half_open so next worker probes.
Expand All @@ -500,7 +507,7 @@ async def send_mcp(
bkey,
url,
task_id,
task_type,
task_type_str,
status_str,
result_json,
token,
Expand Down
141 changes: 75 additions & 66 deletions src/adcp/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
WebhookVerifyOptions,
verify_webhook_signature,
)
from adcp.types import GeneratedTaskStatus
from adcp.types import GeneratedTaskStatus, McpWebhookPayload, TaskType
from adcp.types.base import AdCPBaseModel
from adcp.webhook_receiver import (
LegacyHmacFallback,
Expand Down Expand Up @@ -86,72 +86,87 @@ def generate_webhook_idempotency_key() -> str:
def create_mcp_webhook_payload(
task_id: str,
status: GeneratedTaskStatus | str,
task_type: TaskType | str,
*,
result: PydanticBaseModel | dict[str, Any] | None = None,
timestamp: datetime | None = None,
task_type: str | None = None,
operation_id: str | None = None,
message: str | None = None,
context_id: str | None = None,
domain: str | None = None,
idempotency_key: str | None = None,
token: str | None = None,
) -> dict[str, Any]:
) -> McpWebhookPayload:
"""
Create MCP webhook payload dictionary.
Build an :class:`McpWebhookPayload` for a tracked async task.

This function helps agent implementations construct properly formatted
webhook payloads for sending to clients.
Pair with :func:`to_wire_dict` for HTTP transport — Pydantic-typed at
construction so the publisher catches schema drift before it leaves
the process.

``task_type`` is restricted to the closed :class:`TaskType` enum (the
spec's complete set of async/tracked operations). Synchronous-only
operations (e.g. ``get_products``, ``list_creatives``) are not in the
enum because they don't go through the task management system —
passing them would have produced a webhook payload the receiver would
reject as schema-invalid.

Args:
task_id: Unique identifier for the task
status: Current task status
task_type: Optionally type of AdCP operation (e.g., "get_products", "create_media_buy")
timestamp: When the webhook was generated (defaults to current UTC time)
result: Task-specific payload — any Pydantic model or plain dict
operation_id: Client-generated identifier the buyer embedded in the
webhook URL when registering push-notification config. Publishers
MUST echo this back in the payload so buyers correlate notifications
without parsing URL paths (per ``mcp-webhook-payload.json``).
Senders extracting the value from the URL path on emission populate
this field; callers constructing payloads directly pass it through.
message: Human-readable summary of task state
context_id: Session/conversation identifier
domain: AdCP domain this task belongs to
idempotency_key: Sender-generated key stable across retries of the same
event. Defaults to a freshly-generated UUID v4 — callers retrying
delivery of the same event MUST pass the key from their first
attempt; passing None twice mints two keys and defeats dedup.
task_id: Unique identifier for the task.
status: Current task status.
task_type: Type of AdCP async operation (see :class:`TaskType`).
result: Task-specific payload — any Pydantic model or plain dict.
Plain dicts are validated against
:class:`AdcpAsyncResponseData`'s discriminated union.
timestamp: When the webhook was generated. Defaults to current UTC.
operation_id: Client-generated identifier the buyer embedded in
the webhook URL when registering push-notification config.
Publishers MUST echo this back so buyers correlate
notifications without parsing URL paths.
message: Human-readable summary of task state.
context_id: Session/conversation identifier.
domain: AdCP domain this task belongs to.
idempotency_key: Sender-generated key stable across retries of the
same event. Defaults to a freshly-generated UUID v4 — callers
retrying delivery of the same event MUST pass the key from
their first attempt; passing None twice mints two keys and
defeats dedup.
token: Buyer-supplied token from ``push_notification_config.token``,
echoed back per spec for authenticity validation.

Returns:
Dictionary matching McpWebhookPayload schema, ready to be sent as JSON
:class:`McpWebhookPayload` instance. Use :func:`to_wire_dict` (or
``payload.model_dump(mode="json", exclude_none=True)``) to get the
JSON-ready dict for HTTP transport.

Examples:
Create a completed webhook with results:
>>> from adcp.webhooks import create_mcp_webhook_payload
>>> from adcp.webhooks import create_mcp_webhook_payload, to_wire_dict
>>> from adcp.types import GeneratedTaskStatus
>>>
>>> payload = create_mcp_webhook_payload(
... task_id="task_123",
... task_type="get_products",
... status=GeneratedTaskStatus.completed,
... result={"products": [...]},
... message="Found 5 products"
... task_type="create_media_buy",
... result={"media_buy_id": "mb_1", "buyer_ref": "ref_1"},
... message="Created campaign"
... )
>>> wire = to_wire_dict(payload)

Create a failed webhook with error:
>>> payload = create_mcp_webhook_payload(
... task_id="task_456",
... task_type="create_media_buy",
... status=GeneratedTaskStatus.failed,
... task_type="create_media_buy",
... result={"errors": [{"code": "INVALID_INPUT", "message": "..."}]},
... message="Validation failed"
... )

Create a working status update:
>>> payload = create_mcp_webhook_payload(
... task_id="task_789",
... task_type="sync_creatives",
... status=GeneratedTaskStatus.working,
... task_type="sync_creatives",
... message="Processing 3 of 10 creatives"
... )
"""
Expand All @@ -160,48 +175,42 @@ def create_mcp_webhook_payload(
if idempotency_key is None:
idempotency_key = generate_webhook_idempotency_key()

# Convert status enum to string value
status_value = status.value if hasattr(status, "value") else str(status)

# Build payload matching McpWebhookPayload schema
payload: dict[str, Any] = {
"idempotency_key": idempotency_key,
"task_id": task_id,
"task_type": task_type,
"status": status_value,
"timestamp": timestamp.isoformat() if isinstance(timestamp, datetime) else timestamp,
}

# Add optional fields only if provided
if result is not None:
# Convert Pydantic model to dict if needed for JSON serialization
if hasattr(result, "model_dump"):
payload["result"] = result.model_dump(mode="json")
else:
payload["result"] = result

if operation_id is not None:
payload["operation_id"] = operation_id

if message is not None:
payload["message"] = message

if context_id is not None:
payload["context_id"] = context_id
# Foreign BaseModel subclasses (anything outside AdcpAsyncResponseData)
# don't match the discriminated-union variants by identity — dump to a
# dict so the union picks by shape, matching the dict path.
result_value: PydanticBaseModel | dict[str, Any] | None
if isinstance(result, PydanticBaseModel):
result_value = result.model_dump(mode="json")
else:
result_value = result

# `domain` and `token` aren't in the schema but are accepted via
# `extra='allow'`; they round-trip through `model_dump`.
extras: dict[str, Any] = {}
if domain is not None:
payload["domain"] = domain

extras["domain"] = domain
if token is not None:
# Buyer-supplied token from push_notification_config.token,
# echoed back per push-notification-config.json spec text:
# "Echoed back in webhook payload to validate request authenticity."
# Cross-language wire-parity with the JS implementation
# (``buildTaskWebhookPayload`` in ``from-platform.ts``) — buyers
# validating against the spec read body.token, not headers.
payload["token"] = token
extras["token"] = token

return payload
return McpWebhookPayload.model_validate(
{
"idempotency_key": idempotency_key,
"task_id": task_id,
"task_type": task_type,
"status": status_value,
"timestamp": timestamp,
"operation_id": operation_id,
"message": message,
"context_id": context_id,
"result": result_value,
**extras,
}
)


def get_adcp_signed_headers_for_webhook(
Expand Down Expand Up @@ -245,9 +254,9 @@ def get_adcp_signed_headers_for_webhook(
>>>
>>> payload = create_mcp_webhook_payload(
... task_id="task_123",
... task_type="get_products",
... status="completed",
... result={"products": [...]}
... task_type="create_media_buy",
... result={"media_buy_id": "mb_1"},
... )
>>> headers = {"Content-Type": "application/json"}
>>> signed_headers = get_adcp_signed_headers_for_webhook(
Expand Down
Loading
Loading