Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/adcp/testing/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ async def invoke(
payload: dict[str, Any] | None = None,
*,
timeout_seconds: float = 5.0,
max_events: int = 32,
) -> ToolInvokeResult:
"""Invoke a skill and return the terminal-state result.

Expand All @@ -302,6 +303,8 @@ async def invoke(
timeout_seconds: Per-event dequeue timeout. The harness drains
the event queue waiting for a terminal Task; this caps how
long any single dequeue waits. Default 5s.
max_events: Maximum number of A2A events to drain while waiting
for a terminal Task. Default 32 preserves the previous hard cap.

Returns:
:class:`ToolInvokeResult` — ``ok`` reflects whether the terminal
Expand Down Expand Up @@ -348,9 +351,9 @@ async def invoke(
# Drain the event queue until a terminal Task arrives. Bounded so a
# buggy handler that never publishes a terminal event can't hang the
# test runner — each dequeue carries `timeout_seconds`, and the loop
# is bounded to a small number of intermediate state events.
# is bounded to `max_events` intermediate state events.
terminal_task: Any = None
for _ in range(32):
for _ in range(max_events):
try:
event = await asyncio.wait_for(queue.dequeue_event(), timeout=timeout_seconds)
except asyncio.TimeoutError:
Expand All @@ -366,7 +369,8 @@ async def invoke(
if terminal_task is None:
raise RuntimeError(
f"A2A executor for skill={skill!r} produced no terminal Task "
f"within {timeout_seconds}s — check executor middleware for hangs"
f"within {timeout_seconds}s x {max_events} events — "
"check executor middleware for hangs"
)

# Project the terminal Task's first DataPart artifact to a dict.
Expand Down
61 changes: 60 additions & 1 deletion tests/test_seller_a2a_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from typing import Any

import pytest

from adcp.decisioning import (
DecisioningCapabilities,
DecisioningPlatform,
Expand All @@ -16,7 +18,7 @@
class _SuccessPlatform(DecisioningPlatform):
capabilities = DecisioningCapabilities(
specialisms=["sales-non-guaranteed"],
supported_billing=("operator",),
supported_billing=["operator"],
)
accounts = SingletonAccounts(account_id="test")

Expand Down Expand Up @@ -58,6 +60,33 @@ def get_products(self, req: Any, ctx: Any) -> dict[str, Any]:
)


class _BurstExecutor:
def __init__(self, *, non_terminal_events: int) -> None:
self.non_terminal_events = non_terminal_events

async def execute(self, request_ctx: Any, queue: Any) -> None:
from a2a import types as pb

from adcp.server.a2a_server import _make_task

for _ in range(self.non_terminal_events):
await queue.enqueue_event(
_make_task(
request_ctx,
state=pb.TaskState.TASK_STATE_WORKING,
message="working",
)
)
await queue.enqueue_event(
_make_task(
request_ctx,
state=pb.TaskState.TASK_STATE_COMPLETED,
data={"products": []},
message="done",
)
)


_GET_PRODUCTS_PAYLOAD = {"buying_mode": "brief"}


Expand Down Expand Up @@ -133,6 +162,36 @@ async def test_a2a_invoke_structured_content_populated() -> None:
assert isinstance(result.structured_content, dict)


# ---- event drain budget -------------------------------------------------


async def test_a2a_invoke_default_event_cap_exhausts_before_terminal_task() -> None:
client = SellerA2AClient(_SuccessPlatform())
client._executor = _BurstExecutor(non_terminal_events=33)

with pytest.raises(RuntimeError) as exc_info:
await client.invoke("get_products", _GET_PRODUCTS_PAYLOAD, timeout_seconds=0.01)

message = str(exc_info.value)
assert "produced no terminal Task" in message
assert "within 0.01s x 32 events" in message


async def test_a2a_invoke_custom_max_events_reaches_terminal_task() -> None:
client = SellerA2AClient(_SuccessPlatform())
client._executor = _BurstExecutor(non_terminal_events=33)

result = await client.invoke(
"get_products",
_GET_PRODUCTS_PAYLOAD,
timeout_seconds=0.01,
max_events=128,
)

assert result.ok
assert result.data == {"products": []}


# ---- validation wiring round-trip --------------------------------------


Expand Down
Loading