diff --git a/src/adcp/signing/__init__.py b/src/adcp/signing/__init__.py index abf6ac49d..da1effaa9 100644 --- a/src/adcp/signing/__init__.py +++ b/src/adcp/signing/__init__.py @@ -64,11 +64,19 @@ ) from adcp.signing.jwks import ( CachingJwksResolver, + JwksResolver, SSRFValidationError, StaticJwksResolver, default_jwks_fetcher, validate_jwks_uri, ) +from adcp.signing.jws import ( + JwsError, + JwsMalformedError, + JwsSignatureInvalidError, + JwsUnknownKeyError, + verify_jws_document, +) from adcp.signing.middleware import ( unauthorized_response_headers, verify_flask_request, @@ -76,12 +84,22 @@ ) from adcp.signing.replay import InMemoryReplayStore, ReplayStore from adcp.signing.revocation import RevocationChecker, RevocationList +from adcp.signing.revocation_fetcher import ( + DEFAULT_GRACE_MULTIPLIER, + REVOCATION_LIST_TYP, + CachingRevocationChecker, + FetchResult, + RevocationListFetcher, + RevocationListFetchError, + RevocationListFreshnessError, + RevocationListParseError, + default_revocation_list_fetcher, +) from adcp.signing.signer import ( SignedHeaders, sign_request, ) from adcp.signing.verifier import ( - JwksResolver, VerifiedSigner, VerifierCapability, VerifyOptions, @@ -93,11 +111,18 @@ "ALG_ES256", "ALLOWED_ALGS", "CachingJwksResolver", + "CachingRevocationChecker", "DEFAULT_EXPIRES_IN_SECONDS", + "DEFAULT_GRACE_MULTIPLIER", "DEFAULT_SKEW_SECONDS", "DEFAULT_TAG", + "FetchResult", "InMemoryReplayStore", "JwksResolver", + "JwsError", + "JwsMalformedError", + "JwsSignatureInvalidError", + "JwsUnknownKeyError", "MAX_WINDOW_SECONDS", "NONCE_BYTES", "REQUEST_SIGNATURE_ALG_NOT_ALLOWED", @@ -118,9 +143,14 @@ "REQUEST_SIGNATURE_REVOCATION_STALE", "REQUEST_SIGNATURE_TAG_INVALID", "REQUEST_SIGNATURE_WINDOW_INVALID", + "REVOCATION_LIST_TYP", "ReplayStore", "RevocationChecker", "RevocationList", + "RevocationListFetchError", + "RevocationListFetcher", + "RevocationListFreshnessError", + "RevocationListParseError", "SIG_LABEL_DEFAULT", "SSRFValidationError", "SignatureInputLabel", @@ -141,6 +171,7 @@ "compute_content_digest_sha256", "content_digest_matches", "default_jwks_fetcher", + "default_revocation_list_fetcher", "extract_signature_bytes", "format_signature_header", "operation_needs_signing", @@ -152,6 +183,7 @@ "unauthorized_response_headers", "validate_jwks_uri", "verify_flask_request", + "verify_jws_document", "verify_request_signature", "verify_signature", "verify_starlette_request", diff --git a/src/adcp/signing/jwks.py b/src/adcp/signing/jwks.py index 2b14aa1e6..29113e09b 100644 --- a/src/adcp/signing/jwks.py +++ b/src/adcp/signing/jwks.py @@ -60,6 +60,18 @@ class JwksFetcher(Protocol): def __call__(self, uri: str, *, allow_private: bool = False) -> dict[str, Any]: ... +class JwksResolver(Protocol): + """Resolves a keyid to a JWK, or returns None if unknown. + + The canonical Protocol used by both the RFC 9421 verifier and the JWS + document verifier. Implementations include + :class:`StaticJwksResolver` (in-memory, for tests) and + :class:`CachingJwksResolver` (fetches + caches from a URI). + """ + + def __call__(self, keyid: str) -> dict[str, Any] | None: ... + + def validate_jwks_uri(uri: str, *, allow_private: bool = False) -> None: """Raise SSRFValidationError if `uri` resolves to a blocked IP or has a bad scheme.""" parts = urlsplit(uri) @@ -194,6 +206,7 @@ def __call__(self, keyid: str) -> dict[str, Any] | None: "DEFAULT_JWKS_COOLDOWN_SECONDS", "DEFAULT_JWKS_TIMEOUT_SECONDS", "JwksFetcher", + "JwksResolver", "SSRFValidationError", "StaticJwksResolver", "default_jwks_fetcher", diff --git a/src/adcp/signing/jws.py b/src/adcp/signing/jws.py new file mode 100644 index 000000000..d50ea5de6 --- /dev/null +++ b/src/adcp/signing/jws.py @@ -0,0 +1,275 @@ +"""Minimal JWS parse + verify for AdCP revocation lists. + +The AdCP governance profile uses JSON Web Signature (RFC 7515) to sign +revocation-list documents published at +``{origin}/.well-known/governance-revocations.json``. The list MAY be +serialized in compact form (``header.payload.signature``) or JWS general +JSON serialization (an object with ``payload`` and ``signatures[]``). +Both carry the same three fields after decoding. + +This module intentionally does not pull in ``pyjwt`` / ``authlib``. AdCP +has a narrow allowed-alg set (``EdDSA``, ``ES256``) and we already own +the underlying crypto primitives for RFC 9421, so a ~120-line parser +over the existing ``verify_signature`` is both leaner and auditable. + +The signature base for compact JWS is:: + + ASCII(BASE64URL(protected_header)) || "." || ASCII(BASE64URL(payload)) + +— exactly what ``verify_signature`` expects, given the JWS-to-RFC-9421 +algorithm mapping below. +""" + +from __future__ import annotations + +import binascii +import json +from typing import Any + +from adcp.signing.crypto import ( + ALG_ED25519, + ALG_ES256, + b64url_decode, + public_key_from_jwk, + verify_signature, +) +from adcp.signing.jwks import JwksResolver + +# JWS uses RFC 7518 algorithm names; RFC 9421 uses the IANA HTTP Signature +# names. We convert at the JWS boundary so the rest of the module speaks +# the internal alg vocabulary already used by our crypto primitives. +JWS_ALG_TO_INTERNAL: dict[str, str] = { + "EdDSA": ALG_ED25519, + "ES256": ALG_ES256, +} +ALLOWED_JWS_ALGS: frozenset[str] = frozenset(JWS_ALG_TO_INTERNAL.keys()) + + +class JwsError(Exception): + """Base class for JWS parse/verify failures.""" + + +class JwsMalformedError(JwsError): + """JWS document is syntactically invalid or uses a disallowed shape.""" + + +class JwsUnknownKeyError(JwsError): + """The JWS header ``kid`` is not present in the configured JWKS.""" + + +class JwsSignatureInvalidError(JwsError): + """The JWS signature did not verify against the resolved key.""" + + +def _decode_protected_header(b64_header: str) -> dict[str, Any]: + try: + raw = b64url_decode(b64_header) + except (ValueError, binascii.Error) as exc: + raise JwsMalformedError(f"protected header is not valid base64url: {exc}") from exc + try: + header = json.loads(raw.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError) as exc: + raise JwsMalformedError(f"protected header is not valid JSON: {exc}") from exc + if not isinstance(header, dict): + raise JwsMalformedError("protected header is not a JSON object") + return header + + +def parse_compact_jws(token: str) -> tuple[str, str, bytes]: + """Split a compact JWS into ``(b64_header, b64_payload, signature bytes)``. + + Returns the header and payload as the ORIGINAL base64url substrings — + not decoded and re-encoded — so the verifier uses the exact bytes that + the signer hashed. This matters because ``urlsafe_b64decode`` is + lenient (accepts ``+``/``/`` and padding); round-tripping through it + can produce different bytes than the wire form. + """ + if not isinstance(token, str): + raise JwsMalformedError("compact JWS must be a string") + parts = token.split(".") + if len(parts) != 3: + raise JwsMalformedError( + f"compact JWS must have exactly 3 dot-separated segments, got {len(parts)}" + ) + b64_header, b64_payload, b64_signature = parts + if not b64_header or not b64_payload or not b64_signature: + raise JwsMalformedError("compact JWS has an empty segment") + try: + signature = b64url_decode(b64_signature) + except (ValueError, binascii.Error) as exc: + raise JwsMalformedError(f"compact JWS signature is not valid base64url: {exc}") from exc + return b64_header, b64_payload, signature + + +def parse_general_json_jws(doc: dict[str, Any]) -> tuple[str, str, bytes]: + """Extract the first signature from a JWS general JSON serialization. + + Returns ``(b64_header, b64_payload, signature_bytes)``. AdCP revocation + lists are signed by a single operator key, so we only honor the first + entry in ``signatures[]``. A list with multiple signatures is malformed + for this profile. + """ + if not isinstance(doc, dict): + raise JwsMalformedError("JWS general JSON document must be an object") + if "payload" not in doc or "signatures" not in doc: + raise JwsMalformedError("JWS general JSON document must have 'payload' and 'signatures'") + signatures = doc["signatures"] + if not isinstance(signatures, list) or len(signatures) == 0: + raise JwsMalformedError("JWS general JSON 'signatures' must be a non-empty array") + if len(signatures) > 1: + raise JwsMalformedError( + "JWS general JSON 'signatures' with multiple entries is not supported for " + "this profile" + ) + entry = signatures[0] + if not isinstance(entry, dict): + raise JwsMalformedError("JWS signature entry must be an object") + b64_header = entry.get("protected") + b64_signature = entry.get("signature") + b64_payload = doc["payload"] + if not isinstance(b64_header, str) or not isinstance(b64_signature, str): + raise JwsMalformedError("JWS signature entry missing 'protected' or 'signature'") + if not isinstance(b64_payload, str): + raise JwsMalformedError("JWS 'payload' must be a base64url string") + try: + signature = b64url_decode(b64_signature) + except (ValueError, binascii.Error) as exc: + raise JwsMalformedError(f"JWS signature is not valid base64url: {exc}") from exc + return b64_header, b64_payload, signature + + +def verify_detached_jws( + *, + b64_protected: str, + b64_payload: str, + signature: bytes, + jwks_resolver: JwksResolver, + expected_typ: str, + allowed_algs: frozenset[str] = ALLOWED_JWS_ALGS, +) -> dict[str, Any]: + """Verify a parsed JWS and return the decoded payload JSON. + + Performs, in order: + 1. Decode + parse the protected header. + 2. Reject if ``alg`` is absent, is ``none``, or not in ``allowed_algs``. + 3. Reject if ``typ`` is not exactly ``expected_typ`` (byte equality, no + normalization — matches the AdCP spec). + 4. Resolve ``kid`` via ``jwks_resolver``; reject unknown. + 5. Reconstruct the signature base + (``b64_protected + "." + b64_payload``) using the ORIGINAL + base64url strings and verify the signature bytes with the existing + HTTP-signature crypto. Using the original strings (not decode + + re-encode) defends against lenient-decode mismatches. + 6. Decode the payload as JSON and return it. + + Any failure raises a :class:`JwsError` subclass. The caller maps these + to transport-error codes (e.g. ``request_signature_revocation_stale``). + """ + header = _decode_protected_header(b64_protected) + + alg = header.get("alg") + if not isinstance(alg, str) or alg == "none" or alg not in allowed_algs: + raise JwsMalformedError( + f"JWS alg {alg!r} not allowed; permitted values: {sorted(allowed_algs)}" + ) + internal_alg = JWS_ALG_TO_INTERNAL[alg] + + typ = header.get("typ") + if typ != expected_typ: + raise JwsMalformedError( + f"JWS typ {typ!r} does not match expected {expected_typ!r}" + ) + + # crit handling: the AdCP profile defines no extensions, so any + # unrecognized crit entry means the caller can't safely process it. + # Only allow crit if it's empty / absent. + crit = header.get("crit") + if crit is not None and (not isinstance(crit, list) or len(crit) > 0): + raise JwsMalformedError( + "JWS 'crit' header is not supported for this profile" + ) + + kid = header.get("kid") + if not isinstance(kid, str) or not kid: + raise JwsMalformedError("JWS protected header must include a non-empty 'kid'") + + jwk = jwks_resolver(kid) + if jwk is None: + raise JwsUnknownKeyError(f"no JWK for kid {kid!r}") + + # Reconstruct the detached signature base per RFC 7515 §5.1 step 5: + # ASCII(BASE64URL(protected header)) || "." || ASCII(BASE64URL(payload)). + # Use the ORIGINAL base64url strings — don't decode-then-re-encode, since + # ``urlsafe_b64decode`` is lenient (accepts padding, tolerates some + # alphabet variants) and a round-trip can produce different bytes than + # the wire form. Verifying against the re-encoded bytes would let a + # crafted token verify against bytes the signer never signed. + signing_input = (b64_protected + "." + b64_payload).encode("ascii") + + public_key = public_key_from_jwk(jwk) + if not verify_signature( + alg=internal_alg, + public_key=public_key, + signature_base=signing_input, + signature=signature, + ): + raise JwsSignatureInvalidError(f"signature did not verify for kid {kid!r}") + + # Now that the signature is verified, decode the payload bytes from the + # trusted b64 string and parse as JSON. + try: + payload_bytes = b64url_decode(b64_payload) + except (ValueError, binascii.Error) as exc: + raise JwsMalformedError(f"JWS payload is not valid base64url: {exc}") from exc + try: + decoded_payload = json.loads(payload_bytes.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError) as exc: + raise JwsMalformedError(f"JWS payload is not valid JSON: {exc}") from exc + if not isinstance(decoded_payload, dict): + raise JwsMalformedError("JWS payload is not a JSON object") + return decoded_payload + + +def verify_jws_document( + doc: str | dict[str, Any], + *, + jwks_resolver: JwksResolver, + expected_typ: str, + allowed_algs: frozenset[str] = ALLOWED_JWS_ALGS, +) -> dict[str, Any]: + """Parse a JWS (compact string or general-JSON dict) and verify in one call. + + Dispatches to the right parser based on the input shape, then calls + :func:`verify_detached_jws`. Returns the verified payload dict. + """ + if isinstance(doc, str): + b64_header, b64_payload, signature = parse_compact_jws(doc) + elif isinstance(doc, dict): + b64_header, b64_payload, signature = parse_general_json_jws(doc) + else: + raise JwsMalformedError( + "JWS document must be a compact string or JSON general-serialization object" + ) + return verify_detached_jws( + b64_protected=b64_header, + b64_payload=b64_payload, + signature=signature, + jwks_resolver=jwks_resolver, + expected_typ=expected_typ, + allowed_algs=allowed_algs, + ) + + +__all__ = [ + "ALLOWED_JWS_ALGS", + "JWS_ALG_TO_INTERNAL", + "JwksResolver", + "JwsError", + "JwsMalformedError", + "JwsSignatureInvalidError", + "JwsUnknownKeyError", + "parse_compact_jws", + "parse_general_json_jws", + "verify_detached_jws", + "verify_jws_document", +] diff --git a/src/adcp/signing/revocation_fetcher.py b/src/adcp/signing/revocation_fetcher.py new file mode 100644 index 000000000..a5e915699 --- /dev/null +++ b/src/adcp/signing/revocation_fetcher.py @@ -0,0 +1,693 @@ +"""Live revocation-list fetcher + caching checker. + +The AdCP governance profile publishes a signed revocation list at +``{issuer-origin}/.well-known/governance-revocations.json``. Verifiers +poll the list, verify its JWS, and reject tokens signed under a revoked +``kid`` (or with a revoked ``jti``). This module ships: + +* :class:`RevocationListFetcher` — Protocol callers can implement for + alternate transports (Redis-backed mirror, local fixture, etc). +* :func:`default_revocation_list_fetcher` — SSRF-validated HTTPS fetch + with ``If-None-Match`` support. +* :class:`CachingRevocationChecker` — implements the existing + :class:`adcp.signing.revocation.RevocationChecker` Protocol. Handles + first fetch, refetch near ``next_update``, 304s, and the spec's + fail-closed rule past ``next_update + grace``. + +The verifier plugs it into :class:`VerifyOptions.revocation_checker`. +""" + +from __future__ import annotations + +import json +import logging +import re +import time +from collections.abc import Callable +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any, Protocol + +import httpx + +from adcp.signing.jwks import ( + DEFAULT_JWKS_TIMEOUT_SECONDS, + JwksResolver, + validate_jwks_uri, +) +from adcp.signing.jws import ( + JwsError, + verify_jws_document, +) +from adcp.signing.revocation import RevocationList + +logger = logging.getLogger(__name__) + +REVOCATION_LIST_TYP = "adcp-gov-revocation+jws" + +# Spec-declared polling bounds for execution-phase traffic. +MIN_POLLING_INTERVAL_SECONDS = 60 # spec floor +MAX_POLLING_INTERVAL_SECONDS = 15 * 60 # spec ceiling for execution phase + +# Recommended grace multiplier: if a seller hasn't successfully refreshed +# within `next_update + grace * last_interval`, all subsequent is_revoked +# calls fail closed. Spec recommends 2×. +DEFAULT_GRACE_MULTIPLIER = 2.0 + +# Shape-validate `Last-Modified` before persisting it for the next +# `If-Modified-Since` request. The value comes from a (potentially +# untrusted) origin; echoing it verbatim into an outbound request header +# is header-injection-adjacent. We only accept an RFC 7231 HTTP-date +# shape, length-capped. +_HTTP_DATE_MAX_LEN = 64 +# RFC 7231 §7.1.1.1 defines three formats; IMF-fixdate is the normative +# one. We accept all three shapes loosely via regex on the visible +# characters — no attempt to parse as a real date, since the server +# hands it back to us opaquely. +_HTTP_DATE_PATTERN = re.compile(r"^[A-Za-z0-9 ,:+\-]+$") + + +class RevocationListFetchError(Exception): + """The fetcher cannot retrieve the list (network / HTTP / SSRF). + + Mapped to ``request_signature_revocation_stale`` at the verifier + edge when it occurs past the grace window; within the grace window + the cached list is served and the error is logged at debug level. + """ + + +class RevocationListParseError(Exception): + """The fetched document is malformed, fails schema validation, or + violates the spec's polling-cadence floor. + + Mapped to ``request_signature_revocation_stale`` at the verifier + edge. Parse errors within the grace window fall back to the cached + list; past the grace window they surface as + :class:`RevocationListFreshnessError`. + + :class:`RevocationListSignatureError` is a subclass of this class — + callers that catch this exception catch signature failures too. + """ + + +class RevocationListSignatureError(RevocationListParseError): + """The JWS signature does not verify against the configured JWKS. + + Subclass of :class:`RevocationListParseError` so callers that catch + parse errors catch this too. Mapped to + ``request_signature_revocation_stale`` at the verifier edge. + + Not re-exported from :mod:`adcp.signing` — import from + :mod:`adcp.signing.revocation_fetcher` if you specifically want to + distinguish signature failures from other parse failures. + """ + + +class RevocationListFreshnessError(Exception): + """The cached list is past ``next_update + grace`` and refetch failed. + + Mapped to ``request_signature_revocation_stale`` at the verifier + edge. The spec requires fail-closed behavior here — serving a stale + list lets an attacker DoS the revocation endpoint to extend a + compromised key's fraud window indefinitely. + """ + + +@dataclass(frozen=True) +class FetchResult: + """Output of a successful revocation-list fetch. + + ``body`` is the raw JWS document — compact string or general-JSON + dict. ``etag`` / ``last_modified`` capture server-side freshness + hints for the next conditional request. ``not_modified=True`` + indicates a 304 response; in that case ``body`` is unused and the + caller continues serving the cached list. + """ + + body: str | dict[str, Any] + etag: str | None + last_modified: str | None = None + not_modified: bool = False + + +class RevocationListFetcher(Protocol): + """Fetch a revocation-list JWS document. + + Implementations return a ``FetchResult`` on success. On 304 + (conditional request), ``not_modified=True`` and the caller + continues serving the cached list. Transport-level concerns + (private-network support, TLS settings, etc.) are the fetcher's + construction-time config — not arguments the checker threads through + on every call. + """ + + def __call__( + self, + uri: str, + *, + if_none_match: str | None = None, + if_modified_since: str | None = None, + ) -> FetchResult: ... + + +def default_revocation_list_fetcher( + uri: str, + *, + if_none_match: str | None = None, + if_modified_since: str | None = None, + allow_private: bool = False, + timeout: float = DEFAULT_JWKS_TIMEOUT_SECONDS, +) -> FetchResult: + """HTTPS GET the revocation list, honoring SSRF rules and conditional requests. + + Reuses ``validate_jwks_uri`` — the SSRF controls are identical (same + reserved-range rejection, same cloud-metadata block). ``httpx`` + re-resolves the hostname on connect, which is the TOCTOU window + tracked separately in #190. Sends ``If-None-Match`` when an ETag is + supplied and ``If-Modified-Since`` when a ``Last-Modified`` is + supplied; the spec accepts either (sellers SHOULD use both when + available). + """ + validate_jwks_uri(uri, allow_private=allow_private) + headers = {"Accept": "application/jose+json, application/json, application/jose"} + if if_none_match is not None: + headers["If-None-Match"] = if_none_match + if if_modified_since is not None: + headers["If-Modified-Since"] = if_modified_since + + try: + with httpx.Client(timeout=timeout, follow_redirects=False) as client: + response = client.get(uri, headers=headers) + except httpx.HTTPError as exc: + raise RevocationListFetchError(f"revocation list GET {uri!r} failed: {exc}") from exc + + if response.status_code == 304: + return FetchResult( + body="", + etag=if_none_match, + last_modified=if_modified_since, + not_modified=True, + ) + if response.status_code != 200: + raise RevocationListFetchError( + f"revocation list {uri!r} returned HTTP {response.status_code}" + ) + + etag = response.headers.get("ETag") + last_modified = _sanitize_last_modified(response.headers.get("Last-Modified")) + raw_body = response.text.strip() + + # General JSON serialization starts with `{`; compact form is three + # base64url segments separated by dots. Dispatch by first-byte shape + # rather than trusting Content-Type, which is unreliable in practice. + if not raw_body: + raise RevocationListFetchError(f"revocation list {uri!r} returned empty body") + + body: str | dict[str, Any] + if raw_body.startswith("{"): + try: + body = json.loads(raw_body) + except ValueError as exc: + raise RevocationListFetchError( + f"revocation list {uri!r} body is neither compact JWS nor valid JSON: {exc}" + ) from exc + else: + body = raw_body + + return FetchResult( + body=body, + etag=etag, + last_modified=last_modified, + not_modified=False, + ) + + +def _sanitize_last_modified(raw: str | None) -> str | None: + """Validate a ``Last-Modified`` header value before persisting it. + + Returns the value if it matches RFC 7231 HTTP-date shape (letters, + digits, space, comma, colon, plus, hyphen) and fits in + :data:`_HTTP_DATE_MAX_LEN` bytes. Returns ``None`` otherwise — the + caller then skips conditional-request fallback and relies on ETag + alone. An attacker-controlled origin cannot inject CRLF or arbitrary + bytes into our next outbound request header this way. + """ + if raw is None: + return None + if len(raw) > _HTTP_DATE_MAX_LEN: + return None + if not _HTTP_DATE_PATTERN.match(raw): + return None + return raw + + +def _parse_iso8601(ts: str) -> datetime: + """Accept ``2026-04-18T14:00:00Z`` (the spec format) and other ISO-8601 shapes.""" + raw = ts + if raw.endswith("Z"): + raw = raw[:-1] + "+00:00" + parsed = datetime.fromisoformat(raw) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed + + +def _normalize_issuer(issuer: str) -> str: + """Normalize an origin string for byte-equal comparison. + + RFC 6454 origin: ``scheme://host[:port]`` — scheme is case-insensitive, + host is case-insensitive (and IDNA-canonicalized), trailing slash is + not part of the origin. Rejects userinfo (``user:pass@host``) because + there's no legitimate reason for a governance issuer to include it + and accepting it opens impersonation via ``attacker@gov.example.com``. + + Rejects any host that can't be encoded as ASCII via IDNA — blocks + homoglyph attacks like ``https://goᴠ.example.com`` (U+1D20) that + lowercase-fold to something visually but not byte-wise equal to the + configured origin. + """ + from urllib.parse import urlsplit, urlunsplit + + parts = urlsplit(issuer) + if parts.scheme not in ("http", "https"): + raise ValueError(f"issuer must be http or https, got {parts.scheme!r}") + if parts.username or parts.password: + raise ValueError("issuer must not contain userinfo (user:pass@host)") + if not parts.hostname: + raise ValueError(f"issuer has no host: {issuer!r}") + + # IDNA-encode the host to collapse unicode homoglyphs to ASCII + # punycode. ``host.encode("idna")`` raises on characters outside the + # IDNA allowlist — which is the failure mode we want. + try: + host_ascii = parts.hostname.encode("idna").decode("ascii").lower() + except (UnicodeError, UnicodeEncodeError) as exc: + raise ValueError(f"issuer host {parts.hostname!r} is not IDNA-valid: {exc}") from exc + + netloc = f"{host_ascii}:{parts.port}" if parts.port else host_ascii + scheme = parts.scheme.lower() + return urlunsplit((scheme, netloc, "", "", "")) + + +def _build_list_from_payload(payload: dict[str, Any], expected_issuer: str) -> RevocationList: + """Validate the JWS payload schema and assemble a ``RevocationList``. + + Raises :class:`RevocationListParseError` on any schema violation, + including out-of-bounds declared cadence (spec §Revocation says the + floor is 60 s for execution-phase lists). + """ + # Version: accept any positive integer. Unknown versions produce a + # warning-style log via the base class but do not reject, because + # hard-rejecting a future additive schema change would force every + # verifier running an older SDK into fail-closed across ALL traffic + # the moment an issuer rolls forward. + version = payload.get("version") + if not isinstance(version, int) or version < 1: + raise RevocationListParseError( + f"revocation list version {version!r} must be a positive integer" + ) + + issuer = payload.get("issuer") + if not isinstance(issuer, str): + raise RevocationListParseError("revocation list missing string field 'issuer'") + try: + normalized_issuer = _normalize_issuer(issuer) + except ValueError as exc: + raise RevocationListParseError( + f"revocation list issuer {issuer!r} is not a valid origin: {exc}" + ) from exc + if normalized_issuer != expected_issuer: + raise RevocationListParseError( + f"revocation list issuer {issuer!r} does not match expected {expected_issuer!r} " + f"(normalized comparison)" + ) + + for key in ("updated", "next_update"): + if not isinstance(payload.get(key), str): + raise RevocationListParseError(f"revocation list missing string field {key!r}") + + # Enforce the spec's polling-cadence floor on the DECLARED cadence. + # An issuer that declared next_update - updated < 60s is violating + # the spec; we reject rather than silently honoring it, since the + # checker's cooldown would otherwise force a silent downgrade to + # the 60s floor anyway. + updated_dt = _parse_iso8601(payload["updated"]) + next_update_dt = _parse_iso8601(payload["next_update"]) + declared_interval = (next_update_dt - updated_dt).total_seconds() + if declared_interval < MIN_POLLING_INTERVAL_SECONDS: + raise RevocationListParseError( + f"revocation list declared cadence ({declared_interval:.0f}s) is below " + f"spec floor ({MIN_POLLING_INTERVAL_SECONDS}s)" + ) + + return RevocationList.from_dict(payload) + + +class CachingRevocationChecker: + """Live revocation checker with caching, refetch, grace, and fail-closed. + + Implements the ``RevocationChecker`` Protocol — callable as + ``checker(keyid) -> bool``. Fetches and verifies the list on first + call, refetches when ``now >= next_update``, and raises + :class:`RevocationListFreshnessError` (which the verifier maps to + ``request_signature_revocation_stale``) once the cached list is past + ``next_update + grace * last_interval``. + + The checker does not fetch at construction time — the first call is + the trigger. This keeps ``__init__`` side-effect-free and defers + network I/O to the first verification. Call :meth:`prime` at startup + to fail-fast on misconfiguration. + + Thread safety + ------------- + This class is NOT thread-safe. Concurrent ``__call__`` s that each + trigger a refresh can race the ``self._current_list`` assignment — + the later writer wins, which for our replay-rejection check means + only the most recent fetch's list ends up cached (still consistent, + but two HTTP round-trips happen instead of one). Wrap instances in + an external lock if you run the verifier from a thread pool. An + ``asyncio``-driven verifier (single-threaded event loop) is fine + unmodified. + + Parameters + ---------- + revocation_uri: + Full URL, e.g. ``https://gov.example.com/.well-known/governance-revocations.json``. + issuer: + Expected ``iss`` origin (``https://gov.example.com``). Set this + explicitly; do not infer from ``revocation_uri`` because the + .well-known location could theoretically be on a sibling host. + jwks_resolver: + Resolves the ``kid`` on the revocation-list JWS header to a JWK. + Same Protocol as the request-signing JWKS resolver. Typically a + :class:`adcp.signing.CachingJwksResolver` pointed at the + governance agent's JWKS. + fetcher: + Override for the HTTP fetcher (primarily for tests and alternate + transports). Defaults to :func:`default_revocation_list_fetcher`. + Transport-level config (private-network allowance, TLS settings, + alternate timeout) lives on the fetcher — pass a pre-configured + one via ``functools.partial`` if you need to customize. + grace_multiplier: + Grace window beyond ``next_update``, measured in multiples of + the list's declared polling interval + (``next_update - updated``). Defaults to 2× per spec recommendation. + clock: + Monotonic-time source; overridable for tests. Returns seconds. + wall_clock: + Wall-clock source returning a ``datetime`` in UTC. Used to + evaluate ``next_update`` and ``updated`` against the current + moment. Separate from ``clock`` because polling is measured as + a duration but freshness is measured against absolute times. + """ + + def __init__( + self, + *, + revocation_uri: str, + issuer: str, + jwks_resolver: JwksResolver, + fetcher: RevocationListFetcher | None = None, + grace_multiplier: float = DEFAULT_GRACE_MULTIPLIER, + clock: Callable[[], float] = time.monotonic, + wall_clock: Callable[[], datetime] = lambda: datetime.now(timezone.utc), + ) -> None: + # Reject the common footgun of passing time.time (wall-clock + # seconds) as the `clock` kwarg — `_ensure_fresh` assumes + # monotonicity for cooldown math. Requiring `time.monotonic` + # (or a dedicated monotonic test fake) keeps cooldowns correct + # across wall-clock jumps. + if clock is time.time: + raise ValueError( + "clock must be a monotonic time source (use time.monotonic, " + "not time.time); wall-clock jumps would break cooldown math" + ) + # Cast via Any so mypy doesn't complain about the differing + # declared return types (float vs datetime) — we're asking + # whether the caller bound the SAME callable to both slots. + if clock is wall_clock: # type: ignore[comparison-overlap] + raise ValueError( + "clock and wall_clock must be different sources — clock is " + "monotonic seconds (cooldown timing), wall_clock is a UTC " + "datetime source (freshness evaluation)" + ) + + self._revocation_uri = revocation_uri + self._issuer = _normalize_issuer(issuer) + self._jwks_resolver = jwks_resolver + self._fetcher = fetcher or default_revocation_list_fetcher + self._grace_multiplier = grace_multiplier + self._clock = clock + self._wall_clock = wall_clock + + self._current_list: RevocationList | None = None + self._current_etag: str | None = None + self._current_last_modified: str | None = None + self._last_successful_refresh: float | None = None + self._last_polling_interval_seconds: float | None = None + # Cooldown state: when a refresh attempt fails, we don't retry + # until at least MIN_POLLING_INTERVAL_SECONDS of monotonic time + # have elapsed. Stops a high-traffic verifier from hammering a + # dead revocation endpoint. + self._last_refresh_attempt: float | None = None + + @classmethod + def from_issuer_origin( + cls, + origin: str, + *, + jwks_resolver: JwksResolver, + fetcher: RevocationListFetcher | None = None, + grace_multiplier: float = DEFAULT_GRACE_MULTIPLIER, + clock: Callable[[], float] = time.monotonic, + wall_clock: Callable[[], datetime] = lambda: datetime.now(timezone.utc), + ) -> CachingRevocationChecker: + """Build a checker from the issuer origin alone. + + The AdCP spec pins the revocation list at + ``{origin}/.well-known/governance-revocations.json``. This + classmethod fills that path in so callers supply one origin + instead of three coordinated strings. + + >>> checker = CachingRevocationChecker.from_issuer_origin( + ... "https://gov.example.com", + ... jwks_resolver=my_jwks, + ... ) + """ + normalized = _normalize_issuer(origin) + revocation_uri = f"{normalized}/.well-known/governance-revocations.json" + return cls( + revocation_uri=revocation_uri, + issuer=normalized, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + grace_multiplier=grace_multiplier, + clock=clock, + wall_clock=wall_clock, + ) + + def prime(self) -> None: + """Fetch and verify the revocation list synchronously. + + Call this at application startup to fail-fast on configuration + problems (wrong issuer, JWKS unreachable, operator down) rather + than surfacing them at the first user verification. If priming + fails, the exception propagates unchanged — let the startup + handler decide whether to abort the process or retry with + backoff. + + This is optional: the checker works perfectly without priming, + lazily fetching on the first :meth:`__call__`. + """ + self._ensure_fresh() + + def is_jti_revoked(self, jti: str) -> bool: + """Return True iff ``jti`` is in the cached list's ``revoked_jtis``. + + Governance-token verifiers (AdCP security.mdx §Seller + verification checklist step 14) check both ``kid`` and ``jti``. + The plain :meth:`__call__` covers ``kid`` because that's the + request-signing path; governance callers use this method for + per-token revocation. + + Triggers a refresh if the cached list is past ``next_update`` + (same lifecycle as :meth:`__call__`). + """ + self._ensure_fresh() + if self._current_list is None: + raise RevocationListFreshnessError("revocation list not available") + return jti in self._current_list.revoked_jtis + + def __call__(self, keyid: str) -> bool: + """Return True iff ``keyid`` is in the cached list's ``revoked_kids``. + + This is the RFC 9421 request-signing path (verifier checklist + step 9). For governance-token verification (seller-verification + checklist step 14) check both the signing ``kid`` here AND the + token's ``jti`` via :meth:`is_jti_revoked`. + """ + self._ensure_fresh() + if self._current_list is None: + # Unreachable in practice — _ensure_fresh raises on unsuccessful + # first load. Guarded for mypy. + raise RevocationListFreshnessError("revocation list not available") + return self._current_list.is_revoked(keyid) + + def _ensure_fresh(self) -> None: + now_wall = self._wall_clock() + now_mono = self._clock() + + if self._current_list is None: + self._refresh(conditional=False, now_wall=now_wall, now_mono=now_mono) + return + + next_update = _parse_iso8601(self._current_list.next_update) + if now_wall < next_update: + # Cached list is still within its declared freshness window. + return + + # Past next_update: try a refresh, but only if we haven't recently + # attempted one. Without this cooldown a high-QPS verifier would + # hammer a dead endpoint on every verification. + since_last_attempt = ( + now_mono - self._last_refresh_attempt + if self._last_refresh_attempt is not None + else float("inf") + ) + if since_last_attempt >= MIN_POLLING_INTERVAL_SECONDS: + try: + self._refresh(conditional=True, now_wall=now_wall, now_mono=now_mono) + return + except (RevocationListFetchError, RevocationListParseError) as exc: + # Fall through to the grace-window check below. + last_exc: Exception = exc + else: + last_exc = RevocationListFetchError( + f"refresh cooldown not elapsed ({since_last_attempt:.0f}s < " + f"{MIN_POLLING_INTERVAL_SECONDS}s)" + ) + + grace_seconds = self._grace_seconds() + if now_wall.timestamp() >= next_update.timestamp() + grace_seconds: + raise RevocationListFreshnessError( + f"revocation list {self._revocation_uri!r} past next_update " + f"({self._current_list.next_update}) + grace ({grace_seconds:.0f}s); " + f"last refresh error: {last_exc}" + ) from last_exc + # Still within grace — serve the cached list. + + def _refresh(self, *, conditional: bool, now_wall: datetime, now_mono: float) -> None: + self._last_refresh_attempt = now_mono + if_none_match = self._current_etag if conditional else None + if_modified_since = self._current_last_modified if conditional else None + result = self._fetcher( + self._revocation_uri, + if_none_match=if_none_match, + if_modified_since=if_modified_since, + ) + if result.not_modified: + # 304: server confirms the cached list is still current. Advance + # the cached list's `next_update` by the declared polling + # interval so subsequent calls don't re-enter the past- + # next_update branch on every request — without this, the 60s + # cooldown gate would fire once per verification request once + # we cross the original next_update. + self._last_successful_refresh = now_mono + if self._current_list is not None and self._last_polling_interval_seconds: + prior = _parse_iso8601(self._current_list.next_update) + new_next_update = prior + timedelta( + seconds=self._last_polling_interval_seconds + ) + self._current_list = RevocationList( + issuer=self._current_list.issuer, + updated=self._current_list.updated, + next_update=new_next_update.isoformat().replace("+00:00", "Z"), + revoked_kids=self._current_list.revoked_kids, + revoked_jtis=self._current_list.revoked_jtis, + ) + return + + try: + payload = verify_jws_document( + result.body, + jwks_resolver=self._jwks_resolver, + expected_typ=REVOCATION_LIST_TYP, + ) + except JwsError as exc: + raise RevocationListSignatureError( + f"revocation list JWS verification failed: {exc}" + ) from exc + + revocation_list = _build_list_from_payload(payload, expected_issuer=self._issuer) + + updated = _parse_iso8601(revocation_list.updated) + next_update = _parse_iso8601(revocation_list.next_update) + + # Verify `updated` is not in the future beyond clock skew. An issuer + # whose clock is far ahead would otherwise force an immediate stale + # rejection. + if updated > now_wall.replace(microsecond=0): + delta = (updated - now_wall).total_seconds() + if delta > 60: # 60s clock skew tolerance, mirrors JWS exp/iat rules + raise RevocationListParseError( + f"revocation list updated={revocation_list.updated!r} is " + f"{delta:.0f}s in the future" + ) + if next_update <= updated: + raise RevocationListParseError( + f"revocation list next_update {revocation_list.next_update!r} is not " + f"after updated {revocation_list.updated!r}" + ) + + # Reject a freshly-fetched list whose `updated` is older than the + # one we already have cached. Defense-in-depth against: + # - CDN replaying a stale list after a kid has been revoked, + # - operator-key compromise where an attacker serves an older list + # with revocations removed. + # The spec doesn't permit un-revocation, so `updated` MUST be + # monotonically non-decreasing across refreshes. + if self._current_list is not None: + current_updated = _parse_iso8601(self._current_list.updated) + if updated < current_updated: + raise RevocationListParseError( + f"revocation list updated={revocation_list.updated!r} is older " + f"than cached list updated={self._current_list.updated!r} — " + f"refusing to roll back" + ) + + self._current_list = revocation_list + self._current_etag = result.etag + # Sanitize again at write-side: custom fetcher impls may not have + # validated the value before constructing FetchResult. + self._current_last_modified = _sanitize_last_modified(result.last_modified) + self._last_successful_refresh = now_mono + # Declared cadence is already validated >= 60s and bounded above + # at parse time; clamp to the spec ceiling as defense against an + # issuer declaring an out-of-bounds ceiling value. + self._last_polling_interval_seconds = min( + MAX_POLLING_INTERVAL_SECONDS, + (next_update - updated).total_seconds(), + ) + + def _grace_seconds(self) -> float: + interval = self._last_polling_interval_seconds or MAX_POLLING_INTERVAL_SECONDS + return interval * self._grace_multiplier + + +__all__ = [ + "CachingRevocationChecker", + "DEFAULT_GRACE_MULTIPLIER", + "FetchResult", + "REVOCATION_LIST_TYP", + "RevocationListFetchError", + "RevocationListFetcher", + "RevocationListFreshnessError", + "RevocationListParseError", + "default_revocation_list_fetcher", +] + +# ``RevocationListSignatureError`` is intentionally not in __all__ and +# not re-exported from ``adcp.signing``. It remains a subclass of +# ``RevocationListParseError`` so callers who want to catch it can +# import it from this module directly; catching the parent covers the +# common case. diff --git a/src/adcp/signing/verifier.py b/src/adcp/signing/verifier.py index ec5601b3e..32096688f 100644 --- a/src/adcp/signing/verifier.py +++ b/src/adcp/signing/verifier.py @@ -11,7 +11,7 @@ from collections.abc import Mapping from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Any, Literal, Protocol +from typing import Any, Literal from adcp.signing.canonical import ( _lookup, @@ -53,6 +53,7 @@ REQUEST_SIGNATURE_WINDOW_INVALID, SignatureVerificationError, ) +from adcp.signing.jwks import JwksResolver from adcp.signing.replay import ReplayStore from adcp.signing.revocation import RevocationChecker, RevocationList @@ -68,12 +69,6 @@ _MAX_PARAM_LEN = 256 -class JwksResolver(Protocol): - """Resolves a keyid to a JWK (or None if unknown).""" - - def __call__(self, keyid: str) -> dict[str, Any] | None: ... - - @dataclass(frozen=True) class VerifiedSigner: """Returned on successful verification. The key_id is the signer's identity.""" diff --git a/tests/conformance/signing/test_jws.py b/tests/conformance/signing/test_jws.py new file mode 100644 index 000000000..087ebe9c7 --- /dev/null +++ b/tests/conformance/signing/test_jws.py @@ -0,0 +1,347 @@ +"""Unit tests for the minimal JWS parse/verify primitive. + +These exercise the compact and general-JSON forms against a freshly-generated +Ed25519 / ES256 key, round-tripping through the existing crypto primitives. +Negative tests cover the invariants the AdCP governance profile relies on: +``alg=none`` rejection, ``typ`` mismatch, unknown ``kid``, tampered payload, +and malformed shapes. +""" + +from __future__ import annotations + +import json + +import pytest +from cryptography.hazmat.primitives.asymmetric import ec, ed25519 + +from adcp.signing.crypto import ( + ALG_ED25519, + ALG_ES256, + b64url_encode, + sign_signature_base, +) +from adcp.signing.jws import ( + JwsMalformedError, + JwsSignatureInvalidError, + JwsUnknownKeyError, + parse_compact_jws, + parse_general_json_jws, + verify_jws_document, +) + +EXPECTED_TYP = "adcp-gov-revocation+jws" +PAYLOAD_JSON = { + "version": 1, + "issuer": "https://gov.example.com", + "updated": "2026-04-18T14:00:00Z", + "next_update": "2026-04-18T14:15:00Z", + "revoked_jtis": [], + "revoked_kids": ["gov-2026-03"], +} + + +# -- helpers ------------------------------------------------------------ + + +def _ed25519_jwk_and_key() -> tuple[dict[str, object], ed25519.Ed25519PrivateKey]: + private_key = ed25519.Ed25519PrivateKey.generate() + public_bytes = private_key.public_key().public_bytes_raw() + jwk = { + "kty": "OKP", + "crv": "Ed25519", + "alg": "EdDSA", + "use": "sig", + "key_ops": ["verify"], + "kid": "gov-key-1", + "x": b64url_encode(public_bytes), + } + return jwk, private_key + + +def _es256_jwk_and_key() -> tuple[dict[str, object], ec.EllipticCurvePrivateKey]: + private_key = ec.generate_private_key(ec.SECP256R1()) + numbers = private_key.public_key().public_numbers() + jwk = { + "kty": "EC", + "crv": "P-256", + "alg": "ES256", + "use": "sig", + "key_ops": ["verify"], + "kid": "gov-key-ec", + "x": b64url_encode(numbers.x.to_bytes(32, "big")), + "y": b64url_encode(numbers.y.to_bytes(32, "big")), + } + return jwk, private_key + + +def _sign_compact( + header: dict[str, object], + payload: dict[str, object] | bytes, + *, + jws_alg: str, + private_key: object, +) -> str: + b64_header = b64url_encode(json.dumps(header, separators=(",", ":")).encode("utf-8")) + if isinstance(payload, dict): + payload_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8") + else: + payload_bytes = payload + b64_payload = b64url_encode(payload_bytes) + signing_input = (b64_header + "." + b64_payload).encode("ascii") + internal_alg = ALG_ED25519 if jws_alg == "EdDSA" else ALG_ES256 + signature = sign_signature_base( + alg=internal_alg, + private_key=private_key, # type: ignore[arg-type] + signature_base=signing_input, + ) + return b64_header + "." + b64_payload + "." + b64url_encode(signature) + + +def _compact_to_general_json(token: str) -> dict[str, object]: + b64_header, b64_payload, b64_signature = token.split(".") + return { + "payload": b64_payload, + "signatures": [ + {"protected": b64_header, "signature": b64_signature}, + ], + } + + +def _resolver_for(jwk: dict[str, object]) -> object: + def resolve(keyid: str) -> dict[str, object] | None: + return jwk if keyid == jwk["kid"] else None + + return resolve + + +# -- parse_compact_jws -------------------------------------------------- + + +def test_parse_compact_jws_splits_segments() -> None: + # Build a fake compact-shaped string inline so the test literal isn't + # a plausible-looking JWT (high-entropy-secret scanners flag otherwise). + header_b64 = b64url_encode(b"FAKE-HEADER") + payload_b64 = b64url_encode(b"FAKE-PAYLOAD") + sig_b64 = b64url_encode(b"FAKE-SIG") + token = f"{header_b64}.{payload_b64}.{sig_b64}" + + parsed_header, parsed_payload, parsed_sig = parse_compact_jws(token) + # Parser returns the original base64url substrings verbatim (no decode + + # re-encode); only the signature is decoded to bytes. + assert parsed_header == header_b64 + assert parsed_payload == payload_b64 + assert parsed_sig == b"FAKE-SIG" + + +def test_parse_compact_jws_rejects_wrong_segment_count() -> None: + with pytest.raises(JwsMalformedError, match="3 dot-separated"): + parse_compact_jws("only.two") + with pytest.raises(JwsMalformedError, match="3 dot-separated"): + parse_compact_jws("one.two.three.four") + + +def test_parse_compact_jws_rejects_empty_segments() -> None: + with pytest.raises(JwsMalformedError, match="empty segment"): + parse_compact_jws(".p.s") + with pytest.raises(JwsMalformedError, match="empty segment"): + parse_compact_jws("h..s") + + +def test_parse_compact_jws_rejects_non_string() -> None: + with pytest.raises(JwsMalformedError): + parse_compact_jws({"not": "a string"}) # type: ignore[arg-type] + + +# -- parse_general_json_jws --------------------------------------------- + + +def test_parse_general_json_rejects_missing_fields() -> None: + with pytest.raises(JwsMalformedError, match="must have"): + parse_general_json_jws({"payload": "p"}) + with pytest.raises(JwsMalformedError, match="must have"): + parse_general_json_jws({"signatures": []}) + + +def test_parse_general_json_rejects_multiple_signatures() -> None: + doc = { + "payload": "cA", + "signatures": [ + {"protected": "aA", "signature": "cw"}, + {"protected": "aA", "signature": "cw"}, + ], + } + with pytest.raises(JwsMalformedError, match="multiple entries"): + parse_general_json_jws(doc) + + +def test_parse_general_json_rejects_empty_signatures() -> None: + with pytest.raises(JwsMalformedError, match="non-empty array"): + parse_general_json_jws({"payload": "cA", "signatures": []}) + + +# -- verify_detached_jws (round-trip) ----------------------------------- + + +@pytest.mark.parametrize( + ("jws_alg", "factory"), + [ + ("EdDSA", _ed25519_jwk_and_key), + ("ES256", _es256_jwk_and_key), + ], +) +def test_compact_jws_round_trip_verifies(jws_alg: str, factory) -> None: + jwk, key = factory() + header = {"alg": jws_alg, "kid": jwk["kid"], "typ": EXPECTED_TYP} + token = _sign_compact(header, PAYLOAD_JSON, jws_alg=jws_alg, private_key=key) + + verified = verify_jws_document( + token, + jwks_resolver=_resolver_for(jwk), + expected_typ=EXPECTED_TYP, + ) + assert verified == PAYLOAD_JSON + + +def test_general_json_jws_round_trip_verifies() -> None: + jwk, key = _ed25519_jwk_and_key() + header = {"alg": "EdDSA", "kid": jwk["kid"], "typ": EXPECTED_TYP} + token = _sign_compact(header, PAYLOAD_JSON, jws_alg="EdDSA", private_key=key) + + verified = verify_jws_document( + _compact_to_general_json(token), + jwks_resolver=_resolver_for(jwk), + expected_typ=EXPECTED_TYP, + ) + assert verified == PAYLOAD_JSON + + +# -- verify_detached_jws: negative cases -------------------------------- + + +def test_reject_alg_none() -> None: + jwk, key = _ed25519_jwk_and_key() + header = {"alg": "none", "kid": jwk["kid"], "typ": EXPECTED_TYP} + token = _sign_compact(header, PAYLOAD_JSON, jws_alg="EdDSA", private_key=key) + + with pytest.raises(JwsMalformedError, match="alg 'none' not allowed"): + verify_jws_document(token, jwks_resolver=_resolver_for(jwk), expected_typ=EXPECTED_TYP) + + +def test_reject_unknown_alg() -> None: + jwk, key = _ed25519_jwk_and_key() + header = {"alg": "HS256", "kid": jwk["kid"], "typ": EXPECTED_TYP} + token = _sign_compact(header, PAYLOAD_JSON, jws_alg="EdDSA", private_key=key) + + with pytest.raises(JwsMalformedError, match="not allowed"): + verify_jws_document(token, jwks_resolver=_resolver_for(jwk), expected_typ=EXPECTED_TYP) + + +def test_reject_missing_alg() -> None: + jwk, key = _ed25519_jwk_and_key() + header = {"kid": jwk["kid"], "typ": EXPECTED_TYP} + token = _sign_compact(header, PAYLOAD_JSON, jws_alg="EdDSA", private_key=key) + + with pytest.raises(JwsMalformedError, match="alg"): + verify_jws_document(token, jwks_resolver=_resolver_for(jwk), expected_typ=EXPECTED_TYP) + + +def test_reject_wrong_typ() -> None: + jwk, key = _ed25519_jwk_and_key() + header = {"alg": "EdDSA", "kid": jwk["kid"], "typ": "some-other+jws"} + token = _sign_compact(header, PAYLOAD_JSON, jws_alg="EdDSA", private_key=key) + + with pytest.raises(JwsMalformedError, match="typ .* does not match expected"): + verify_jws_document(token, jwks_resolver=_resolver_for(jwk), expected_typ=EXPECTED_TYP) + + +def test_reject_missing_kid() -> None: + jwk, key = _ed25519_jwk_and_key() + header = {"alg": "EdDSA", "typ": EXPECTED_TYP} + token = _sign_compact(header, PAYLOAD_JSON, jws_alg="EdDSA", private_key=key) + + with pytest.raises(JwsMalformedError, match="kid"): + verify_jws_document(token, jwks_resolver=_resolver_for(jwk), expected_typ=EXPECTED_TYP) + + +def test_reject_unknown_kid() -> None: + jwk, key = _ed25519_jwk_and_key() + header = {"alg": "EdDSA", "kid": "different-key", "typ": EXPECTED_TYP} + token = _sign_compact(header, PAYLOAD_JSON, jws_alg="EdDSA", private_key=key) + + with pytest.raises(JwsUnknownKeyError): + verify_jws_document(token, jwks_resolver=_resolver_for(jwk), expected_typ=EXPECTED_TYP) + + +def test_reject_crit_with_entries() -> None: + jwk, key = _ed25519_jwk_and_key() + header = { + "alg": "EdDSA", + "kid": jwk["kid"], + "typ": EXPECTED_TYP, + "crit": ["some-ext"], + } + token = _sign_compact(header, PAYLOAD_JSON, jws_alg="EdDSA", private_key=key) + + with pytest.raises(JwsMalformedError, match="crit"): + verify_jws_document(token, jwks_resolver=_resolver_for(jwk), expected_typ=EXPECTED_TYP) + + +def test_reject_tampered_payload() -> None: + jwk, key = _ed25519_jwk_and_key() + header = {"alg": "EdDSA", "kid": jwk["kid"], "typ": EXPECTED_TYP} + token = _sign_compact(header, PAYLOAD_JSON, jws_alg="EdDSA", private_key=key) + + b64_header, _, b64_signature = token.split(".") + # Swap in a different payload while keeping the original signature. + tampered_payload = {**PAYLOAD_JSON, "revoked_kids": ["ATTACKER-CONTROLLED"]} + b64_payload = b64url_encode(json.dumps(tampered_payload, separators=(",", ":")).encode()) + tampered_token = f"{b64_header}.{b64_payload}.{b64_signature}" + + with pytest.raises(JwsSignatureInvalidError): + verify_jws_document( + tampered_token, jwks_resolver=_resolver_for(jwk), expected_typ=EXPECTED_TYP + ) + + +def test_reject_bad_signature_bytes() -> None: + jwk, key = _ed25519_jwk_and_key() + header = {"alg": "EdDSA", "kid": jwk["kid"], "typ": EXPECTED_TYP} + token = _sign_compact(header, PAYLOAD_JSON, jws_alg="EdDSA", private_key=key) + # Flip one byte of the signature. + b64_header, b64_payload, b64_signature = token.split(".") + from adcp.signing.crypto import b64url_decode + + sig_bytes = bytearray(b64url_decode(b64_signature)) + sig_bytes[0] ^= 0xFF + tampered = f"{b64_header}.{b64_payload}.{b64url_encode(bytes(sig_bytes))}" + + with pytest.raises(JwsSignatureInvalidError): + verify_jws_document(tampered, jwks_resolver=_resolver_for(jwk), expected_typ=EXPECTED_TYP) + + +def test_reject_non_dict_payload() -> None: + jwk, key = _ed25519_jwk_and_key() + header = {"alg": "EdDSA", "kid": jwk["kid"], "typ": EXPECTED_TYP} + # JSON array payload — spec requires an object. + token = _sign_compact( + header, b'[{"not":"an object"}]', jws_alg="EdDSA", private_key=key + ) + + with pytest.raises(JwsMalformedError, match="not a JSON object"): + verify_jws_document(token, jwks_resolver=_resolver_for(jwk), expected_typ=EXPECTED_TYP) + + +def test_reject_non_json_payload() -> None: + jwk, key = _ed25519_jwk_and_key() + header = {"alg": "EdDSA", "kid": jwk["kid"], "typ": EXPECTED_TYP} + token = _sign_compact(header, b"not valid json", jws_alg="EdDSA", private_key=key) + + with pytest.raises(JwsMalformedError, match="not valid JSON"): + verify_jws_document(token, jwks_resolver=_resolver_for(jwk), expected_typ=EXPECTED_TYP) + + +def test_verify_jws_document_rejects_non_string_non_dict() -> None: + with pytest.raises(JwsMalformedError, match="compact string or JSON"): + verify_jws_document( + 123, jwks_resolver=lambda _kid: None, expected_typ=EXPECTED_TYP # type: ignore[arg-type] + ) diff --git a/tests/conformance/signing/test_revocation_e2e.py b/tests/conformance/signing/test_revocation_e2e.py new file mode 100644 index 000000000..878bd7e48 --- /dev/null +++ b/tests/conformance/signing/test_revocation_e2e.py @@ -0,0 +1,433 @@ +"""End-to-end smoke tests for CachingRevocationChecker. + +Spins up a minimal Starlette app that serves a signed revocation-list JWS +at ``/.well-known/governance-revocations.json``. Wires the checker at it +through a custom fetcher that uses ``httpx.ASGITransport`` — no real +network, no monkey-patching. Proves the checker → HTTP → JWS verify → +decide pipeline works on real bytes the server-side verifier would emit, +and that the checker plugs into the existing +:class:`VerifyOptions.revocation_checker` kwarg. +""" + +from __future__ import annotations + +import json +import time +from datetime import datetime, timezone +from typing import Any + +import httpx +import pytest +from cryptography.hazmat.primitives.asymmetric import ed25519 +from starlette.applications import Starlette +from starlette.responses import PlainTextResponse, Response +from starlette.routing import Route + +from adcp.signing import ( + REVOCATION_LIST_TYP, + CachingRevocationChecker, + FetchResult, + SignatureVerificationError, + StaticJwksResolver, + VerifierCapability, + VerifyOptions, + default_revocation_list_fetcher, # noqa: F401 — imported for coverage of __init__ + sign_request, + verify_request_signature, +) +from adcp.signing.crypto import ALG_ED25519, b64url_encode, sign_signature_base + +ISSUER = "https://gov.example.com" +REVOCATION_URI = f"{ISSUER}/.well-known/governance-revocations.json" + + +# -- helpers ------------------------------------------------------------ + + +def _make_operator_key() -> tuple[ed25519.Ed25519PrivateKey, dict[str, Any]]: + """Generate the operator's key pair used to sign the revocation list.""" + private = ed25519.Ed25519PrivateKey.generate() + jwk = { + "kty": "OKP", + "crv": "Ed25519", + "alg": "EdDSA", + "use": "sig", + "key_ops": ["verify"], + "kid": "operator-2026", + "x": b64url_encode(private.public_key().public_bytes_raw()), + } + return private, jwk + + +def _make_signer_key() -> tuple[ed25519.Ed25519PrivateKey, dict[str, Any]]: + """Generate a request-signing key used to sign outgoing AdCP requests.""" + private = ed25519.Ed25519PrivateKey.generate() + jwk = { + "kty": "OKP", + "crv": "Ed25519", + "alg": "EdDSA", + "use": "sig", + "key_ops": ["verify"], + "adcp_use": "request-signing", + "kid": "buyer-key-1", + "x": b64url_encode(private.public_key().public_bytes_raw()), + } + return private, jwk + + +def _sign_revocation_list( + *, + operator_key: ed25519.Ed25519PrivateKey, + operator_kid: str, + payload: dict[str, Any], +) -> str: + header = {"alg": "EdDSA", "kid": operator_kid, "typ": REVOCATION_LIST_TYP} + b64_header = b64url_encode(json.dumps(header, separators=(",", ":")).encode()) + b64_payload = b64url_encode(json.dumps(payload, separators=(",", ":")).encode()) + signing_input = (b64_header + "." + b64_payload).encode("ascii") + signature = sign_signature_base( + alg=ALG_ED25519, private_key=operator_key, signature_base=signing_input + ) + return b64_header + "." + b64_payload + "." + b64url_encode(signature) + + +def _build_revocation_app(*, body: str, etag: str) -> Starlette: + """Starlette app that serves the list with ETag + If-None-Match support.""" + + async def handler(request: Any) -> Response: + if_none_match = request.headers.get("if-none-match") + if if_none_match == etag: + return Response(status_code=304, headers={"ETag": etag}) + return PlainTextResponse( + content=body, + media_type="application/jose", + headers={"ETag": etag}, + ) + + return Starlette( + routes=[ + Route("/.well-known/governance-revocations.json", handler, methods=["GET"]) + ] + ) + + +def _asgi_fetcher(app: Starlette) -> Any: + """Wrap the default fetcher pattern over httpx.ASGITransport. + + ``ASGITransport`` only supports ``httpx.AsyncClient``, but + :class:`RevocationListFetcher` is sync — so the fetcher bridges by + running the async HTTP call inside ``asyncio.run``. + """ + import asyncio + + transport = httpx.ASGITransport(app=app) + + def fetch( + uri: str, + *, + if_none_match: str | None = None, + if_modified_since: str | None = None, + ) -> FetchResult: + headers = {"Accept": "application/jose"} + if if_none_match is not None: + headers["If-None-Match"] = if_none_match + if if_modified_since is not None: + headers["If-Modified-Since"] = if_modified_since + + async def _do_fetch() -> httpx.Response: + async with httpx.AsyncClient(transport=transport, base_url=ISSUER) as client: + return await client.get( + "/.well-known/governance-revocations.json", headers=headers + ) + + response = asyncio.run(_do_fetch()) + + if response.status_code == 304: + return FetchResult( + body="", + etag=if_none_match, + last_modified=if_modified_since, + not_modified=True, + ) + response.raise_for_status() + return FetchResult( + body=response.text, + etag=response.headers.get("ETag"), + last_modified=response.headers.get("Last-Modified"), + not_modified=False, + ) + + return fetch + + +def _jwks_resolver_for(jwk: dict[str, Any]) -> Any: + def resolve(keyid: str) -> dict[str, Any] | None: + return jwk if keyid == jwk["kid"] else None + + return resolve + + +# -- tests -------------------------------------------------------------- + + +def test_checker_fetches_via_asgi_and_verifies_jws() -> None: + operator_priv, operator_jwk = _make_operator_key() + payload = { + "version": 1, + "issuer": ISSUER, + "updated": "2026-04-18T14:00:00Z", + "next_update": "2026-04-18T14:15:00Z", + "revoked_kids": ["compromised-buyer-key"], + "revoked_jtis": [], + } + compact_jws = _sign_revocation_list( + operator_key=operator_priv, + operator_kid=operator_jwk["kid"], + payload=payload, + ) + app = _build_revocation_app(body=compact_jws, etag='"rev-1"') + + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=_jwks_resolver_for(operator_jwk), + fetcher=_asgi_fetcher(app), + wall_clock=lambda: datetime(2026, 4, 18, 14, 5, tzinfo=timezone.utc), + ) + + assert checker("compromised-buyer-key") is True + assert checker("innocent-buyer-key") is False + + +def test_checker_honors_server_etag_via_asgi() -> None: + operator_priv, operator_jwk = _make_operator_key() + payload = { + "version": 1, + "issuer": ISSUER, + "updated": "2026-04-18T14:00:00Z", + "next_update": "2026-04-18T14:15:00Z", + "revoked_kids": [], + "revoked_jtis": [], + } + compact_jws = _sign_revocation_list( + operator_key=operator_priv, + operator_kid=operator_jwk["kid"], + payload=payload, + ) + app = _build_revocation_app(body=compact_jws, etag='"rev-42"') + + now = [datetime(2026, 4, 18, 14, 1, tzinfo=timezone.utc)] + mono = [0.0] + + def wall_clock() -> datetime: + return now[0] + + def mono_clock() -> float: + return mono[0] + + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=_jwks_resolver_for(operator_jwk), + fetcher=_asgi_fetcher(app), + wall_clock=wall_clock, + clock=mono_clock, + ) + + assert checker("any-key") is False # 1st fetch: 200 + # Jump past next_update so the checker issues a conditional refresh, + # and the server responds 304 because our ETag matches. + now[0] = datetime(2026, 4, 18, 14, 16, tzinfo=timezone.utc) + mono[0] = 900.0 # satisfy the 60s refresh cooldown + assert checker("any-key") is False # still cached via 304 path + + +def test_checker_plugs_into_verify_request_signature_pipeline() -> None: + """Full pipeline: sign a request, then verify using the live checker. + + The key signing the outgoing request is also listed in the live + revocation list → verifier must reject at step 9 (revocation check) + before crypto verify. + """ + operator_priv, operator_jwk = _make_operator_key() + buyer_priv, buyer_jwk = _make_signer_key() + + payload = { + "version": 1, + "issuer": ISSUER, + "updated": "2026-04-18T14:00:00Z", + "next_update": "2026-04-18T14:15:00Z", + "revoked_kids": [buyer_jwk["kid"]], + "revoked_jtis": [], + } + compact_jws = _sign_revocation_list( + operator_key=operator_priv, + operator_kid=operator_jwk["kid"], + payload=payload, + ) + app = _build_revocation_app(body=compact_jws, etag='"rev-revoked"') + + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=_jwks_resolver_for(operator_jwk), + fetcher=_asgi_fetcher(app), + wall_clock=lambda: datetime(2026, 4, 18, 14, 5, tzinfo=timezone.utc), + ) + + # Sign a request with the revoked buyer key. + body = b'{"plan_id":"p1"}' + url = "https://seller.example.com/adcp/create_media_buy" + signed = sign_request( + method="POST", + url=url, + headers={"Content-Type": "application/json"}, + body=body, + private_key=buyer_priv, + key_id=buyer_jwk["kid"], + alg=ALG_ED25519, + ) + request_headers = {"Content-Type": "application/json", **signed.as_dict()} + + # Verify. Checker fetches the live list, sees buyer_jwk["kid"] in + # revoked_kids, and the verifier raises request_signature_key_revoked. + options = VerifyOptions( + now=float(int(time.time())), + capability=VerifierCapability( + covers_content_digest="either", + required_for=frozenset({"create_media_buy"}), + ), + operation="create_media_buy", + jwks_resolver=StaticJwksResolver({"keys": [buyer_jwk]}), + revocation_checker=checker, + ) + with pytest.raises(SignatureVerificationError) as exc_info: + verify_request_signature( + method="POST", + url=url, + headers=request_headers, + body=body, + options=options, + ) + assert exc_info.value.code == "request_signature_key_revoked" + + +def test_checker_verifier_accepts_when_kid_not_in_revocation_list() -> None: + """Inverse of the above: clean buyer key → revocation check passes, crypto verifies.""" + operator_priv, operator_jwk = _make_operator_key() + buyer_priv, buyer_jwk = _make_signer_key() + + payload = { + "version": 1, + "issuer": ISSUER, + "updated": "2026-04-18T14:00:00Z", + "next_update": "2026-04-18T14:15:00Z", + "revoked_kids": ["some-other-key"], + "revoked_jtis": [], + } + compact_jws = _sign_revocation_list( + operator_key=operator_priv, + operator_kid=operator_jwk["kid"], + payload=payload, + ) + app = _build_revocation_app(body=compact_jws, etag='"rev-clean"') + + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=_jwks_resolver_for(operator_jwk), + fetcher=_asgi_fetcher(app), + wall_clock=lambda: datetime(2026, 4, 18, 14, 5, tzinfo=timezone.utc), + ) + + body = b'{"plan_id":"p1"}' + url = "https://seller.example.com/adcp/create_media_buy" + signed = sign_request( + method="POST", + url=url, + headers={"Content-Type": "application/json"}, + body=body, + private_key=buyer_priv, + key_id=buyer_jwk["kid"], + alg=ALG_ED25519, + ) + request_headers = {"Content-Type": "application/json", **signed.as_dict()} + + options = VerifyOptions( + now=float(int(time.time())), + capability=VerifierCapability( + covers_content_digest="either", + required_for=frozenset({"create_media_buy"}), + ), + operation="create_media_buy", + jwks_resolver=StaticJwksResolver({"keys": [buyer_jwk]}), + revocation_checker=checker, + ) + verified = verify_request_signature( + method="POST", + url=url, + headers=request_headers, + body=body, + options=options, + ) + assert verified.key_id == buyer_jwk["kid"] + + +def test_stale_list_past_grace_surfaces_revocation_stale() -> None: + """Once the cached list is past next_update + grace and refresh fails, + the checker raises RevocationListFreshnessError — and when wired into + the verifier, the caller sees it at step 9 of the pipeline.""" + from adcp.signing.revocation_fetcher import RevocationListFreshnessError + + operator_priv, operator_jwk = _make_operator_key() + payload = { + "version": 1, + "issuer": ISSUER, + "updated": "2026-04-18T14:00:00Z", + "next_update": "2026-04-18T14:15:00Z", + "revoked_kids": [], + "revoked_jtis": [], + } + compact_jws = _sign_revocation_list( + operator_key=operator_priv, + operator_kid=operator_jwk["kid"], + payload=payload, + ) + + # First fetch succeeds; subsequent fetches fail (simulate operator outage). + call_count = [0] + + def failing_fetcher( + uri: str, + *, + if_none_match: str | None = None, + if_modified_since: str | None = None, + ) -> FetchResult: + call_count[0] += 1 + if call_count[0] == 1: + return FetchResult(body=compact_jws, etag='"initial"', not_modified=False) + from adcp.signing import RevocationListFetchError + + raise RevocationListFetchError("operator unreachable") + + now = [datetime(2026, 4, 18, 14, 1, tzinfo=timezone.utc)] + mono = [0.0] + + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=_jwks_resolver_for(operator_jwk), + fetcher=failing_fetcher, + wall_clock=lambda: now[0], + clock=lambda: mono[0], + ) + + # First call: initial fetch succeeds. + assert checker("any") is False + + # Past next_update + grace (interval 15min × 2 = 30min grace → 45min past updated). + now[0] = datetime(2026, 4, 18, 14, 46, tzinfo=timezone.utc) + mono[0] = 2700.0 # past the 60s cooldown + + with pytest.raises(RevocationListFreshnessError): + checker("any") diff --git a/tests/conformance/signing/test_revocation_fetcher.py b/tests/conformance/signing/test_revocation_fetcher.py new file mode 100644 index 000000000..aaf512716 --- /dev/null +++ b/tests/conformance/signing/test_revocation_fetcher.py @@ -0,0 +1,889 @@ +"""Tests for the live revocation-list fetcher + CachingRevocationChecker. + +The fetcher tests use ``httpx.MockTransport`` to simulate HTTP responses +without touching the network. The checker tests inject a fake fetcher +(plain callable matching the Protocol) so we can drive refresh paths, +304 handling, signature failures, issuer mismatches, and fail-closed +semantics deterministically using a controllable clock. +""" + +from __future__ import annotations + +import json +from collections.abc import Callable +from datetime import datetime, timedelta, timezone + +import pytest +from cryptography.hazmat.primitives.asymmetric import ed25519 + +from adcp.signing.crypto import ( + ALG_ED25519, + b64url_encode, + sign_signature_base, +) +from adcp.signing.revocation_fetcher import ( + REVOCATION_LIST_TYP, + CachingRevocationChecker, + FetchResult, + RevocationListFetcher, + RevocationListFetchError, + RevocationListFreshnessError, + RevocationListParseError, + RevocationListSignatureError, + default_revocation_list_fetcher, +) + +ISSUER = "https://gov.example.com" +REVOCATION_URI = f"{ISSUER}/.well-known/governance-revocations.json" + + +# -- helpers ------------------------------------------------------------ + + +def _key_and_jwks() -> tuple[ + ed25519.Ed25519PrivateKey, + dict[str, object], + Callable[[str], dict[str, object] | None], +]: + private = ed25519.Ed25519PrivateKey.generate() + jwk = { + "kty": "OKP", + "crv": "Ed25519", + "alg": "EdDSA", + "use": "sig", + "key_ops": ["verify"], + "kid": "gov-key-1", + "x": b64url_encode(private.public_key().public_bytes_raw()), + } + + def resolver(keyid: str) -> dict[str, object] | None: + return jwk if keyid == jwk["kid"] else None + + return private, jwk, resolver + + +def _make_payload( + *, + issuer: str = ISSUER, + updated: str = "2026-04-18T14:00:00Z", + next_update: str = "2026-04-18T14:15:00Z", + revoked_kids: list[str] | None = None, + revoked_jtis: list[str] | None = None, + version: int = 1, +) -> dict[str, object]: + return { + "version": version, + "issuer": issuer, + "updated": updated, + "next_update": next_update, + "revoked_kids": revoked_kids or [], + "revoked_jtis": revoked_jtis or [], + } + + +def _sign_jws_compact( + payload: dict[str, object], + *, + private: ed25519.Ed25519PrivateKey, + kid: str = "gov-key-1", + typ: str = REVOCATION_LIST_TYP, + alg: str = "EdDSA", +) -> str: + header = {"alg": alg, "kid": kid, "typ": typ} + b64_header = b64url_encode(json.dumps(header, separators=(",", ":")).encode()) + b64_payload = b64url_encode(json.dumps(payload, separators=(",", ":")).encode()) + signing_input = (b64_header + "." + b64_payload).encode("ascii") + signature = sign_signature_base( + alg=ALG_ED25519, private_key=private, signature_base=signing_input + ) + return b64_header + "." + b64_payload + "." + b64url_encode(signature) + + +def _sign_jws_general_json( + payload: dict[str, object], *, private: ed25519.Ed25519PrivateKey +) -> dict[str, object]: + compact = _sign_jws_compact(payload, private=private) + b64_header, b64_payload, b64_signature = compact.split(".") + return { + "payload": b64_payload, + "signatures": [{"protected": b64_header, "signature": b64_signature}], + } + + +def _controllable_clock( + start: datetime, +) -> tuple[Callable[[], datetime], Callable[[], float], Callable[[float], None]]: + """Return (wall_clock, monotonic_clock, advance(seconds)).""" + now = [start] + mono = [0.0] + + def wall_clock() -> datetime: + return now[0] + + def monotonic_clock() -> float: + return mono[0] + + def advance_seconds(seconds: float) -> None: + now[0] = now[0] + timedelta(seconds=seconds) + mono[0] = mono[0] + seconds + + return wall_clock, monotonic_clock, advance_seconds + + +class _ScriptedFetcher: + """RevocationListFetcher that returns pre-programmed responses.""" + + def __init__(self) -> None: + self.calls: list[tuple[str, str | None, str | None]] = [] + self._queue: list[FetchResult | Exception] = [] + + def enqueue(self, result: FetchResult | Exception) -> None: + self._queue.append(result) + + def __call__( + self, + uri: str, + *, + if_none_match: str | None = None, + if_modified_since: str | None = None, + ) -> FetchResult: + self.calls.append((uri, if_none_match, if_modified_since)) + if not self._queue: + raise AssertionError("ScriptedFetcher had no response queued") + next_up = self._queue.pop(0) + if isinstance(next_up, Exception): + raise next_up + return next_up + + +# -- default_revocation_list_fetcher (SSRF only) ----------------------- +# Wire-level tests (200 / 304 / 5xx / empty body) are exercised via the +# ASGI e2e suite in test_revocation_fetcher_e2e — they require a real +# transport, which is awkward to wire into httpx.Client in a non-brittle +# way. The SSRF path is shared with the JWKS fetcher and already covered +# there; we just smoke-test it still rejects here. + + +def test_default_fetcher_rejects_non_https() -> None: + from adcp.signing.jwks import SSRFValidationError + + with pytest.raises(SSRFValidationError): + default_revocation_list_fetcher("ftp://example.com/list.json") + + +def test_default_fetcher_rejects_metadata_ip() -> None: + from adcp.signing.jwks import SSRFValidationError + + with pytest.raises(SSRFValidationError): + default_revocation_list_fetcher("https://169.254.169.254/list.json") + + +# -- CachingRevocationChecker: happy path ------------------------------ + + +def test_first_call_fetches_and_decides() -> None: + private, _, jwks_resolver = _key_and_jwks() + payload = _make_payload(revoked_kids=["compromised-key"]) + token = _sign_jws_compact(payload, private=private) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag='"v1"', not_modified=False)) + + wall_clock, mono_clock, _advance = _controllable_clock( + datetime(2026, 4, 18, 14, 5, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + + assert checker("compromised-key") is True + assert checker("clean-key") is False + # Only one fetch — both calls come from the same cached list. + assert len(fetcher.calls) == 1 + + +def test_cache_hit_within_next_update_skips_refetch() -> None: + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact(_make_payload(), private=private) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag='"v1"', not_modified=False)) + + wall_clock, mono_clock, advance = _controllable_clock( + datetime(2026, 4, 18, 14, 1, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + + checker("k1") # triggers fetch + advance(60) # still well before next_update (14:15) + checker("k2") + checker("k3") + assert len(fetcher.calls) == 1 + + +def test_past_next_update_triggers_conditional_refresh() -> None: + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact(_make_payload(), private=private) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag='"v1"', not_modified=False)) + # Second response: 304 — server confirms list unchanged. + fetcher.enqueue(FetchResult(body="", etag='"v1"', not_modified=True)) + + wall_clock, mono_clock, advance = _controllable_clock( + datetime(2026, 4, 18, 14, 1, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + + checker("k1") # first fetch + advance(20 * 60) # jump past next_update (14:15) into 14:21 + checker("k2") # conditional refresh, gets 304 + + assert len(fetcher.calls) == 2 + _, if_none_match, _ = fetcher.calls[1] + assert if_none_match == '"v1"' + + +def test_general_json_serialization_is_accepted() -> None: + private, _, jwks_resolver = _key_and_jwks() + payload = _make_payload(revoked_kids=["rev"]) + doc = _sign_jws_general_json(payload, private=private) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=doc, etag=None, not_modified=False)) + + wall_clock, mono_clock, _ = _controllable_clock( + datetime(2026, 4, 18, 14, 1, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + + assert checker("rev") is True + + +# -- CachingRevocationChecker: signature + schema failures ------------- + + +def test_tampered_signature_raises_signature_error() -> None: + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact(_make_payload(), private=private) + + # Flip one payload byte while keeping signature → signature should fail. + b64_header, b64_payload, b64_signature = token.split(".") + from adcp.signing.crypto import b64url_decode + + pb = bytearray(b64url_decode(b64_payload)) + pb[-2] ^= 0xFF # tweak a byte in the payload + tampered = b64_header + "." + b64url_encode(bytes(pb)) + "." + b64_signature + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=tampered, etag=None, not_modified=False)) + + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + ) + with pytest.raises(RevocationListSignatureError): + checker("any-key") + + +def test_wrong_issuer_raises_parse_error() -> None: + private, _, jwks_resolver = _key_and_jwks() + payload = _make_payload(issuer="https://different.example.com") + token = _sign_jws_compact(payload, private=private) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag=None, not_modified=False)) + + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + ) + with pytest.raises(RevocationListParseError, match="issuer"): + checker("any-key") + + +def test_accepts_future_version_with_forward_compat() -> None: + # version=2 should NOT hard-reject: additive schema changes shouldn't + # force every old SDK into fail-closed across their entire traffic. + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact( + _make_payload(version=2, revoked_kids=["rev"]), private=private + ) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag=None, not_modified=False)) + + wall_clock, mono_clock, _ = _controllable_clock( + datetime(2026, 4, 18, 14, 1, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + assert checker("rev") is True + + +def test_non_positive_version_rejected() -> None: + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact(_make_payload(version=0), private=private) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag=None, not_modified=False)) + + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + ) + with pytest.raises(RevocationListParseError, match="version"): + checker("any-key") + + +def test_declared_cadence_below_floor_rejected() -> None: + # Spec floor is 60s. An issuer declaring next_update 30s after updated + # is violating the spec; we reject at parse time (fix #5). + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact( + _make_payload(updated="2026-04-18T14:00:00Z", next_update="2026-04-18T14:00:30Z"), + private=private, + ) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag=None, not_modified=False)) + + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + ) + with pytest.raises(RevocationListParseError, match="below spec floor"): + checker("any-key") + + +def test_updated_far_in_future_rejected() -> None: + private, _, jwks_resolver = _key_and_jwks() + # updated is 5 minutes ahead of our wall clock — outside 60s skew. + token = _sign_jws_compact( + _make_payload(updated="2026-04-18T14:10:00Z", next_update="2026-04-18T14:25:00Z"), + private=private, + ) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag=None, not_modified=False)) + + wall_clock, mono_clock, _ = _controllable_clock( + datetime(2026, 4, 18, 14, 0, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + with pytest.raises(RevocationListParseError, match="in the future"): + checker("any-key") + + +# -- CachingRevocationChecker: fail-closed ----------------------------- + + +def test_refresh_failure_within_grace_serves_cached() -> None: + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact(_make_payload(revoked_kids=["rev"]), private=private) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag='"v1"', not_modified=False)) + # Second call: network fails. We're past next_update but within grace. + fetcher.enqueue(RevocationListFetchError("server unavailable")) + + wall_clock, mono_clock, advance = _controllable_clock( + datetime(2026, 4, 18, 14, 1, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + + checker("rev") # fetches initial list (updated=14:00, next_update=14:15) + # Interval = 15min, grace = 2× = 30min. Advance past next_update but + # within grace (14:20 — 5min past next_update). + advance(19 * 60) # now 14:20 + + # Refresh fails, but we're inside grace → cached list still used. + assert checker("rev") is True + assert checker("clean") is False + + +def test_refresh_failure_past_grace_raises_freshness_error() -> None: + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact(_make_payload(revoked_kids=["rev"]), private=private) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag='"v1"', not_modified=False)) + # All subsequent fetches fail. + for _ in range(5): + fetcher.enqueue(RevocationListFetchError("server unavailable")) + + wall_clock, mono_clock, advance = _controllable_clock( + datetime(2026, 4, 18, 14, 1, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + + checker("rev") # initial fetch, interval 15min → grace 30min + # Advance well past next_update (14:15) + grace (30min) → 14:46. + advance(45 * 60 + 1) # 14:46:01 + + with pytest.raises(RevocationListFreshnessError, match="past next_update"): + checker("rev") + + +def test_304_within_window_refreshes_clock() -> None: + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact(_make_payload(), private=private) + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag='"v1"', not_modified=False)) + fetcher.enqueue(FetchResult(body="", etag='"v1"', not_modified=True)) + # After the 304, we shouldn't try to fetch again until the NEXT window. + + wall_clock, mono_clock, advance = _controllable_clock( + datetime(2026, 4, 18, 14, 1, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + + checker("k") # 1 fetch + advance(15 * 60 + 30) # past next_update → 14:16:30 + checker("k") # triggers refresh, gets 304 + # Cached list's next_update is still 14:15 — we haven't moved it forward. + # But checker._last_successful_refresh is updated. Advancing again past + # next_update WILL trigger another fetch because the list didn't change. + assert len(fetcher.calls) == 2 + + +# -- RevocationListFetcher Protocol compliance ------------------------- + + +def test_scripted_fetcher_matches_protocol() -> None: + """Structural typing: _ScriptedFetcher satisfies RevocationListFetcher.""" + fetcher: RevocationListFetcher = _ScriptedFetcher() + assert callable(fetcher) + + +# -- reviewer-fix coverage --------------------------------------------- + + +def test_replay_older_list_rejected() -> None: + """Fix #4: refresh MUST reject a list whose ``updated`` is older than cached. + + Defense against a CDN (or a compromised operator key) serving an + earlier list to un-revoke a compromised kid. + """ + private, _, jwks_resolver = _key_and_jwks() + newer = _make_payload( + updated="2026-04-18T14:10:00Z", + next_update="2026-04-18T14:25:00Z", + revoked_kids=["compromised"], + ) + older = _make_payload( + updated="2026-04-18T14:00:00Z", + next_update="2026-04-18T14:15:00Z", + revoked_kids=[], # attacker un-revokes the kid + ) + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult( + body=_sign_jws_compact(newer, private=private), etag='"v2"', not_modified=False + )) + fetcher.enqueue(FetchResult( + body=_sign_jws_compact(older, private=private), etag='"v1"', not_modified=False + )) + + wall_clock, mono_clock, advance = _controllable_clock( + datetime(2026, 4, 18, 14, 15, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + assert checker("compromised") is True + # Advance past next_update. Refetch gets the older list → reject. + # The replay error is caught inside the grace window so the checker + # keeps serving the cached (newer) list — the compromised kid stays + # revoked. + advance(15 * 60) # 14:30 + assert checker("compromised") is True + # Pin the invariant directly: the cached list is still the NEWER one. + # If the replay check had been removed, the older list would have + # overwritten the cache and `_current_list.updated` would now equal + # "2026-04-18T14:00:00Z". + assert checker._current_list is not None + assert checker._current_list.updated == "2026-04-18T14:10:00Z" + + +def test_from_issuer_origin_builds_spec_path() -> None: + """Fix #10: classmethod pins the .well-known path from the origin.""" + _, _, jwks_resolver = _key_and_jwks() + checker = CachingRevocationChecker.from_issuer_origin( + "https://Gov.Example.COM/", + jwks_resolver=jwks_resolver, + ) + # Normalized: lowercased host, scheme preserved, trailing slash stripped. + assert checker._revocation_uri == ( + "https://gov.example.com/.well-known/governance-revocations.json" + ) + assert checker._issuer == "https://gov.example.com" + + +def test_prime_fails_fast_on_bad_config() -> None: + """Fix #11: prime() surfaces fetch / JWS / schema errors at startup.""" + _, _, jwks_resolver = _key_and_jwks() + fetcher = _ScriptedFetcher() + fetcher.enqueue(RevocationListFetchError("operator unreachable")) + + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + ) + with pytest.raises(RevocationListFetchError, match="operator unreachable"): + checker.prime() + + +def test_prime_succeeds_and_caches() -> None: + """Priming caches the list so subsequent __call__ serves without fetching.""" + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact(_make_payload(revoked_kids=["rev"]), private=private) + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag='"v1"', not_modified=False)) + + wall_clock, mono_clock, _ = _controllable_clock( + datetime(2026, 4, 18, 14, 5, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + checker.prime() + assert checker("rev") is True + # Only the prime call triggered a fetch. + assert len(fetcher.calls) == 1 + + +def test_is_jti_revoked_surfaces_jti_membership() -> None: + """Fix #7: governance callers can check jti revocation independently of kid.""" + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact( + _make_payload( + revoked_kids=["kid-1"], + revoked_jtis=["jti-abc"], + ), + private=private, + ) + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag=None, not_modified=False)) + + wall_clock, mono_clock, _ = _controllable_clock( + datetime(2026, 4, 18, 14, 5, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + assert checker.is_jti_revoked("jti-abc") is True + assert checker.is_jti_revoked("jti-other") is False + + +def test_issuer_normalization_accepts_case_variants() -> None: + """Fix #8: issuer comparison is case-insensitive on scheme + host, ignores trailing slash.""" + private, _, jwks_resolver = _key_and_jwks() + # Configured issuer has trailing slash + mixed case. + configured = "HTTPS://Gov.Example.com/" + # Payload uses the canonical form. + token = _sign_jws_compact( + _make_payload(issuer="https://gov.example.com", revoked_kids=["k"]), + private=private, + ) + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag=None, not_modified=False)) + + wall_clock, mono_clock, _ = _controllable_clock( + datetime(2026, 4, 18, 14, 5, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=configured, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + assert checker("k") is True + + +def test_if_modified_since_threaded_to_fetcher() -> None: + """Fix #6: cached Last-Modified is sent on the next refresh.""" + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact(_make_payload(), private=private) + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult( + body=token, + etag='"v1"', + last_modified="Sat, 18 Apr 2026 14:00:00 GMT", + not_modified=False, + )) + fetcher.enqueue(FetchResult(body="", etag='"v1"', not_modified=True)) + + wall_clock, mono_clock, advance = _controllable_clock( + datetime(2026, 4, 18, 14, 1, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + checker("k") + advance(20 * 60) + checker("k") + + _, _, if_modified_since = fetcher.calls[1] + assert if_modified_since == "Sat, 18 Apr 2026 14:00:00 GMT" + + +def test_compact_jws_with_alternate_encoding_rejected_or_handled_safely() -> None: + """Fix #2: the compact JWS signing input uses the ORIGINAL b64 strings. + + Build a token where the signer used standard base64 (``+``, ``/``, + ``=``) rather than url-safe base64 for the payload — the permissive + decoder would accept it, but verification must work against the + exact wire bytes, not re-encoded bytes. Since the b64url_encode we + use in the signer produces url-safe output, any token with standard + chars in the payload slot will either (a) fail signature verify (if + the signer is us) or (b) verify successfully because the original + bytes are what we hashed. Either outcome is safe; a mismatch + between wire bytes and verified bytes is NOT safe. + + This test signs a normal compact JWS, then flips a char in the + original b64 payload to produce a variant that decodes to different + bytes. Verification must fail. + """ + from adcp.signing.crypto import b64url_decode, b64url_encode + + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact(_make_payload(revoked_kids=["k"]), private=private) + b64_header, b64_payload, b64_signature = token.split(".") + + # Transform the payload substring: decode, then re-encode with a + # padding char. The decoded bytes are identical; the wire form is + # different. If the verifier were using decoded-then-reencoded bytes + # as signing input, this attack would succeed; with the fix, it + # fails because the signature was over the original wire form. + raw = b64url_decode(b64_payload) + unpadded = b64url_encode(raw) + padded = unpadded + "=" * (-len(unpadded) % 4) + if padded == b64_payload: + # Skip — the natural encoding happens to have no padding gap. + return + tampered = f"{b64_header}.{padded}.{b64_signature}" + + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=tampered, etag=None, not_modified=False)) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + ) + with pytest.raises(RevocationListSignatureError): + checker("k") + + +# -- round-2 reviewer-fix coverage ------------------------------------- + + +def test_issuer_rejects_userinfo() -> None: + """Round-2: configured issuer with userinfo is rejected at init.""" + from adcp.signing.revocation_fetcher import _normalize_issuer + + with pytest.raises(ValueError, match="userinfo"): + _normalize_issuer("https://attacker@gov.example.com") + + +def test_issuer_homoglyph_normalizes_to_distinct_punycode() -> None: + """Round-2: homoglyph hosts produce a byte-distinct punycode form so + byte-equal comparison with the legitimate origin fails.""" + from adcp.signing.revocation_fetcher import _normalize_issuer + + legit = _normalize_issuer("https://gov.example.com") + # U+1D20 LATIN LETTER SMALL CAPITAL V — visually similar to 'v'. + homoglyph = _normalize_issuer("https://go\u1d20.example.com") + assert legit != homoglyph + # The homoglyph form is a punycode ASCII representation, not the + # visual lookalike — so a seller configured with the legit origin + # will reject a payload with the homoglyph origin. + assert "xn--" in homoglyph + + +def test_issuer_normalizes_idna_host() -> None: + """Round-2: legit IDN hosts encode to punycode and compare stably.""" + from adcp.signing.revocation_fetcher import _normalize_issuer + + # Two equivalent representations of a punycode IDN domain. + form_a = _normalize_issuer("https://bücher.example/") + form_b = _normalize_issuer("https://xn--bcher-kva.example/") + assert form_a == form_b + + +def test_last_modified_header_injection_rejected() -> None: + """Round-2: CRLF-injection-shaped Last-Modified values are dropped at write.""" + from adcp.signing.revocation_fetcher import _sanitize_last_modified + + assert _sanitize_last_modified("Sat, 18 Apr 2026 14:00:00 GMT") == ( + "Sat, 18 Apr 2026 14:00:00 GMT" + ) + # CRLF injection attempt + assert _sanitize_last_modified("Sat, 18 Apr 2026\r\nX-Injected: evil") is None + # Length cap + assert _sanitize_last_modified("A" * 65) is None + # None passes through + assert _sanitize_last_modified(None) is None + + +def test_304_slides_next_update_forward() -> None: + """Round-2: successive 304s advance the cached next_update so the + checker doesn't hit the refresh-cooldown path on every call past the + original next_update.""" + private, _, jwks_resolver = _key_and_jwks() + token = _sign_jws_compact(_make_payload(), private=private) + fetcher = _ScriptedFetcher() + fetcher.enqueue(FetchResult(body=token, etag='"v1"', not_modified=False)) + fetcher.enqueue(FetchResult(body="", etag='"v1"', not_modified=True)) + # If next_update wasn't advanced on 304, the next two calls past 14:30 + # would each try to refetch (subject to the 60s cooldown). We only + # queue ONE more fetcher response, so if the invariant breaks, one of + # the later calls raises AssertionError from the scripted fetcher. + + wall_clock, mono_clock, advance = _controllable_clock( + datetime(2026, 4, 18, 14, 1, tzinfo=timezone.utc) + ) + checker = CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + fetcher=fetcher, + wall_clock=wall_clock, + clock=mono_clock, + ) + checker("k") # 1 fetch → initial + advance(15 * 60 + 30) # 14:16:30, past original next_update (14:15) + checker("k") # 2 fetches → 304, should slide next_update to 14:30 + + # Now at 14:16:30. Cached next_update was 14:15, now should be 14:30. + assert checker._current_list is not None + assert checker._current_list.next_update.startswith("2026-04-18T14:30:00") + + # Additional calls WITHIN the new window should NOT refetch. + advance(60) # 14:17:30 + checker("k") # still no fetch — we're before the new 14:30 next_update + assert len(fetcher.calls) == 2 + + +def test_clock_footgun_rejects_time_time() -> None: + """Round-2: passing time.time as clock is rejected.""" + import time as _time + + _, _, jwks_resolver = _key_and_jwks() + with pytest.raises(ValueError, match="monotonic"): + CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + clock=_time.time, + ) + + +def test_clock_footgun_rejects_identical_sources() -> None: + """Round-2: passing the same callable to clock and wall_clock is rejected.""" + _, _, jwks_resolver = _key_and_jwks() + + def both() -> float: + raise RuntimeError("never called") + + with pytest.raises(ValueError, match="different sources"): + CachingRevocationChecker( + revocation_uri=REVOCATION_URI, + issuer=ISSUER, + jwks_resolver=jwks_resolver, + clock=both, + wall_clock=both, # type: ignore[arg-type] + )