diff --git a/packages/agent/src/adapters/claude/claude-agent.refresh.test.ts b/packages/agent/src/adapters/claude/claude-agent.refresh.test.ts index 3f6b26002..a70aab87d 100644 --- a/packages/agent/src/adapters/claude/claude-agent.refresh.test.ts +++ b/packages/agent/src/adapters/claude/claude-agent.refresh.test.ts @@ -1,4 +1,7 @@ -import type { AgentSideConnection } from "@agentclientprotocol/sdk"; +import { + type AgentSideConnection, + RequestError, +} from "@agentclientprotocol/sdk"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { POSTHOG_METHODS } from "../../acp-extensions"; import { Pushable } from "../../utils/streams"; @@ -15,6 +18,7 @@ type SdkQueryHandle = { setMcpServers: ReturnType; supportedCommands: ReturnType; initializationResult: ReturnType; + close: ReturnType; [Symbol.asyncIterator]: () => AsyncIterator; }; @@ -31,6 +35,7 @@ function makeQueryHandle(): SdkQueryHandle { setMcpServers: vi.fn().mockResolvedValue(undefined), supportedCommands: vi.fn().mockResolvedValue([]), initializationResult: vi.fn().mockImplementation(() => nextInitPromise), + close: vi.fn(), [Symbol.asyncIterator]: async function* () { /* never yields */ } as never, @@ -192,7 +197,7 @@ describe("ClaudeAcpAgent.extMethod refresh_session", () => { ).rejects.toThrow(/does not support MCP injection/); }); - it("throws when initialization of the new query times out", async () => { + it("throws a RequestError and closes the timed-out query so it cannot leak", async () => { vi.useFakeTimers(); try { const agent = makeAgent(); @@ -209,9 +214,12 @@ describe("ClaudeAcpAgent.extMethod refresh_session", () => { await vi.advanceTimersByTimeAsync(30_001); - await expect(promise).rejects.toThrow( - /Session refresh timed out for s-timeout/, - ); + // A RequestError (not a plain Error) is what survives the ACP layer + // instead of being collapsed into a generic "Internal error". + await expect(promise).rejects.toBeInstanceOf(RequestError); + await expect(promise).rejects.toThrow(/Session refresh timed out after/); + // The new query is closed so its CLI subprocess does not leak. + expect(createdQueries[0]?.close).toHaveBeenCalledTimes(1); } finally { vi.useRealTimers(); } diff --git a/packages/agent/src/adapters/claude/claude-agent.resume-model.test.ts b/packages/agent/src/adapters/claude/claude-agent.resume-model.test.ts index 65597c4b7..bec1885f8 100644 --- a/packages/agent/src/adapters/claude/claude-agent.resume-model.test.ts +++ b/packages/agent/src/adapters/claude/claude-agent.resume-model.test.ts @@ -10,20 +10,24 @@ type SdkQueryHandle = { setMcpServers: ReturnType; supportedCommands: ReturnType; initializationResult: ReturnType; + close: ReturnType; [Symbol.asyncIterator]: () => AsyncIterator; }; +let nextInitPromise: Promise = Promise.resolve({ + result: "success", + commands: [], + models: [], +}); + function makeQueryHandle(): SdkQueryHandle { return { interrupt: vi.fn().mockResolvedValue(undefined), setModel: vi.fn().mockResolvedValue(undefined), setMcpServers: vi.fn().mockResolvedValue(undefined), supportedCommands: vi.fn().mockResolvedValue([]), - initializationResult: vi.fn().mockResolvedValue({ - result: "success", - commands: [], - models: [], - }), + initializationResult: vi.fn().mockImplementation(() => nextInitPromise), + close: vi.fn(), [Symbol.asyncIterator]: async function* () { /* never yields */ } as never, @@ -101,6 +105,11 @@ afterAll(() => { describe("ClaudeAcpAgent session model on resume", () => { beforeEach(() => { createdQueries.length = 0; + nextInitPromise = Promise.resolve({ + result: "success", + commands: [], + models: [], + }); // No gateway: fetchGatewayModels returns [] and the requested model is // kept as a custom option — mirrors the gateway-outage failure mode. delete process.env.ANTHROPIC_BASE_URL; @@ -157,4 +166,43 @@ describe("ClaudeAcpAgent session model on resume", () => { expect(createdQueries).toHaveLength(1); expect(createdQueries[0].setModel).not.toHaveBeenCalled(); }); + + // The timeout *message* (RequestError "... timed out after ...") is covered + // by claude-agent.refresh.test.ts. Here we cover the leak fix on the + // new-session and resume paths: any init failure must close the query so the + // CLI subprocess can't leak and be multiplied by the retry loop. + it("closes the query and rethrows when new-session init fails", async () => { + const failedInit = Promise.reject(new Error("init boom")); + failedInit.catch(() => {}); + nextInitPromise = failedInit; + const agent = makeAgent(); + + await expect( + agent.newSession({ + cwd, + mcpServers: [], + _meta: { taskRunId: "run-init-fail-new" }, + }), + ).rejects.toThrow(/init boom/); + + expect(createdQueries[0]?.close).toHaveBeenCalledTimes(1); + }); + + it("closes the query and rethrows when resume init fails", async () => { + const failedInit = Promise.reject(new Error("resume boom")); + failedInit.catch(() => {}); + nextInitPromise = failedInit; + const agent = makeAgent(); + + await expect( + agent.resumeSession({ + sessionId: "0197a000-0000-7000-8000-0000000000ff", + cwd, + mcpServers: [], + _meta: { taskRunId: "run-init-fail-resume" }, + }), + ).rejects.toThrow(/resume boom/); + + expect(createdQueries[0]?.close).toHaveBeenCalledTimes(1); + }); }); diff --git a/packages/agent/src/adapters/claude/claude-agent.ts b/packages/agent/src/adapters/claude/claude-agent.ts index 6b1a5291b..8df465ae2 100644 --- a/packages/agent/src/adapters/claude/claude-agent.ts +++ b/packages/agent/src/adapters/claude/claude-agent.ts @@ -1286,7 +1286,12 @@ export class ClaudeAcpAgent extends BaseAcpAgent { SESSION_VALIDATION_TIMEOUT_MS, ); if (result.result === "timeout") { - throw new Error(`Session refresh timed out for ${this.sessionId}`); + this.terminateQuery(newQuery, newAbortController); + throw new RequestError( + -32603, + `Session refresh timed out after ${SESSION_VALIDATION_TIMEOUT_MS}ms`, + { sessionId: this.sessionId }, + ); } // Re-fetch MCP tool metadata + slash commands — the server list changed. @@ -1467,6 +1472,20 @@ export class ClaudeAcpAgent extends BaseAcpAgent { } } + /** + * Without this, a timed-out session leaks an orphaned `claude` process that + * the retry loop then multiplies. Aborting the controller kills the + * subprocess via the spawn signal; closing the query stops further reads. + */ + private terminateQuery(sdkQuery: Query, controller: AbortController): void { + controller.abort(); + try { + sdkQuery.close(); + } catch { + // Query may already be closed. + } + } + private async createSession( params: { cwd: string; @@ -1655,8 +1674,10 @@ export class ClaudeAcpAgent extends BaseAcpAgent { SESSION_VALIDATION_TIMEOUT_MS, ); if (result.result === "timeout") { - throw new Error( - `Session ${forkSession ? "fork" : "resumption"} timed out for sessionId=${sessionId}`, + throw new RequestError( + -32603, + `Session ${forkSession ? "fork" : "resumption"} timed out after ${SESSION_VALIDATION_TIMEOUT_MS}ms`, + { sessionId, taskId, taskRunId: meta?.taskRunId }, ); } session.knownSlashCommands = collectKnownSlashCommands( @@ -1664,6 +1685,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent { ); } catch (err) { settingsManager.dispose(); + this.terminateQuery(q, abortController); if ( err instanceof Error && err.message === "Query closed before response received" @@ -1718,9 +1740,10 @@ export class ClaudeAcpAgent extends BaseAcpAgent { try { const initResult = await initPromise; if (initResult.result === "timeout") { - settingsManager.dispose(); - throw new Error( - `Session initialization timed out for sessionId=${sessionId}`, + throw new RequestError( + -32603, + `Session initialization timed out after ${SESSION_VALIDATION_TIMEOUT_MS}ms`, + { sessionId, taskId, taskRunId: meta?.taskRunId }, ); } session.knownSlashCommands = collectKnownSlashCommands( @@ -1728,6 +1751,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent { ); } catch (err) { settingsManager.dispose(); + this.terminateQuery(q, abortController); this.logger.error("Session initialization failed", { sessionId, taskId, diff --git a/packages/agent/src/gateway-models.test.ts b/packages/agent/src/gateway-models.test.ts index 37f354807..ee83e138b 100644 --- a/packages/agent/src/gateway-models.test.ts +++ b/packages/agent/src/gateway-models.test.ts @@ -1,5 +1,7 @@ -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { + fetchGatewayModels, + fetchModelsList, formatGatewayModelName, getClaudeModelRecency, isBlockedModelId, @@ -109,3 +111,34 @@ describe("getClaudeModelRecency", () => { ]); }); }); + +describe("gateway model fetch timeout", () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + // Both fetches run inside the Promise.all that gates session-init, so a + // stalled gateway must degrade to "no models" rather than hang. + it.each([ + { name: "fetchGatewayModels", fn: fetchGatewayModels }, + { name: "fetchModelsList", fn: fetchModelsList }, + ])( + "$name bounds the request and returns [] when it times out", + async ({ fn }) => { + // Reject the way AbortSignal.timeout would once the deadline passes. + const fetchMock = vi + .spyOn(globalThis, "fetch") + .mockRejectedValue( + new DOMException("The operation was aborted.", "TimeoutError"), + ); + + await expect( + fn({ gatewayUrl: "https://gateway.timeout-test" }), + ).resolves.toEqual([]); + + expect(fetchMock).toHaveBeenCalledTimes(1); + const init = fetchMock.mock.calls[0]?.[1] as RequestInit | undefined; + expect(init?.signal).toBeInstanceOf(AbortSignal); + }, + ); +}); diff --git a/packages/agent/src/gateway-models.ts b/packages/agent/src/gateway-models.ts index 82accd258..cacc55e51 100644 --- a/packages/agent/src/gateway-models.ts +++ b/packages/agent/src/gateway-models.ts @@ -51,6 +51,12 @@ type ModelsListResponse = const CACHE_TTL = 10 * 60 * 1000; // 10 minutes +// Bound the gateway /v1/models request so a stalled connection cannot hold up +// session init: this fetch runs inside the Promise.all that gates the 30s SDK +// initialization timeout, so it must resolve well within that window. On abort +// the callers fall through to `return []`. +const GATEWAY_FETCH_TIMEOUT_MS = 10_000; + let gatewayModelsCache: { models: GatewayModel[]; expiry: number; @@ -76,7 +82,9 @@ export async function fetchGatewayModels( const modelsUrl = `${gatewayUrl}/v1/models`; try { - const response = await fetch(modelsUrl); + const response = await fetch(modelsUrl, { + signal: AbortSignal.timeout(GATEWAY_FETCH_TIMEOUT_MS), + }); if (!response.ok) { return []; @@ -138,7 +146,9 @@ export async function fetchModelsList( try { const modelsUrl = `${gatewayUrl}/v1/models`; - const response = await fetch(modelsUrl); + const response = await fetch(modelsUrl, { + signal: AbortSignal.timeout(GATEWAY_FETCH_TIMEOUT_MS), + }); if (!response.ok) { return []; } diff --git a/packages/workspace-server/src/services/agent/agent.ts b/packages/workspace-server/src/services/agent/agent.ts index 7b1b59468..dc8f8d66c 100644 --- a/packages/workspace-server/src/services/agent/agent.ts +++ b/packages/workspace-server/src/services/agent/agent.ts @@ -921,10 +921,19 @@ When creating pull requests, add the following footer at the end of the PR descr ); return this.getOrCreateSession(config, isReconnect, true); } + // When the in-process ACP layer masks a thrown error as a generic + // "Internal error", the real text survives in `data.details`. Surface it + // here (host-side, before the tRPC boundary drops `data`) so the exported + // log names the actual cause. + const maskedDetail = (err as { data?: { details?: unknown } })?.data + ?.details; + const detailSuffix = + typeof maskedDetail === "string" && maskedDetail + ? `: ${maskedDetail}` + : ""; + const action = isReconnect ? "reconnect" : "create"; this.log.error( - `Failed to ${isReconnect ? "reconnect" : "create"} session${ - isRetry ? " after retry" : "" - }`, + `Failed to ${action} session${isRetry ? " after retry" : ""}${detailSuffix}`, err, ); // Non-auth reconnect failure on first attempt: fall back to a fresh session.