diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index b05dd86259d9..5724f12a0c13 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -258,13 +258,20 @@ export namespace Plugin { } // Subscribe to bus events, fiber interrupted when scope closes + // Run plugin event handlers in parallel so slow handlers don't block the main event stream yield* bus.subscribeAll().pipe( Stream.runForEach((input) => - Effect.sync(() => { - for (const hook of hooks) { - hook["event"]?.({ event: input as any }) - } - }), + Effect.forEach( + hooks, + (hook) => + Effect.tryPromise({ + try: () => Promise.resolve(hook["event"]?.({ event: input as any })), + catch: (err) => { + log.error("plugin event handler failed", { error: err }) + }, + }).pipe(Effect.ignore), + { concurrency: "unbounded", discard: true }, + ), ), Effect.forkScoped, ) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index b632a61a18e8..0852f1a82426 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -46,12 +46,14 @@ export namespace SessionProcessor { interface ProcessorContext extends Input { toolcalls: Record + toolCallHistory: MessageV2.ToolPart[] shouldBreak: boolean snapshot: string | undefined blocked: boolean needsCompaction: boolean currentText: MessageV2.TextPart | undefined reasoningMap: Record + stepCount: number } type StreamEvent = Event @@ -89,12 +91,14 @@ export namespace SessionProcessor { sessionID: input.sessionID, model: input.model, toolcalls: {}, + toolCallHistory: [], shouldBreak: false, snapshot: undefined, blocked: false, needsCompaction: false, currentText: undefined, reasoningMap: {}, + stepCount: 0, } let aborted = false @@ -180,12 +184,13 @@ export namespace SessionProcessor { metadata: value.providerMetadata, } satisfies MessageV2.ToolPart) - const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id)) - const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD) + // Track tool call history in-memory for doom loop detection + ctx.toolCallHistory.push(ctx.toolcalls[value.toolCallId]) + const recentCalls = ctx.toolCallHistory.slice(-DOOM_LOOP_THRESHOLD) if ( - recentParts.length !== DOOM_LOOP_THRESHOLD || - !recentParts.every( + recentCalls.length !== DOOM_LOOP_THRESHOLD || + !recentCalls.every( (part) => part.type === "tool" && part.tool === value.toolName && @@ -294,12 +299,15 @@ export namespace SessionProcessor { } ctx.snapshot = undefined } - yield* Effect.promise(() => - SessionSummary.summarize({ - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.parentID, - }), - ).pipe(Effect.ignoreCause({ log: true, message: "session summary failed" }), Effect.forkDetach) + ctx.stepCount++ + if (ctx.stepCount === 1) { + yield* Effect.promise(() => + SessionSummary.summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }), + ).pipe(Effect.ignoreCause({ log: true, message: "session summary failed" }), Effect.forkDetach) + } if ( !ctx.assistantMessage.summary && isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })