Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,6 @@ IMPLEMENTATION_PLAN.md
IMPLEMENTATION_SUMMARY.md
TESTING_STATUS.md
IMPLEMENTATION_PLAN.md

# Claude Code harness scratch — scheduled-task lock, session state, etc.
.claude/
162 changes: 143 additions & 19 deletions src/adcp/protocols/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
HTTPX_AVAILABLE = False
HTTPStatusError = None # type: ignore[assignment, misc]

import json

from adcp import _idempotency
from adcp.exceptions import (
ADCPConnectionError,
Expand All @@ -51,6 +53,114 @@
from adcp.protocols.base import ProtocolAdapter
from adcp.types.core import DebugInfo, TaskResult, TaskStatus

# Spec-defined limits from docs/building/implementation/mcp-response-extraction.mdx
# and docs/building/implementation/transport-errors.mdx.
_MAX_TEXT_SIZE_BYTES = 1_048_576 # 1MB cap on text items before JSON.parse
_MAX_ERROR_SIZE_BYTES = 4096 # total adcp_error JSON-serialized size
_MAX_ERROR_CODE_LEN = 64


def _text_of(item: Any) -> str | None:
"""Return the text payload of an MCP content item, or None if not a text item."""
if isinstance(item, dict):
if item.get("type") != "text":
return None
text = item.get("text")
else:
if getattr(item, "type", None) != "text":
return None
text = getattr(item, "text", None)
return text if isinstance(text, str) and text else None


def extract_adcp_success(result: Any) -> dict[str, Any] | None:
"""Extract AdCP success response data from an MCP tool result.

Implements the normative algorithm from AdCP spec §MCP Response Extraction
(docs/building/implementation/mcp-response-extraction.mdx):

1. If ``isError`` is truthy, return ``None`` — error extraction is a
separate path.
2. ``structuredContent`` — if present and a non-array object that is NOT
an ``adcp_error``-only payload, return it.
3. Text fallback — iterate ``content[]`` in order; for each ``type='text'``
item within the 1MB size limit, ``json.loads`` and return the result
if it is a non-array object that is NOT ``adcp_error``-only.
4. No structured data found — return ``None``.
"""
if getattr(result, "isError", False):
return None

sc = getattr(result, "structuredContent", None)
if isinstance(sc, dict) and not (len(sc) == 1 and "adcp_error" in sc):
return sc

for item in getattr(result, "content", None) or []:
text = _text_of(item)
if text is None or len(text) > _MAX_TEXT_SIZE_BYTES:
continue
try:
parsed = json.loads(text)
except (json.JSONDecodeError, ValueError):
continue
if (
isinstance(parsed, dict)
and not (len(parsed) == 1 and "adcp_error" in parsed)
):
return parsed
return None


def _validate_adcp_error(err: Any) -> dict[str, Any] | None:
"""Per transport-errors.mdx: ``code`` must be a non-empty string ≤ 64 chars,
total serialized size ≤ 4KB. Returns the validated error or None."""
if not isinstance(err, dict):
return None
code = err.get("code")
if not isinstance(code, str) or not (0 < len(code) <= _MAX_ERROR_CODE_LEN):
return None
try:
if len(json.dumps(err)) > _MAX_ERROR_SIZE_BYTES:
return None
except (TypeError, ValueError):
return None
return err


def extract_adcp_error(result: Any) -> dict[str, Any] | None:
"""Extract and validate an AdCP ``adcp_error`` object from an MCP result.

Implements AdCP spec §Client Detection Order (MCP paths 1 + 5) from
docs/building/implementation/transport-errors.mdx. Only applies when
``isError`` is truthy. Returns a validated error object or ``None``.
"""
if not getattr(result, "isError", False):
return None

sc = getattr(result, "structuredContent", None)
if isinstance(sc, dict):
validated = _validate_adcp_error(sc.get("adcp_error"))
if validated is not None:
return validated

for item in getattr(result, "content", None) or []:
text = _text_of(item)
# Apply the same 1MB pre-parse cap as the success path to prevent a
# malicious server returning ``isError=true`` plus a giant payload from
# forcing a multi-MB json.loads into memory before the 4KB validation
# would reject it.
if text is None or len(text) > _MAX_TEXT_SIZE_BYTES:
continue
try:
parsed = json.loads(text)
except (json.JSONDecodeError, ValueError):
continue
if isinstance(parsed, dict):
validated = _validate_adcp_error(parsed.get("adcp_error"))
if validated is not None:
return validated
return None


class MCPAdapter(ProtocolAdapter):
"""Adapter for MCP protocol using official Python MCP SDK."""
Expand Down Expand Up @@ -350,28 +460,41 @@ async def _call_mcp_tool(self, tool_name: str, params: dict[str, Any]) -> TaskRe
message_text = item["text"]
break

# Handle error responses
# Handle error responses per transport-errors.mdx §Client Detection
# Order. Extract the adcp_error object from structuredContent first,
# then from text fallback — whichever is present.
if is_error:
# For error responses, structuredContent is optional
# Use the error message from content as the error
error_message = message_text or "Tool execution failed"
structured_error = getattr(result, "structuredContent", None)
# Prefer structured error codes when present, then fall back to
# scanning the text content — many MCP servers (FastMCP default)
# return is_error=true with only a text body carrying the code.
_idempotency.raise_for_idempotency_error(
tool_name, structured_error, self.agent_config.id
)
adcp_error = extract_adcp_error(result)
# Raise typed idempotency exceptions before building a generic
# TaskResult(failed), so callers that catch them distinctly
# don't lose the signal.
if adcp_error and adcp_error.get("code") in (
"IDEMPOTENCY_CONFLICT",
"IDEMPOTENCY_EXPIRED",
):
from adcp.exceptions import classify_task_error

raise classify_task_error(
tool_name, [adcp_error], agent_id=self.agent_config.id
)
# FastMCP-style is_error with plain-text content: text-match
# fallback for the two idempotency codes.
_idempotency.raise_for_idempotency_text(
tool_name, message_text, self.agent_config.id
)
error_message = (
(adcp_error.get("message") if adcp_error else None)
or message_text
or "Tool execution failed"
)
if self.agent_config.debug and start_time:
duration_ms = (time.time() - start_time) * 1000
debug_info = DebugInfo(
request=debug_request,
response={
"error": error_message,
"is_error": True,
"adcp_error": adcp_error,
},
duration_ms=duration_ms,
)
Expand All @@ -383,18 +506,19 @@ async def _call_mcp_tool(self, tool_name: str, params: dict[str, Any]) -> TaskRe
idempotency_key=idempotency_key,
)

# For successful responses, structuredContent is required
if not hasattr(result, "structuredContent") or result.structuredContent is None:
# Success extraction per mcp-response-extraction.mdx §Extraction
# Algorithm: prefer structuredContent (MCP 2025-03-26+), fall back
# to JSON-parsing content[].text for older servers (including the
# AdCP reference training agent).
data_to_return = extract_adcp_success(result)
if data_to_return is None:
raise ValueError(
f"MCP tool {tool_name} did not return structuredContent. "
f"This SDK requires MCP tools to provide structured responses "
f"for successful calls. "
f"MCP tool {tool_name} returned no structured AdCP data. "
f"Neither structuredContent nor content[].text yielded a "
f"parseable non-adcp_error JSON object. "
f"Got content: {result.content if hasattr(result, 'content') else 'none'}"
)

# Extract the structured data (required for success)
data_to_return = result.structuredContent

if self.agent_config.debug and start_time:
duration_ms = (time.time() - start_time) * 1000
debug_info = DebugInfo(
Expand Down
Loading
Loading