diff --git a/.server-changes/llm-pricing-registry-reload-channel.md b/.server-changes/llm-pricing-registry-reload-channel.md new file mode 100644 index 0000000000..ec1daad0a3 --- /dev/null +++ b/.server-changes/llm-pricing-registry-reload-channel.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +The LLM pricing registry now reloads from the database whenever a publish lands on `LLM_PRICING_RELOAD_CHANNEL` on the worker Redis, instead of waiting for the next 5-minute interval. LLM model and pricing changes reflect in cost enrichment within seconds. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 13e9e5dacb..6b58896429 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1424,6 +1424,14 @@ const EnvironmentSchema = z // LLM cost tracking LLM_COST_TRACKING_ENABLED: BoolEnv.default(true), LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes + LLM_PRICING_RELOAD_CHANNEL: z.string().default("llm-registry:reload"), + LLM_PRICING_RELOAD_DEBOUNCE_MS: z.coerce.number().int().default(1000), + // Whether to subscribe this process to the LLM_PRICING_RELOAD_CHANNEL. + // Default off — only OTel-ingesting services need real-time pricing + // freshness; dashboard/worker processes are fine on the existing + // 5-minute periodic reload. In multi-service deployments, set this to + // true on the span-ingesting services. + LLM_PRICING_RELOAD_PUBSUB_ENABLED: BoolEnv.default(false), LLM_PRICING_SEED_ON_STARTUP: BoolEnv.default(false), LLM_PRICING_READY_TIMEOUT_MS: z.coerce.number().int().default(500), LLM_METRICS_BATCH_SIZE: z.coerce.number().int().default(5000), diff --git a/apps/webapp/app/v3/llmPricingRegistry.server.ts b/apps/webapp/app/v3/llmPricingRegistry.server.ts index 2212c41779..eb186e1521 100644 --- a/apps/webapp/app/v3/llmPricingRegistry.server.ts +++ b/apps/webapp/app/v3/llmPricingRegistry.server.ts @@ -1,7 +1,9 @@ import { ModelPricingRegistry, seedLlmPricing } from "@internal/llm-model-catalog"; import { prisma, $replica } from "~/db.server"; import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; import { signalsEmitter } from "~/services/signals.server"; +import { createRedisClient } from "~/redis.server"; import { singleton } from "~/utils/singleton"; import { setLlmPricingRegistry } from "./utils/enrichCreatableEvents.server"; @@ -27,7 +29,7 @@ export const llmPricingRegistry = singleton("llmPricingRegistry", () => { console.error("Failed to initialize LLM pricing registry", err); }); - // Periodic reload + // Periodic reload (backstop for the pub/sub path below) const reloadInterval = env.LLM_PRICING_RELOAD_INTERVAL_MS; const interval = setInterval(() => { registry.reload().catch((err) => { @@ -35,12 +37,69 @@ export const llmPricingRegistry = singleton("llmPricingRegistry", () => { }); }, reloadInterval); - signalsEmitter.on("SIGTERM", () => { - clearInterval(interval); - }); - signalsEmitter.on("SIGINT", () => { - clearInterval(interval); - }); + // Pub/sub reload is opt-in per process (default off). Without it, the + // registry stays accurate via the existing 5-minute interval. Enable on + // the OTel-ingesting services where pricing freshness directly affects + // span cost enrichment; dashboard and worker services don't need it and + // shouldn't pile onto each publish with a full-table reload. + if (env.LLM_PRICING_RELOAD_PUBSUB_ENABLED) { + const subscriber = createRedisClient("llm-pricing:subscriber", { + keyPrefix: "llm-pricing:subscriber:", + host: env.COMMON_WORKER_REDIS_HOST, + port: env.COMMON_WORKER_REDIS_PORT, + username: env.COMMON_WORKER_REDIS_USERNAME, + password: env.COMMON_WORKER_REDIS_PASSWORD, + tlsDisabled: env.COMMON_WORKER_REDIS_TLS_DISABLED === "true", + clusterMode: env.COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED === "1", + }); + + subscriber.subscribe(env.LLM_PRICING_RELOAD_CHANNEL).catch((err) => { + logger.warn("Failed to subscribe to LLM pricing reload channel", { + channel: env.LLM_PRICING_RELOAD_CHANNEL, + error: err instanceof Error ? err.message : String(err), + }); + }); + + // Coalesce reload calls so a burst of publishes only triggers one + // reload. The first publish schedules a reload at + // T+LLM_PRICING_RELOAD_DEBOUNCE_MS; subsequent publishes during that + // window are no-ops because the trailing reload picks up everything + // when it queries the DB. Bounds reload rate to at most 1 per debounce + // window regardless of publisher chattiness. + const debounceMs = env.LLM_PRICING_RELOAD_DEBOUNCE_MS; + let pendingReloadTimer: NodeJS.Timeout | null = null; + + function scheduleReload() { + if (pendingReloadTimer) return; + pendingReloadTimer = setTimeout(() => { + pendingReloadTimer = null; + registry.reload().catch((err) => { + logger.warn("Failed to reload LLM pricing registry from pub/sub", { + error: err instanceof Error ? err.message : String(err), + }); + }); + }, debounceMs); + } + + subscriber.on("message", (channel) => { + if (channel !== env.LLM_PRICING_RELOAD_CHANNEL) return; + scheduleReload(); + }); + + signalsEmitter.on("SIGTERM", () => { + clearInterval(interval); + if (pendingReloadTimer) clearTimeout(pendingReloadTimer); + void subscriber.quit().catch(() => {}); + }); + signalsEmitter.on("SIGINT", () => { + clearInterval(interval); + if (pendingReloadTimer) clearTimeout(pendingReloadTimer); + void subscriber.quit().catch(() => {}); + }); + } else { + signalsEmitter.on("SIGTERM", () => clearInterval(interval)); + signalsEmitter.on("SIGINT", () => clearInterval(interval)); + } return registry; });