diff --git a/src/adcp/__init__.py b/src/adcp/__init__.py index c3b69e833..443b5edc9 100644 --- a/src/adcp/__init__.py +++ b/src/adcp/__init__.py @@ -22,7 +22,10 @@ DirectoryEdgeStatus, DirectoryPublisherEntry, DiscoveryMethod, + DivergenceReport, EntryErrorKind, + PublisherDivergence, + detect_publisher_properties_divergence, domain_matches, fetch_adagents, fetch_adagents_with_cache, @@ -832,7 +835,10 @@ def get_adcp_version() -> str: "DirectoryEdgeStatus", "DirectoryPublisherEntry", "DiscoveryMethod", + "DivergenceReport", "EntryErrorKind", + "PublisherDivergence", + "detect_publisher_properties_divergence", "fetch_adagents", "fetch_adagents_with_cache", "fetch_agent_authorizations", diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index ea5517a26..07c4e9ea9 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -11,6 +11,7 @@ import asyncio import ipaddress import json +import logging import re import socket from dataclasses import dataclass, field @@ -25,6 +26,8 @@ from adcp.types.base import AdCPBaseModel from adcp.validation import ValidationError, validate_adagents +logger = logging.getLogger(__name__) + DiscoveryMethod = Literal["direct", "authoritative_location", "ads_txt_managerdomain"] @@ -1959,6 +1962,16 @@ class DirectoryPublisherEntry(AdCPBaseModel): signing_keys_pinned: bool | None = None status: DirectoryEdgeStatus last_verified_at: datetime + property_ids: list[str] | None = Field( + default=None, + description=( + "Canonical property IDs the agent's selectors resolve to under " + "this publisher. Present iff the request was made with " + "include=['properties'] AND the directory server supports it " + "(per adcp#4894). None signals count-only mode for downstream " + "consumers." + ), + ) class AgentAuthorizationsDirectoryResult(AdCPBaseModel): @@ -1981,12 +1994,18 @@ class AgentAuthorizationsDirectoryResult(AdCPBaseModel): # page is a small envelope; pagination handles bulk responses. MAX_DIRECTORY_PAGE_BYTES = 5 * 1024 * 1024 +# Hard cap on directory pagination iterations. A misbehaving directory that +# never returns an empty next_cursor would otherwise hang the sweep +# indefinitely; this cap fail-closes the loop. +MAX_DIRECTORY_PAGES = 1000 + async def fetch_agent_authorizations_from_directory( agent_url: str, *, directory_url: str, since: str | None = None, + include: list[str] | None = None, timeout: float = 10.0, client: httpx.AsyncClient | None = None, ) -> AgentAuthorizationsDirectoryResult: @@ -2009,6 +2028,15 @@ async def fetch_agent_authorizations_from_directory( since: Optional opaque cursor or RFC 3339 timestamp from a prior ``directory_indexed_at`` — passed through as ``?since=...`` to limit the result to edges that changed since that point. + include: Optional list of expansion keys per the AAO directory + API spec (adcp#4894). Each value is emitted as a separate + ``?include=`` query parameter (repeated-key form, not + comma-joined). Pass ``["properties"]`` against directories + that support it to receive per-publisher ``property_ids[]`` + on each row, enabling full set-diff against the publisher's + own adagents.json. Directories that don't support a given + expansion key simply omit the corresponding fields from the + response; callers should treat absence as count-only mode. timeout: Request timeout in seconds. client: Optional shared ``httpx.AsyncClient`` for connection pooling. Caller owns the client lifecycle. @@ -2045,8 +2073,18 @@ async def fetch_agent_authorizations_from_directory( _validate_redirect_url(f"{base}/v1/agents/_/publishers") request_url = f"{base}/v1/agents/{quote(agent_url, safe='')}/publishers" + query_pairs: list[tuple[str, str]] = [] if since is not None: - request_url = f"{request_url}?since={quote(since, safe='')}" + query_pairs.append(("since", since)) + if include: + # Repeated-key form per docs/aao/directory-api.mdx (style: form, + # explode: true). Comma-joined NOT accepted by spec-conformant + # directories. + for value in include: + query_pairs.append(("include", value)) + if query_pairs: + query_string = "&".join(f"{quote(k, safe='')}={quote(v, safe='')}" for k, v in query_pairs) + request_url = f"{request_url}?{query_string}" parsed = urlparse(request_url) await _dns_validate_host( @@ -2098,3 +2136,210 @@ async def fetch_agent_authorizations_from_directory( raise AdagentsValidationError( f"Agent-publishers directory response failed schema validation: {e}" ) from e + + +class PublisherDivergence(AdCPBaseModel): + """Divergence record for a single publisher domain. + + ``missing_in_inline``: property IDs the federated fetch found in the + publisher's own adagents.json that the directory did not surface + (publisher has properties the directory doesn't know about yet). + + ``missing_in_federated``: property IDs the directory claims the agent + is authorized for but the publisher's own adagents.json does not + include (stale directory entry or publisher revocation). + + Both fields are None in count-only fallback mode (directory did + not return ``property_ids[]``). In count-only mode, count-equality + does NOT guarantee set-equality — same-count substitutions are + undetectable. Use ``?include=properties`` (adcp#4894) on directories + that support it for full set-diff precision. + + ``child_fetch_error`` is non-None when the publisher's adagents.json + could not be fetched or parsed; other fields carry no meaning. + """ + + publisher_domain: str + directory_properties_authorized: int = Field(ge=0) + federated_properties_found: int = Field(ge=0) + missing_in_inline: list[str] | None = None + missing_in_federated: list[str] | None = None + child_fetch_error: str | None = None + + +DivergenceReport = list[PublisherDivergence] + + +async def detect_publisher_properties_divergence( + agent_url: str, + *, + directory_url: str, + sample_size: int | None = 200, + max_concurrency: int = 20, + timeout: float = 30.0, + client: httpx.AsyncClient | None = None, +) -> DivergenceReport: + """Compare directory's inline resolution against per-publisher federated fetches. + + For each publisher the directory lists under ``agent_url``, fetches + that publisher's own ``adagents.json`` and compares the property set + against the directory's claim. Returns only publishers where the two + paths disagree (or where the child fetch failed). + + Always requests ``include=["properties"]`` from the directory so the + full ``(publisher_domain, property_id)`` set-diff lights up on + directories that support adcp#4894. Against older directories that + return only ``properties_authorized`` counts, falls back to count- + comparison; ``missing_in_inline`` / ``missing_in_federated`` are + None in that fallback path. + + Per adcp#4827 §Resolution-paths, the federated result is + authoritative when the two paths disagree. + + Args: + agent_url: agent to check. + directory_url: AAO directory base URL (HTTPS only — same SSRF + gate as :func:`fetch_agent_authorizations_from_directory`). + sample_size: cap the sweep at N publishers (drawn from the first + page of directory results). None opts into a full sweep + across all pages — only do this for small networks. Default + 200 keeps the divergence sweep bounded by default. + max_concurrency: semaphore-capped concurrent federated fetches. + Default 20 — caps the burst against publisher origins. + timeout: per-request timeout (directory + child fetches). + client: optional shared ``httpx.AsyncClient``. + + Returns: + :data:`DivergenceReport` (``list[PublisherDivergence]``). Empty + list = no divergence detected. Note in count-only fallback mode, + an empty list means counts agree but set-equality is not + guaranteed. + """ + own_client = client is None + http = client or httpx.AsyncClient() + try: + collected: list[DirectoryPublisherEntry] = [] + cursor: str | None = None + seen_cursors: set[str] = set() + page_count = 0 + while True: + page = await fetch_agent_authorizations_from_directory( + agent_url, + directory_url=directory_url, + since=cursor, + include=["properties"], + timeout=timeout, + client=http, + ) + page_count += 1 + collected.extend(page.publishers) + if sample_size is not None and len(collected) >= sample_size: + collected = collected[:sample_size] + break + cursor = page.next_cursor + if not cursor: + break + if cursor in seen_cursors: + raise AdagentsValidationError( + f"Directory page cursor {cursor!r} repeated — refusing to loop forever." + ) + seen_cursors.add(cursor) + if page_count >= MAX_DIRECTORY_PAGES: + raise AdagentsValidationError( + f"Directory pagination exceeded {MAX_DIRECTORY_PAGES} pages — aborting sweep." + ) + + # Dedupe by publisher_domain before fan-out: a hostile directory + # returning N rows for the same publisher would otherwise amplify + # into N concurrent fetches against a single victim host. First + # occurrence wins (deterministic) — conflicting property_ids / + # properties_authorized across duplicates are dropped here; the + # directory's behavior is itself a divergence signal for ops. + seen_domains: set[str] = set() + deduped: list[DirectoryPublisherEntry] = [] + for entry in collected: + if entry.publisher_domain in seen_domains: + continue + seen_domains.add(entry.publisher_domain) + deduped.append(entry) + collected = deduped + + # Emit a one-shot warning when the entire sample comes back without + # property_ids[]. In count-only mode, same-count substitutions are + # undetectable — adopters should pin include=["properties"] support + # on directories that offer it. + if collected and all(e.property_ids is None for e in collected): + logger.warning( + "AAO directory %s did not return property_ids[] on any publisher " + "entry — falling back to count-only divergence detection. Same-count " + "substitutions are undetectable in this mode. Upgrade the directory " + "or pin include=['properties'] support.", + directory_url, + ) + + sem = asyncio.Semaphore(max_concurrency) + + async def _probe(entry: DirectoryPublisherEntry) -> PublisherDivergence | None: + async with sem: + try: + data = await fetch_adagents( + entry.publisher_domain, timeout=timeout, client=http + ) + federated_props = get_properties_by_agent(data, agent_url) + # Falsy/empty property_id is silently dropped: upstream + # schema requires a non-empty string, so an empty value + # is a structural violation that belongs in + # validate_adagents, not a divergence signal. Federated + # properties with valid IDs only. + federated_ids = { + str(p.get("property_id")) for p in federated_props if p.get("property_id") + } + except ( + AdagentsNotFoundError, + AdagentsValidationError, + AdagentsTimeoutError, + httpx.HTTPError, + OSError, + ValueError, + ) as exc: + return PublisherDivergence( + publisher_domain=entry.publisher_domain, + directory_properties_authorized=entry.properties_authorized, + federated_properties_found=0, + missing_in_inline=None, + missing_in_federated=None, + child_fetch_error=str(exc), + ) + + if entry.property_ids is not None: + # Full set-diff path (adcp#4894). + dir_ids = set(entry.property_ids) + missing_in_inline = sorted(federated_ids - dir_ids) + missing_in_federated = sorted(dir_ids - federated_ids) + if not missing_in_inline and not missing_in_federated: + return None + return PublisherDivergence( + publisher_domain=entry.publisher_domain, + directory_properties_authorized=entry.properties_authorized, + federated_properties_found=len(federated_ids), + missing_in_inline=missing_in_inline, + missing_in_federated=missing_in_federated, + ) + + # Count-only fallback (older directories). + if len(federated_ids) == entry.properties_authorized: + return None + return PublisherDivergence( + publisher_domain=entry.publisher_domain, + directory_properties_authorized=entry.properties_authorized, + federated_properties_found=len(federated_ids), + missing_in_inline=None, + missing_in_federated=None, + ) + + probes = await asyncio.gather(*[_probe(e) for e in collected]) + finally: + if own_client: + await http.aclose() + + return [p for p in probes if p is not None] diff --git a/tests/fixtures/public_api_snapshot.json b/tests/fixtures/public_api_snapshot.json index ebeaacd42..77cbe99da 100644 --- a/tests/fixtures/public_api_snapshot.json +++ b/tests/fixtures/public_api_snapshot.json @@ -121,6 +121,7 @@ "DirectoryEdgeStatus", "DirectoryPublisherEntry", "DiscoveryMethod", + "DivergenceReport", "DomainLookupResult", "Duration", "EntryErrorKind", @@ -253,6 +254,7 @@ "ProvidePerformanceFeedbackRequest", "ProvidePerformanceFeedbackResponse", "ProvidePerformanceFeedbackSuccessResponse", + "PublisherDivergence", "PublisherProperties", "PublisherPropertiesAll", "PublisherPropertiesById", @@ -351,6 +353,7 @@ "create_mcp_webhook_payload", "create_test_agent", "creative_agent", + "detect_publisher_properties_divergence", "domain_matches", "extract_webhook_result_data", "fetch_adagents", diff --git a/tests/test_adagents.py b/tests/test_adagents.py index 33d404cbc..59e2e0fa2 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -4144,3 +4144,525 @@ def handler(request: httpx.Request) -> httpx.Response: directory_url="https://aao.example.com", client=client, ) + + async def test_include_properties_appears_as_query_param(self): + """include=['properties'] emits ?include=properties (repeated-key form).""" + from adcp.adagents import fetch_agent_authorizations_from_directory + + captured: dict[str, object] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["raw_url"] = str(request.url) + captured["include_list"] = request.url.params.get_list("include") + return httpx.Response( + 200, + json={ + "agent_url": "https://agent.example.com/", + "directory_indexed_at": None, + "publishers": [], + }, + ) + + async with self._client(handler) as client: + await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com", + include=["properties"], + client=client, + ) + + assert captured["include_list"] == ["properties"] + assert "include=properties" in captured["raw_url"] # type: ignore[operator] + + async def test_include_multiple_values_repeated_keys(self): + """include=['properties','future'] emits TWO ?include= keys, not comma-joined.""" + from adcp.adagents import fetch_agent_authorizations_from_directory + + captured: dict[str, object] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["raw_url"] = str(request.url) + captured["include_list"] = request.url.params.get_list("include") + return httpx.Response( + 200, + json={ + "agent_url": "https://agent.example.com/", + "directory_indexed_at": None, + "publishers": [], + }, + ) + + async with self._client(handler) as client: + await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com", + include=["properties", "future"], + client=client, + ) + + assert captured["include_list"] == ["properties", "future"] + # Comma-joined form would not produce two list items; assert wire form. + raw = captured["raw_url"] + assert isinstance(raw, str) + assert raw.count("include=") == 2 + assert "include=properties%2Cfuture" not in raw + + async def test_property_ids_parsed_from_publisher_entry(self): + """Directory row with property_ids round-trips into the Pydantic model.""" + from adcp.adagents import ( + DirectoryPublisherEntry, + fetch_agent_authorizations_from_directory, + ) + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "agent_url": "https://agent.example.com/", + "directory_indexed_at": "2026-05-20T12:00:00Z", + "publishers": [ + { + "publisher_domain": "nytimes.example", + "discovery_method": "direct", + "properties_authorized": 2, + "properties_total": 2, + "status": "authorized", + "last_verified_at": "2026-05-20T11:50:00Z", + "property_ids": ["p-1", "p-2"], + } + ], + }, + ) + + async with self._client(handler) as client: + result = await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com", + include=["properties"], + client=client, + ) + + entry = result.publishers[0] + assert isinstance(entry, DirectoryPublisherEntry) + assert entry.property_ids == ["p-1", "p-2"] + + async def test_property_ids_absent_is_none(self): + """Directory row without property_ids parses with property_ids=None.""" + from adcp.adagents import fetch_agent_authorizations_from_directory + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "agent_url": "https://agent.example.com/", + "directory_indexed_at": "2026-05-20T12:00:00Z", + "publishers": [ + { + "publisher_domain": "nytimes.example", + "discovery_method": "direct", + "properties_authorized": 3, + "properties_total": 3, + "status": "authorized", + "last_verified_at": "2026-05-20T11:50:00Z", + } + ], + }, + ) + + async with self._client(handler) as client: + result = await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + assert result.publishers[0].property_ids is None + + async def test_include_combines_with_since(self): + """`since` and `include` both round-trip together in the URL.""" + from adcp.adagents import fetch_agent_authorizations_from_directory + + captured: dict[str, object] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["since"] = request.url.params.get("since") + captured["include_list"] = request.url.params.get_list("include") + return httpx.Response( + 200, + json={ + "agent_url": "https://agent.example.com/", + "directory_indexed_at": None, + "publishers": [], + }, + ) + + async with self._client(handler) as client: + await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com", + since="opaque-cursor-1", + include=["properties"], + client=client, + ) + + assert captured["since"] == "opaque-cursor-1" + assert captured["include_list"] == ["properties"] + + +class TestDetectPublisherPropertiesDivergence: + """detect_publisher_properties_divergence: directory vs federated set-diff.""" + + @staticmethod + def _directory_handler(publishers, *, next_cursor=None): + """Build a MockTransport handler that returns a fixed directory page.""" + + def handler(request: httpx.Request) -> httpx.Response: + body: dict[str, object] = { + "agent_url": "https://agent.example.com/", + "directory_indexed_at": "2026-05-20T12:00:00Z", + "publishers": publishers, + } + if next_cursor is not None: + body["next_cursor"] = next_cursor + return httpx.Response(200, json=body) + + return handler + + @staticmethod + def _entry( + publisher_domain, + *, + properties_authorized=1, + property_ids=None, + ): + entry: dict[str, object] = { + "publisher_domain": publisher_domain, + "discovery_method": "direct", + "properties_authorized": properties_authorized, + "properties_total": properties_authorized, + "status": "authorized", + "last_verified_at": "2026-05-20T11:50:00Z", + } + if property_ids is not None: + entry["property_ids"] = property_ids + return entry + + async def test_full_set_diff_when_property_ids_present(self, monkeypatch): + """Directory says {p1,p2}; federated returns {p2,p3} → set-diff reported.""" + from adcp import adagents as adagents_mod + from adcp.adagents import detect_publisher_properties_divergence + + publishers = [ + self._entry("nytimes.example", properties_authorized=2, property_ids=["p1", "p2"]) + ] + handler = self._directory_handler(publishers) + + async def fake_fetch_adagents(domain, timeout=10.0, client=None): + return {"_": "ignored — get_properties_by_agent is patched"} + + def fake_get_properties_by_agent(data, agent_url): + return [{"property_id": "p2"}, {"property_id": "p3"}] + + monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents) + monkeypatch.setattr(adagents_mod, "get_properties_by_agent", fake_get_properties_by_agent) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + report = await detect_publisher_properties_divergence( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + assert len(report) == 1 + d = report[0] + assert d.publisher_domain == "nytimes.example" + assert d.missing_in_inline == ["p3"] + assert d.missing_in_federated == ["p1"] + assert d.federated_properties_found == 2 + assert d.directory_properties_authorized == 2 + assert d.child_fetch_error is None + + async def test_no_report_when_sets_match(self, monkeypatch): + """Directory and federated agree on the ID set → empty report.""" + from adcp import adagents as adagents_mod + from adcp.adagents import detect_publisher_properties_divergence + + publishers = [ + self._entry("nytimes.example", properties_authorized=2, property_ids=["p1", "p2"]) + ] + handler = self._directory_handler(publishers) + + async def fake_fetch_adagents(domain, timeout=10.0, client=None): + return {} + + def fake_get_properties_by_agent(data, agent_url): + return [{"property_id": "p1"}, {"property_id": "p2"}] + + monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents) + monkeypatch.setattr(adagents_mod, "get_properties_by_agent", fake_get_properties_by_agent) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + report = await detect_publisher_properties_divergence( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + assert report == [] + + async def test_count_fallback_when_property_ids_absent(self, monkeypatch): + """No property_ids on the row → count mismatch yields divergence with None fields.""" + from adcp import adagents as adagents_mod + from adcp.adagents import detect_publisher_properties_divergence + + publishers = [self._entry("nytimes.example", properties_authorized=5)] + handler = self._directory_handler(publishers) + + async def fake_fetch_adagents(domain, timeout=10.0, client=None): + return {} + + def fake_get_properties_by_agent(data, agent_url): + # Only 3 IDs — directory said 5 → count mismatch. + return [{"property_id": "a"}, {"property_id": "b"}, {"property_id": "c"}] + + monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents) + monkeypatch.setattr(adagents_mod, "get_properties_by_agent", fake_get_properties_by_agent) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + report = await detect_publisher_properties_divergence( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + assert len(report) == 1 + d = report[0] + assert d.directory_properties_authorized == 5 + assert d.federated_properties_found == 3 + assert d.missing_in_inline is None + assert d.missing_in_federated is None + + async def test_child_fetch_error_recorded(self, monkeypatch): + """fetch_adagents raises → divergence record carries child_fetch_error.""" + from adcp import adagents as adagents_mod + from adcp.adagents import detect_publisher_properties_divergence + from adcp.exceptions import AdagentsNotFoundError + + publishers = [ + self._entry("nytimes.example", properties_authorized=2, property_ids=["p1", "p2"]) + ] + handler = self._directory_handler(publishers) + + async def fake_fetch_adagents(domain, timeout=10.0, client=None): + raise AdagentsNotFoundError(domain) + + monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + report = await detect_publisher_properties_divergence( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + assert len(report) == 1 + d = report[0] + assert d.child_fetch_error is not None + assert d.federated_properties_found == 0 + assert d.missing_in_inline is None + assert d.missing_in_federated is None + + async def test_sample_size_caps_probes(self, monkeypatch): + """sample_size=3 against a 10-row page → only 3 fetch_adagents calls.""" + from adcp import adagents as adagents_mod + from adcp.adagents import detect_publisher_properties_divergence + + publishers = [ + self._entry(f"pub{i}.example", properties_authorized=1, property_ids=["p1"]) + for i in range(10) + ] + handler = self._directory_handler(publishers) + + call_count = 0 + + async def fake_fetch_adagents(domain, timeout=10.0, client=None): + nonlocal call_count + call_count += 1 + return {} + + def fake_get_properties_by_agent(data, agent_url): + return [{"property_id": "p1"}] + + monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents) + monkeypatch.setattr(adagents_mod, "get_properties_by_agent", fake_get_properties_by_agent) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + await detect_publisher_properties_divergence( + "https://agent.example.com/", + directory_url="https://aao.example.com", + sample_size=3, + client=client, + ) + + assert call_count == 3 + + async def test_max_concurrency_respected(self, monkeypatch): + """Peak in-flight fetch_adagents calls never exceed max_concurrency.""" + import asyncio + + from adcp import adagents as adagents_mod + from adcp.adagents import detect_publisher_properties_divergence + + publishers = [ + self._entry(f"pub{i}.example", properties_authorized=1, property_ids=["p1"]) + for i in range(20) + ] + handler = self._directory_handler(publishers) + + in_flight = 0 + peak = 0 + lock = asyncio.Lock() + release_gate = asyncio.Event() + + async def fake_fetch_adagents(domain, timeout=10.0, client=None): + nonlocal in_flight, peak + async with lock: + in_flight += 1 + peak = max(peak, in_flight) + try: + # Hold the slot long enough for the semaphore to actually + # gate concurrent entrants; gate is set immediately by + # the test event below so we don't slow the suite down. + await release_gate.wait() + finally: + async with lock: + in_flight -= 1 + return {} + + def fake_get_properties_by_agent(data, agent_url): + return [{"property_id": "p1"}] + + monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents) + monkeypatch.setattr(adagents_mod, "get_properties_by_agent", fake_get_properties_by_agent) + + async def releaser(): + # Yield enough to let the semaphore admit its first batch + # of waiters, observe peak, then release everyone. + for _ in range(50): + await asyncio.sleep(0) + release_gate.set() + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + await asyncio.gather( + detect_publisher_properties_divergence( + "https://agent.example.com/", + directory_url="https://aao.example.com", + sample_size=20, + max_concurrency=4, + client=client, + ), + releaser(), + ) + + assert peak <= 4 + assert peak >= 1 # sanity: probes actually ran + + async def test_divergence_dedupes_collected_by_publisher_domain(self, monkeypatch): + """Hostile directory: 5 rows all publisher_domain=victim → 1 fetch.""" + from adcp import adagents as adagents_mod + from adcp.adagents import detect_publisher_properties_divergence + + # 5 entries all pointing at the same victim host. A naive + # implementation would fan out 5 concurrent fetches against + # victim.example; the dedupe path must collapse to a single probe. + publishers = [ + self._entry("victim.example", properties_authorized=1, property_ids=["p1"]) + for _ in range(5) + ] + handler = self._directory_handler(publishers) + + call_count = 0 + + async def fake_fetch_adagents(domain, timeout=10.0, client=None): + nonlocal call_count + call_count += 1 + return {} + + def fake_get_properties_by_agent(data, agent_url): + return [{"property_id": "p1"}] + + monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents) + monkeypatch.setattr(adagents_mod, "get_properties_by_agent", fake_get_properties_by_agent) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + await detect_publisher_properties_divergence( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + assert call_count == 1 + + async def test_divergence_aborts_on_repeated_cursor(self, monkeypatch): + """Misbehaving directory returns the same next_cursor forever → raise.""" + from adcp.adagents import detect_publisher_properties_divergence + + # Each response includes a next_cursor that never advances. The + # page-walk loop must detect the repeat and raise rather than + # spin until OOM. + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "agent_url": "https://agent.example.com/", + "directory_indexed_at": "2026-05-20T12:00:00Z", + "publishers": [], + "next_cursor": "stuck", + }, + ) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + with pytest.raises(AdagentsValidationError, match="cursor 'stuck' repeated"): + await detect_publisher_properties_divergence( + "https://agent.example.com/", + directory_url="https://aao.example.com", + sample_size=None, # full sweep, so we actually walk pages + client=client, + ) + + async def test_divergence_warns_on_count_only_mode(self, monkeypatch, caplog): + """No entry has property_ids → one-shot warning fires.""" + import logging as _logging + + from adcp import adagents as adagents_mod + from adcp.adagents import detect_publisher_properties_divergence + + # No property_ids on any row → directory is in count-only mode. + publishers = [ + self._entry("a.example", properties_authorized=1), + self._entry("b.example", properties_authorized=2), + ] + handler = self._directory_handler(publishers) + + async def fake_fetch_adagents(domain, timeout=10.0, client=None): + return {} + + def fake_get_properties_by_agent(data, agent_url): + return [{"property_id": "p1"}] + + monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents) + monkeypatch.setattr(adagents_mod, "get_properties_by_agent", fake_get_properties_by_agent) + + with caplog.at_level(_logging.WARNING, logger="adcp.adagents"): + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + await detect_publisher_properties_divergence( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + count_only_warnings = [ + r for r in caplog.records if "count-only divergence detection" in r.getMessage() + ] + assert len(count_only_warnings) == 1 + assert "https://aao.example.com" in count_only_warnings[0].getMessage()