feat(a2a): auto-retain contextId + taskId across multi-turn calls#251
Conversation
Multi-turn A2A buyer flows (get_products → create_media_buy on the same brief; HITL approvals on a single task) require the client to echo the server-assigned contextId and, for non-terminal states, the taskId. Prior to this change the client minted a fresh contextId on every send and dropped taskId entirely, so servers had to invent out-of-band correlation and HITL resumes orphaned pending tasks. Public API on ADCPClient (A2A only): - `ADCPClient(context_id=...)` to seed (resume across restart, or self-name the session with a buyer-side correlation key) - `client.context_id` read-only property - `client.pending_task_id` read-only property - `client.reset_context(context_id=None)` to start a new conversation Non-A2A callers: reads return None (safe in generic code); writes raise ValueError. Covered end-to-end by new HTTP integration tests that spin up a real a2a-sdk Starlette app under uvicorn and prove session continuity at the wire level, including the HITL task_id resume path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
scr-oath
left a comment
There was a problem hiding this comment.
Post-merge audit review.
Bottom line: core logic is sound. TaskState(str, Enum) makes the _NONTERMINAL_TASK_STATES string-frozenset membership check safe, and the "retain task_id only on non-terminal, clear on terminal" rule matches A2A's resume-same-task contract. No FunctionResponse handling is needed from the client side — that is an ADK-internal concept; at the A2A protocol layer the client just sends Message and the ADK server converts internally.
Risk level: medium. Mostly additive; one real correctness concern (state mutated before a post-processing step that can raise) and two test-coverage gaps worth closing in a follow-up.
Specialists consulted: code-reviewer, python-pro, api-designer, test-automator.
Counts: 1 blocking issue + 2 test-coverage issues, 8 suggestions, several praises. Details inline.
Notes on the enum-vs-string question (result.status.state in _NONTERMINAL_TASK_STATES): verified against a2a-sdk 0.3.24/25 — class TaskState(str, Enum) with values exactly 'submitted' | 'working' | 'input-required' | 'auth-required' | ..., so enum members compare equal to their string values in set membership. Pydantic coerces string inputs to enum members in A2ATaskStatus(state=...), so unit-test mocks exercise the real enum semantics, not a string-only happy path. Initial concern confirmed safe.
ADCP metadata question: the ADCP context echo field rides inside tool-call arguments (DataPart payload), not at the A2A Message envelope — so the adapter correctly doesn't touch it. Message-envelope fields metadata, reference_task_ids, extensions are not passed today; probably fine for the current scope, worth a tracking issue if buyers ever need message-level trace headers.
| # Retain the server-assigned context_id so subsequent | ||
| # turns continue the same A2A conversation. Task.context_id | ||
| # is required by a2a-sdk, so no None-guard needed. | ||
| self._context_id = result.context_id |
There was a problem hiding this comment.
issue (blocking): state is mutated before a call that can raise, leaving the adapter in a state that reflects a response the caller never received.
self._context_id and self._pending_task_id are updated at lines 345–353 before self._process_task_response(result, debug_info) at line 354. If _process_task_response raises (malformed artifact, missing required field, etc.), the caller sees the exception but the adapter has already advanced its state. A retry would then send the wrong task_id — possibly None after a clear, or a new server-assigned id that the prior-turn server doesn't know about — potentially orphaning the legitimate in-flight task.
Fix: capture locals first, commit only after the processing step succeeds.
new_context_id = result.context_id
new_pending_task_id = (
result.id if result.status.state in self._NONTERMINAL_TASK_STATES else None
)
task_result = self._process_task_response(result, debug_info) # may raise
_idempotency.raise_for_idempotency_error(tool_name, task_result.data, self.agent_config.id) # may raise
# commit after all failure points
self._context_id = new_context_id
self._pending_task_id = new_pending_task_id
return _idempotency.annotate_result(task_result, idempotency_key)| if result.status.state in self._NONTERMINAL_TASK_STATES: | ||
| self._pending_task_id = result.id | ||
| else: | ||
| self._pending_task_id = None |
There was a problem hiding this comment.
issue: the state-retention logic only runs inside if isinstance(result, Task):. The else branch (when send_message returns a bare Message, a few lines below) leaves both _context_id and _pending_task_id unchanged, with no test locking that decision down.
If a server returns a bare Message while the adapter holds a _pending_task_id from a prior input-required, the next outbound call will still echo that stale task_id — either correct (task still in-flight) or a bug (server ended via Message). In practice this branch may be unreachable against ADCP-conformant servers (per a2a-response-format.mdx, Task is always returned), but please either (a) defensively clear _pending_task_id in the Message branch with a rationale comment, or (b) add an explicit test that constructs SendMessageSuccessResponse(result=<Message>) and asserts the chosen behavior so future refactors don't silently change it.
Also worth confirming: does a2a.types.Message carry context_id? The PR title references "dropped Message.context_id" — if that field exists on Message, the fix is incomplete (capture from the Message result too). A one-line clarifying comment would help future readers either way.
| assert adapter.pending_task_id is None | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_task_id_retained_on_working_state(self, a2a_config): |
There was a problem hiding this comment.
issue: _NONTERMINAL_TASK_STATES has four members — submitted, working, input-required, auth-required — but only input-required and working have unit tests asserting pending_task_id retention. submitted and auth-required have no equivalent. If a future refactor accidentally drops either from the frozenset, no unit test catches the regression (the integration tests only exercise input-required via _HitlExecutor).
Add mirror tests:
async def test_task_id_retained_on_submitted_state(self, a2a_config): ...
async def test_task_id_retained_on_auth_required_state(self, a2a_config): ...The PR description claims "task_id retention in each of the non-terminal states" — currently only half the states are covered.
| assert second_call[0][0].params.message.context_id == "ctx-session" | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_task_id_cleared_on_failed_state(self, a2a_config): |
There was a problem hiding this comment.
suggestion (non-blocking): completed and failed are tested for pending_task_id clearing, but canceled and rejected (both valid TaskState members with similar terminal semantics) are not. The logic is "anything not in the frozenset clears," so one test for either would lock the contract without ambiguity.
| # a new one. Terminal states (completed/failed/canceled/rejected) | ||
| # clear the retained task_id — subsequent calls in the conversation | ||
| # are new tasks. | ||
| _NONTERMINAL_TASK_STATES = frozenset( |
There was a problem hiding this comment.
suggestion: TaskState has a 9th member — unknown = 'unknown' — that is implicitly treated as terminal here (clears _pending_task_id). Arguably correct (don't cling to an undefined-state task), but the class-level comment only enumerates completed/failed/canceled/rejected as terminal; unknown is silently lumped in. Worth adding either a one-line note to the comment or a logger.warning when encountered so operators notice if a server starts emitting it.
| return self.adapter.pending_task_id | ||
| return None | ||
|
|
||
| def reset_context(self, context_id: str | None = None) -> None: |
There was a problem hiding this comment.
suggestion: the docstring on the context_id property advertises the persist-and-resume story ("persist this value (e.g., Redis keyed by your brief id) to resume across process restarts"), but full resume state is two fields — context_id AND pending_task_id. A caller who only persists context_id and restores after an input-required response will reconnect to the right conversation but with pending_task_id=None, orphaning the in-flight task server-side.
Consider a first-class checkpoint/restore surface:
def checkpoint(self) -> dict[str, str | None]:
return {"context_id": self.context_id, "pending_task_id": self.pending_task_id}
@classmethod
def from_checkpoint(cls, agent_config, state, **kwargs):
client = cls(agent_config, context_id=state.get("context_id"), **kwargs)
if state.get("pending_task_id") and isinstance(client.adapter, A2AAdapter):
client.adapter._pending_task_id = state["pending_task_id"]
return clientWithout this, the advertised Redis-resume pattern is subtly broken for HITL workflows — which is the primary use case the PR is motivated by.
| assert adapter.pending_task_id == "task-in-progress" | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_set_context_id_clears_pending_task(self, a2a_config): |
There was a problem hiding this comment.
nitpick: this test and test_clearing_context_id_starts_fresh_conversation write directly to adapter._context_id / adapter._pending_task_id for setup, bypassing the public set_context_id() path. Using the public API for setup exercises the full write path and decouples tests from attribute names. If the direct write is intentional (isolating a specific code path), a comment noting that would help. test_pending_task_id_property_exposes_adapter_state has the same pattern.
| # Retain the server-assigned context_id so subsequent | ||
| # turns continue the same A2A conversation. Task.context_id | ||
| # is required by a2a-sdk, so no None-guard needed. | ||
| self._context_id = result.context_id |
There was a problem hiding this comment.
praise: the _NONTERMINAL_TASK_STATES frozenset + the "retain only on non-terminal" rule is the right model for A2A's resume-same-task contract. The inline comment at lines 346–349 justifying why task_id must be cleared on terminal states is exactly the kind of breadcrumb that prevents future regressions from "just retain it always, simpler that way" refactors. Confirmed safe against a2a-sdk 0.3.24/25 where TaskState is class TaskState(str, Enum).
| always starts a fresh task under the new context. | ||
| """ | ||
| self._context_id = context_id | ||
| self._pending_task_id = None |
There was a problem hiding this comment.
praise: set_context_id atomically clearing _pending_task_id on every context change is the correct invariant. A "reset context but keep stale task_id" state would be a hard-to-diagnose bug class, and this method makes it unreachable by construction.
| return int(s.getsockname()[1]) | ||
|
|
||
|
|
||
| @asynccontextmanager |
There was a problem hiding this comment.
praise: the module docstring honestly explains what server-side observation can and cannot prove (server always mints an id, so "client sent None" and "client sent X" look the same server-side on the first turn), and the tests then check the right observable: session continuity across turns. _pick_free_port via bind(("127.0.0.1", 0)) is the right pattern for parallel-safe tests.
suggestion (non-blocking): small TOCTOU window between s.close() and uvicorn binding to that port. Under high-concurrency CI, another process can claim it. uvicorn.Server.serve(sockets=[s]) accepts a pre-bound socket directly, avoiding the close-then-rebind gap. Very low priority.
Pulled latest schemas via `make regenerate-schemas`. Spec drift includes: - protocol.get_adcp_capabilities_response.idempotency is now a discriminated oneOf (IdempotencySupported / IdempotencyUnsupported) with a new `account_id_is_opaque` field — lets code generators that can't lower draft-07 if/then constructs produce correct named types with the replay_ttl_seconds invariant at the type level. - Other files: refreshed descriptions, enum reorderings that reshuffle numbered discriminated-union variants, minor field tightening. 296 generated files changed, ~24K lines in / ~20K lines out. validate-generated passes. mypy clean across 678 source files. Full suite: 2079 tests passing (no regressions — PR adcontextprotocol#251's A2A contextId integration suite lands cleanly against the refreshed types). No source changes outside generated_poc/ + _generated.py + _ergonomic.py + schemas/cache/. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Upstream spec added a cluster of breaking changes — most Asset classes
went from 'discriminator is implicit' to 'asset_type is required'.
Pydantic honors 'required' literally, breaking ergonomic construction:
# Before upstream change: TextContent(content='hello')
# After upstream change, pre-fix:
TextContent(asset_type='text', content='hello') # must repeat tag
Fix: inject_literal_discriminator_defaults() in post_generate_fixes.py.
Pattern-matches AnnAssign nodes whose annotation is Literal['x'] (bare
or Annotated-wrapped) with no default, appends ` = 'x'` on the same
line. Touched 687 fields across 615 classes in 105 files — Asset
types, pricing options, webhook types, catalog types, all spec
discriminator fields.
Wire consumption unchanged. Literal type still rejects other values,
so validation strength preserved. Wire dicts with the tag present
validate as before; dicts without the tag now also work (fall through
to default) — useful for minimal clients.
Pattern-based, not value-specific: robust to spec-value churn as long
as the single-value-Literal discriminator shape holds.
Also pulls upstream PRs adcontextprotocol#250 (design doc) and adcontextprotocol#251 (A2A contextId,
+22 integration tests). All green.
+16 new tests in tests/test_literal_discriminator_defaults.py. Covers
Asset defaults, VAST/DAAST dual-discriminator defaults, validation-
strength preservation, wire-format compatibility, injector meta-tests
(skips multi-value literals, extracts single-value, skips non-Literal).
2102 passing (was 2086), mypy clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Message.context_identirely (v2.12.0 minted a fresh UUID per send; v3 sent nothing). Both broke multi-turn buyer flows that need server-side session continuity.task_idretention for non-terminal states (input-required,working,submitted,auth-required) so HITL / approval flows can resume the same server-side task instead of orphaning it.client.context_idand rehydrate a newADCPClientwithcontext_id=<saved>to land on the same session.Public API (A2A only)
Non-A2A callers: reads return
None(safe in generic code); writes raiseValueError— symmetric writes, lenient reads, explicitly documented in the property docstring.Design notes
ADK/<app>/<user>/<session>format and returns the rewritten id. The adapter auto-adopts whatever the server returns, so buyer-seeded ids survive rewriting transparently.Test coverage
tests/test_protocols.pycovering auto-retain, inject, reset, server rebinding, task_id retention in each of the non-terminal states, task_id clearing on terminal states, and non-A2A rejection.tests/integration/test_a2a_context_id.pythat spin up a real a2a-sdk Starlette app under uvicorn and prove the contract end-to-end, including the HITLinput-required→ resume path.Test plan
ruff checkcleanmypy src/adcp/cleanpytest tests/test_protocols.py tests/integration/test_a2a_context_id.py— 53 passedpytest tests/— 2022 passed (excluding two pre-broken files missingasgi_lifespan, unrelated)test-agent.adcontextprotocol.org(blocked: that agent's/.well-known/agent-card.jsonreturns 404, also breaks the existing reference-agent A2A tests — separate issue)🤖 Generated with Claude Code