Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/adcp/signing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@
AsyncCachingJwksResolver,
AsyncJwksFetcher,
AsyncJwksResolver,
BrandSourcedJwksResolver,
CachingJwksResolver,
JwksResolver,
SSRFValidationError,
Expand Down Expand Up @@ -320,6 +321,7 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"BrandAuthorizationResult",
"BrandJsonAuthorizationResolver",
"BrandJsonJwksResolver",
"BrandSourcedJwksResolver",
"BrandJsonResolverError",
"BrandJsonResolverErrorCode",
"CAPABILITY_OP",
Expand Down
80 changes: 80 additions & 0 deletions src/adcp/signing/_idna_canonicalize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""IDNA-2008 host canonicalization with IP-literal short-circuit.

Shared by the four signing-side callsites that canonicalize host
strings for comparison: :mod:`adcp.signing.jwks` (JWKS URI host
pinning), :mod:`adcp.signing.ip_pinned_transport` (per-connect pin
normalization), :mod:`adcp.signing.revocation_fetcher` (revocation-
issuer canonicalization), :mod:`adcp.signing.key_origins` (ADCP #3690
step 7 ``identity.key_origins`` consistency check).

**Why IP literals need a short-circuit.** ``idna.encode("192.0.2.1",
uts46=True)`` raises because IDNA-2008 rejects purely-numeric labels
(``a label which consists of digits only``). Stdlib's
``host.encode("idna")`` was lenient and returned the ASCII as-is.
Adopters running on ``allow_private=True`` dev setups with IP-literal
JWKS URIs would see ``SSRFValidationError: URI host '...' is not
IDNA-valid`` after the IDNA-2008 migration in PR #789 — a regression
on the dev-loop path without a security justification (IP literals
are not IDN candidates by definition).

Gating with :func:`ipaddress.ip_address` short-circuits IP inputs
through the encoder untouched. Both v4 (``192.0.2.1``) and v6
(``2001:db8::1`` or bracketed ``[2001:db8::1]``) are handled.

**Why ``transitional=False`` is explicit.** The default in
``idna>=3.x`` is already ``False`` (Eszett-preserving — what UTS#46
calls *non-transitional processing*), but pinning it at the callsite
documents intent and locks the canonicalization regardless of any
future upstream default flip. The package's existing eszett-regression
test (``tests/test_key_origins.py``) covers the load-bearing
behavior; the kwarg here is belt-and-suspenders. (Note: the ``idna``
package spells the kwarg ``transitional``, not the UTS#46-document
spelling ``transitional_processing``.)
"""

from __future__ import annotations

import ipaddress

import idna

__all__ = ["canonicalize_host"]


def canonicalize_host(host: str) -> str:
"""Return the canonical A-label form of ``host`` for byte-equal
host comparisons.

Steps:

1. Strip a single trailing FQDN-root dot.
2. ASCII-lowercase (IDNA encoding is case-insensitive on the
wire but we want comparison-friendly bytes).
3. **Short-circuit IP literals** — both v4 and v6 (with or without
surrounding brackets) are returned as ``str(ipaddress.ip_address(host))``,
skipping IDNA entirely. IDNA-2008 rejects purely-numeric labels.
4. Otherwise call ``idna.encode(host, uts46=True, transitional=False)``
and return the decoded ASCII (lowercased to match the other
branches).

Raises ``idna.IDNAError`` (or its parent ``UnicodeError``) on a
label the encoder cannot process. Callers decide whether to
fail-closed (let the exception propagate) or fall back to a
permissive comparison (catch and use the raw input).
"""
host = host.strip()
if host.endswith("."):
host = host[:-1]
host = host.lower()
# IP-literal short-circuit. ``[2001:db8::1]`` form (URL-bracketed)
# comes in from some callsites; strip brackets before the parse.
candidate = host
if candidate.startswith("[") and candidate.endswith("]"):
candidate = candidate[1:-1]
try:
ip = ipaddress.ip_address(candidate)
except ValueError:
# Not an IP literal — fall through to IDNA encoding.
return idna.encode(host, uts46=True, transitional=False).decode("ascii")
# Compressed canonical form via str(IPv6Address) etc.
return str(ip)
91 changes: 57 additions & 34 deletions src/adcp/signing/agent_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,14 @@ def _extract_brand_json_url(capabilities: dict[str, Any]) -> str:
return brand_json_url


#: Per-entry size clamp on ``identity.key_origins`` values. DNS hostname
#: limit is 253 octets (RFC 1035); origin strings carry scheme+host so
#: the practical cap is a bit higher, but 512 is well above any
#: legitimate value while still bounding the surface against a
#: pathologically-large entry from a 64 KiB capabilities body.
_MAX_KEY_ORIGIN_VALUE_BYTES = 512


def _extract_key_origins(capabilities: dict[str, Any]) -> dict[str, str] | None:
"""Pluck ``identity.key_origins`` from the capabilities body.

Expand All @@ -339,6 +347,14 @@ def _extract_key_origins(capabilities: dict[str, Any]) -> dict[str, str] | None:
purpose is actually exercised). Filters values to strings — a
malformed entry is skipped rather than poisoning the whole map.

**Per-entry length cap (``_MAX_KEY_ORIGIN_VALUE_BYTES``).** Each
origin value is bounded to 512 bytes — well above any legitimate
``scheme + host + port`` shape but tight enough that a pathological
multi-kilobyte value from the 64 KiB capabilities body doesn't
propagate through downstream comparisons. Entries exceeding the cap
are skipped (the verifier then surfaces the purpose as missing on
the consistency check).

Forward-compat with operators on 3.0 schemas: the map travels under
``additionalProperties: true`` and the SDK reads it as a plain dict
rather than via the typed Pydantic surface (which won't carry the
Expand All @@ -352,8 +368,13 @@ def _extract_key_origins(capabilities: dict[str, Any]) -> dict[str, str] | None:
return None
out: dict[str, str] = {}
for purpose, origin in raw.items():
if isinstance(purpose, str) and isinstance(origin, str) and origin:
out[purpose] = origin
if not (isinstance(purpose, str) and isinstance(origin, str) and origin):
continue
if len(origin.encode("utf-8")) > _MAX_KEY_ORIGIN_VALUE_BYTES:
# Length-capped entry — skip rather than truncate (a
# truncated host would silently match the wrong domain).
continue
out[purpose] = origin
return out or None


Expand Down Expand Up @@ -557,6 +578,40 @@ def resolve_agent(
# ---- verify factory ----


class _BrandJsonStaticJwksResolver(StaticJwksResolver):
"""A :class:`StaticJwksResolver` carrying the ``"brand_json"``
source discriminant AND the resolved ``jwks_uri``.

Conforms to :class:`adcp.signing.BrandSourcedJwksResolver` — the
verifier's ``_maybe_check_key_origin`` engages the spec's
consistency check on every signed request routed through
:func:`verify_from_agent_url`. Adopters wiring custom resolvers
declare the same conformance by setting ``jwks_source = "brand_json"``
(class attribute) and exposing ``jwks_uri`` (instance attribute);
they MAY also import :class:`BrandSourcedJwksResolver` to type-check
the contract at static analysis time.

The brand.json walk in :func:`async_resolve_agent` resolved this
JWKS — that's exactly the source the spec's key-origin consistency
check (ADCP #3690 step 7) defends. The verifier reads
``getattr(resolver, "jwks_uri", None)`` to look up the resolved
host for the comparison. :class:`StaticJwksResolver` does not
carry a ``jwks_uri`` (it's a static keyset), so this subclass
stores the brand.json-resolved URI on the instance. Without it
the check would mismatch every legitimate signer with
``actual_origin=""``.

Defined inside the module rather than as a public type because the
helper composition is internal to the buyer-side verify factory.
"""

jwks_source: ClassVar[Literal["brand_json"]] = "brand_json"

def __init__(self, jwks: dict[str, Any], *, jwks_uri: str) -> None:
super().__init__(jwks)
self.jwks_uri = jwks_uri


async def verify_from_agent_url(
request: Any,
agent_url: str,
Expand Down Expand Up @@ -674,38 +729,6 @@ async def verify_from_agent_url(
return await verify_starlette_request(request, options=options)


class _BrandJsonStaticJwksResolver(StaticJwksResolver):
"""A :class:`StaticJwksResolver` carrying the ``"brand_json"``
source discriminant AND the resolved ``jwks_uri``.

The brand.json walk in :func:`async_resolve_agent` resolved this
JWKS — that's exactly the source the spec's key-origin consistency
check (ADCP #3690 step 7) defends. The verifier's
``_maybe_check_key_origin`` step skips when ``jwks_source`` is
absent (treating absence as publisher-pin-equivalent); marking the
static resolver here engages the check on every signed request
routed through :func:`verify_from_agent_url`.

The verifier reads ``getattr(resolver, "jwks_uri", None)`` to look
up the resolved host for the consistency comparison.
:class:`StaticJwksResolver` does not carry a ``jwks_uri`` (it's a
static keyset), so this subclass stores the brand.json-resolved
URI on the instance. Without it the check would mismatch every
legitimate signer with ``actual_origin=""``.

Defined inside the module rather than as a public type because the
discriminant is internal — adopters wiring custom resolvers set
their own ``jwks_source = "brand_json"`` class attribute and
``jwks_uri`` instance attribute directly.
"""

jwks_source: ClassVar[Literal["brand_json", "publisher_pin"]] = "brand_json"

def __init__(self, jwks: dict[str, Any], *, jwks_uri: str) -> None:
super().__init__(jwks)
self.jwks_uri = jwks_uri


# ---- helpers ----


Expand Down
2 changes: 1 addition & 1 deletion src/adcp/signing/brand_jwks.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class BrandJsonJwksResolver:

#: Discriminant for the verifier-side key_origin consistency
#: check (see class docstring).
jwks_source: ClassVar[Literal["brand_json", "publisher_pin"]] = "brand_json"
jwks_source: ClassVar[Literal["brand_json"]] = "brand_json"

def __init__(
self,
Expand Down
31 changes: 14 additions & 17 deletions src/adcp/signing/ip_pinned_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from httpcore._backends.anyio import AnyIOBackend as _AnyIOBackend
from httpcore._backends.sync import SyncBackend as _SyncBackend

from adcp.signing._idna_canonicalize import canonicalize_host
from adcp.signing.jwks import resolve_and_validate_host

if TYPE_CHECKING:
Expand Down Expand Up @@ -100,26 +101,22 @@ def _build_ssl_context() -> ssl.SSLContext:
def _normalize_pin_host(host: str) -> str:
"""Normalize a hostname for byte-equal comparison.

Lowercases, strips a single trailing dot, and IDNA-encodes so
Unicode hostnames compare equal to the punycode form httpx
passes to httpcore.

IDNA-2008 (UTS#46) via the PyPI ``idna`` package — the
package-wide canonicalization convention, matching the JWKS
fetcher's ``resolve_and_validate_host`` so a pin set on
``straße.de`` collapses to the same A-label httpx will pass to
httpcore at connect time.
Delegates to :func:`canonicalize_host` — strips a single trailing
dot, ASCII-lowercases, short-circuits IP literals (v4 and v6,
bracketed or not) before IDNA, and otherwise encodes via
IDNA-2008 (UTS#46 with ``transitional_processing=False``).
Matches the JWKS fetcher's ``resolve_and_validate_host`` so a pin
set on ``straße.de`` collapses to the same A-label httpx will
pass to httpcore at connect time.

Falls back to the raw input on IDNA encode failure so the
comparison just fails cleanly instead of raising inside
connect_tcp.
"""
host = host.lower()
if host.endswith("."):
host = host[:-1]
try:
return idna.encode(host, uts46=True).decode("ascii")
return canonicalize_host(host)
except (idna.IDNAError, UnicodeError, UnicodeEncodeError):
# Caller already stored the normalized form; fall through
# with the lowercased input so the comparison just fails
# cleanly instead of raising inside connect_tcp.
return host
return host.lower().rstrip(".")


class _IpPinnedSyncBackend(_SyncBackend):
Expand Down
70 changes: 48 additions & 22 deletions src/adcp/signing/jwks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
import socket
import time
from collections.abc import Callable
from typing import Any, Protocol
from typing import Any, ClassVar, Literal, Protocol, runtime_checkable
from urllib.parse import urlsplit

import httpx
import idna

from adcp.signing._idna_canonicalize import canonicalize_host
from adcp.signing.errors import (
REQUEST_SIGNATURE_JWKS_UNAVAILABLE,
REQUEST_SIGNATURE_JWKS_UNTRUSTED,
Expand Down Expand Up @@ -116,6 +117,43 @@ class AsyncJwksResolver(Protocol):
async def __call__(self, keyid: str) -> dict[str, Any] | None: ...


@runtime_checkable
class BrandSourcedJwksResolver(Protocol):
"""A :class:`JwksResolver` whose keys were resolved via a
brand.json walk (operator-attested key source per ADCP #3690).

The verifier's ``identity.key_origins`` consistency check engages
only on resolvers advertising ``jwks_source == "brand_json"``;
publisher-pinned tuples (``jwks_source == "publisher_pin"``) skip
the check, and legacy adopter resolvers without the attribute
default to skip (treated as publisher-pin-equivalent for
back-compat).

Surfacing the contract as a runtime-checkable Protocol means
``isinstance(resolver, BrandSourcedJwksResolver)`` at the verifier
layer is a typed predicate — not just a duck-typed
``hasattr(resolver, "jwks_source")``. Adopters wiring custom
brand.json-walking resolvers declare conformance by setting
``jwks_source = "brand_json"`` (class attribute) and exposing
``jwks_uri`` (instance attribute); :func:`isinstance` will then
return True even without inheriting from this Protocol.

Implementations in this package:

* :class:`adcp.signing.brand_jwks.BrandJsonJwksResolver` —
production resolver walking brand.json on every cache miss.
* ``adcp.signing.agent_resolver._BrandJsonStaticJwksResolver`` —
one-shot static resolver constructed by
:func:`verify_from_agent_url` from a frozen JWK set.
"""

jwks_source: ClassVar[Literal["brand_json"]]
jwks_uri: str

def __call__(self, keyid: str) -> dict[str, Any] | None:
"""Resolve a JWK by keyid. Same shape as :meth:`JwksResolver.__call__`."""


def validate_jwks_uri(
uri: str,
*,
Expand Down Expand Up @@ -186,28 +224,16 @@ def resolve_and_validate_host(
host = parts.hostname
if host is None or host == "":
raise SSRFValidationError(f"URI has no host: {uri!r}")
# Strip a single trailing dot (FQDN form) so the pin matches what
# httpx / httpcore pass on subsequent requests. Without this, a
# caller who constructs with ``https://host./`` and then requests
# ``https://host/`` (or vice versa) sees the backend's
# hostname-match fail and falls through to unpinned resolution.
if host.endswith("."):
host = host[:-1]
# IDNA-encode so Unicode hostnames match the ASCII form httpx
# produces before calling into httpcore. urlsplit preserves the
# raw Unicode; httpx encodes it. A mismatch here breaks the
# hostname-match in the backend override and silently reopens
# the TOCTOU for IDN hosts.
#
# IDNA-2008 (UTS#46, transitional_processing=False) via the PyPI
# ``idna`` package — stdlib ``encodings.idna`` is IDNA-2003 and
# mismaps Eszett (``ß`` → ``ss``) and final-sigma. The
# package-wide IDNA convention is IDNA-2008; all four callsites
# (here, ``ip_pinned_transport``, ``revocation_fetcher``,
# ``key_origins``) share this encoding so canonicalization
# results compare byte-equal across the verifier pipeline.
# Canonicalize so Unicode hostnames match the ASCII form httpx
# produces before calling into httpcore (preserving the
# hostname-match in the backend override; a mismatch silently
# reopens the TOCTOU for IDN hosts), AND so IP literals don't
# trip IDNA-2008's reject-purely-numeric-label rule.
# See :mod:`adcp.signing._idna_canonicalize` for the
# package-wide IDNA convention (UTS#46, transitional_processing
# explicitly False, IP-literal short-circuit).
try:
host = idna.encode(host, uts46=True).decode("ascii").lower()
host = canonicalize_host(host)
except (idna.IDNAError, UnicodeError, UnicodeEncodeError) as exc:
raise SSRFValidationError(f"URI host {host!r} is not IDNA-valid: {exc}") from exc
port = parts.port if parts.port is not None else (443 if parts.scheme == "https" else 80)
Expand Down
Loading
Loading