Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/web/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ COPY --from=builder --chown=nextjs:nodejs /app/apps/web/.next/static ./apps/web/
COPY --from=builder --chown=nextjs:nodejs /app/apps/web/public ./apps/web/public
COPY --from=builder --chown=nextjs:nodejs /app/packages/database/migrations ./apps/web/migrations

RUN chown nextjs:nodejs /app

USER nextjs

Expand Down
152 changes: 110 additions & 42 deletions apps/web/__tests__/unit/video-processing.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { Effect } from "effect";
import { beforeEach, describe, expect, it, vi } from "vitest";

const updateWhereMock = vi.fn();
const selectWhereMock = vi.fn();
const startMock = vi.fn();
const runPromiseMock = vi.fn();
const fetchMock = vi.fn();

const dbMock = vi.fn(() => ({
update: vi.fn(() => ({
Expand All @@ -23,20 +25,49 @@ vi.mock("@cap/database", () => ({

vi.mock("server-only", () => ({}));

vi.mock("workflow/api", () => ({
start: startMock,
const serverEnvMock = vi.fn(() => ({
MEDIA_SERVER_URL: "http://media-server:3000",
MEDIA_SERVER_WEBHOOK_SECRET: undefined as string | undefined,
MEDIA_SERVER_WEBHOOK_URL: undefined as string | undefined,
WEB_URL: "http://localhost:3000",
}));

vi.mock("@/workflows/process-video", () => ({
processVideoWorkflow: Symbol("processVideoWorkflow"),
vi.mock("@cap/env", () => ({
serverEnv: serverEnvMock,
}));

const mockBucket = {
getInternalSignedObjectUrl: vi.fn(),
getInternalPresignedPutUrl: vi.fn(),
};

const getBucketAccessMock = vi.fn();

vi.mock("@cap/web-backend", () => ({
S3Buckets: {
getBucketAccess: getBucketAccessMock,
},
}));

vi.mock("@/lib/server", () => ({
runPromise: runPromiseMock,
}));

describe("video processing starts", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.stubGlobal("fetch", fetchMock);

getBucketAccessMock.mockReturnValue(Effect.succeed([mockBucket, null]));
mockBucket.getInternalSignedObjectUrl.mockReturnValue(
Effect.succeed("https://signed-url"),
);
mockBucket.getInternalPresignedPutUrl.mockReturnValue(
Effect.succeed("https://presigned-url"),
);
});

it("does not start a duplicate workflow when processing is already running", async () => {
it("does not start processing when already running", async () => {
updateWhereMock.mockResolvedValueOnce({ affectedRows: 0 });
selectWhereMock.mockResolvedValueOnce([
{
Expand All @@ -46,12 +77,12 @@ describe("video processing starts", () => {
},
]);

const { startVideoProcessingWorkflow } = await import(
const { startVideoProcessingDirect } = await import(
"@/lib/video-processing"
);

await expect(
startVideoProcessingWorkflow({
startVideoProcessingDirect({
videoId: "video-123" as never,
userId: "user-123",
rawFileKey: "user-123/video-123/raw-upload.webm",
Expand All @@ -61,42 +92,58 @@ describe("video processing starts", () => {
}),
).resolves.toBe("already-processing");

expect(startMock).not.toHaveBeenCalled();
expect(fetchMock).not.toHaveBeenCalled();
expect(runPromiseMock).not.toHaveBeenCalled();
});

it("starts the workflow after claiming the upload row", async () => {
updateWhereMock.mockResolvedValueOnce({ affectedRows: 1 });
startMock.mockResolvedValueOnce(undefined);
it("throws and marks error when MEDIA_SERVER_URL is not configured", async () => {
serverEnvMock.mockReturnValue({
MEDIA_SERVER_URL: undefined,
MEDIA_SERVER_WEBHOOK_SECRET: undefined,
MEDIA_SERVER_WEBHOOK_URL: undefined,
WEB_URL: "http://localhost:3000",
});
updateWhereMock
.mockResolvedValueOnce({ affectedRows: 1 })
.mockResolvedValueOnce({ affectedRows: 1 });

const { startVideoProcessingWorkflow } = await import(
const { startVideoProcessingDirect } = await import(
"@/lib/video-processing"
);

await expect(
startVideoProcessingWorkflow({
startVideoProcessingDirect({
videoId: "video-123" as never,
userId: "user-123",
rawFileKey: "user-123/video-123/raw-upload.webm",
bucketId: null,
processingMessage: "Starting video processing...",
startFailureMessage: "Video processing could not start.",
mode: "multipart",
startFailureMessage: "MEDIA_SERVER_URL not configured.",
}),
).resolves.toBe("started");
).rejects.toThrow("MEDIA_SERVER_URL is not configured");

expect(startMock).toHaveBeenCalledTimes(1);
expect(updateWhereMock).toHaveBeenCalledTimes(2);
expect(fetchMock).not.toHaveBeenCalled();
});

it("starts the workflow when mysql returns affectedRows in the first tuple slot", async () => {
updateWhereMock.mockResolvedValueOnce([{ affectedRows: 1 }]);
startMock.mockResolvedValueOnce(undefined);

const { startVideoProcessingWorkflow } = await import(
it("calls media server and returns started when processing succeeds", async () => {
updateWhereMock.mockResolvedValueOnce({ affectedRows: 1 });
runPromiseMock
.mockResolvedValueOnce([mockBucket, null])
.mockResolvedValueOnce("https://raw-signed-url")
.mockResolvedValueOnce("https://output-presigned-url")
.mockResolvedValueOnce("https://thumbnail-presigned-url");
fetchMock.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ jobId: "job-abc" }),
});

const { startVideoProcessingDirect } = await import(
"@/lib/video-processing"
);

await expect(
startVideoProcessingWorkflow({
startVideoProcessingDirect({
videoId: "video-123" as never,
userId: "user-123",
rawFileKey: "user-123/video-123/raw-upload.webm",
Expand All @@ -107,54 +154,75 @@ describe("video processing starts", () => {
}),
).resolves.toBe("started");

expect(startMock).toHaveBeenCalledTimes(1);
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock).toHaveBeenCalledWith(
"http://media-server:3000/video/process",
expect.objectContaining({ method: "POST" }),
);
});

it("force restarts a stale processing row", async () => {
updateWhereMock.mockResolvedValueOnce({ affectedRows: 1 });
startMock.mockResolvedValueOnce(undefined);

const { startVideoProcessingWorkflow } = await import(
it("works when mysql returns affectedRows in the first tuple slot", async () => {
updateWhereMock.mockResolvedValueOnce([{ affectedRows: 1 }]);
runPromiseMock
.mockResolvedValueOnce([mockBucket, null])
.mockResolvedValueOnce("https://raw-signed-url")
.mockResolvedValueOnce("https://output-presigned-url")
.mockResolvedValueOnce("https://thumbnail-presigned-url");
fetchMock.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ jobId: "job-xyz" }),
});

const { startVideoProcessingDirect } = await import(
"@/lib/video-processing"
);

await expect(
startVideoProcessingWorkflow({
startVideoProcessingDirect({
videoId: "video-123" as never,
userId: "user-123",
rawFileKey: "user-123/video-123/raw-upload.webm",
bucketId: null,
processingMessage: "Retrying video processing...",
startFailureMessage: "Video processing could not restart.",
forceRestart: true,
processingMessage: "Starting video processing...",
startFailureMessage: "Video processing could not start.",
mode: "multipart",
}),
).resolves.toBe("started");

expect(startMock).toHaveBeenCalledTimes(1);
expect(fetchMock).toHaveBeenCalledTimes(1);
});

it("marks the upload as errored when workflow start fails", async () => {
it("marks the upload as errored when media server call fails", async () => {
updateWhereMock
.mockResolvedValueOnce({ affectedRows: 1 })
.mockResolvedValueOnce({ affectedRows: 1 });
startMock.mockRejectedValueOnce(new Error("temporary failure"));

const { startVideoProcessingWorkflow } = await import(
runPromiseMock
.mockResolvedValueOnce([mockBucket, null])
.mockResolvedValueOnce("https://raw-signed-url")
.mockResolvedValueOnce("https://output-presigned-url")
.mockResolvedValueOnce("https://thumbnail-presigned-url");
fetchMock.mockResolvedValueOnce({
ok: false,
status: 500,
json: () => Promise.resolve({ error: "media server error" }),
});

const { startVideoProcessingDirect } = await import(
"@/lib/video-processing"
);

await expect(
startVideoProcessingWorkflow({
startVideoProcessingDirect({
videoId: "video-123" as never,
userId: "user-123",
rawFileKey: "user-123/video-123/raw-upload.webm",
bucketId: null,
processingMessage: "Starting video processing...",
startFailureMessage: "Video processing could not start.",
}),
).rejects.toThrow("temporary failure");
).rejects.toThrow("media server error");

expect(startMock).toHaveBeenCalledTimes(1);
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(updateWhereMock).toHaveBeenCalledTimes(2);
});
});
4 changes: 2 additions & 2 deletions apps/web/actions/video/retry-processing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { videos, videoUploads } from "@cap/database/schema";
import type { Video } from "@cap/web-domain";
import { eq } from "drizzle-orm";
import {
startVideoProcessingWorkflow,
startVideoProcessingDirect,
type VideoProcessingStartStatus,
} from "@/lib/video-processing";

Expand Down Expand Up @@ -62,7 +62,7 @@ export async function retryVideoProcessing({
if (!upload) throw new Error("No upload record found");
if (!upload.rawFileKey) throw new Error("No raw file key found for retry");

const status = await startVideoProcessingWorkflow({
const status = await startVideoProcessingDirect({
videoId,
userId: user.id,
rawFileKey: upload.rawFileKey,
Expand Down
4 changes: 2 additions & 2 deletions apps/web/actions/video/trigger-processing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { getCurrentUser } from "@cap/database/auth/session";
import { videos } from "@cap/database/schema";
import type { Video } from "@cap/web-domain";
import { eq } from "drizzle-orm";
import { startVideoProcessingWorkflow } from "@/lib/video-processing";
import { startVideoProcessingDirect } from "@/lib/video-processing";

export async function triggerVideoProcessing({
videoId,
Expand All @@ -27,7 +27,7 @@ export async function triggerVideoProcessing({
if (!video) throw new Error("Video not found");
if (video.ownerId !== user.id) throw new Error("Unauthorized");

await startVideoProcessingWorkflow({
await startVideoProcessingDirect({
videoId,
userId: user.id,
rawFileKey,
Expand Down
5 changes: 4 additions & 1 deletion apps/web/actions/videos/get-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@ export async function getVideoStatus(

if (!video.transcriptionStatus && serverEnv().DEEPGRAM_API_KEY) {
const activeUpload = await db()
.select({ videoId: videoUploads.videoId })
.select({ videoId: videoUploads.videoId, phase: videoUploads.phase })
.from(videoUploads)
.where(eq(videoUploads.videoId, videoId))
.limit(1);

if (activeUpload.length > 0) {
console.log(
`[Get Status] Video ${videoId} has active upload (phase=${activeUpload[0]?.phase}), waiting before transcription`,
);
return {
transcriptionStatus: null,
aiGenerationStatus:
Expand Down
29 changes: 24 additions & 5 deletions apps/web/app/api/upload/[...route]/multipart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { Hono, type MiddlewareHandler } from "hono";
import { z } from "zod";
import { withAuth } from "@/app/api/utils";
import { runPromise } from "@/lib/server";
import { startVideoProcessingWorkflow } from "@/lib/video-processing";
import { startVideoProcessingDirect } from "@/lib/video-processing";
import { stringOrNumberOptional } from "@/utils/zod";
import {
getMultipartFileKey,
Expand Down Expand Up @@ -377,6 +377,9 @@ app.post(
);

if (isRawRecorderUpload(subpath)) {
console.log(
`[multipart] Raw recorder upload complete: videoId=${videoId} fileKey=${fileKey}`,
);
yield* db.use((db) =>
db
.update(Db.videos)
Expand All @@ -398,7 +401,7 @@ app.post(
);

const processingStarted = yield* Effect.tryPromise(() =>
startVideoProcessingWorkflow({
startVideoProcessingDirect({
videoId: Video.VideoId.make(videoId),
userId: user.id,
rawFileKey: fileKey,
Expand All @@ -409,10 +412,15 @@ app.post(
mode: "multipart",
}),
).pipe(
Effect.map(() => true),
Effect.map(() => {
console.log(
`[multipart] Processing job started for raw upload ${videoId}`,
);
return true;
}),
Effect.catchAll((error) =>
Effect.logError(
"Failed to start video processing workflow after raw upload completion",
"Failed to start video processing after raw upload completion",
error,
).pipe(Effect.map(() => false)),
),
Expand Down Expand Up @@ -480,6 +488,10 @@ app.post(
),
);

console.log(
`[multipart] Deleted videoUploads for webMP4 ${videoId} (transcription now unblocked)`,
);

const mediaServerUrl = serverEnv().MEDIA_SERVER_URL;
if (video.source.type === "webMP4" && mediaServerUrl) {
const inputUrl = yield* bucket.getInternalSignedObjectUrl(fileKey);
Expand All @@ -497,11 +509,18 @@ app.post(

yield* Effect.tryPromise({
try: async () => {
const webhookSecret = serverEnv().MEDIA_SERVER_WEBHOOK_SECRET;
const remuxHeaders: Record<string, string> = {
"Content-Type": "application/json",
};
if (webhookSecret) {
remuxHeaders["x-media-server-secret"] = webhookSecret;
}
const response = await fetch(
`${mediaServerUrl}/video/process`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
headers: remuxHeaders,
body: JSON.stringify({
videoId,
userId: user.id,
Expand Down
Loading
Loading