diff --git a/src/adcp/signing/__init__.py b/src/adcp/signing/__init__.py index dd7be7253..e9ea83287 100644 --- a/src/adcp/signing/__init__.py +++ b/src/adcp/signing/__init__.py @@ -208,6 +208,7 @@ AsyncCachingJwksResolver, AsyncJwksFetcher, AsyncJwksResolver, + BrandSourcedJwksResolver, CachingJwksResolver, JwksResolver, SSRFValidationError, @@ -320,6 +321,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "BrandAuthorizationResult", "BrandJsonAuthorizationResolver", "BrandJsonJwksResolver", + "BrandSourcedJwksResolver", "BrandJsonResolverError", "BrandJsonResolverErrorCode", "CAPABILITY_OP", diff --git a/src/adcp/signing/_idna_canonicalize.py b/src/adcp/signing/_idna_canonicalize.py new file mode 100644 index 000000000..af56748ea --- /dev/null +++ b/src/adcp/signing/_idna_canonicalize.py @@ -0,0 +1,80 @@ +"""IDNA-2008 host canonicalization with IP-literal short-circuit. + +Shared by the four signing-side callsites that canonicalize host +strings for comparison: :mod:`adcp.signing.jwks` (JWKS URI host +pinning), :mod:`adcp.signing.ip_pinned_transport` (per-connect pin +normalization), :mod:`adcp.signing.revocation_fetcher` (revocation- +issuer canonicalization), :mod:`adcp.signing.key_origins` (ADCP #3690 +step 7 ``identity.key_origins`` consistency check). + +**Why IP literals need a short-circuit.** ``idna.encode("192.0.2.1", +uts46=True)`` raises because IDNA-2008 rejects purely-numeric labels +(``a label which consists of digits only``). Stdlib's +``host.encode("idna")`` was lenient and returned the ASCII as-is. +Adopters running on ``allow_private=True`` dev setups with IP-literal +JWKS URIs would see ``SSRFValidationError: URI host '...' is not +IDNA-valid`` after the IDNA-2008 migration in PR #789 — a regression +on the dev-loop path without a security justification (IP literals +are not IDN candidates by definition). + +Gating with :func:`ipaddress.ip_address` short-circuits IP inputs +through the encoder untouched. Both v4 (``192.0.2.1``) and v6 +(``2001:db8::1`` or bracketed ``[2001:db8::1]``) are handled. + +**Why ``transitional=False`` is explicit.** The default in +``idna>=3.x`` is already ``False`` (Eszett-preserving — what UTS#46 +calls *non-transitional processing*), but pinning it at the callsite +documents intent and locks the canonicalization regardless of any +future upstream default flip. The package's existing eszett-regression +test (``tests/test_key_origins.py``) covers the load-bearing +behavior; the kwarg here is belt-and-suspenders. (Note: the ``idna`` +package spells the kwarg ``transitional``, not the UTS#46-document +spelling ``transitional_processing``.) +""" + +from __future__ import annotations + +import ipaddress + +import idna + +__all__ = ["canonicalize_host"] + + +def canonicalize_host(host: str) -> str: + """Return the canonical A-label form of ``host`` for byte-equal + host comparisons. + + Steps: + + 1. Strip a single trailing FQDN-root dot. + 2. ASCII-lowercase (IDNA encoding is case-insensitive on the + wire but we want comparison-friendly bytes). + 3. **Short-circuit IP literals** — both v4 and v6 (with or without + surrounding brackets) are returned as ``str(ipaddress.ip_address(host))``, + skipping IDNA entirely. IDNA-2008 rejects purely-numeric labels. + 4. Otherwise call ``idna.encode(host, uts46=True, transitional=False)`` + and return the decoded ASCII (lowercased to match the other + branches). + + Raises ``idna.IDNAError`` (or its parent ``UnicodeError``) on a + label the encoder cannot process. Callers decide whether to + fail-closed (let the exception propagate) or fall back to a + permissive comparison (catch and use the raw input). + """ + host = host.strip() + if host.endswith("."): + host = host[:-1] + host = host.lower() + # IP-literal short-circuit. ``[2001:db8::1]`` form (URL-bracketed) + # comes in from some callsites; strip brackets before the parse. + candidate = host + if candidate.startswith("[") and candidate.endswith("]"): + candidate = candidate[1:-1] + try: + ip = ipaddress.ip_address(candidate) + except ValueError: + # Not an IP literal — fall through to IDNA encoding. + return idna.encode(host, uts46=True, transitional=False).decode("ascii") + # Compressed canonical form via str(IPv6Address) etc. + return str(ip) diff --git a/src/adcp/signing/agent_resolver.py b/src/adcp/signing/agent_resolver.py index bc041ef6f..41edf0053 100644 --- a/src/adcp/signing/agent_resolver.py +++ b/src/adcp/signing/agent_resolver.py @@ -329,6 +329,14 @@ def _extract_brand_json_url(capabilities: dict[str, Any]) -> str: return brand_json_url +#: Per-entry size clamp on ``identity.key_origins`` values. DNS hostname +#: limit is 253 octets (RFC 1035); origin strings carry scheme+host so +#: the practical cap is a bit higher, but 512 is well above any +#: legitimate value while still bounding the surface against a +#: pathologically-large entry from a 64 KiB capabilities body. +_MAX_KEY_ORIGIN_VALUE_BYTES = 512 + + def _extract_key_origins(capabilities: dict[str, Any]) -> dict[str, str] | None: """Pluck ``identity.key_origins`` from the capabilities body. @@ -339,6 +347,14 @@ def _extract_key_origins(capabilities: dict[str, Any]) -> dict[str, str] | None: purpose is actually exercised). Filters values to strings — a malformed entry is skipped rather than poisoning the whole map. + **Per-entry length cap (``_MAX_KEY_ORIGIN_VALUE_BYTES``).** Each + origin value is bounded to 512 bytes — well above any legitimate + ``scheme + host + port`` shape but tight enough that a pathological + multi-kilobyte value from the 64 KiB capabilities body doesn't + propagate through downstream comparisons. Entries exceeding the cap + are skipped (the verifier then surfaces the purpose as missing on + the consistency check). + Forward-compat with operators on 3.0 schemas: the map travels under ``additionalProperties: true`` and the SDK reads it as a plain dict rather than via the typed Pydantic surface (which won't carry the @@ -352,8 +368,13 @@ def _extract_key_origins(capabilities: dict[str, Any]) -> dict[str, str] | None: return None out: dict[str, str] = {} for purpose, origin in raw.items(): - if isinstance(purpose, str) and isinstance(origin, str) and origin: - out[purpose] = origin + if not (isinstance(purpose, str) and isinstance(origin, str) and origin): + continue + if len(origin.encode("utf-8")) > _MAX_KEY_ORIGIN_VALUE_BYTES: + # Length-capped entry — skip rather than truncate (a + # truncated host would silently match the wrong domain). + continue + out[purpose] = origin return out or None @@ -557,6 +578,40 @@ def resolve_agent( # ---- verify factory ---- +class _BrandJsonStaticJwksResolver(StaticJwksResolver): + """A :class:`StaticJwksResolver` carrying the ``"brand_json"`` + source discriminant AND the resolved ``jwks_uri``. + + Conforms to :class:`adcp.signing.BrandSourcedJwksResolver` — the + verifier's ``_maybe_check_key_origin`` engages the spec's + consistency check on every signed request routed through + :func:`verify_from_agent_url`. Adopters wiring custom resolvers + declare the same conformance by setting ``jwks_source = "brand_json"`` + (class attribute) and exposing ``jwks_uri`` (instance attribute); + they MAY also import :class:`BrandSourcedJwksResolver` to type-check + the contract at static analysis time. + + The brand.json walk in :func:`async_resolve_agent` resolved this + JWKS — that's exactly the source the spec's key-origin consistency + check (ADCP #3690 step 7) defends. The verifier reads + ``getattr(resolver, "jwks_uri", None)`` to look up the resolved + host for the comparison. :class:`StaticJwksResolver` does not + carry a ``jwks_uri`` (it's a static keyset), so this subclass + stores the brand.json-resolved URI on the instance. Without it + the check would mismatch every legitimate signer with + ``actual_origin=""``. + + Defined inside the module rather than as a public type because the + helper composition is internal to the buyer-side verify factory. + """ + + jwks_source: ClassVar[Literal["brand_json"]] = "brand_json" + + def __init__(self, jwks: dict[str, Any], *, jwks_uri: str) -> None: + super().__init__(jwks) + self.jwks_uri = jwks_uri + + async def verify_from_agent_url( request: Any, agent_url: str, @@ -674,38 +729,6 @@ async def verify_from_agent_url( return await verify_starlette_request(request, options=options) -class _BrandJsonStaticJwksResolver(StaticJwksResolver): - """A :class:`StaticJwksResolver` carrying the ``"brand_json"`` - source discriminant AND the resolved ``jwks_uri``. - - The brand.json walk in :func:`async_resolve_agent` resolved this - JWKS — that's exactly the source the spec's key-origin consistency - check (ADCP #3690 step 7) defends. The verifier's - ``_maybe_check_key_origin`` step skips when ``jwks_source`` is - absent (treating absence as publisher-pin-equivalent); marking the - static resolver here engages the check on every signed request - routed through :func:`verify_from_agent_url`. - - The verifier reads ``getattr(resolver, "jwks_uri", None)`` to look - up the resolved host for the consistency comparison. - :class:`StaticJwksResolver` does not carry a ``jwks_uri`` (it's a - static keyset), so this subclass stores the brand.json-resolved - URI on the instance. Without it the check would mismatch every - legitimate signer with ``actual_origin=""``. - - Defined inside the module rather than as a public type because the - discriminant is internal — adopters wiring custom resolvers set - their own ``jwks_source = "brand_json"`` class attribute and - ``jwks_uri`` instance attribute directly. - """ - - jwks_source: ClassVar[Literal["brand_json", "publisher_pin"]] = "brand_json" - - def __init__(self, jwks: dict[str, Any], *, jwks_uri: str) -> None: - super().__init__(jwks) - self.jwks_uri = jwks_uri - - # ---- helpers ---- diff --git a/src/adcp/signing/brand_jwks.py b/src/adcp/signing/brand_jwks.py index ed34e0569..58ed84ff4 100644 --- a/src/adcp/signing/brand_jwks.py +++ b/src/adcp/signing/brand_jwks.py @@ -347,7 +347,7 @@ class BrandJsonJwksResolver: #: Discriminant for the verifier-side key_origin consistency #: check (see class docstring). - jwks_source: ClassVar[Literal["brand_json", "publisher_pin"]] = "brand_json" + jwks_source: ClassVar[Literal["brand_json"]] = "brand_json" def __init__( self, diff --git a/src/adcp/signing/ip_pinned_transport.py b/src/adcp/signing/ip_pinned_transport.py index fe2a968aa..17fa4e792 100644 --- a/src/adcp/signing/ip_pinned_transport.py +++ b/src/adcp/signing/ip_pinned_transport.py @@ -73,6 +73,7 @@ from httpcore._backends.anyio import AnyIOBackend as _AnyIOBackend from httpcore._backends.sync import SyncBackend as _SyncBackend +from adcp.signing._idna_canonicalize import canonicalize_host from adcp.signing.jwks import resolve_and_validate_host if TYPE_CHECKING: @@ -100,26 +101,22 @@ def _build_ssl_context() -> ssl.SSLContext: def _normalize_pin_host(host: str) -> str: """Normalize a hostname for byte-equal comparison. - Lowercases, strips a single trailing dot, and IDNA-encodes so - Unicode hostnames compare equal to the punycode form httpx - passes to httpcore. - - IDNA-2008 (UTS#46) via the PyPI ``idna`` package — the - package-wide canonicalization convention, matching the JWKS - fetcher's ``resolve_and_validate_host`` so a pin set on - ``straße.de`` collapses to the same A-label httpx will pass to - httpcore at connect time. + Delegates to :func:`canonicalize_host` — strips a single trailing + dot, ASCII-lowercases, short-circuits IP literals (v4 and v6, + bracketed or not) before IDNA, and otherwise encodes via + IDNA-2008 (UTS#46 with ``transitional_processing=False``). + Matches the JWKS fetcher's ``resolve_and_validate_host`` so a pin + set on ``straße.de`` collapses to the same A-label httpx will + pass to httpcore at connect time. + + Falls back to the raw input on IDNA encode failure so the + comparison just fails cleanly instead of raising inside + connect_tcp. """ - host = host.lower() - if host.endswith("."): - host = host[:-1] try: - return idna.encode(host, uts46=True).decode("ascii") + return canonicalize_host(host) except (idna.IDNAError, UnicodeError, UnicodeEncodeError): - # Caller already stored the normalized form; fall through - # with the lowercased input so the comparison just fails - # cleanly instead of raising inside connect_tcp. - return host + return host.lower().rstrip(".") class _IpPinnedSyncBackend(_SyncBackend): diff --git a/src/adcp/signing/jwks.py b/src/adcp/signing/jwks.py index cb4952dc5..92399807e 100644 --- a/src/adcp/signing/jwks.py +++ b/src/adcp/signing/jwks.py @@ -29,12 +29,13 @@ import socket import time from collections.abc import Callable -from typing import Any, Protocol +from typing import Any, ClassVar, Literal, Protocol, runtime_checkable from urllib.parse import urlsplit import httpx import idna +from adcp.signing._idna_canonicalize import canonicalize_host from adcp.signing.errors import ( REQUEST_SIGNATURE_JWKS_UNAVAILABLE, REQUEST_SIGNATURE_JWKS_UNTRUSTED, @@ -116,6 +117,43 @@ class AsyncJwksResolver(Protocol): async def __call__(self, keyid: str) -> dict[str, Any] | None: ... +@runtime_checkable +class BrandSourcedJwksResolver(Protocol): + """A :class:`JwksResolver` whose keys were resolved via a + brand.json walk (operator-attested key source per ADCP #3690). + + The verifier's ``identity.key_origins`` consistency check engages + only on resolvers advertising ``jwks_source == "brand_json"``; + publisher-pinned tuples (``jwks_source == "publisher_pin"``) skip + the check, and legacy adopter resolvers without the attribute + default to skip (treated as publisher-pin-equivalent for + back-compat). + + Surfacing the contract as a runtime-checkable Protocol means + ``isinstance(resolver, BrandSourcedJwksResolver)`` at the verifier + layer is a typed predicate — not just a duck-typed + ``hasattr(resolver, "jwks_source")``. Adopters wiring custom + brand.json-walking resolvers declare conformance by setting + ``jwks_source = "brand_json"`` (class attribute) and exposing + ``jwks_uri`` (instance attribute); :func:`isinstance` will then + return True even without inheriting from this Protocol. + + Implementations in this package: + + * :class:`adcp.signing.brand_jwks.BrandJsonJwksResolver` — + production resolver walking brand.json on every cache miss. + * ``adcp.signing.agent_resolver._BrandJsonStaticJwksResolver`` — + one-shot static resolver constructed by + :func:`verify_from_agent_url` from a frozen JWK set. + """ + + jwks_source: ClassVar[Literal["brand_json"]] + jwks_uri: str + + def __call__(self, keyid: str) -> dict[str, Any] | None: + """Resolve a JWK by keyid. Same shape as :meth:`JwksResolver.__call__`.""" + + def validate_jwks_uri( uri: str, *, @@ -186,28 +224,16 @@ def resolve_and_validate_host( host = parts.hostname if host is None or host == "": raise SSRFValidationError(f"URI has no host: {uri!r}") - # Strip a single trailing dot (FQDN form) so the pin matches what - # httpx / httpcore pass on subsequent requests. Without this, a - # caller who constructs with ``https://host./`` and then requests - # ``https://host/`` (or vice versa) sees the backend's - # hostname-match fail and falls through to unpinned resolution. - if host.endswith("."): - host = host[:-1] - # IDNA-encode so Unicode hostnames match the ASCII form httpx - # produces before calling into httpcore. urlsplit preserves the - # raw Unicode; httpx encodes it. A mismatch here breaks the - # hostname-match in the backend override and silently reopens - # the TOCTOU for IDN hosts. - # - # IDNA-2008 (UTS#46, transitional_processing=False) via the PyPI - # ``idna`` package — stdlib ``encodings.idna`` is IDNA-2003 and - # mismaps Eszett (``ß`` → ``ss``) and final-sigma. The - # package-wide IDNA convention is IDNA-2008; all four callsites - # (here, ``ip_pinned_transport``, ``revocation_fetcher``, - # ``key_origins``) share this encoding so canonicalization - # results compare byte-equal across the verifier pipeline. + # Canonicalize so Unicode hostnames match the ASCII form httpx + # produces before calling into httpcore (preserving the + # hostname-match in the backend override; a mismatch silently + # reopens the TOCTOU for IDN hosts), AND so IP literals don't + # trip IDNA-2008's reject-purely-numeric-label rule. + # See :mod:`adcp.signing._idna_canonicalize` for the + # package-wide IDNA convention (UTS#46, transitional_processing + # explicitly False, IP-literal short-circuit). try: - host = idna.encode(host, uts46=True).decode("ascii").lower() + host = canonicalize_host(host) except (idna.IDNAError, UnicodeError, UnicodeEncodeError) as exc: raise SSRFValidationError(f"URI host {host!r} is not IDNA-valid: {exc}") from exc port = parts.port if parts.port is not None else (443 if parts.scheme == "https" else 80) diff --git a/src/adcp/signing/key_origins.py b/src/adcp/signing/key_origins.py index 5b240453c..ce15a474b 100644 --- a/src/adcp/signing/key_origins.py +++ b/src/adcp/signing/key_origins.py @@ -35,12 +35,14 @@ from __future__ import annotations +import ipaddress from collections.abc import Mapping from typing import Literal from urllib.parse import urlsplit import idna +from adcp.signing._idna_canonicalize import canonicalize_host from adcp.signing.errors import ( REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH, REQUEST_SIGNATURE_KEY_ORIGIN_MISSING, @@ -149,28 +151,47 @@ def check_key_origin_consistency( detail={ "purpose": purpose, # Use the canonicalized values when available; fall back - # to the raw inputs for diagnostic accuracy when one - # side failed to canonicalize. Spec wording is + # to a best-effort host extraction (NOT the raw URL — the + # field name promises a host, and surfacing a full URL on + # canonicalization failure was inconsistent with the + # success path's host-only shape). Spec wording is # ``expected_origin`` / ``actual_origin`` verbatim. - "expected_origin": declared_host if declared_host is not None else declared, - "actual_origin": actual_host if actual_host is not None else jwks_uri, + "expected_origin": _diagnostic_host(declared_host, declared), + "actual_origin": _diagnostic_host(actual_host, jwks_uri), }, ) +def _diagnostic_host(canonical: str | None, raw: str) -> str: + """Return ``canonical`` if present, else a best-effort host from + ``raw``, else the empty string. + + Used to keep ``expected_origin`` / ``actual_origin`` host-shaped + in the mismatch detail payload even when canonicalization failed. + Falls through to ``_extract_host`` (the same URL/bare-host parser + the canonicalization step uses) for the best-effort path, so the + diagnostic value still reflects "the host the operator/verifier + pointed at" rather than the full URL surface. + """ + if canonical is not None: + return canonical + host = _extract_host(raw) + return host or "" + + def _origin_host(value: str) -> str | None: """Return the host portion of a URL or bare origin, canonicalized for byte-equality comparison. - Canonicalization mirrors the package-wide IDNA-2008 (UTS#46) - convention used by ``jwks.py``, ``ip_pinned_transport.py``, and - ``revocation_fetcher.py``: ASCII-lowercase, then - ``idna.encode(host, uts46=True).decode("ascii")`` to convert IDN - U-labels to their A-label (Punycode) form. IDNA-2008 (vs the - stdlib's IDNA-2003) preserves Eszett (``ß``) and final-sigma per - spec rather than mapping them away, which is the canonicalization - the request-signing spec mandates for cross-implementation - byte-equality. + Delegates to :func:`canonicalize_host` for the package-wide + IDNA-2008 (UTS#46) convention shared with ``jwks.py``, + ``ip_pinned_transport.py``, and ``revocation_fetcher.py``. + IDNA-2008 preserves Eszett (``ß``) and final-sigma rather than + mapping them away (which IDNA-2003 does), matching the + canonicalization the request-signing spec mandates for + cross-implementation byte-equality. IP literals short-circuit + through ``ipaddress.ip_address`` so they're not rejected by + IDNA-2008's reject-purely-numeric-label rule. **Bare-host and URL forms are normalized symmetrically.** A bare host like ``"keys.brand.com"`` is processed through the same @@ -186,8 +207,9 @@ def _origin_host(value: str) -> str | None: **Trailing-dot equality.** ``host.example.`` and ``host.example`` are the same FQDN at the protocol layer (the dot denotes the root zone). A counterparty serving the dot form while the capability - declares the no-dot form (or vice versa) must not mismatch. We - strip a single trailing dot before IDNA encoding. + declares the no-dot form (or vice versa) must not mismatch. + :func:`canonicalize_host` strips a single trailing dot before + encoding. Returns ``None`` when the input is structurally invalid (no resolvable host, or it parses but contains characters that don't @@ -196,11 +218,10 @@ def _origin_host(value: str) -> str | None: host = _extract_host(value) if host is None: return None - host = host.rstrip(".").lower() if not host: return None try: - return idna.encode(host, uts46=True).decode("ascii").lower() + return canonicalize_host(host) except (idna.IDNAError, UnicodeError, UnicodeEncodeError): return None @@ -215,6 +236,12 @@ def _extract_host(value: str) -> str | None: re-parse so port / userinfo / query / fragment all strip the same way they would for an explicit URL — closing the bare-host vs URL asymmetry that the bare-host fallback used to have. + + **Bare IPv6 needs bracket synthesis.** ``urlsplit("https://2001:db8::1")`` + interprets the first ``:`` as the port separator and produces + ``hostname="2001"``, which then fails canonicalization downstream. + Detect bare IPv6 (multiple ``:`` and no scheme, not already + bracketed) and add brackets before re-parsing. """ parts = urlsplit(value) if parts.hostname: @@ -226,6 +253,18 @@ def _extract_host(value: str) -> str | None: stripped = value.strip() if not stripped: return None + # Bare IPv6 needs brackets to survive urlsplit's port-separator + # interpretation of ``:``. ``ipaddress.ip_address`` rejects + # bracketed and dotted-quad-with-port forms; using it as the + # IPv6 detector is precise. + if not stripped.startswith("["): + try: + ip = ipaddress.ip_address(stripped) + except ValueError: + pass + else: + if ip.version == 6: + stripped = f"[{stripped}]" parts = urlsplit(f"https://{stripped}") return parts.hostname or None diff --git a/src/adcp/signing/revocation_fetcher.py b/src/adcp/signing/revocation_fetcher.py index b883f3ed5..1112ecd14 100644 --- a/src/adcp/signing/revocation_fetcher.py +++ b/src/adcp/signing/revocation_fetcher.py @@ -44,6 +44,7 @@ import httpx import idna +from adcp.signing._idna_canonicalize import canonicalize_host from adcp.signing.jwks import ( DEFAULT_JWKS_TIMEOUT_SECONDS, AsyncJwksResolver, @@ -377,14 +378,16 @@ def _normalize_issuer(issuer: str) -> str: if not parts.hostname: raise ValueError(f"issuer has no host: {issuer!r}") - # IDNA-encode the host to collapse unicode homoglyphs to ASCII - # punycode. ``idna.encode(host, uts46=True)`` raises on characters - # outside the IDNA-2008 allowlist — which is the failure mode we - # want. IDNA-2008 (UTS#46) matches the package-wide convention so - # an issuer canonicalized here compares byte-equal to the - # ``jwks_uri`` host the verifier pins via ``resolve_and_validate_host``. + # Canonicalize the host (IDNA-2008 A-label or IP-literal pass-through) + # so an issuer canonicalized here compares byte-equal to the + # ``jwks_uri`` host the verifier pins via + # ``resolve_and_validate_host``. See + # :mod:`adcp.signing._idna_canonicalize` for the package-wide + # convention (UTS#46, transitional_processing explicitly False, + # IP-literal short-circuit so revocation issuers on IP literals + # don't trip IDNA-2008's reject-purely-numeric-label rule). try: - host_ascii = idna.encode(parts.hostname, uts46=True).decode("ascii").lower() + host_ascii = canonicalize_host(parts.hostname) except (idna.IDNAError, UnicodeError, UnicodeEncodeError) as exc: raise ValueError(f"issuer host {parts.hostname!r} is not IDNA-valid: {exc}") from exc diff --git a/src/adcp/signing/verifier.py b/src/adcp/signing/verifier.py index 575c7a76f..e813896e0 100644 --- a/src/adcp/signing/verifier.py +++ b/src/adcp/signing/verifier.py @@ -8,6 +8,7 @@ from __future__ import annotations +import warnings from collections.abc import Mapping from dataclasses import dataclass, field from datetime import datetime, timezone @@ -42,6 +43,7 @@ REQUEST_SIGNATURE_DIGEST_MISMATCH, REQUEST_SIGNATURE_HEADER_MALFORMED, REQUEST_SIGNATURE_INVALID, + REQUEST_SIGNATURE_JWKS_UNAVAILABLE, REQUEST_SIGNATURE_KEY_PURPOSE_INVALID, REQUEST_SIGNATURE_KEY_REVOKED, REQUEST_SIGNATURE_KEY_UNKNOWN, @@ -514,13 +516,13 @@ def _maybe_check_key_origin( """Run the ADCP #3690 step 7 ``identity.key_origins`` check when the resolver sourced its keys from brand.json. - Resolver contract (duck-typed): + Resolver contract (duck-typed; conformance is also surfaced by + :class:`adcp.signing.BrandSourcedJwksResolver`): - * ``jwks_source``: one of ``"brand_json"`` / ``"publisher_pin"`` / - absent. Only ``"brand_json"`` engages the check; ``"publisher_pin"`` - and absence skip it. Absence is treated as "skip" so legacy - :class:`JwksResolver` implementations that predate this attribute - keep working without behavior change. + * ``jwks_source``: ``"brand_json"`` engages the check; any other + value (or absence) skips it. Absence is treated as "skip" so + legacy :class:`JwksResolver` implementations that predate this + attribute keep working without behavior change. * ``jwks_uri``: the resolved JWKS URI whose host is canonicalized and compared against the declared origin. Required when the resolver advertises ``jwks_source == "brand_json"``; a brand-json @@ -528,24 +530,74 @@ def _maybe_check_key_origin( we fail closed via the mismatch path (``actual_origin`` becomes ``None``). - Returns silently when: - - * ``expected_key_origins`` is ``None`` (adopter hasn't plumbed - capabilities through), OR - * the resolver isn't brand-json-sourced. + Skip + warn cases (both fire :func:`warnings.warn` so the + one-time message in the operator's log surfaces the misconfig): + + * ``jwks_source == "brand_json"`` + ``expected_key_origins is None``: + the resolver IS brand-json-sourced but the caller didn't surface + the operator's declared ``identity.key_origins`` map, so the + spec-mandated check silently no-ops. ``UserWarning`` — the + adopter needs to thread ``expected_key_origins`` through + ``VerifyOptions``. + * ``expected_key_origins`` set + resolver has no ``jwks_source``: + adopter upgraded the SDK but their custom resolver predates the + discriminant. ``DeprecationWarning`` — set + ``jwks_source = "brand_json"`` on the resolver class (or + conform to :class:`BrandSourcedJwksResolver`) to engage the + spec defense; without it the SDK silently downgrades to no-check. """ + source = getattr(resolver, "jwks_source", None) if expected_key_origins is None: + if source == "brand_json": + warnings.warn( + "Resolver advertises jwks_source='brand_json' but VerifyOptions " + "did not supply expected_key_origins — the spec-mandated " + "identity.key_origins consistency check (ADCP #3690 step 7) " + "is silently skipped. Thread the operator's " + "identity.key_origins map through VerifyOptions(expected_key_origins=...) " + "to engage the check; pass an empty dict if the operator " + "advertises no map and you want the missing-declaration " + "rejection (request_signature_key_origin_missing) to fire.", + UserWarning, + stacklevel=2, + ) return - source = getattr(resolver, "jwks_source", None) if source != "brand_json": + if source is None: + warnings.warn( + "VerifyOptions supplied expected_key_origins but the JWKS " + "resolver has no jwks_source attribute. The " + "identity.key_origins consistency check is silently skipped " + "on this path (back-compat for pre-#776 resolvers). Set " + "jwks_source='brand_json' on the resolver class (or conform " + "to adcp.signing.BrandSourcedJwksResolver) to engage the " + "ADCP #3690 step 7 defense.", + DeprecationWarning, + stacklevel=2, + ) return jwks_uri = getattr(resolver, "jwks_uri", None) - # ``jwks_uri`` may be ``None`` if the brand-json resolver hasn't - # populated it yet (cold cache + failed refresh). The consistency - # check fails closed on a missing actual host — same posture as - # ``_origin_host`` returning ``None``. + if not jwks_uri: + # A brand-json resolver that hasn't populated ``jwks_uri`` (cold + # cache + failed refresh, or a misconfigured custom resolver) is + # a resolver-side I/O failure, not a key-origin mismatch — the + # verifier has no resolved host to compare. Surface as + # ``REQUEST_SIGNATURE_JWKS_UNAVAILABLE`` so dashboards aggregate + # this cold-cache shape with other resolver-fetch failures + # rather than with adversarial origin-mismatch traffic. + raise SignatureVerificationError( + REQUEST_SIGNATURE_JWKS_UNAVAILABLE, + step=7, + message=( + "brand-json resolver did not populate jwks_uri (cold cache " + "or misconfigured resolver); key_origins consistency check " + "cannot proceed without a resolved host to compare against " + f"identity.key_origins.{signing_purpose}" + ), + detail={"purpose": signing_purpose}, + ) check_key_origin_consistency( - jwks_uri=jwks_uri or "", + jwks_uri=jwks_uri, key_origins=expected_key_origins, purpose=signing_purpose, posture=posture, diff --git a/tests/test_key_origins.py b/tests/test_key_origins.py index 86814aed5..cc4565f95 100644 --- a/tests/test_key_origins.py +++ b/tests/test_key_origins.py @@ -328,3 +328,45 @@ def test_consistency_unparseable_declared_origin_fails_closed() -> None: purpose="request_signing", ) assert exc_info.value.code == REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH + + +# ---- IP-literal short-circuit (Argus follow-up #1) ---- + + +def test_consistency_ipv4_literal_short_circuits_idna_canonicalization() -> None: + """``idna.encode("192.0.2.1", uts46=True)`` raises in IDNA-2008 + mode because the spec rejects purely-numeric labels. The helper + must short-circuit IP literals through ``ipaddress.ip_address`` + before IDNA so adopters on ``allow_private=True`` dev setups with + IP-literal JWKS URIs don't regress after the IDNA-2008 migration + (PR #789). Argus second-pass review follow-up.""" + # Both sides as IPv4 literals — should match. + check_key_origin_consistency( + jwks_uri="https://192.0.2.1/jwks.json", + key_origins={"request_signing": "https://192.0.2.1"}, + purpose="request_signing", + ) + + +def test_consistency_ipv6_literal_compressed_and_bracketed_canonicalize_equal() -> None: + """IPv6 literals come in bracketed (``[2001:db8::1]``) on URL + form and bare (``2001:db8::1``) on capability declarations. Both + must canonicalize to the same byte form so they compare equal.""" + check_key_origin_consistency( + jwks_uri="https://[2001:db8::1]/jwks.json", + key_origins={"request_signing": "2001:db8::1"}, + purpose="request_signing", + ) + + +def test_consistency_ipv4_vs_idn_host_fails_closed() -> None: + """IP literal on one side, IDN host on the other → mismatch (a + purely-numeric label cannot equal a domain). Pin the fail-closed + direction so a future refactor can't silently let them collide.""" + with pytest.raises(SignatureVerificationError) as exc_info: + check_key_origin_consistency( + jwks_uri="https://192.0.2.1/jwks.json", + key_origins={"request_signing": "https://example.com"}, + purpose="request_signing", + ) + assert exc_info.value.code == REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH diff --git a/tests/test_verify_from_agent_url.py b/tests/test_verify_from_agent_url.py index 3ee7fe400..3d5674230 100644 --- a/tests/test_verify_from_agent_url.py +++ b/tests/test_verify_from_agent_url.py @@ -499,3 +499,213 @@ async def fake_verify_starlette(request, *, options): # type: ignore[no-untyped # Production resolver MUST carry the URI as an instance attribute, # not just the source-marker class attribute. assert resolver.jwks_uri == "https://example.com/.well-known/jwks.json" + + +# ---- BrandSourcedJwksResolver Protocol (Argus follow-up) ---- + + +def test_brand_json_static_resolver_satisfies_brand_sourced_protocol() -> None: + """``_BrandJsonStaticJwksResolver`` MUST satisfy + :class:`BrandSourcedJwksResolver` at runtime. The Protocol surfaces + the duck-typed ``jwks_source`` + ``jwks_uri`` contract as a typed + predicate so verifier-side ``isinstance`` checks work and adopters + declaring custom brand.json-walking resolvers can opt in by + setting the two attributes (no inheritance required).""" + from adcp.signing import BrandSourcedJwksResolver + from adcp.signing.agent_resolver import _BrandJsonStaticJwksResolver + + resolver = _BrandJsonStaticJwksResolver( + {"keys": []}, + jwks_uri="https://keys.brand.example/.well-known/jwks.json", + ) + assert isinstance(resolver, BrandSourcedJwksResolver) + + +def test_static_jwks_resolver_does_not_satisfy_brand_sourced_protocol() -> None: + """A bare :class:`StaticJwksResolver` MUST NOT satisfy the + BrandSourcedJwksResolver Protocol — it carries neither + ``jwks_source`` nor ``jwks_uri``. The check skip on absence is + the back-compat path for adopter resolvers that predate the + discriminant.""" + from adcp.signing import BrandSourcedJwksResolver, StaticJwksResolver + + resolver = StaticJwksResolver({"keys": []}) + assert not isinstance(resolver, BrandSourcedJwksResolver) + + +# ---- Misconfig warnings (Argus first-pass follow-ups) ---- + + +def test_brand_json_source_without_expected_origins_emits_user_warning() -> None: + """A resolver advertising ``jwks_source='brand_json'`` paired with + ``expected_key_origins=None`` is an observable misconfig — the + spec's identity.key_origins consistency check (ADCP #3690 step 7) + silently no-ops. Surface as a :class:`UserWarning` so adopters + catch it in operator logs and thread the origins map through + ``VerifyOptions``.""" + import warnings as _w + + from adcp.signing.agent_resolver import _BrandJsonStaticJwksResolver + from adcp.signing.verifier import _maybe_check_key_origin + + resolver = _BrandJsonStaticJwksResolver( + {"keys": []}, + jwks_uri="https://keys.brand.example/jwks.json", + ) + with _w.catch_warnings(record=True) as caught: + _w.simplefilter("always") + _maybe_check_key_origin( + resolver=resolver, + expected_key_origins=None, + signing_purpose="request_signing", + posture=None, + ) + user_warnings = [w for w in caught if issubclass(w.category, UserWarning)] + assert len(user_warnings) == 1 + assert "jwks_source='brand_json'" in str(user_warnings[0].message) + + +def test_legacy_resolver_with_expected_origins_emits_deprecation_warning() -> None: + """A resolver without a ``jwks_source`` attribute (a pre-#776 + adopter resolver) paired with ``expected_key_origins`` set is + misconfig in the other direction — the SDK silently downgrades + to no-check. Surface as :class:`DeprecationWarning` so the adopter + sees the upgrade signal on next deploy.""" + import warnings as _w + + from adcp.signing import StaticJwksResolver + from adcp.signing.verifier import _maybe_check_key_origin + + # StaticJwksResolver carries no jwks_source — legacy adopter shape. + resolver = StaticJwksResolver({"keys": []}) + with _w.catch_warnings(record=True) as caught: + _w.simplefilter("always") + _maybe_check_key_origin( + resolver=resolver, + expected_key_origins={"request_signing": "https://example.com"}, + signing_purpose="request_signing", + posture=None, + ) + deprecations = [w for w in caught if issubclass(w.category, DeprecationWarning)] + assert len(deprecations) == 1 + assert "jwks_source" in str(deprecations[0].message) + + +def test_no_warning_when_both_attributes_align() -> None: + """The happy path — brand_json resolver + supplied origins — must + not emit either warning. Pin the no-noise direction so a future + refactor can't accidentally fire warnings on legitimate verifier + invocations.""" + import warnings as _w + + from adcp.signing.agent_resolver import _BrandJsonStaticJwksResolver + from adcp.signing.verifier import _maybe_check_key_origin + + resolver = _BrandJsonStaticJwksResolver( + {"keys": []}, + jwks_uri="https://keys.brand.example/jwks.json", + ) + with _w.catch_warnings(record=True) as caught: + _w.simplefilter("always") + _maybe_check_key_origin( + resolver=resolver, + expected_key_origins={"request_signing": "https://keys.brand.example"}, + signing_purpose="request_signing", + posture=None, + ) + misconfig = [ + w + for w in caught + if issubclass(w.category, (UserWarning, DeprecationWarning)) + and "jwks_source" in str(w.message) + ] + assert misconfig == [] + + +# ---- _extract_key_origins length cap (Argus follow-up nit #1) ---- + + +def test_extract_key_origins_caps_oversized_entries() -> None: + """Each origin value must be clamped at 512 bytes — well above any + legitimate ``scheme+host+port`` shape but tight enough that a + pathological multi-kilobyte value from the 64 KiB capabilities body + doesn't propagate through downstream comparisons. Oversized entries + are SKIPPED (not truncated — a truncated host would silently match + the wrong domain).""" + from adcp.signing.agent_resolver import _extract_key_origins + + huge = "https://" + "x" * 1024 + ".com" + legit = "https://keys.brand.com" + result = _extract_key_origins( + { + "identity": { + "key_origins": { + "request_signing": legit, + "webhook_signing": huge, # skipped + } + } + } + ) + assert result == {"request_signing": legit} + + +# ---- Diagnostic host-only fallback (Argus follow-up nit #2) ---- + + +def test_mismatch_detail_uses_host_only_fallback_on_canonicalization_failure() -> None: + """When ``_origin_host`` can't canonicalize one side (e.g. spaces + in the host), the mismatch ``expected_origin`` / ``actual_origin`` + detail values must still be HOST-SHAPED — not the full raw URL. + Previous behavior leaked the full URL into the host-labeled field, + inconsistent with the success path. Now the diagnostic uses a + best-effort host extraction via ``_extract_host``.""" + from adcp.signing.errors import REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH + from adcp.signing.key_origins import check_key_origin_consistency + + with pytest.raises(SignatureVerificationError) as exc_info: + check_key_origin_consistency( + jwks_uri="https://keys.brand.example/jwks.json", + key_origins={"request_signing": "not a host with spaces"}, + purpose="request_signing", + ) + assert exc_info.value.code == REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH + detail = exc_info.value.detail + assert detail is not None + # actual_origin canonicalizes cleanly to the host (no URL form). + assert detail["actual_origin"] == "keys.brand.example" + # expected_origin failed to canonicalize but still doesn't leak the + # full raw string with quoting artifacts — empty string at worst, + # never the full URL. + assert "/" not in detail["expected_origin"] + + +# ---- jwks_uri=None routes to JWKS_UNAVAILABLE (Argus follow-up #4) ---- + + +def test_maybe_check_key_origin_jwks_uri_none_routes_to_jwks_unavailable() -> None: + """A brand-json resolver that hasn't populated ``jwks_uri`` (cold + cache + failed refresh, or a misconfigured custom resolver) is a + resolver-side I/O failure, not an origin mismatch. The verifier + must surface ``REQUEST_SIGNATURE_JWKS_UNAVAILABLE`` so dashboards + aggregate this cold-cache shape with other resolver-fetch + failures rather than with adversarial origin-mismatch traffic.""" + from adcp.signing.errors import REQUEST_SIGNATURE_JWKS_UNAVAILABLE + from adcp.signing.verifier import _maybe_check_key_origin + + class _BrandJsonResolverWithNoJwksUri: + jwks_source = "brand_json" + # ``jwks_uri`` deliberately absent / None — cold cache shape. + jwks_uri = None + + def __call__(self, keyid: str) -> dict | None: # type: ignore[type-arg] + return None + + with pytest.raises(SignatureVerificationError) as exc_info: + _maybe_check_key_origin( + resolver=_BrandJsonResolverWithNoJwksUri(), # type: ignore[arg-type] + expected_key_origins={"request_signing": "https://keys.brand.example"}, + signing_purpose="request_signing", + posture=None, + ) + assert exc_info.value.code == REQUEST_SIGNATURE_JWKS_UNAVAILABLE + assert exc_info.value.detail == {"purpose": "request_signing"}