Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/sim/app/api/copilot/chat/update-messages/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { updateCopilotMessagesContract } from '@/lib/api/contracts/copilot'
import { parseRequest } from '@/lib/api/server'
import { getAccessibleCopilotChatAuth } from '@/lib/copilot/chat/lifecycle'
import { replaceCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message'
import {
authenticateCopilotRequestSessionOnly,
Expand Down Expand Up @@ -87,6 +88,7 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
}

await db.update(copilotChats).set(updateData).where(eq(copilotChats.id, chatId))
await replaceCopilotChatMessages(chatId, normalizedMessages)
Comment thread
waleedlatif1 marked this conversation as resolved.

logger.info(`[${tracker.requestId}] Successfully updated chat`, {
chatId,
Expand Down
3 changes: 3 additions & 0 deletions apps/sim/app/api/mothership/chats/[chatId]/fork/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { forkMothershipChatContract } from '@/lib/api/contracts/mothership-tasks'
import { parseRequest } from '@/lib/api/server'
import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'
import { fetchGo } from '@/lib/copilot/request/go/fetch'
import {
Expand Down Expand Up @@ -102,6 +103,8 @@ export const POST = withRouteHandler(
return createInternalServerErrorResponse('Failed to create forked chat')
}

await appendCopilotChatMessages(newId, forkedMessages, { chatModel: parent.model })

// Clone copilot-service conversation state (messages, active_messages, memory files).
// Best-effort: if the copilot service doesn't have a row for the source chat yet, skip.
try {
Expand Down
33 changes: 20 additions & 13 deletions apps/sim/app/api/superuser/import-workflow/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { importWorkflowAsSuperuserContract } from '@/lib/api/contracts/workflows'
import { parseRequest } from '@/lib/api/server'
import { getSession } from '@/lib/auth'
import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { verifyEffectiveSuperUser } from '@/lib/templates/permissions'
import { parseWorkflowJson } from '@/lib/workflows/operations/import-export'
Expand Down Expand Up @@ -172,19 +173,25 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
let copilotChatsImported = 0

for (const chat of sourceCopilotChats) {
await db.insert(copilotChats).values({
userId: session.user.id,
workflowId: newWorkflowId,
title: chat.title ? `[Import] ${chat.title}` : null,
messages: chat.messages,
model: chat.model,
conversationId: null, // Don't copy conversation ID
previewYaml: chat.previewYaml,
planArtifact: chat.planArtifact,
config: chat.config,
createdAt: new Date(),
updatedAt: new Date(),
})
const [imported] = await db
.insert(copilotChats)
.values({
userId: session.user.id,
workflowId: newWorkflowId,
title: chat.title ? `[Import] ${chat.title}` : null,
messages: chat.messages,
model: chat.model,
conversationId: null, // Don't copy conversation ID
previewYaml: chat.previewYaml,
planArtifact: chat.planArtifact,
config: chat.config,
createdAt: new Date(),
updatedAt: new Date(),
})
.returning({ id: copilotChats.id })
if (imported && Array.isArray(chat.messages) && chat.messages.length > 0) {
await appendCopilotChatMessages(imported.id, chat.messages, { chatModel: chat.model })
}
copilotChatsImported++
}

Expand Down
8 changes: 8 additions & 0 deletions apps/sim/instrumentation-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,12 @@ export async function register() {

const { startMemoryTelemetry } = await import('./lib/monitoring/memory-telemetry')
startMemoryTelemetry()

// Fire-and-forget catch-up sweep: bounded to the last 7 days, idempotent,
// runs in the background so server boot isn't blocked.
void import('./lib/copilot/chat/messages-catchup')
.then((mod) => mod.catchUpCopilotChatMessages())
.catch((err) => {
logger.warn('Failed to schedule copilot chat messages catch-up', err)
})
}
48 changes: 48 additions & 0 deletions apps/sim/lib/copilot/chat/messages-catchup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { db } from '@sim/db'
import { createLogger } from '@sim/logger'
import { getErrorMessage } from '@sim/utils/errors'
import { sql } from 'drizzle-orm'

const logger = createLogger('CopilotChatMessagesCatchup')

/**
* Sweep recently-active chats from `copilot_chats.messages` JSONB into the new
* `copilot_chat_messages` table. Idempotent via `ON CONFLICT DO NOTHING`.
*
* Bounded to the last 7 days of activity so the cost is bounded regardless of
* total table size. The migration handles initial backfill; this sweep only
* exists to close the rolling-deploy window where old code may write to JSONB
* before the new dual-write code is live on every server.
*/
export async function catchUpCopilotChatMessages(): Promise<void> {
try {
const result = await db.execute(sql`
INSERT INTO copilot_chat_messages (chat_id, message_id, role, content, model, created_at, updated_at)
SELECT
c.id,
COALESCE(msg.value->>'id', gen_random_uuid()::text),
Comment thread
waleedlatif1 marked this conversation as resolved.
COALESCE(msg.value->>'role', 'user'),
msg.value,
COALESCE(msg.value->>'model', c.model),
COALESCE(
NULLIF(msg.value->>'createdAt','')::timestamp,
c.created_at + (msg.ord * interval '1 microsecond')
),
COALESCE(
NULLIF(msg.value->>'createdAt','')::timestamp,
c.created_at + (msg.ord * interval '1 microsecond')
)
FROM copilot_chats c
CROSS JOIN LATERAL jsonb_array_elements(c.messages) WITH ORDINALITY AS msg(value, ord)
WHERE c.updated_at > now() - interval '7 days'
AND jsonb_typeof(c.messages) = 'array'
AND jsonb_array_length(c.messages) > 0
ON CONFLICT (chat_id, message_id) DO NOTHING
`)
logger.info('Copilot chat messages catch-up completed', {
rowCount: (result as { rowCount?: number }).rowCount ?? 0,
})
} catch (err) {
logger.warn('Copilot chat messages catch-up failed', { error: getErrorMessage(err) })
}
}
107 changes: 107 additions & 0 deletions apps/sim/lib/copilot/chat/messages-dual-write.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { db } from '@sim/db'
import { copilotChatMessages } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { getErrorMessage } from '@sim/utils/errors'
import { generateShortId } from '@sim/utils/id'
import { eq, sql } from 'drizzle-orm'

const logger = createLogger('CopilotChatMessagesDualWrite')

/**
* Build a row payload for `copilot_chat_messages` from a JSONB message blob.
* The blob format mirrors what's stored in the legacy `copilot_chats.messages`
* array — `id`, `role`, and optionally `model`/`createdAt` at the top level —
* with the entire blob preserved as the row's `content` for forward compat.
*/
function toMessageRow(
chatId: string,
rawMessage: unknown,
options?: { chatModel?: string | null; streamId?: string | null }
): typeof copilotChatMessages.$inferInsert | null {
if (!rawMessage || typeof rawMessage !== 'object') return null
const msg = rawMessage as Record<string, unknown>
const id = typeof msg.id === 'string' && msg.id.length > 0 ? msg.id : generateShortId()
const role = typeof msg.role === 'string' ? msg.role : 'user'
const model =
typeof msg.model === 'string' && msg.model.length > 0 ? msg.model : (options?.chatModel ?? null)
return {
chatId,
messageId: id,
role,
content: msg,
model,
streamId: options?.streamId ?? null,
}
Comment thread
waleedlatif1 marked this conversation as resolved.
}

/**
* Append messages to the new `copilot_chat_messages` table. Best-effort —
* errors are logged but never thrown, since the legacy `copilot_chats.messages`
* JSONB column remains the source of truth during the dual-write rollout.
*/
export async function appendCopilotChatMessages(
chatId: string,
messages: unknown[],
options?: { chatModel?: string | null; streamId?: string | null }
): Promise<void> {
if (!Array.isArray(messages) || messages.length === 0) return

const rows = messages
.map((msg) => toMessageRow(chatId, msg, options))
.filter((row): row is typeof copilotChatMessages.$inferInsert => row !== null)

if (rows.length === 0) return

try {
await db
.insert(copilotChatMessages)
.values(rows)
.onConflictDoUpdate({
target: [copilotChatMessages.chatId, copilotChatMessages.messageId],
set: {
content: sql`excluded.content`,
role: sql`excluded.role`,
// Preserve existing non-null model if the incoming row lacks one.
model: sql`COALESCE(excluded.model, ${copilotChatMessages.model})`,
updatedAt: sql`now()`,
},
Comment thread
waleedlatif1 marked this conversation as resolved.
})
} catch (err) {
logger.warn('Failed to append copilot chat messages', {
chatId,
messageCount: rows.length,
error: getErrorMessage(err),
})
}
}

/**
* Replace all messages for a chat. Used by the update-messages endpoint that
* receives a full snapshot of the conversation state. Best-effort.
*/
export async function replaceCopilotChatMessages(
chatId: string,
messages: unknown[],
options?: { chatModel?: string | null }
): Promise<void> {
if (!Array.isArray(messages)) return

const rows = messages
.map((msg) => toMessageRow(chatId, msg, options))
.filter((row): row is typeof copilotChatMessages.$inferInsert => row !== null)

try {
await db.transaction(async (tx) => {
await tx.delete(copilotChatMessages).where(eq(copilotChatMessages.chatId, chatId))
if (rows.length > 0) {
await tx.insert(copilotChatMessages).values(rows)
}
})
Comment thread
waleedlatif1 marked this conversation as resolved.
} catch (err) {
logger.warn('Failed to replace copilot chat messages', {
chatId,
messageCount: rows.length,
error: getErrorMessage(err),
})
}
}
5 changes: 5 additions & 0 deletions apps/sim/lib/copilot/chat/post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { z } from 'zod'
import { isZodError, validationErrorResponse } from '@/lib/api/server'
import { getSession } from '@/lib/auth'
import { type ChatLoadResult, resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle'
import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
import { buildCopilotRequestPayload } from '@/lib/copilot/chat/payload'
import {
buildPersistedAssistantMessage,
Expand Down Expand Up @@ -332,6 +333,10 @@ async function persistUserMessage(params: {
.where(eq(copilotChats.id, chatId))
.returning({ messages: copilotChats.messages })

if (updated) {
await appendCopilotChatMessages(chatId, [userMsg], { streamId: userMessageId })
}

const messagesAfter = Array.isArray(updated?.messages) ? updated.messages : undefined
span.setAttributes({
[TraceAttr.ChatPersistOutcome]: updated
Expand Down
9 changes: 9 additions & 0 deletions apps/sim/lib/copilot/chat/terminal-state.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { db } from '@sim/db'
import { copilotChats } from '@sim/db/schema'
import { and, eq, sql } from 'drizzle-orm'
import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'
import { CopilotChatFinalizeOutcome } from '@/lib/copilot/generated/trace-attribute-values-v1'
import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1'
Expand Down Expand Up @@ -47,6 +48,7 @@ export async function finalizeAssistantTurn({
[TraceAttr.ChatHasAssistantMessage]: !!assistantMessage,
},
async (span) => {
let appendedAssistantMessage: PersistedMessage | undefined
const result = await db.transaction(async (tx) => {
const where = userId
? and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId))
Expand Down Expand Up @@ -113,6 +115,7 @@ export async function finalizeAssistantTurn({
messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`,
})
.where(updateWhere)
appendedAssistantMessage = assistantMessage
return {
found: true,
updated: true,
Expand Down Expand Up @@ -148,6 +151,12 @@ export async function finalizeAssistantTurn({
}
})

if (appendedAssistantMessage) {
await appendCopilotChatMessages(chatId, [appendedAssistantMessage], {
streamId: userMessageId,
})
}

span.setAttribute(TraceAttr.ChatFinalizeOutcome, result.outcome)
return result
}
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/lib/mothership/inbox/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { getErrorMessage } from '@sim/utils/errors'
import { generateId } from '@sim/utils/id'
import { and, eq, sql } from 'drizzle-orm'
import { resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle'
import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
import { buildIntegrationToolSchemas } from '@/lib/copilot/chat/payload'
import {
buildPersistedAssistantMessage,
Expand Down Expand Up @@ -350,6 +351,7 @@ async function persistChatMessages(
updatedAt: new Date(),
})
.where(eq(copilotChats.id, chatId))
await appendCopilotChatMessages(chatId, [userMessage, assistantMessage])
} catch (err) {
logger.warn('Failed to persist chat messages', {
chatId,
Expand Down
42 changes: 42 additions & 0 deletions packages/db/migrations/0212_cynical_jack_murdock.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
CREATE TABLE "copilot_chat_messages" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
"chat_id" uuid NOT NULL,
"message_id" text NOT NULL,
"role" text NOT NULL,
"content" jsonb NOT NULL,
"stream_id" text,
"parent_message_id" text,
"model" text,
"tokens_in" integer,
"tokens_out" integer,
"deleted_at" timestamp,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
ALTER TABLE "copilot_chat_messages" ADD CONSTRAINT "copilot_chat_messages_chat_id_copilot_chats_id_fk" FOREIGN KEY ("chat_id") REFERENCES "public"."copilot_chats"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE UNIQUE INDEX "copilot_chat_messages_chat_message_unique" ON "copilot_chat_messages" USING btree ("chat_id","message_id");--> statement-breakpoint
CREATE INDEX "copilot_chat_messages_chat_created_at_idx" ON "copilot_chat_messages" USING btree ("chat_id","created_at","id") WHERE "copilot_chat_messages"."deleted_at" IS NULL;--> statement-breakpoint
CREATE INDEX "copilot_chat_messages_chat_stream_idx" ON "copilot_chat_messages" USING btree ("chat_id","stream_id") WHERE "copilot_chat_messages"."stream_id" IS NOT NULL;--> statement-breakpoint
INSERT INTO "copilot_chat_messages" (
"chat_id", "message_id", "role", "content", "model", "created_at", "updated_at"
)
SELECT
c."id",
COALESCE(msg.value->>'id', gen_random_uuid()::text),
COALESCE(msg.value->>'role', 'user'),
msg.value,
COALESCE(msg.value->>'model', c."model"),
COALESCE(
NULLIF(msg.value->>'createdAt','')::timestamp,
c."created_at" + (msg.ord * interval '1 microsecond')
),
COALESCE(
NULLIF(msg.value->>'createdAt','')::timestamp,
c."created_at" + (msg.ord * interval '1 microsecond')
)
FROM "copilot_chats" c
CROSS JOIN LATERAL jsonb_array_elements(c."messages") WITH ORDINALITY AS msg(value, ord)
WHERE jsonb_typeof(c."messages") = 'array'
AND jsonb_array_length(c."messages") > 0
ON CONFLICT ("chat_id", "message_id") DO NOTHING;
Loading
Loading