From 5cef97b1cfc80f9fbb3f362e10805b6c00d2f83a Mon Sep 17 00:00:00 2001 From: yumakakuya Date: Mon, 25 May 2026 19:51:57 +0900 Subject: [PATCH] fix(session): classify ReadableStream locked as retryable and add stream trace types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extend OPENAI_TRANSIENT_PATTERNS with ReadableStream locked / Invalid state / ERR_INVALID_STATE patterns - Check APICallError cause chain in isOpenAiGatewayTransient so stream-level TypeErrors buried in cause are classified as retryable - Add fromError path for raw Error/TypeError with ReadableStream locked → retryable APIError 'Provider stream connection lost' - Add StreamTrace type (session/stream-log.ts) for structured stream lifecycle observability — types only, integration in next WAVE - Tests: +3 (error classification ×2, fromError conversion ×1) --- packages/opencode/src/provider/error.ts | 7 +- packages/opencode/src/session/message-v2.ts | 9 ++ packages/opencode/src/session/stream-log.ts | 92 +++++++++++++++++++ packages/opencode/test/provider/error.test.ts | 29 ++++++ packages/opencode/test/session/retry.test.ts | 7 ++ 5 files changed, 143 insertions(+), 1 deletion(-) create mode 100644 packages/opencode/src/session/stream-log.ts diff --git a/packages/opencode/src/provider/error.ts b/packages/opencode/src/provider/error.ts index f06d732d1311..68a47dc4af1e 100644 --- a/packages/opencode/src/provider/error.ts +++ b/packages/opencode/src/provider/error.ts @@ -48,15 +48,20 @@ export namespace ProviderError { /reset reason:\s*overflow/i, // envoy/Istio buffer overflow — gateway, not context /stream was reset/i, /upstream reset/i, + /ReadableStream is locked/i, + /Invalid state/i, + /ERR_INVALID_STATE/i, ] function isOpenAiGatewayTransient(e: APICallError) { if (e.statusCode === 503) return true const body = typeof e.responseBody === "string" ? e.responseBody : "" const msg = e.message ?? "" + const causeMsg = e.cause instanceof Error ? e.cause.message : "" return ( OPENAI_TRANSIENT_PATTERNS.some((p) => p.test(msg)) || - OPENAI_TRANSIENT_PATTERNS.some((p) => p.test(body)) + OPENAI_TRANSIENT_PATTERNS.some((p) => p.test(body)) || + OPENAI_TRANSIENT_PATTERNS.some((p) => p.test(causeMsg)) ) } diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index b53ca529f4bb..315b4b875c8c 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -1041,6 +1041,15 @@ export namespace MessageV2 { }, { cause: e }, ).toObject() + case e instanceof Error && /ReadableStream is locked|Invalid state|ERR_INVALID_STATE/i.test(e.message): + return new MessageV2.APIError( + { + message: "Provider stream connection lost", + isRetryable: true, + metadata: { cause: e.message }, + }, + { cause: e }, + ).toObject() case e instanceof Error: return new NamedError.Unknown({ message: errorMessage(e) }, { cause: e }).toObject() default: diff --git a/packages/opencode/src/session/stream-log.ts b/packages/opencode/src/session/stream-log.ts new file mode 100644 index 000000000000..33b2ccefb603 --- /dev/null +++ b/packages/opencode/src/session/stream-log.ts @@ -0,0 +1,92 @@ +import { Log } from "@/util/log" +import type { ProviderID } from "@/provider/schema" + +export namespace StreamLog { + const log = Log.create({ service: "session.stream" }) + + export interface Boundary { + type: "text" | "tool_call" | "tool_result" | "finish" | "incomplete" | "error" + at: number + elapsed: number + detail?: string + } + + export interface Trace { + streamID: string + providerID: ProviderID + modelID: string + sessionID: string + startedAt: number + firstByteAt?: number + boundaries: Boundary[] + terminatedAt?: number + terminationReason?: string + retryCount: number + } + + export function start(input: { + providerID: ProviderID + modelID: string + sessionID: string + }): Trace { + const now = Date.now() + const streamID = `${input.sessionID}-${now.toString(36)}` + const trace: Trace = { + streamID, + providerID: input.providerID, + modelID: input.modelID, + sessionID: input.sessionID, + startedAt: now, + boundaries: [], + retryCount: 0, + } + log.info("stream started", { + streamID, + providerID: input.providerID, + modelID: input.modelID, + sessionID: input.sessionID, + }) + return trace + } + + export function firstByte(trace: Trace) { + trace.firstByteAt = Date.now() + log.info("stream first byte", { + streamID: trace.streamID, + elapsed: trace.firstByteAt - trace.startedAt, + }) + } + + export function boundary(trace: Trace, b: Omit) { + const entry: Boundary = { + ...b, + at: Date.now(), + elapsed: Date.now() - trace.startedAt, + } + trace.boundaries.push(entry) + log.info("stream boundary", { + streamID: trace.streamID, + ...entry, + }) + } + + export function end(trace: Trace, reason?: string) { + trace.terminatedAt = Date.now() + trace.terminationReason = reason ?? "completed" + log.info("stream ended", { + streamID: trace.streamID, + reason: trace.terminationReason, + elapsed: trace.terminatedAt - trace.startedAt, + boundaries: trace.boundaries.length, + }) + } + + export function retry(trace: Trace) { + trace.retryCount++ + log.info("stream retry", { + streamID: trace.streamID, + retryCount: trace.retryCount, + boundaries: trace.boundaries.length, + }) + } +} diff --git a/packages/opencode/test/provider/error.test.ts b/packages/opencode/test/provider/error.test.ts index cfd6994c25e6..d3a9378611a4 100644 --- a/packages/opencode/test/provider/error.test.ts +++ b/packages/opencode/test/provider/error.test.ts @@ -87,6 +87,35 @@ describe("ProviderError.parseAPICallError — OpenAI gateway transient errors", expect(result.message).not.toBe("Provider temporarily unavailable") }) + test("classifies ReadableStream is locked in message as retryable (statusCode 200)", () => { + const error = makeAPICallError({ + message: "Invalid state: ReadableStream is locked", + statusCode: 200, + isRetryable: false, + }) + const result = ProviderError.parseAPICallError({ providerID: openaiID, error }) + expect(result.type).toBe("api_error") + expect((result as any).isRetryable).toBe(true) + expect(result.message).toBe("Provider temporarily unavailable") + }) + + test("classifies ReadableStream is locked in cause as retryable", () => { + const streamErr = new TypeError("Invalid state: ReadableStream is locked") + const error = new APICallError({ + message: "Not Found", + url: "https://api.openai.com/v1/responses", + requestBodyValues: {}, + statusCode: 200, + responseHeaders: {}, + responseBody: "", + isRetryable: false, + cause: streamErr, + }) + const result = ProviderError.parseAPICallError({ providerID: openaiID, error }) + expect(result.type).toBe("api_error") + expect((result as any).isRetryable).toBe(true) + }) + test("classifies 502 with upstream overflow body as retryable api_error via body-pattern fallback", () => { // 502 does not hit the status===503 early return; must rely on body-pattern detection. const error = makeAPICallError({ diff --git a/packages/opencode/test/session/retry.test.ts b/packages/opencode/test/session/retry.test.ts index 747414280ad3..daa894af3460 100644 --- a/packages/opencode/test/session/retry.test.ts +++ b/packages/opencode/test/session/retry.test.ts @@ -338,4 +338,11 @@ describe("session.message-v2.fromError", () => { expect(result.data.isRetryable).toBe(false) expect(result.data.message).toContain("gpt-5.5") }) + + test("converts ReadableStream is locked raw Error to retryable APIError", () => { + const error = new TypeError("Invalid state: ReadableStream is locked") + const result = MessageV2.fromError(error, { providerID }) as MessageV2.APIError + expect(result.data.isRetryable).toBe(true) + expect(result.data.message).toBe("Provider stream connection lost") + }) })