From b1e5629f41018c6ffcf09c2bcf585b26bf35828a Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sat, 23 May 2026 21:16:47 -0400 Subject: [PATCH 1/3] feat: add webhook proof-of-control helper --- src/adcp/__init__.py | 12 + src/adcp/webhook_auth.py | 5 +- src/adcp/webhook_sender.py | 36 +++ src/adcp/webhooks.py | 412 ++++++++++++++++++++++++ tests/fixtures/public_api_snapshot.json | 6 + tests/test_webhook_challenge.py | 324 +++++++++++++++++++ 6 files changed, 793 insertions(+), 2 deletions(-) create mode 100644 tests/test_webhook_challenge.py diff --git a/src/adcp/__init__.py b/src/adcp/__init__.py index dd317755c..4218e5c36 100644 --- a/src/adcp/__init__.py +++ b/src/adcp/__init__.py @@ -487,18 +487,24 @@ from adcp.webhooks import ( LegacyHmacFallback, MemoryBackend, + WebhookChallengeError, + WebhookChallengeResult, WebhookDedupStore, WebhookReceiver, WebhookReceiverConfig, WebhookVerifyOptions, + challenge_webhook_destination, create_a2a_webhook_payload, create_mcp_webhook_payload, + create_webhook_challenge_payload, extract_webhook_result_data, + generate_webhook_challenge_value, generate_webhook_idempotency_key, get_adcp_signed_headers_for_webhook, sign_legacy_webhook, sign_webhook, to_wire_dict, + validate_webhook_challenge_response, ) try: @@ -670,12 +676,18 @@ def get_adcp_version() -> str: # Webhook utilities "create_mcp_webhook_payload", "create_a2a_webhook_payload", + "create_webhook_challenge_payload", + "challenge_webhook_destination", + "validate_webhook_challenge_response", "get_adcp_signed_headers_for_webhook", "extract_webhook_result_data", + "generate_webhook_challenge_value", "generate_webhook_idempotency_key", "sign_legacy_webhook", "sign_webhook", "to_wire_dict", + "WebhookChallengeError", + "WebhookChallengeResult", "WebhookReceiver", "WebhookReceiverConfig", "WebhookVerifyOptions", diff --git a/src/adcp/webhook_auth.py b/src/adcp/webhook_auth.py index a904ec462..3e4e44319 100644 --- a/src/adcp/webhook_auth.py +++ b/src/adcp/webhook_auth.py @@ -49,7 +49,7 @@ # the strategy emits cannot be overridden by an extra_headers payload, # nor can Content-Type (changing it would break body framing on the # receiver). -_BASE_RESERVED: frozenset[str] = frozenset({"content-type"}) +_BASE_RESERVED: frozenset[str] = frozenset({"content-length", "content-type", "host"}) class WebhookAuthStrategy(Protocol): @@ -203,7 +203,8 @@ def merge_extra_headers( if not extra: return merged for key in extra: - if str(key).lower() in reserved: + normalized = str(key).lower() + if normalized in reserved or normalized.startswith(":"): raise ValueError( f"extra_headers may not override auth-binding or content-type " f"header {key!r}" ) diff --git a/src/adcp/webhook_sender.py b/src/adcp/webhook_sender.py index ecd384cf5..74e4c94bb 100644 --- a/src/adcp/webhook_sender.py +++ b/src/adcp/webhook_sender.py @@ -74,6 +74,7 @@ ) from adcp.webhooks import ( create_mcp_webhook_payload, + create_webhook_challenge_payload, generate_webhook_idempotency_key, to_wire_dict, ) @@ -836,6 +837,41 @@ async def send_wholesale_feed_to_subscription( extra_headers=extra_headers, ) + async def send_webhook_challenge( + self, + *, + url: str, + account_id: str, + subscriber_id: str, + challenge: str | None = None, + extra_headers: Mapping[str, str] | None = None, + ) -> WebhookDeliveryResult: + """POST a signed durable-subscription proof-of-control challenge. + + The body matches the durable ``notification_configs[]`` challenge + shape and intentionally does not inject ``idempotency_key``: + + ``{"type":"webhook.challenge","challenge":"...", ...}`` + + Pair this low-level sender method with + :func:`adcp.webhooks.challenge_webhook_destination` when you also + want URL validation and response echo checking in one call. + """ + + payload = create_webhook_challenge_payload( + account_id=account_id, + subscriber_id=subscriber_id, + challenge=challenge, + ) + challenge_value = str(payload["challenge"]) + body = json.dumps(payload, separators=(",", ":")).encode("utf-8") + return await self._send_bytes( + url=url, + body=body, + idempotency_key=challenge_value, + extra_headers=extra_headers, + ) + async def send_raw( self, *, diff --git a/src/adcp/webhooks.py b/src/adcp/webhooks.py index 36a52d788..0135ed861 100644 --- a/src/adcp/webhooks.py +++ b/src/adcp/webhooks.py @@ -24,6 +24,7 @@ import hashlib import hmac import json +import secrets import time import uuid import warnings @@ -901,6 +902,132 @@ def to_error(self) -> dict[str, str]: return error +@dataclass(frozen=True) +class WebhookChallengeResult: + """Successful durable webhook proof-of-control challenge.""" + + challenge: str + echoed_field: str + destination: WebhookDestinationValidation + status_code: int + response_headers: Mapping[str, str] + response_body: bytes + + @property + def ok(self) -> bool: + return 200 <= self.status_code < 300 + + +class WebhookChallengeError(ValueError): + """Typed proof-of-control failure suitable for ``sync_accounts`` errors.""" + + code = "INVALID_REQUEST" + + def __init__( + self, + message: str, + *, + reason: str, + field: str | None = None, + url: str | None = None, + status_code: int | None = None, + suggestion: str | None = None, + ) -> None: + super().__init__(message) + self.reason = reason + self.field = field + self.url = url + self.status_code = status_code + self.suggestion = suggestion + + def to_error(self) -> dict[str, str]: + """Return a small ``errors[]``-compatible dict for seller handlers.""" + + error = {"code": self.code, "message": str(self)} + if self.field is not None: + error["field"] = self.field + if self.suggestion is not None: + error["suggestion"] = self.suggestion + return error + + +def generate_webhook_challenge_value() -> str: + """Generate an opaque random value for a proof-of-control challenge.""" + + return f"wch_{secrets.token_urlsafe(32)}" + + +def create_webhook_challenge_payload( + *, + account_id: str, + subscriber_id: str, + challenge: str | None = None, +) -> dict[str, str]: + """Build the durable ``notification_configs[]`` challenge payload.""" + + if not isinstance(account_id, str) or not account_id: + raise ValueError("account_id must be a non-empty string") + if not isinstance(subscriber_id, str) or not subscriber_id: + raise ValueError("subscriber_id must be a non-empty string") + challenge_value = challenge or generate_webhook_challenge_value() + if not isinstance(challenge_value, str) or not challenge_value: + raise ValueError("challenge must be a non-empty string") + return { + "type": "webhook.challenge", + "challenge": challenge_value, + "account_id": account_id, + "subscriber_id": subscriber_id, + } + + +def validate_webhook_challenge_response( + response: bytes | Mapping[str, Any], + *, + challenge: str, + field: str | None = None, + url: str | None = None, +) -> str: + """Validate that a receiver echoed the challenge value. + + Receivers may respond with either ``{"challenge": ""}`` or + ``{"token": ""}``. The return value is the field that matched. + """ + + try: + if isinstance(response, bytes): + decoded = json.loads(response.decode("utf-8")) + else: + decoded = dict(response) + except (UnicodeDecodeError, json.JSONDecodeError, TypeError, ValueError) as exc: + raise WebhookChallengeError( + "webhook challenge response must be a JSON object", + reason="invalid_json", + field=field, + url=url, + ) from exc + + if not isinstance(decoded, Mapping): + raise WebhookChallengeError( + "webhook challenge response must be a JSON object", + reason="invalid_json", + field=field, + url=url, + ) + + for key in ("challenge", "token"): + value = decoded.get(key) + if value == challenge: + return key + + if "challenge" in decoded or "token" in decoded: + reason = "challenge_mismatch" + message = "webhook challenge response did not echo the expected value" + else: + reason = "missing_echo" + message = "webhook challenge response must include 'challenge' or 'token'" + raise WebhookChallengeError(message, reason=reason, field=field, url=url) + + def _raise_webhook_destination_error( message: str, *, @@ -1066,6 +1193,285 @@ def validate_webhook_destination_url( ) +def _authentication_to_config(authentication: AdCPBaseModel | Mapping[str, Any]) -> dict[str, Any]: + if hasattr(authentication, "model_dump"): + return cast(AdCPBaseModel, authentication).model_dump(mode="json", exclude_none=True) + return dict(authentication) + + +async def _send_legacy_webhook_challenge( + *, + url: str, + authentication: Mapping[str, Any], + payload: dict[str, str], + client: httpx.AsyncClient | None, + timeout_seconds: float | None, + policy: WebhookDestinationPolicy, + extra_headers: Mapping[str, str] | None, +) -> httpx.Response: + schemes_raw = authentication.get("schemes") + if schemes_raw is not None and not isinstance(schemes_raw, (list, tuple)): + raise ValueError( + "authentication.schemes must be a list, got " f"{type(schemes_raw).__name__}" + ) + schemes = list(schemes_raw or []) + if len(schemes) != 1: + raise ValueError("authentication.schemes must contain exactly one scheme") + auth_scheme = str(getattr(schemes[0], "value", schemes[0])) + credentials = authentication.get("credentials") + + if auth_scheme not in ("Bearer", "HMAC-SHA256"): + raise ValueError( + f"unknown authentication scheme {auth_scheme!r}; " + "supported legacy schemes are 'Bearer' and 'HMAC-SHA256'." + ) + if not isinstance(credentials, str) or not credentials: + raise ValueError(f"authentication.schemes={[auth_scheme]!r} requires credentials") + _validate_header_value("authentication.credentials", credentials) + _warn_auth_deprecation_once() + + transport: Any = None + if client is None: + from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport + + transport = build_async_ip_pinned_transport( + url, + allow_private=policy.allow_private_destinations, + allowed_ports=policy.allowed_destination_ports, + ) + + body_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8") + if len(body_bytes) > _MAX_BODY_BYTES: + raise ValueError( + f"serialized webhook challenge body is {len(body_bytes):,} bytes, " + f"over the {_MAX_BODY_BYTES:,}-byte cap." + ) + + headers: dict[str, str] = {"Content-Type": "application/json"} + if auth_scheme == "Bearer": + headers["Authorization"] = f"Bearer {credentials}" + else: + get_adcp_signed_headers_for_webhook( + headers, + secret=credentials, + timestamp=str(int(time.time())), + payload=payload, + ) + + if extra_headers: + if len(extra_headers) > _MAX_EXTRA_HEADERS: + raise ValueError( + f"extra_headers has {len(extra_headers)} entries; " + f"helper caps at {_MAX_EXTRA_HEADERS}." + ) + for key in extra_headers: + normalized = str(key).lower() + if normalized in _RESERVED_HEADERS or normalized.startswith(":"): + raise ValueError(_reserved_header_message(normalized, key)) + for key, value in extra_headers.items(): + _validate_header_value(f"extra_headers[{key!r}]", value) + headers[key] = value + + effective_timeout = timeout_seconds if timeout_seconds is not None else _DEFAULT_TIMEOUT_SECONDS + if client is None: + async with httpx.AsyncClient( + transport=transport, + timeout=effective_timeout, + follow_redirects=False, + trust_env=False, + ) as http_client: + return await http_client.post(url, content=body_bytes, headers=headers) + return await client.post(url, content=body_bytes, headers=headers) + + +async def challenge_webhook_destination( + *, + url: str, + account_id: str, + subscriber_id: str, + sender: WebhookSender | None = None, + authentication: AdCPBaseModel | Mapping[str, Any] | None = None, + challenge: str | None = None, + client: httpx.AsyncClient | None = None, + timeout_seconds: float | None = _DEFAULT_TIMEOUT_SECONDS, + policy: WebhookDestinationPolicy | None = None, + field: str | None = None, + extra_headers: Mapping[str, str] | None = None, +) -> WebhookChallengeResult: + """Validate and prove control of a durable webhook destination. + + Use before activating a new or changed active + ``sync_accounts.accounts[].notification_configs[]`` entry. Inactive + configs can be persisted without calling this helper. + + ``authentication`` follows the durable config's legacy auth selector: + when present, the challenge is sent with Bearer or HMAC-SHA256. When + omitted, pass a :class:`WebhookSender` so the challenge uses the + default RFC 9421 webhook profile. + """ + + if sender is not None and authentication is not None: + raise WebhookChallengeError( + "pass either sender= for RFC 9421 or authentication= for legacy auth, not both", + reason="ambiguous_auth_mode", + field=field, + url=url, + ) + if sender is not None and client is not None: + raise WebhookChallengeError( + "client= cannot be combined with sender=; configure the client on WebhookSender", + reason="ambiguous_client", + field=field, + url=url, + ) + if client is not None: + raise WebhookChallengeError( + "proof-of-control must use the SDK-managed pinned transport", + reason="unsafe_client", + field=field, + url=url, + ) + sender_owns_client = bool(getattr(cast(Any, sender), "_owns_client", False)) + sender_transport_hooks = tuple(getattr(cast(Any, sender), "_transport_hooks", ())) + if sender is not None and not sender_owns_client: + raise WebhookChallengeError( + "proof-of-control requires a WebhookSender constructed without client=", + reason="unsafe_sender_client", + field=field, + url=url, + ) + if sender is not None and sender_transport_hooks: + raise WebhookChallengeError( + "proof-of-control does not support sender transport_hooks", + reason="unsupported_sender_hooks", + field=field, + url=url, + ) + if sender is None and authentication is None: + raise WebhookChallengeError( + "webhook challenge requires sender= when authentication is omitted", + reason="sender_required", + field=field, + url=url, + suggestion=( + "Pass the seller's WebhookSender, or pass config.authentication " "for legacy auth." + ), + ) + if sender is not None and timeout_seconds not in (None, _DEFAULT_TIMEOUT_SECONDS): + raise WebhookChallengeError( + "timeout_seconds cannot be set when sender= is provided; configure WebhookSender", + reason="ambiguous_timeout", + field=field, + url=url, + ) + if client is not None and timeout_seconds not in (None, _DEFAULT_TIMEOUT_SECONDS): + raise WebhookChallengeError( + "timeout_seconds cannot be set when client= is provided; " + "configure the AsyncClient timeout", + reason="ambiguous_timeout", + field=field, + url=url, + ) + + try: + destination = validate_webhook_destination_url(url, policy=policy, field=field) + payload = create_webhook_challenge_payload( + account_id=account_id, + subscriber_id=subscriber_id, + challenge=challenge, + ) + except WebhookDestinationValidationError as exc: + raise WebhookChallengeError( + str(exc), + reason=exc.reason, + field=exc.field, + url=exc.url, + suggestion=exc.suggestion, + ) from exc + except ValueError as exc: + raise WebhookChallengeError( + f"webhook challenge configuration is invalid: {exc}", + reason="invalid_configuration", + field=field, + url=url, + ) from exc + challenge_value = payload["challenge"] + + try: + if sender is not None: + delivery = await sender.send_webhook_challenge( + url=destination.effective_url, + account_id=account_id, + subscriber_id=subscriber_id, + challenge=challenge_value, + extra_headers=extra_headers, + ) + status_code = delivery.status_code + response_headers = dict(delivery.response_headers) + response_body = delivery.response_body + else: + if authentication is None: + raise RuntimeError("authentication unexpectedly missing") + auth_config = _authentication_to_config(authentication) + response = await _send_legacy_webhook_challenge( + url=destination.effective_url, + authentication=auth_config, + payload=payload, + client=client, + extra_headers=extra_headers, + timeout_seconds=timeout_seconds, + policy=destination.policy, + ) + status_code = response.status_code + response_headers = dict(response.headers) + response_body = response.content + except httpx.TimeoutException as exc: + raise WebhookChallengeError( + "webhook challenge timed out", + reason="timeout", + field=field, + url=destination.original_url, + ) from exc + except httpx.HTTPError as exc: + raise WebhookChallengeError( + f"webhook challenge request failed: {exc}", + reason="request_failed", + field=field, + url=destination.original_url, + ) from exc + except ValueError as exc: + raise WebhookChallengeError( + f"webhook challenge configuration is invalid: {exc}", + reason="invalid_configuration", + field=field, + url=destination.original_url, + ) from exc + + if not 200 <= status_code < 300: + raise WebhookChallengeError( + f"webhook challenge failed with HTTP {status_code}", + reason="http_status", + field=field, + url=destination.original_url, + status_code=status_code, + ) + + echoed_field = validate_webhook_challenge_response( + response_body, + challenge=challenge_value, + field=field, + url=destination.original_url, + ) + return WebhookChallengeResult( + challenge=challenge_value, + echoed_field=echoed_field, + destination=destination, + status_code=status_code, + response_headers=response_headers, + response_body=response_body, + ) + + def _warn_auth_deprecation_once() -> None: global _AUTH_DEPRECATION_WARNED if _AUTH_DEPRECATION_WARNED: @@ -1569,6 +1975,9 @@ def _validate_header_value(name: str, value: Any) -> None: "create_a2a_webhook_payload", "create_mcp_webhook_payload", "generate_webhook_idempotency_key", + "generate_webhook_challenge_value", + "create_webhook_challenge_payload", + "validate_webhook_challenge_response", "get_adcp_signed_headers_for_webhook", "sign_legacy_webhook", "to_wire_dict", @@ -1581,6 +1990,9 @@ def _validate_header_value(name: str, value: Any) -> None: "WebhookDestinationPolicy", "WebhookDestinationValidation", "WebhookDestinationValidationError", + "WebhookChallengeError", + "WebhookChallengeResult", + "challenge_webhook_destination", "validate_webhook_destination_url", # Sender — transport hooks (URL rewrite before SSRF) "DockerLocalhostRewrite", diff --git a/tests/fixtures/public_api_snapshot.json b/tests/fixtures/public_api_snapshot.json index 3fb0dc117..95237cdb7 100644 --- a/tests/fixtures/public_api_snapshot.json +++ b/tests/fixtures/public_api_snapshot.json @@ -377,6 +377,8 @@ "VerifyBrandClaimsRequestBulk", "VerifyBrandClaimsResponseBulk", "WcagLevel", + "WebhookChallengeError", + "WebhookChallengeResult", "WebhookDedupStore", "WebhookMetadata", "WebhookReceiver", @@ -385,9 +387,11 @@ "WholesaleFeedEvent", "WholesaleFeedWebhook", "aliases", + "challenge_webhook_destination", "create_a2a_webhook_payload", "create_mcp_webhook_payload", "create_test_agent", + "create_webhook_challenge_payload", "creative_agent", "detect_publisher_properties_divergence", "domain_matches", @@ -397,6 +401,7 @@ "fetch_agent_authorizations", "fetch_agent_authorizations_from_directory", "filter_revoked_selectors", + "generate_webhook_challenge_value", "generate_webhook_idempotency_key", "generated", "get_adcp_signed_headers_for_webhook", @@ -429,6 +434,7 @@ "validate_capabilities", "validate_product", "validate_publisher_properties_item", + "validate_webhook_challenge_response", "verify_agent_authorization", "verify_agent_for_property" ], diff --git a/tests/test_webhook_challenge.py b/tests/test_webhook_challenge.py new file mode 100644 index 000000000..dfecbbce2 --- /dev/null +++ b/tests/test_webhook_challenge.py @@ -0,0 +1,324 @@ +from __future__ import annotations + +import hashlib +import hmac +import json +import socket +from typing import Any +from unittest.mock import patch + +import httpx +import pytest + +from adcp.webhooks import ( + WebhookChallengeError, + WebhookDestinationPolicy, + WebhookSender, + challenge_webhook_destination, + create_webhook_challenge_payload, + validate_webhook_challenge_response, +) + +pytestmark = pytest.mark.filterwarnings("ignore::DeprecationWarning") + + +def _public_dns(monkeypatch: pytest.MonkeyPatch, host: str = "buyer.example") -> None: + def fake_getaddrinfo(query_host: str, port: object) -> list[tuple[object, ...]]: + assert query_host == host + return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("93.184.216.34", 443))] + + monkeypatch.setattr("adcp.signing.jwks.socket.getaddrinfo", fake_getaddrinfo) + + +def _challenge_transport(captured: list[httpx.Request]) -> httpx.MockTransport: + def handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + body = json.loads(request.content) + return httpx.Response(200, json={"challenge": body["challenge"]}) + + return httpx.MockTransport(handler) + + +def test_create_webhook_challenge_payload_shape() -> None: + payload = create_webhook_challenge_payload( + account_id="acct_1", + subscriber_id="buyer-primary", + challenge="opaque-random-value", + ) + + assert payload == { + "type": "webhook.challenge", + "challenge": "opaque-random-value", + "account_id": "acct_1", + "subscriber_id": "buyer-primary", + } + + +@pytest.mark.parametrize("body", [b'{"challenge":"abc"}', {"token": "abc"}]) +def test_validate_webhook_challenge_response_accepts_challenge_or_token( + body: bytes | dict[str, str], +) -> None: + assert validate_webhook_challenge_response(body, challenge="abc") in {"challenge", "token"} + + +def test_validate_webhook_challenge_response_rejects_mismatch() -> None: + with pytest.raises(WebhookChallengeError) as exc: + validate_webhook_challenge_response(b'{"challenge":"wrong"}', challenge="expected") + + assert exc.value.reason == "challenge_mismatch" + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_with_sender( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _public_dns(monkeypatch) + captured: list[httpx.Request] = [] + sender = WebhookSender.from_bearer_token("sender-token") + + with patch( + "adcp.webhook_sender.build_async_ip_pinned_transport", + return_value=_challenge_transport(captured), + ): + result = await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + sender=sender, + challenge="challenge_123", + field="accounts[0].notification_configs[0].url", + ) + + assert result.ok + assert result.echoed_field == "challenge" + assert result.challenge == "challenge_123" + assert result.destination.effective_url == "https://buyer.example/webhooks/catalog" + assert captured[0].headers["authorization"] == "Bearer sender-token" + sent = json.loads(captured[0].content) + assert sent == { + "type": "webhook.challenge", + "challenge": "challenge_123", + "account_id": "acct_1", + "subscriber_id": "buyer-primary", + } + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_with_legacy_authentication( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _public_dns(monkeypatch) + captured: list[httpx.Request] = [] + + def handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + body = json.loads(request.content) + return httpx.Response(202, json={"token": body["challenge"]}) + + with patch( + "adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport", + return_value=httpx.MockTransport(handler), + ): + result = await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="audit-bus", + authentication={"schemes": ["Bearer"], "credentials": "b" * 40}, + challenge="legacy_challenge_123", + ) + + assert result.echoed_field == "token" + assert captured[0].headers["authorization"] == f"Bearer {'b' * 40}" + assert json.loads(captured[0].content)["type"] == "webhook.challenge" + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_with_legacy_hmac( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _public_dns(monkeypatch) + captured: list[httpx.Request] = [] + secret = "s" * 40 + + def handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + body = json.loads(request.content) + return httpx.Response(200, json={"challenge": body["challenge"]}) + + with patch( + "adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport", + return_value=httpx.MockTransport(handler), + ): + await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="audit-bus", + authentication={"schemes": ["HMAC-SHA256"], "credentials": secret}, + challenge="hmac_challenge_123", + ) + + sent = captured[0] + timestamp = sent.headers["x-adcp-timestamp"] + expected = hmac.new( + secret.encode("utf-8"), + f"{timestamp}.".encode() + sent.content, + hashlib.sha256, + ).hexdigest() + assert sent.headers["x-adcp-signature"] == f"sha256={expected}" + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_rejects_non_2xx( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _public_dns(monkeypatch) + + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(403, json={"error": "nope"}) + + sender = WebhookSender.from_bearer_token("sender-token") + + with patch( + "adcp.webhook_sender.build_async_ip_pinned_transport", + return_value=httpx.MockTransport(handler), + ): + with pytest.raises(WebhookChallengeError) as exc: + await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + sender=sender, + challenge="challenge_123", + ) + + assert exc.value.reason == "http_status" + assert exc.value.status_code == 403 + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_local_development_policy() -> None: + captured: dict[str, Any] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + body = json.loads(request.content) + return httpx.Response(200, json={"challenge": body["challenge"]}) + + with patch( + "adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport", + return_value=httpx.MockTransport(handler), + ): + result = await challenge_webhook_destination( + url="http://localhost/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + authentication={"schemes": ["Bearer"], "credentials": "b" * 40}, + challenge="local_challenge_123", + policy=WebhookDestinationPolicy.local_development(), + ) + + assert result.ok + assert captured["url"] == "http://localhost/webhooks/catalog" + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_rejects_custom_client() -> None: + transport = httpx.MockTransport(lambda _req: httpx.Response(200)) + async with httpx.AsyncClient(transport=transport) as client: + with pytest.raises(WebhookChallengeError) as exc: + await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + authentication={"schemes": ["Bearer"], "credentials": "b" * 40}, + client=client, + ) + + assert exc.value.reason == "unsafe_client" + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_rejects_sender_with_custom_client() -> None: + transport = httpx.MockTransport(lambda _req: httpx.Response(200)) + async with httpx.AsyncClient(transport=transport) as client: + sender = WebhookSender.from_bearer_token("sender-token", client=client) + with pytest.raises(WebhookChallengeError) as exc: + await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + sender=sender, + ) + + assert exc.value.reason == "unsafe_sender_client" + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_rejects_sender_transport_hooks() -> None: + class RewriteHook: + def rewrite_url(self, url: str) -> str: + return url.replace("buyer.example", "other.example") + + sender = WebhookSender.from_bearer_token( + "sender-token", + transport_hooks=(RewriteHook(),), + ) + + with pytest.raises(WebhookChallengeError) as exc: + await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + sender=sender, + ) + + assert exc.value.reason == "unsupported_sender_hooks" + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_rejects_sender_host_override( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _public_dns(monkeypatch) + sender = WebhookSender.from_bearer_token("sender-token") + + with patch( + "adcp.webhook_sender.build_async_ip_pinned_transport", + return_value=_challenge_transport([]), + ): + with pytest.raises(WebhookChallengeError) as exc: + await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + sender=sender, + extra_headers={"Host": "evil.internal"}, + ) + + assert exc.value.reason == "invalid_configuration" + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_wraps_preflight_errors( + monkeypatch: pytest.MonkeyPatch, +) -> None: + with pytest.raises(WebhookChallengeError) as exc: + await challenge_webhook_destination( + url="http://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + authentication={"schemes": ["Bearer"], "credentials": "b" * 40}, + ) + + assert exc.value.reason == "https_required" + + _public_dns(monkeypatch) + with pytest.raises(WebhookChallengeError) as empty_id: + await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="", + subscriber_id="buyer-primary", + authentication={"schemes": ["Bearer"], "credentials": "b" * 40}, + ) + + assert empty_id.value.reason == "invalid_configuration" From 10e45810937aa3e228ff7cba7ec0bc19e39fd074 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sat, 23 May 2026 21:51:09 -0400 Subject: [PATCH 2/3] fix: tighten webhook challenge auth mode --- README.md | 43 +++-- docs/handler-authoring.md | 51 +++++- src/adcp/__init__.py | 4 + src/adcp/webhooks.py | 205 ++++++++++++++---------- tests/fixtures/public_api_snapshot.json | 2 + tests/test_webhook_challenge.py | 182 ++++++++++++++++++--- 6 files changed, 362 insertions(+), 125 deletions(-) diff --git a/README.md b/README.md index 078194440..0bdd8eff3 100644 --- a/README.md +++ b/README.md @@ -528,22 +528,45 @@ kind (`send_revocation_notification`, `send_artifact_webhook`, Validate buyer-provided webhook URLs before storing durable subscriptions: ```python -from adcp.webhooks import WebhookDestinationPolicy, validate_webhook_destination_url - -validate_webhook_destination_url( - request.push_notification_config.url, - field="push_notification_config.url", - policy=WebhookDestinationPolicy.production(), +from adcp import ( + WebhookChallengeError, + WebhookDestinationPolicy, + WebhookSender, + challenge_webhook_destination, ) + +sender = WebhookSender.from_jwk(webhook_signing_jwk_with_private_d) + +try: + # Call only for new or changed active durable subscriptions. Persist + # active=False configs without challenging; challenge before activation. + auth_kwargs = ( + {"authentication": config.authentication} + if config.authentication + else {"sender": sender} + ) + await challenge_webhook_destination( + url=config.url, + account_id=account_id, + subscriber_id=config.subscriber_id, + **auth_kwargs, + field="accounts[0].notification_configs[0].url", + policy=WebhookDestinationPolicy.production(), + ) +except WebhookChallengeError as exc: + return {"errors": [exc.to_error()]} ``` Use `WebhookDestinationPolicy.local_development()` only for local tests that need `http://localhost` or private-network destinations. Production validation requires HTTPS and rejects loopback, private, link-local, reserved, and cloud -metadata destinations using the same SSRF classifier as `WebhookSender`. The -validation result includes both `original_url` and `effective_url`; sellers -should normally persist the buyer's original URL and reapply the same -policy/hooks at send time, rather than storing a Docker or test rewrite. +metadata destinations. The challenge helper uses the SDK-managed pinned +transport and fails unless the receiver echoes the challenge in a JSON +`challenge` or `token` field. For omitted `authentication`, pass an RFC 9421 +`WebhookSender.from_jwk(...)` without `client=`. Bearer/HMAC durable configs +must pass `authentication=config.authentication`; custom egress clients, +proxies, and mTLS transports are for normal delivery paths, not registration +challenges. Wholesale feed notifications use stable types from `adcp` / `adcp.types`: diff --git a/docs/handler-authoring.md b/docs/handler-authoring.md index 48b602e9a..1b29a2a87 100644 --- a/docs/handler-authoring.md +++ b/docs/handler-authoring.md @@ -1220,15 +1220,46 @@ Validate durable buyer endpoints before persisting `push_notification_config.url or `accounts[].notification_configs[].url` from `sync_accounts`: ```python -from adcp.webhooks import ( +from adcp import ( + WebhookChallengeError, WebhookDestinationPolicy, - WebhookDestinationValidationError, - validate_webhook_destination_url, + WebhookSender, + challenge_webhook_destination, ) +sender = WebhookSender.from_jwk(webhook_signing_jwk_with_private_d) + +try: + if config.active is not False: + # Challenge new or changed active durable subscriptions before + # activation. Inactive configs are stored without a network challenge. + auth_kwargs = ( + {"authentication": config.authentication} + if config.authentication + else {"sender": sender} + ) + await challenge_webhook_destination( + url=str(config.url), + account_id=account_id, + subscriber_id=config.subscriber_id, + **auth_kwargs, + policy=WebhookDestinationPolicy.production(), + field="accounts[0].notification_configs[0].url", + ) +except WebhookChallengeError as exc: + return {"errors": [exc.to_error()]} +``` + +If you only need to preflight a URL before deciding whether the subscription +changed, use `validate_webhook_destination_url` directly: + +```python +from adcp import WebhookDestinationPolicy +from adcp.webhooks import WebhookDestinationValidationError, validate_webhook_destination_url + try: - validate_webhook_destination_url( - config.url, + validation = validate_webhook_destination_url( + str(config.url), field="accounts[0].notification_configs[0].url", policy=WebhookDestinationPolicy.production(), ) @@ -1241,8 +1272,14 @@ reserved, and cloud metadata destinations. Use `WebhookDestinationPolicy.local_development()` only for local fixtures that need `http://localhost` or private-network endpoints. The helper returns both `original_url` and `effective_url`; persist the buyer's original URL in durable -subscription state, and reapply the same policy/hooks when sending. Do not -persist a Docker or test rewrite as the buyer's registered endpoint. +subscription state. The proof-of-control helper uses the SDK-managed pinned +transport, posts a `webhook.challenge`, and requires the receiver to echo the +challenge value in a JSON `challenge` or `token` field. For omitted +`authentication`, pass an RFC 9421 `WebhookSender.from_jwk(...)` without +`client=`. Bearer/HMAC durable configs must pass +`authentication=config.authentication`; custom egress clients, proxies, and +mTLS transports are reserved for normal delivery paths, not registration +proof-of-control. ### Wholesale feed notifications diff --git a/src/adcp/__init__.py b/src/adcp/__init__.py index 4218e5c36..a83aefbf8 100644 --- a/src/adcp/__init__.py +++ b/src/adcp/__init__.py @@ -490,8 +490,10 @@ WebhookChallengeError, WebhookChallengeResult, WebhookDedupStore, + WebhookDestinationPolicy, WebhookReceiver, WebhookReceiverConfig, + WebhookSender, WebhookVerifyOptions, challenge_webhook_destination, create_a2a_webhook_payload, @@ -688,8 +690,10 @@ def get_adcp_version() -> str: "to_wire_dict", "WebhookChallengeError", "WebhookChallengeResult", + "WebhookDestinationPolicy", "WebhookReceiver", "WebhookReceiverConfig", + "WebhookSender", "WebhookVerifyOptions", "WebhookDedupStore", "MemoryBackend", diff --git a/src/adcp/webhooks.py b/src/adcp/webhooks.py index 0135ed861..dd1453801 100644 --- a/src/adcp/webhooks.py +++ b/src/adcp/webhooks.py @@ -42,6 +42,7 @@ ) from google.protobuf.json_format import MessageToDict, ParseDict from google.protobuf.struct_pb2 import Value +from pydantic import AnyUrl from pydantic import BaseModel as PydanticBaseModel from adcp.server.idempotency.backends import MemoryBackend as MemoryBackend @@ -969,7 +970,7 @@ def create_webhook_challenge_payload( raise ValueError("account_id must be a non-empty string") if not isinstance(subscriber_id, str) or not subscriber_id: raise ValueError("subscriber_id must be a non-empty string") - challenge_value = challenge or generate_webhook_challenge_value() + challenge_value = generate_webhook_challenge_value() if challenge is None else challenge if not isinstance(challenge_value, str) or not challenge_value: raise ValueError("challenge must be a non-empty string") return { @@ -1057,7 +1058,7 @@ def _validate_policy_hooks(policy: WebhookDestinationPolicy) -> None: def validate_webhook_destination_url( - url: str, + url: str | AnyUrl, *, policy: WebhookDestinationPolicy | None = None, field: str | None = None, @@ -1080,33 +1081,40 @@ def validate_webhook_destination_url( active_policy = policy or WebhookDestinationPolicy.production() _validate_policy_hooks(active_policy) - if not isinstance(url, str) or not url: + if isinstance(url, str): + url_text = url + elif isinstance(url, AnyUrl): + url_text = str(url) + else: + url_text = None + + if not isinstance(url_text, str) or not url_text: _raise_webhook_destination_error( "webhook destination URL must be a non-empty string", reason="missing_url", field=field, - url=None if not isinstance(url, str) else url, + url=url_text, effective_url=None, policy=active_policy, ) - if any(c in url for c in _HEADER_FORBIDDEN_CHARS): + if any(c in url_text for c in _HEADER_FORBIDDEN_CHARS): _raise_webhook_destination_error( "webhook destination URL contains control characters", reason="control_characters", field=field, - url=url, + url=url_text, effective_url=None, policy=active_policy, ) try: - effective_url = apply_hooks(url, active_policy.transport_hooks) + effective_url = apply_hooks(url_text, active_policy.transport_hooks) except ValueError as exc: _raise_webhook_destination_error( f"webhook destination URL failed transport hook policy: {exc}", reason="transport_hook_rejected", field=field, - url=url, + url=url_text, effective_url=None, policy=active_policy, ) @@ -1115,7 +1123,7 @@ def validate_webhook_destination_url( "webhook destination URL contains control characters after transport hooks", reason="control_characters", field=field, - url=url, + url=url_text, effective_url=effective_url, policy=active_policy, ) @@ -1126,7 +1134,7 @@ def validate_webhook_destination_url( "webhook destination URL must not embed userinfo (user:pass@host)", reason="userinfo_not_allowed", field=field, - url=url, + url=url_text, effective_url=effective_url, policy=active_policy, suggestion="Pass credentials in webhook authentication settings instead of the URL.", @@ -1136,7 +1144,7 @@ def validate_webhook_destination_url( "webhook destination URL must not include a fragment", reason="fragment_not_allowed", field=field, - url=url, + url=url_text, effective_url=effective_url, policy=active_policy, suggestion=( @@ -1149,7 +1157,7 @@ def validate_webhook_destination_url( f"webhook destination URL must use http:// or https:// (got {parsed.scheme!r})", reason="invalid_scheme", field=field, - url=url, + url=url_text, effective_url=effective_url, policy=active_policy, ) @@ -1158,7 +1166,7 @@ def validate_webhook_destination_url( f"webhook destination URL must use https:// under {active_policy.name} policy", reason="https_required", field=field, - url=url, + url=url_text, effective_url=effective_url, policy=active_policy, suggestion=( @@ -1178,13 +1186,13 @@ def validate_webhook_destination_url( f"webhook destination URL failed SSRF validation: {exc}", reason="ssrf_rejected", field=field, - url=url, + url=url_text, effective_url=effective_url, policy=active_policy, ) return WebhookDestinationValidation( - original_url=url, + original_url=url_text, effective_url=effective_url, hostname=hostname, resolved_ip=resolved_ip, @@ -1204,7 +1212,6 @@ async def _send_legacy_webhook_challenge( url: str, authentication: Mapping[str, Any], payload: dict[str, str], - client: httpx.AsyncClient | None, timeout_seconds: float | None, policy: WebhookDestinationPolicy, extra_headers: Mapping[str, str] | None, @@ -1230,16 +1237,6 @@ async def _send_legacy_webhook_challenge( _validate_header_value("authentication.credentials", credentials) _warn_auth_deprecation_once() - transport: Any = None - if client is None: - from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport - - transport = build_async_ip_pinned_transport( - url, - allow_private=policy.allow_private_destinations, - allowed_ports=policy.allowed_destination_ports, - ) - body_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8") if len(body_bytes) > _MAX_BODY_BYTES: raise ValueError( @@ -1272,28 +1269,83 @@ async def _send_legacy_webhook_challenge( _validate_header_value(f"extra_headers[{key!r}]", value) headers[key] = value + return await _post_managed_webhook_challenge( + url=url, + body=body_bytes, + headers=headers, + timeout_seconds=timeout_seconds, + policy=policy, + ) + + +async def _send_sender_webhook_challenge( + *, + url: str, + sender: WebhookSender, + payload: dict[str, str], + timeout_seconds: float | None, + policy: WebhookDestinationPolicy, + extra_headers: Mapping[str, str] | None, +) -> httpx.Response: + body_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8") + if len(body_bytes) > _MAX_BODY_BYTES: + raise ValueError( + f"serialized webhook challenge body is {len(body_bytes):,} bytes, " + f"over the {_MAX_BODY_BYTES:,}-byte cap." + ) + + auth = getattr(cast(Any, sender), "_auth") + auth_headers = auth.build_auth_headers(method="POST", url=url, body=body_bytes) + from adcp.webhook_auth import merge_extra_headers + + headers = merge_extra_headers( + base={"Content-Type": "application/json", **auth_headers}, + extra=extra_headers, + reserved=auth.reserved_headers(), + ) + return await _post_managed_webhook_challenge( + url=url, + body=body_bytes, + headers=headers, + timeout_seconds=timeout_seconds, + policy=policy, + ) + + +async def _post_managed_webhook_challenge( + *, + url: str, + body: bytes, + headers: Mapping[str, str], + timeout_seconds: float | None, + policy: WebhookDestinationPolicy, +) -> httpx.Response: + from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport + + transport = build_async_ip_pinned_transport( + url, + allow_private=policy.allow_private_destinations, + allowed_ports=policy.allowed_destination_ports, + ) effective_timeout = timeout_seconds if timeout_seconds is not None else _DEFAULT_TIMEOUT_SECONDS - if client is None: - async with httpx.AsyncClient( - transport=transport, - timeout=effective_timeout, - follow_redirects=False, - trust_env=False, - ) as http_client: - return await http_client.post(url, content=body_bytes, headers=headers) - return await client.post(url, content=body_bytes, headers=headers) + async with httpx.AsyncClient( + transport=transport, + timeout=effective_timeout, + follow_redirects=False, + trust_env=False, + ) as http_client: + return await http_client.post(url, content=body, headers=headers) async def challenge_webhook_destination( *, - url: str, + url: str | AnyUrl, account_id: str, subscriber_id: str, sender: WebhookSender | None = None, authentication: AdCPBaseModel | Mapping[str, Any] | None = None, challenge: str | None = None, - client: httpx.AsyncClient | None = None, - timeout_seconds: float | None = _DEFAULT_TIMEOUT_SECONDS, + timeout_seconds: float | None = None, policy: WebhookDestinationPolicy | None = None, field: str | None = None, extra_headers: Mapping[str, str] | None = None, @@ -1306,30 +1358,17 @@ async def challenge_webhook_destination( ``authentication`` follows the durable config's legacy auth selector: when present, the challenge is sent with Bearer or HMAC-SHA256. When - omitted, pass a :class:`WebhookSender` so the challenge uses the - default RFC 9421 webhook profile. + omitted, pass an RFC 9421 :class:`WebhookSender`; the helper uses that + sender's webhook-signing key and the SDK-managed pinned transport. """ + error_url = str(url) if isinstance(url, (str, AnyUrl)) else None if sender is not None and authentication is not None: raise WebhookChallengeError( "pass either sender= for RFC 9421 or authentication= for legacy auth, not both", reason="ambiguous_auth_mode", field=field, - url=url, - ) - if sender is not None and client is not None: - raise WebhookChallengeError( - "client= cannot be combined with sender=; configure the client on WebhookSender", - reason="ambiguous_client", - field=field, - url=url, - ) - if client is not None: - raise WebhookChallengeError( - "proof-of-control must use the SDK-managed pinned transport", - reason="unsafe_client", - field=field, - url=url, + url=error_url, ) sender_owns_client = bool(getattr(cast(Any, sender), "_owns_client", False)) sender_transport_hooks = tuple(getattr(cast(Any, sender), "_transport_hooks", ())) @@ -1338,41 +1377,36 @@ async def challenge_webhook_destination( "proof-of-control requires a WebhookSender constructed without client=", reason="unsafe_sender_client", field=field, - url=url, + url=error_url, ) if sender is not None and sender_transport_hooks: raise WebhookChallengeError( "proof-of-control does not support sender transport_hooks", reason="unsupported_sender_hooks", field=field, - url=url, + url=error_url, + ) + if sender is not None and not sender.signs_with_rfc9421: + raise WebhookChallengeError( + "proof-of-control requires an RFC 9421 WebhookSender when authentication is omitted", + reason="sender_auth_mode_mismatch", + field=field, + url=error_url, + suggestion=( + "Use WebhookSender.from_jwk(...) for default durable configs, " + "or pass config.authentication for legacy Bearer/HMAC configs." + ), ) if sender is None and authentication is None: raise WebhookChallengeError( "webhook challenge requires sender= when authentication is omitted", reason="sender_required", field=field, - url=url, + url=error_url, suggestion=( "Pass the seller's WebhookSender, or pass config.authentication " "for legacy auth." ), ) - if sender is not None and timeout_seconds not in (None, _DEFAULT_TIMEOUT_SECONDS): - raise WebhookChallengeError( - "timeout_seconds cannot be set when sender= is provided; configure WebhookSender", - reason="ambiguous_timeout", - field=field, - url=url, - ) - if client is not None and timeout_seconds not in (None, _DEFAULT_TIMEOUT_SECONDS): - raise WebhookChallengeError( - "timeout_seconds cannot be set when client= is provided; " - "configure the AsyncClient timeout", - reason="ambiguous_timeout", - field=field, - url=url, - ) - try: destination = validate_webhook_destination_url(url, policy=policy, field=field) payload = create_webhook_challenge_payload( @@ -1393,22 +1427,28 @@ async def challenge_webhook_destination( f"webhook challenge configuration is invalid: {exc}", reason="invalid_configuration", field=field, - url=url, + url=error_url, ) from exc challenge_value = payload["challenge"] try: if sender is not None: - delivery = await sender.send_webhook_challenge( + effective_timeout = ( + timeout_seconds + if timeout_seconds is not None + else float(getattr(cast(Any, sender), "_timeout", _DEFAULT_TIMEOUT_SECONDS)) + ) + response = await _send_sender_webhook_challenge( url=destination.effective_url, - account_id=account_id, - subscriber_id=subscriber_id, - challenge=challenge_value, + sender=sender, + payload=payload, + timeout_seconds=effective_timeout, + policy=destination.policy, extra_headers=extra_headers, ) - status_code = delivery.status_code - response_headers = dict(delivery.response_headers) - response_body = delivery.response_body + status_code = response.status_code + response_headers = dict(response.headers) + response_body = response.content else: if authentication is None: raise RuntimeError("authentication unexpectedly missing") @@ -1417,7 +1457,6 @@ async def challenge_webhook_destination( url=destination.effective_url, authentication=auth_config, payload=payload, - client=client, extra_headers=extra_headers, timeout_seconds=timeout_seconds, policy=destination.policy, diff --git a/tests/fixtures/public_api_snapshot.json b/tests/fixtures/public_api_snapshot.json index 95237cdb7..3056ba287 100644 --- a/tests/fixtures/public_api_snapshot.json +++ b/tests/fixtures/public_api_snapshot.json @@ -380,9 +380,11 @@ "WebhookChallengeError", "WebhookChallengeResult", "WebhookDedupStore", + "WebhookDestinationPolicy", "WebhookMetadata", "WebhookReceiver", "WebhookReceiverConfig", + "WebhookSender", "WebhookVerifyOptions", "WholesaleFeedEvent", "WholesaleFeedWebhook", diff --git a/tests/test_webhook_challenge.py b/tests/test_webhook_challenge.py index dfecbbce2..92e6c7c39 100644 --- a/tests/test_webhook_challenge.py +++ b/tests/test_webhook_challenge.py @@ -1,15 +1,18 @@ from __future__ import annotations +import copy import hashlib import hmac import json import socket +from pathlib import Path from typing import Any from unittest.mock import patch import httpx import pytest +from adcp import NotificationConfig from adcp.webhooks import ( WebhookChallengeError, WebhookDestinationPolicy, @@ -21,6 +24,22 @@ pytestmark = pytest.mark.filterwarnings("ignore::DeprecationWarning") +VECTORS_DIR = Path(__file__).parent / "conformance" / "vectors" / "request-signing" +_KEYS = json.loads((VECTORS_DIR / "keys.json").read_text())["keys"] +_REQUEST_ED25519 = next(k for k in _KEYS if k["kid"] == "test-ed25519-2026") +_WEBHOOK_JWK = { + **copy.deepcopy(_REQUEST_ED25519), + "kid": "test-webhook-ed25519-2026", + "adcp_use": "webhook-signing", +} + + +def _rfc9421_sender(*, timeout_seconds: float = 10.0) -> WebhookSender: + return WebhookSender.from_jwk( + {**_WEBHOOK_JWK, "d": _WEBHOOK_JWK["_private_d_for_test_only"]}, + timeout_seconds=timeout_seconds, + ) + def _public_dns(monkeypatch: pytest.MonkeyPatch, host: str = "buyer.example") -> None: def fake_getaddrinfo(query_host: str, port: object) -> list[tuple[object, ...]]: @@ -54,6 +73,15 @@ def test_create_webhook_challenge_payload_shape() -> None: } +def test_create_webhook_challenge_payload_rejects_empty_challenge() -> None: + with pytest.raises(ValueError, match="challenge"): + create_webhook_challenge_payload( + account_id="acct_1", + subscriber_id="buyer-primary", + challenge="", + ) + + @pytest.mark.parametrize("body", [b'{"challenge":"abc"}', {"token": "abc"}]) def test_validate_webhook_challenge_response_accepts_challenge_or_token( body: bytes | dict[str, str], @@ -74,10 +102,10 @@ async def test_challenge_webhook_destination_with_sender( ) -> None: _public_dns(monkeypatch) captured: list[httpx.Request] = [] - sender = WebhookSender.from_bearer_token("sender-token") + sender = _rfc9421_sender() with patch( - "adcp.webhook_sender.build_async_ip_pinned_transport", + "adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport", return_value=_challenge_transport(captured), ): result = await challenge_webhook_destination( @@ -93,7 +121,10 @@ async def test_challenge_webhook_destination_with_sender( assert result.echoed_field == "challenge" assert result.challenge == "challenge_123" assert result.destination.effective_url == "https://buyer.example/webhooks/catalog" - assert captured[0].headers["authorization"] == "Bearer sender-token" + assert "signature" in captured[0].headers + assert "signature-input" in captured[0].headers + assert "content-digest" in captured[0].headers + assert "authorization" not in captured[0].headers sent = json.loads(captured[0].content) assert sent == { "type": "webhook.challenge", @@ -132,6 +163,37 @@ def handler(request: httpx.Request) -> httpx.Response: assert json.loads(captured[0].content)["type"] == "webhook.challenge" +@pytest.mark.asyncio +async def test_challenge_webhook_destination_accepts_notification_config_url( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _public_dns(monkeypatch) + captured: list[httpx.Request] = [] + config = NotificationConfig.model_validate( + { + "subscriber_id": "buyer-primary", + "url": "https://buyer.example/webhooks/catalog", + "event_types": ["product.updated"], + "authentication": {"schemes": ["Bearer"], "credentials": "b" * 40}, + } + ) + + with patch( + "adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport", + return_value=_challenge_transport(captured), + ): + result = await challenge_webhook_destination( + url=config.url, + account_id="acct_1", + subscriber_id=config.subscriber_id, + authentication=config.authentication, + challenge="typed_url_challenge_123", + ) + + assert result.ok + assert str(captured[0].url) == "https://buyer.example/webhooks/catalog" + + @pytest.mark.asyncio async def test_challenge_webhook_destination_with_legacy_hmac( monkeypatch: pytest.MonkeyPatch, @@ -176,10 +238,10 @@ async def test_challenge_webhook_destination_rejects_non_2xx( def handler(_request: httpx.Request) -> httpx.Response: return httpx.Response(403, json={"error": "nope"}) - sender = WebhookSender.from_bearer_token("sender-token") + sender = _rfc9421_sender() with patch( - "adcp.webhook_sender.build_async_ip_pinned_transport", + "adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport", return_value=httpx.MockTransport(handler), ): with pytest.raises(WebhookChallengeError) as exc: @@ -221,22 +283,6 @@ def handler(request: httpx.Request) -> httpx.Response: assert captured["url"] == "http://localhost/webhooks/catalog" -@pytest.mark.asyncio -async def test_challenge_webhook_destination_rejects_custom_client() -> None: - transport = httpx.MockTransport(lambda _req: httpx.Response(200)) - async with httpx.AsyncClient(transport=transport) as client: - with pytest.raises(WebhookChallengeError) as exc: - await challenge_webhook_destination( - url="https://buyer.example/webhooks/catalog", - account_id="acct_1", - subscriber_id="buyer-primary", - authentication={"schemes": ["Bearer"], "credentials": "b" * 40}, - client=client, - ) - - assert exc.value.reason == "unsafe_client" - - @pytest.mark.asyncio async def test_challenge_webhook_destination_rejects_sender_with_custom_client() -> None: transport = httpx.MockTransport(lambda _req: httpx.Response(200)) @@ -253,14 +299,29 @@ async def test_challenge_webhook_destination_rejects_sender_with_custom_client() assert exc.value.reason == "unsafe_sender_client" +@pytest.mark.asyncio +async def test_challenge_webhook_destination_rejects_non_rfc9421_sender() -> None: + sender = WebhookSender.from_bearer_token("sender-token") + + with pytest.raises(WebhookChallengeError) as exc: + await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + sender=sender, + ) + + assert exc.value.reason == "sender_auth_mode_mismatch" + + @pytest.mark.asyncio async def test_challenge_webhook_destination_rejects_sender_transport_hooks() -> None: class RewriteHook: def rewrite_url(self, url: str) -> str: return url.replace("buyer.example", "other.example") - sender = WebhookSender.from_bearer_token( - "sender-token", + sender = WebhookSender.from_jwk( + {**_WEBHOOK_JWK, "d": _WEBHOOK_JWK["_private_d_for_test_only"]}, transport_hooks=(RewriteHook(),), ) @@ -280,10 +341,10 @@ async def test_challenge_webhook_destination_rejects_sender_host_override( monkeypatch: pytest.MonkeyPatch, ) -> None: _public_dns(monkeypatch) - sender = WebhookSender.from_bearer_token("sender-token") + sender = _rfc9421_sender() with patch( - "adcp.webhook_sender.build_async_ip_pinned_transport", + "adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport", return_value=_challenge_transport([]), ): with pytest.raises(WebhookChallengeError) as exc: @@ -322,3 +383,74 @@ async def test_challenge_webhook_destination_wraps_preflight_errors( ) assert empty_id.value.reason == "invalid_configuration" + + with pytest.raises(WebhookChallengeError) as empty_challenge: + await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + authentication={"schemes": ["Bearer"], "credentials": "b" * 40}, + challenge="", + ) + + assert empty_challenge.value.reason == "invalid_configuration" + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_sender_uses_policy( + monkeypatch: pytest.MonkeyPatch, +) -> None: + captured: list[httpx.Request] = [] + + def fake_getaddrinfo(host: str, port: object) -> list[tuple[object, ...]]: + assert host == "localhost" + return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("127.0.0.1", 80))] + + monkeypatch.setattr("adcp.signing.jwks.socket.getaddrinfo", fake_getaddrinfo) + sender = _rfc9421_sender() + + with patch( + "adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport", + return_value=_challenge_transport(captured), + ) as build_transport: + result = await challenge_webhook_destination( + url="http://localhost/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + sender=sender, + challenge="local_sender_challenge_123", + policy=WebhookDestinationPolicy.local_development(), + ) + + assert result.ok + assert str(captured[0].url) == "http://localhost/webhooks/catalog" + assert build_transport.call_args.kwargs["allow_private"] is True + + +@pytest.mark.asyncio +async def test_challenge_webhook_destination_sender_uses_sender_timeout( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _public_dns(monkeypatch) + sender = _rfc9421_sender(timeout_seconds=42.0) + + with ( + patch( + "adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport", + return_value=_challenge_transport([]), + ), + patch("adcp.webhooks.httpx.AsyncClient") as async_client, + ): + async_client.return_value.__aenter__.return_value.post.return_value = httpx.Response( + 200, + json={"challenge": "timeout_challenge_123"}, + ) + await challenge_webhook_destination( + url="https://buyer.example/webhooks/catalog", + account_id="acct_1", + subscriber_id="buyer-primary", + sender=sender, + challenge="timeout_challenge_123", + ) + + assert async_client.call_args.kwargs["timeout"] == 42.0 From 5242f7cb1f68a76a98b47284030569714cb683f6 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sun, 24 May 2026 07:01:39 -0400 Subject: [PATCH 3/3] fix: remove unreachable webhook challenge guard --- src/adcp/webhooks.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/adcp/webhooks.py b/src/adcp/webhooks.py index dd1453801..9f1ecfbda 100644 --- a/src/adcp/webhooks.py +++ b/src/adcp/webhooks.py @@ -1450,9 +1450,7 @@ async def challenge_webhook_destination( response_headers = dict(response.headers) response_body = response.content else: - if authentication is None: - raise RuntimeError("authentication unexpectedly missing") - auth_config = _authentication_to_config(authentication) + auth_config = _authentication_to_config(cast(Any, authentication)) response = await _send_legacy_webhook_challenge( url=destination.effective_url, authentication=auth_config,