From e97e96c0b46dfc0523e30d24085a6d003f51a5e5 Mon Sep 17 00:00:00 2001 From: Ehab Younes Date: Sun, 17 May 2026 19:40:58 +0000 Subject: [PATCH 1/4] feat(telemetry): add OTLP export writer --- src/telemetry/export/otlp.ts | 334 +++++++++++++++++++++++++++++++++++ 1 file changed, 334 insertions(+) create mode 100644 src/telemetry/export/otlp.ts diff --git a/src/telemetry/export/otlp.ts b/src/telemetry/export/otlp.ts new file mode 100644 index 000000000..233369a1c --- /dev/null +++ b/src/telemetry/export/otlp.ts @@ -0,0 +1,334 @@ +import type { TelemetryEvent } from "../event"; + +type JsonPrimitive = string | number | boolean | null; +type JsonValue = + | JsonPrimitive + | JsonValue[] + | { readonly [key: string]: JsonValue }; +type JsonObject = Record; + +const STATUS_CODE_UNSET = 0; +const STATUS_CODE_OK = 1; +const STATUS_CODE_ERROR = 2; +const SPAN_KIND_INTERNAL = 1; + +const SEVERITY_NUMBER_INFO = 9; +const SEVERITY_NUMBER_ERROR = 17; + +const AGGREGATION_TEMPORALITY_DELTA = 1; + +const METRIC_EVENT_NAMES = new Set([ + "http.requests", + "ssh.network.info", + "ssh.network.sampled", +]); + +export function isMetricEvent(event: TelemetryEvent): boolean { + return METRIC_EVENT_NAMES.has(event.eventName); +} + +export function toOtlpLogResource(event: TelemetryEvent): JsonObject { + return { + resource: { attributes: resourceAttributes(event) }, + scopeLogs: [ + { + scope: instrumentationScope(), + logRecords: [toLogRecord(event)], + }, + ], + }; +} + +export function toOtlpSpanResource(event: TelemetryEvent): JsonObject { + return { + resource: { attributes: resourceAttributes(event) }, + scopeSpans: [ + { + scope: instrumentationScope(), + spans: [toSpan(event)], + }, + ], + }; +} + +export function toOtlpMetricResource(event: TelemetryEvent): JsonObject { + return { + resource: { attributes: resourceAttributes(event) }, + scopeMetrics: [ + { + scope: instrumentationScope(), + metrics: toMetrics(event), + }, + ], + }; +} + +function toLogRecord(event: TelemetryEvent): JsonObject { + const timeUnixNano = toUnixNano(event.timestamp); + return { + timeUnixNano, + observedTimeUnixNano: timeUnixNano, + severityNumber: + event.error === undefined ? SEVERITY_NUMBER_INFO : SEVERITY_NUMBER_ERROR, + severityText: event.error === undefined ? "INFO" : "ERROR", + body: { stringValue: event.eventName }, + attributes: eventAttributes(event), + }; +} + +function toSpan(event: TelemetryEvent): JsonObject { + const endTimeUnixNano = toUnixNano(event.timestamp); + const startTimeUnixNano = toSpanStartUnixNano(event, endTimeUnixNano); + return { + traceId: event.traceId ?? "", + spanId: event.eventId, + ...(event.parentEventId !== undefined && { + parentSpanId: event.parentEventId, + }), + name: spanName(event.eventName), + kind: SPAN_KIND_INTERNAL, + startTimeUnixNano, + endTimeUnixNano, + attributes: spanAttributes(event), + status: spanStatus(event), + ...(event.error !== undefined && { + events: [exceptionSpanEvent(event, endTimeUnixNano)], + }), + }; +} + +function toMetrics(event: TelemetryEvent): JsonObject[] { + if (event.eventName === "http.requests") { + return toHttpRequestMetrics(event); + } + return toGaugeMetrics(event, Object.entries(event.measurements)); +} + +function toHttpRequestMetrics(event: TelemetryEvent): JsonObject[] { + const windowSeconds = event.measurements.window_seconds; + const measurements = Object.entries(event.measurements).filter( + ([name]) => name !== "window_seconds", + ); + const countMetrics = measurements.filter(([name]) => + name.startsWith("count_"), + ); + const gaugeMetrics = measurements.filter( + ([name]) => !name.startsWith("count_"), + ); + const timeUnixNano = toUnixNano(event.timestamp); + return [ + ...countMetrics.map(([name, value]) => + toSumMetric(event, name, value, timeUnixNano, windowSeconds), + ), + ...toGaugeMetrics(event, gaugeMetrics, { + startTimeUnixNano: windowStartUnixNano(timeUnixNano, windowSeconds), + timeUnixNano, + }), + ]; +} + +function toSumMetric( + event: TelemetryEvent, + measurementName: string, + value: number, + timeUnixNano: string, + windowSeconds: number | undefined, +): JsonObject { + return { + name: `${event.eventName}.${measurementName}`, + description: event.eventName, + unit: "{request}", + sum: { + aggregationTemporality: AGGREGATION_TEMPORALITY_DELTA, + isMonotonic: true, + dataPoints: [ + { + attributes: metricAttributes(event), + startTimeUnixNano: windowStartUnixNano(timeUnixNano, windowSeconds), + timeUnixNano, + asInt: String(Math.trunc(value)), + }, + ], + }, + }; +} + +function toGaugeMetrics( + event: TelemetryEvent, + measurements: Array<[string, number]>, + times: { + readonly startTimeUnixNano?: string; + readonly timeUnixNano: string; + } = { + timeUnixNano: toUnixNano(event.timestamp), + }, +): JsonObject[] { + return measurements.map(([name, value]) => ({ + name: `${event.eventName}.${name}`, + description: event.eventName, + unit: metricUnit(name), + gauge: { + dataPoints: [ + { + attributes: metricAttributes(event), + ...(times.startTimeUnixNano !== undefined && { + startTimeUnixNano: times.startTimeUnixNano, + }), + timeUnixNano: times.timeUnixNano, + asDouble: value, + }, + ], + }, + })); +} + +function resourceAttributes(event: TelemetryEvent): JsonObject[] { + return keyValues({ + "service.name": "coder-vscode-extension", + "service.version": event.context.extensionVersion, + "coder.machine.id": event.context.machineId, + "coder.session.id": event.context.sessionId, + "os.type": event.context.osType, + "os.version": event.context.osVersion, + "host.arch": event.context.hostArch, + "vscode.platform.name": event.context.platformName, + "vscode.platform.version": event.context.platformVersion, + "coder.deployment.url": event.context.deploymentUrl, + }); +} + +function eventAttributes(event: TelemetryEvent): JsonObject[] { + return keyValues({ + ...event.properties, + ...event.measurements, + ...(event.error !== undefined && { + "exception.message": event.error.message, + ...(event.error.type !== undefined && { + "exception.type": event.error.type, + }), + ...(event.error.code !== undefined && { + "exception.code": event.error.code, + }), + }), + }); +} + +function spanAttributes(event: TelemetryEvent): JsonObject[] { + return keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + ...Object.fromEntries( + Object.entries(event.measurements).filter( + ([name]) => name !== "durationMs", + ), + ), + }); +} + +function metricAttributes(event: TelemetryEvent): JsonObject[] { + return keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + }); +} + +function keyValues( + values: Readonly>, +): JsonObject[] { + return Object.entries(values).map(([key, value]) => { + const otlpValue: JsonObject = + typeof value === "number" + ? { doubleValue: value } + : { stringValue: value }; + return { key, value: otlpValue }; + }); +} + +function instrumentationScope(): JsonObject { + return { + name: "coder.vscode-coder.telemetry.export", + }; +} + +function spanStatus(event: TelemetryEvent): JsonObject { + if (event.properties.result === "success") { + return { code: STATUS_CODE_OK }; + } + if (event.properties.result === "error" || event.error !== undefined) { + return { + code: STATUS_CODE_ERROR, + ...(event.error !== undefined && { message: event.error.message }), + }; + } + return { code: STATUS_CODE_UNSET }; +} + +function exceptionSpanEvent( + event: TelemetryEvent, + timeUnixNano: string, +): JsonObject { + const error = event.error; + if (error === undefined) { + throw new Error("Cannot build exception event without an error."); + } + return { + name: "exception", + timeUnixNano, + attributes: keyValues({ + "exception.message": error.message, + ...(error.type !== undefined && { "exception.type": error.type }), + ...(error.code !== undefined && { "exception.code": error.code }), + }), + }; +} + +function toSpanStartUnixNano( + event: TelemetryEvent, + endTimeUnixNano: string, +): string { + const durationMs = event.measurements.durationMs; + if (durationMs === undefined) { + return endTimeUnixNano; + } + return String(BigInt(endTimeUnixNano) - msToNanos(durationMs)); +} + +function windowStartUnixNano( + timeUnixNano: string, + windowSeconds: number | undefined, +): string { + if (windowSeconds === undefined) { + return timeUnixNano; + } + return String(BigInt(timeUnixNano) - secondsToNanos(windowSeconds)); +} + +function toUnixNano(timestamp: string): string { + const ms = Date.parse(timestamp); + if (!Number.isFinite(ms)) { + throw new Error(`Invalid telemetry timestamp '${timestamp}'.`); + } + return String(BigInt(ms) * 1_000_000n); +} + +function msToNanos(ms: number): bigint { + return BigInt(Math.max(0, Math.round(ms * 1_000_000))); +} + +function secondsToNanos(seconds: number): bigint { + return BigInt(Math.max(0, Math.round(seconds * 1_000_000_000))); +} + +function spanName(eventName: string): string { + return eventName.split(".").at(-1) ?? eventName; +} + +function metricUnit(measurementName: string): string { + if (measurementName.endsWith("_ms") || measurementName.endsWith("Ms")) { + return "ms"; + } + if (measurementName.endsWith("Mbits")) { + return "Mbit/s"; + } + return "1"; +} From d18fd3b0f07a19ac0b10ca14edbaf253cdb93286 Mon Sep 17 00:00:00 2001 From: Ehab Younes Date: Thu, 21 May 2026 12:44:16 +0300 Subject: [PATCH 2/4] refactor(telemetry): tighten OTLP export writer and tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Restructure: writers.ts → writers/json.ts; combine OTLP mapping and writer into writers/otlp.ts; lift signal classification into export/signal.ts. - Group all records of each signal under a single Resource block per file (was one Resource per event), matching the OTLP spec's recommended shape and shrinking exports at scale. - Adopt OTLP enums from @opentelemetry/api + api-logs; document the +1 wire offset on SpanKind. - Tighten the public surface: only writeOtlpZipExport and OtlpExportCounts are exported; record mappers are module-private. - Hoist per-event metric attributes once; single-pass partition of http.requests measurements into counts and gauges. - Share parseTelemetryTimestampMs between range.ts and the OTLP writer's nanos conversion (was duplicated with identical error). - Extract asyncIterable test helper to test/mocks/; inline the memfs vi.mock pair across affected test files. - Tests now exercise the public API only, dropping side-effect assertions about staging cleanup and atomic semantics already covered by writeAtomically's own tests. --- package.json | 2 + pnpm-lock.yaml | 27 +- src/telemetry/export/otlp.ts | 334 ------------- src/telemetry/export/signal.ts | 24 + src/telemetry/export/writers/otlp.ts | 437 ++++++++++++++++++ test/unit/remote/sshOverrides.test.ts | 7 +- test/unit/remote/sshProcess.test.ts | 5 +- .../telemetry/export/writers/otlp.test.ts | 355 ++++++++++++++ .../telemetry/sinks/localJsonlSink.test.ts | 7 +- test/unit/util/fileCleanup.test.ts | 7 +- 10 files changed, 846 insertions(+), 359 deletions(-) delete mode 100644 src/telemetry/export/otlp.ts create mode 100644 src/telemetry/export/signal.ts create mode 100644 src/telemetry/export/writers/otlp.ts create mode 100644 test/unit/telemetry/export/writers/otlp.test.ts diff --git a/package.json b/package.json index 9e3f78113..30e5bb3eb 100644 --- a/package.json +++ b/package.json @@ -675,6 +675,8 @@ ], "dependencies": { "@abraham/reflection": "^0.13.0", + "@opentelemetry/api": "^1.9.1", + "@opentelemetry/api-logs": "^0.218.0", "@peculiar/x509": "^2.0.0", "@repo/shared": "workspace:*", "axios": "^1.16.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4b2288751..f5bb93131 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -78,6 +78,12 @@ importers: '@abraham/reflection': specifier: ^0.13.0 version: 0.13.0 + '@opentelemetry/api': + specifier: ^1.9.1 + version: 1.9.1 + '@opentelemetry/api-logs': + specifier: ^0.218.0 + version: 0.218.0 '@peculiar/x509': specifier: ^2.0.0 version: 2.0.0 @@ -303,7 +309,7 @@ importers: version: 8.0.13(@types/node@22.19.19)(esbuild@0.28.0) vitest: specifier: ^4.1.6 - version: 4.1.6(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13) + version: 4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13) packages/chat: dependencies: @@ -1307,6 +1313,14 @@ packages: resolution: {integrity: sha512-oGB+UxlgWcgQkgwo8GcEGwemoTFt3FIO9ababBmaGwXIoBKZ+GTy0pP185beGg7Llih/NSHSV2XAs1lnznocSg==} engines: {node: '>= 8'} + '@opentelemetry/api-logs@0.218.0': + resolution: {integrity: sha512-fmEWp5kXlGEc3i/lR698Hz41DfGyN4Tbe4g7L1AxSc7fF8Xeh/FQ9Quqpa9dVA413Q1Ad43QOLzU4JoXgbFPWw==} + engines: {node: '>=8.0.0'} + + '@opentelemetry/api@1.9.1': + resolution: {integrity: sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q==} + engines: {node: '>=8.0.0'} + '@oxc-parser/binding-android-arm-eabi@0.127.0': resolution: {integrity: sha512-0LC7ye4hvqbIKxAzThzvswgHLFu2AURKzYLeSVvLdu2TBOYWQDmHnTqPLeA597BcUCxiLqLsS4CJ5uoI5WYWCQ==} engines: {node: ^20.19.0 || >=22.12.0} @@ -5999,6 +6013,12 @@ snapshots: '@nodelib/fs.scandir': 2.1.5 fastq: 1.20.1 + '@opentelemetry/api-logs@0.218.0': + dependencies: + '@opentelemetry/api': 1.9.1 + + '@opentelemetry/api@1.9.1': {} + '@oxc-parser/binding-android-arm-eabi@0.127.0': optional: true @@ -6857,7 +6877,7 @@ snapshots: obug: 2.1.1 std-env: 4.1.0 tinyrainbow: 3.1.0 - vitest: 4.1.6(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13) + vitest: 4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13) '@vitest/expect@3.2.4': dependencies: @@ -10121,7 +10141,7 @@ snapshots: esbuild: 0.28.0 fsevents: 2.3.3 - vitest@4.1.6(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13): + vitest@4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13): dependencies: '@vitest/expect': 4.1.6 '@vitest/mocker': 4.1.6(vite@8.0.13) @@ -10144,6 +10164,7 @@ snapshots: vite: 8.0.13(@types/node@22.19.19)(esbuild@0.28.0) why-is-node-running: 2.3.0 optionalDependencies: + '@opentelemetry/api': 1.9.1 '@types/node': 22.19.19 '@vitest/coverage-v8': 4.1.6(vitest@4.1.6) jsdom: 29.1.1 diff --git a/src/telemetry/export/otlp.ts b/src/telemetry/export/otlp.ts deleted file mode 100644 index 233369a1c..000000000 --- a/src/telemetry/export/otlp.ts +++ /dev/null @@ -1,334 +0,0 @@ -import type { TelemetryEvent } from "../event"; - -type JsonPrimitive = string | number | boolean | null; -type JsonValue = - | JsonPrimitive - | JsonValue[] - | { readonly [key: string]: JsonValue }; -type JsonObject = Record; - -const STATUS_CODE_UNSET = 0; -const STATUS_CODE_OK = 1; -const STATUS_CODE_ERROR = 2; -const SPAN_KIND_INTERNAL = 1; - -const SEVERITY_NUMBER_INFO = 9; -const SEVERITY_NUMBER_ERROR = 17; - -const AGGREGATION_TEMPORALITY_DELTA = 1; - -const METRIC_EVENT_NAMES = new Set([ - "http.requests", - "ssh.network.info", - "ssh.network.sampled", -]); - -export function isMetricEvent(event: TelemetryEvent): boolean { - return METRIC_EVENT_NAMES.has(event.eventName); -} - -export function toOtlpLogResource(event: TelemetryEvent): JsonObject { - return { - resource: { attributes: resourceAttributes(event) }, - scopeLogs: [ - { - scope: instrumentationScope(), - logRecords: [toLogRecord(event)], - }, - ], - }; -} - -export function toOtlpSpanResource(event: TelemetryEvent): JsonObject { - return { - resource: { attributes: resourceAttributes(event) }, - scopeSpans: [ - { - scope: instrumentationScope(), - spans: [toSpan(event)], - }, - ], - }; -} - -export function toOtlpMetricResource(event: TelemetryEvent): JsonObject { - return { - resource: { attributes: resourceAttributes(event) }, - scopeMetrics: [ - { - scope: instrumentationScope(), - metrics: toMetrics(event), - }, - ], - }; -} - -function toLogRecord(event: TelemetryEvent): JsonObject { - const timeUnixNano = toUnixNano(event.timestamp); - return { - timeUnixNano, - observedTimeUnixNano: timeUnixNano, - severityNumber: - event.error === undefined ? SEVERITY_NUMBER_INFO : SEVERITY_NUMBER_ERROR, - severityText: event.error === undefined ? "INFO" : "ERROR", - body: { stringValue: event.eventName }, - attributes: eventAttributes(event), - }; -} - -function toSpan(event: TelemetryEvent): JsonObject { - const endTimeUnixNano = toUnixNano(event.timestamp); - const startTimeUnixNano = toSpanStartUnixNano(event, endTimeUnixNano); - return { - traceId: event.traceId ?? "", - spanId: event.eventId, - ...(event.parentEventId !== undefined && { - parentSpanId: event.parentEventId, - }), - name: spanName(event.eventName), - kind: SPAN_KIND_INTERNAL, - startTimeUnixNano, - endTimeUnixNano, - attributes: spanAttributes(event), - status: spanStatus(event), - ...(event.error !== undefined && { - events: [exceptionSpanEvent(event, endTimeUnixNano)], - }), - }; -} - -function toMetrics(event: TelemetryEvent): JsonObject[] { - if (event.eventName === "http.requests") { - return toHttpRequestMetrics(event); - } - return toGaugeMetrics(event, Object.entries(event.measurements)); -} - -function toHttpRequestMetrics(event: TelemetryEvent): JsonObject[] { - const windowSeconds = event.measurements.window_seconds; - const measurements = Object.entries(event.measurements).filter( - ([name]) => name !== "window_seconds", - ); - const countMetrics = measurements.filter(([name]) => - name.startsWith("count_"), - ); - const gaugeMetrics = measurements.filter( - ([name]) => !name.startsWith("count_"), - ); - const timeUnixNano = toUnixNano(event.timestamp); - return [ - ...countMetrics.map(([name, value]) => - toSumMetric(event, name, value, timeUnixNano, windowSeconds), - ), - ...toGaugeMetrics(event, gaugeMetrics, { - startTimeUnixNano: windowStartUnixNano(timeUnixNano, windowSeconds), - timeUnixNano, - }), - ]; -} - -function toSumMetric( - event: TelemetryEvent, - measurementName: string, - value: number, - timeUnixNano: string, - windowSeconds: number | undefined, -): JsonObject { - return { - name: `${event.eventName}.${measurementName}`, - description: event.eventName, - unit: "{request}", - sum: { - aggregationTemporality: AGGREGATION_TEMPORALITY_DELTA, - isMonotonic: true, - dataPoints: [ - { - attributes: metricAttributes(event), - startTimeUnixNano: windowStartUnixNano(timeUnixNano, windowSeconds), - timeUnixNano, - asInt: String(Math.trunc(value)), - }, - ], - }, - }; -} - -function toGaugeMetrics( - event: TelemetryEvent, - measurements: Array<[string, number]>, - times: { - readonly startTimeUnixNano?: string; - readonly timeUnixNano: string; - } = { - timeUnixNano: toUnixNano(event.timestamp), - }, -): JsonObject[] { - return measurements.map(([name, value]) => ({ - name: `${event.eventName}.${name}`, - description: event.eventName, - unit: metricUnit(name), - gauge: { - dataPoints: [ - { - attributes: metricAttributes(event), - ...(times.startTimeUnixNano !== undefined && { - startTimeUnixNano: times.startTimeUnixNano, - }), - timeUnixNano: times.timeUnixNano, - asDouble: value, - }, - ], - }, - })); -} - -function resourceAttributes(event: TelemetryEvent): JsonObject[] { - return keyValues({ - "service.name": "coder-vscode-extension", - "service.version": event.context.extensionVersion, - "coder.machine.id": event.context.machineId, - "coder.session.id": event.context.sessionId, - "os.type": event.context.osType, - "os.version": event.context.osVersion, - "host.arch": event.context.hostArch, - "vscode.platform.name": event.context.platformName, - "vscode.platform.version": event.context.platformVersion, - "coder.deployment.url": event.context.deploymentUrl, - }); -} - -function eventAttributes(event: TelemetryEvent): JsonObject[] { - return keyValues({ - ...event.properties, - ...event.measurements, - ...(event.error !== undefined && { - "exception.message": event.error.message, - ...(event.error.type !== undefined && { - "exception.type": event.error.type, - }), - ...(event.error.code !== undefined && { - "exception.code": event.error.code, - }), - }), - }); -} - -function spanAttributes(event: TelemetryEvent): JsonObject[] { - return keyValues({ - "coder.event_name": event.eventName, - ...event.properties, - ...Object.fromEntries( - Object.entries(event.measurements).filter( - ([name]) => name !== "durationMs", - ), - ), - }); -} - -function metricAttributes(event: TelemetryEvent): JsonObject[] { - return keyValues({ - "coder.event_name": event.eventName, - ...event.properties, - }); -} - -function keyValues( - values: Readonly>, -): JsonObject[] { - return Object.entries(values).map(([key, value]) => { - const otlpValue: JsonObject = - typeof value === "number" - ? { doubleValue: value } - : { stringValue: value }; - return { key, value: otlpValue }; - }); -} - -function instrumentationScope(): JsonObject { - return { - name: "coder.vscode-coder.telemetry.export", - }; -} - -function spanStatus(event: TelemetryEvent): JsonObject { - if (event.properties.result === "success") { - return { code: STATUS_CODE_OK }; - } - if (event.properties.result === "error" || event.error !== undefined) { - return { - code: STATUS_CODE_ERROR, - ...(event.error !== undefined && { message: event.error.message }), - }; - } - return { code: STATUS_CODE_UNSET }; -} - -function exceptionSpanEvent( - event: TelemetryEvent, - timeUnixNano: string, -): JsonObject { - const error = event.error; - if (error === undefined) { - throw new Error("Cannot build exception event without an error."); - } - return { - name: "exception", - timeUnixNano, - attributes: keyValues({ - "exception.message": error.message, - ...(error.type !== undefined && { "exception.type": error.type }), - ...(error.code !== undefined && { "exception.code": error.code }), - }), - }; -} - -function toSpanStartUnixNano( - event: TelemetryEvent, - endTimeUnixNano: string, -): string { - const durationMs = event.measurements.durationMs; - if (durationMs === undefined) { - return endTimeUnixNano; - } - return String(BigInt(endTimeUnixNano) - msToNanos(durationMs)); -} - -function windowStartUnixNano( - timeUnixNano: string, - windowSeconds: number | undefined, -): string { - if (windowSeconds === undefined) { - return timeUnixNano; - } - return String(BigInt(timeUnixNano) - secondsToNanos(windowSeconds)); -} - -function toUnixNano(timestamp: string): string { - const ms = Date.parse(timestamp); - if (!Number.isFinite(ms)) { - throw new Error(`Invalid telemetry timestamp '${timestamp}'.`); - } - return String(BigInt(ms) * 1_000_000n); -} - -function msToNanos(ms: number): bigint { - return BigInt(Math.max(0, Math.round(ms * 1_000_000))); -} - -function secondsToNanos(seconds: number): bigint { - return BigInt(Math.max(0, Math.round(seconds * 1_000_000_000))); -} - -function spanName(eventName: string): string { - return eventName.split(".").at(-1) ?? eventName; -} - -function metricUnit(measurementName: string): string { - if (measurementName.endsWith("_ms") || measurementName.endsWith("Ms")) { - return "ms"; - } - if (measurementName.endsWith("Mbits")) { - return "Mbit/s"; - } - return "1"; -} diff --git a/src/telemetry/export/signal.ts b/src/telemetry/export/signal.ts new file mode 100644 index 000000000..b580dbc29 --- /dev/null +++ b/src/telemetry/export/signal.ts @@ -0,0 +1,24 @@ +import type { TelemetryEvent } from "../event"; + +/** Telemetry signal an event maps to in an export. */ +export type ExportSignal = "log" | "trace" | "metric"; + +const METRIC_EVENT_NAMES = new Set([ + "http.requests", + "ssh.network.info", + "ssh.network.sampled", +]); + +/** + * Classify a `TelemetryEvent` for export. Metric-shaped event names map to + * `metric`; everything with a `traceId` is a `trace`; otherwise `log`. + */ +export function classifyEvent(event: TelemetryEvent): ExportSignal { + if (METRIC_EVENT_NAMES.has(event.eventName)) { + return "metric"; + } + if (event.traceId !== undefined) { + return "trace"; + } + return "log"; +} diff --git a/src/telemetry/export/writers/otlp.ts b/src/telemetry/export/writers/otlp.ts new file mode 100644 index 000000000..fc771107e --- /dev/null +++ b/src/telemetry/export/writers/otlp.ts @@ -0,0 +1,437 @@ +import { SpanKind, SpanStatusCode } from "@opentelemetry/api"; +import { SeverityNumber } from "@opentelemetry/api-logs"; +import { zip } from "fflate"; +import { createWriteStream } from "node:fs"; +import * as fs from "node:fs/promises"; +import * as path from "node:path"; +import { promisify } from "node:util"; + +import { writeAtomically } from "../../../util/fs"; +import { parseTelemetryTimestampMs } from "../range"; +import { classifyEvent, type ExportSignal } from "../signal"; + +import type { TelemetryContext, TelemetryEvent } from "../../event"; + +type JsonPrimitive = string | number | boolean | null; +type JsonValue = + | JsonPrimitive + | JsonValue[] + | { readonly [key: string]: JsonValue }; +type JsonObject = Record; +type EventError = NonNullable; + +export interface OtlpExportCounts { + readonly logs: number; + readonly traces: number; + readonly metrics: number; +} + +// OTLP proto SpanKind reserves 0 for UNSPECIFIED, so api values shift by 1 +// on the wire. AGGREGATION_TEMPORALITY has no enum in @opentelemetry/api. +const otlpSpanKind = (kind: SpanKind): number => kind + 1; +const AGGREGATION_TEMPORALITY_DELTA = 1; +const zipAsync = promisify(zip); + +/** + * Writes `events` as an OTLP/JSON zip (`logs.json`, `traces.json`, + * `metrics.json`) to `outputPath`. Each file groups all records of one + * signal under a single Resource block built from `context`. Events stream + * into a sibling staging directory with backpressure; the directory is then + * packed in-memory and the zip is atomically renamed onto `outputPath`. + * `onCleanupError` is forwarded to `writeAtomically`. + */ +export async function writeOtlpZipExport( + outputPath: string, + events: AsyncIterable, + context: TelemetryContext, + onCleanupError: (err: unknown, tempPath: string) => void, +): Promise { + return writeAtomically( + outputPath, + async (zipPath) => { + const stagingDir = await fs.mkdtemp(`${outputPath}.staging-`); + try { + const counts = await writeOtlpJsonFiles(stagingDir, events, context); + await packZip( + zipPath, + stagingDir, + ENVELOPES.map((e) => e.file), + ); + return counts; + } finally { + await fs.rm(stagingDir, { recursive: true, force: true }); + } + }, + onCleanupError, + ); +} + +// Per-signal layout driving file names, envelope JSON keys, routing, and counts. +const ENVELOPES = [ + { + signal: "log", + counter: "logs", + file: "logs.json", + resourceKey: "resourceLogs", + scopeKey: "scopeLogs", + recordsKey: "logRecords", + toRecords: (e: TelemetryEvent) => [toOtlpLogRecord(e)], + }, + { + signal: "trace", + counter: "traces", + file: "traces.json", + resourceKey: "resourceSpans", + scopeKey: "scopeSpans", + recordsKey: "spans", + toRecords: (e: TelemetryEvent) => [toOtlpSpan(e)], + }, + { + signal: "metric", + counter: "metrics", + file: "metrics.json", + resourceKey: "resourceMetrics", + scopeKey: "scopeMetrics", + recordsKey: "metrics", + toRecords: toOtlpMetricRecords, + }, +] as const satisfies ReadonlyArray<{ + signal: ExportSignal; + counter: keyof OtlpExportCounts; + file: string; + resourceKey: string; + scopeKey: string; + recordsKey: string; + toRecords: (event: TelemetryEvent) => readonly unknown[]; +}>; + +async function writeOtlpJsonFiles( + dir: string, + events: AsyncIterable, + context: TelemetryContext, +): Promise { + const resource = JSON.stringify(toOtlpResource(context)); + const scope = JSON.stringify(toOtlpScope()); + const entries = await Promise.all( + ENVELOPES.map(async (envelope) => ({ + ...envelope, + writer: await openEnvelope( + path.join(dir, envelope.file), + `{"${envelope.resourceKey}":[{"resource":${resource},"${envelope.scopeKey}":[{"scope":${scope},"${envelope.recordsKey}":[`, + "]}]}]}\n", + ), + })), + ); + const counts: Record = { + logs: 0, + traces: 0, + metrics: 0, + }; + + try { + for await (const event of events) { + const signal = classifyEvent(event); + const entry = entries.find((e) => e.signal === signal); + if (!entry) { + continue; + } + counts[entry.counter] += 1; + for (const record of entry.toRecords(event)) { + await entry.writer.write(record); + } + } + } finally { + await Promise.all(entries.map((e) => e.writer.close())); + } + + return counts; +} + +interface EnvelopeWriter { + write(value: unknown): Promise; + close(): Promise; +} + +/** Streams a `v1,v2,...` JSON envelope to disk. */ +async function openEnvelope( + filePath: string, + prefix: string, + suffix: string, +): Promise { + const stream = createWriteStream(filePath, { encoding: "utf8" }); + await writeChunk(stream, prefix); + let written = 0; + + return { + async write(value) { + await writeChunk( + stream, + (written === 0 ? "" : ",") + JSON.stringify(value), + ); + written += 1; + }, + async close() { + await writeChunk(stream, suffix); + await new Promise((resolve, reject) => { + stream.end((err?: Error | null) => (err ? reject(err) : resolve())); + }); + }, + }; +} + +function writeChunk( + stream: NodeJS.WritableStream, + chunk: string, +): Promise { + return new Promise((resolve, reject) => { + stream.write(chunk, "utf8", (err) => (err ? reject(err) : resolve())); + }); +} + +async function packZip( + outputPath: string, + sourceDir: string, + names: readonly string[], +): Promise { + const entries = await Promise.all( + names.map( + async (name) => + [name, await fs.readFile(path.join(sourceDir, name))] as const, + ), + ); + const archive = await zipAsync(Object.fromEntries(entries)); + await fs.writeFile(outputPath, archive); +} + +/** OTLP `Resource` block built once per export from the session context. */ +function toOtlpResource(context: TelemetryContext): JsonObject { + return { + attributes: keyValues({ + "service.name": "coder-vscode-extension", + "service.version": context.extensionVersion, + "coder.machine.id": context.machineId, + "coder.session.id": context.sessionId, + "os.type": context.osType, + "os.version": context.osVersion, + "host.arch": context.hostArch, + "vscode.platform.name": context.platformName, + "vscode.platform.version": context.platformVersion, + "coder.deployment.url": context.deploymentUrl, + }), + }; +} + +/** OTLP `InstrumentationScope` shared by every record. */ +function toOtlpScope(): JsonObject { + return { name: "coder.vscode-coder.telemetry.export" }; +} + +function toOtlpLogRecord(event: TelemetryEvent): JsonObject { + const timeUnixNano = toUnixNano(event.timestamp); + const errored = event.error !== undefined; + return { + timeUnixNano, + observedTimeUnixNano: timeUnixNano, + severityNumber: errored ? SeverityNumber.ERROR : SeverityNumber.INFO, + severityText: errored ? "ERROR" : "INFO", + body: { stringValue: event.eventName }, + attributes: keyValues({ + ...event.properties, + ...event.measurements, + ...(event.error && exceptionAttributes(event.error)), + }), + }; +} + +function toOtlpSpan(event: TelemetryEvent): JsonObject { + const endTimeUnixNano = toUnixNano(event.timestamp); + const startTimeUnixNano = String( + BigInt(endTimeUnixNano) - nanosFromMs(event.measurements.durationMs ?? 0), + ); + + return { + traceId: event.traceId ?? "", + spanId: event.eventId, + ...(event.parentEventId !== undefined && { + parentSpanId: event.parentEventId, + }), + name: event.eventName, + kind: otlpSpanKind(SpanKind.INTERNAL), + startTimeUnixNano, + endTimeUnixNano, + attributes: keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + ...withoutKey(event.measurements, "durationMs"), + }), + status: spanStatus(event), + ...(event.error && { + events: [ + { + name: "exception", + timeUnixNano: endTimeUnixNano, + attributes: keyValues(exceptionAttributes(event.error)), + }, + ], + }), + }; +} + +function spanStatus(event: TelemetryEvent): JsonObject { + if (event.properties.result === "success") { + return { code: SpanStatusCode.OK }; + } + if (event.properties.result === "error" || event.error !== undefined) { + return { + code: SpanStatusCode.ERROR, + ...(event.error && { message: event.error.message }), + }; + } + return { code: SpanStatusCode.UNSET }; +} + +/** A metric event yields one record per measurement (http.requests fans out). */ +function toOtlpMetricRecords(event: TelemetryEvent): JsonObject[] { + const timeUnixNano = toUnixNano(event.timestamp); + const attributes = metricAttributes(event); + if (event.eventName === "http.requests") { + return toHttpRequestMetrics(event, timeUnixNano, attributes); + } + return toGaugeMetrics( + event, + Object.entries(event.measurements), + attributes, + timeUnixNano, + ); +} + +function toHttpRequestMetrics( + event: TelemetryEvent, + timeUnixNano: string, + attributes: JsonObject[], +): JsonObject[] { + const counts: Array<[string, number]> = []; + const gauges: Array<[string, number]> = []; + let windowSeconds: number | undefined; + for (const [name, value] of Object.entries(event.measurements)) { + if (name === "window_seconds") { + windowSeconds = value; + } else if (name.startsWith("count_")) { + counts.push([name, value]); + } else { + gauges.push([name, value]); + } + } + const startTimeUnixNano = String( + BigInt(timeUnixNano) - nanosFromSeconds(windowSeconds ?? 0), + ); + + return [ + ...counts.map(([name, value]) => ({ + name: `${event.eventName}.${name}`, + description: event.eventName, + unit: "{request}", + sum: { + aggregationTemporality: AGGREGATION_TEMPORALITY_DELTA, + isMonotonic: true, + dataPoints: [ + { + attributes, + startTimeUnixNano, + timeUnixNano, + asInt: String(Math.trunc(value)), + }, + ], + }, + })), + ...toGaugeMetrics( + event, + gauges, + attributes, + timeUnixNano, + startTimeUnixNano, + ), + ]; +} + +function toGaugeMetrics( + event: TelemetryEvent, + measurements: Array<[string, number]>, + attributes: JsonObject[], + timeUnixNano: string, + startTimeUnixNano?: string, +): JsonObject[] { + return measurements.map(([name, value]) => ({ + name: `${event.eventName}.${name}`, + description: event.eventName, + unit: metricUnit(name), + gauge: { + dataPoints: [ + { + attributes, + ...(startTimeUnixNano !== undefined && { startTimeUnixNano }), + timeUnixNano, + asDouble: value, + }, + ], + }, + })); +} + +function metricAttributes(event: TelemetryEvent): JsonObject[] { + return keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + }); +} + +function metricUnit(measurementName: string): string { + if (measurementName.endsWith("_ms") || measurementName.endsWith("Ms")) { + return "ms"; + } + if (measurementName.endsWith("Mbits")) { + return "Mbit/s"; + } + return "1"; +} + +function exceptionAttributes(error: EventError): Record { + return { + "exception.message": error.message, + ...(error.type !== undefined && { "exception.type": error.type }), + ...(error.code !== undefined && { "exception.code": error.code }), + }; +} + +function keyValues( + values: Readonly>, +): JsonObject[] { + return Object.entries(values).map(([key, value]) => { + const otlpValue: JsonObject = + typeof value === "number" + ? { doubleValue: value } + : { stringValue: value }; + return { key, value: otlpValue }; + }); +} + +function withoutKey( + values: Readonly>, + excluded: string, +): Record { + return Object.fromEntries( + Object.entries(values).filter(([name]) => name !== excluded), + ); +} + +function toUnixNano(timestamp: string): string { + return String(BigInt(parseTelemetryTimestampMs(timestamp)) * 1_000_000n); +} + +function nanosFromMs(ms: number): bigint { + return BigInt(Math.max(0, Math.round(ms * 1e6))); +} + +function nanosFromSeconds(seconds: number): bigint { + return BigInt(Math.max(0, Math.round(seconds * 1e9))); +} diff --git a/test/unit/remote/sshOverrides.test.ts b/test/unit/remote/sshOverrides.test.ts index 0c60117dd..08d7f78f7 100644 --- a/test/unit/remote/sshOverrides.test.ts +++ b/test/unit/remote/sshOverrides.test.ts @@ -12,12 +12,7 @@ import { createMockLogger, } from "../../mocks/testHelpers"; -import type * as fs from "node:fs"; - -vi.mock("node:fs/promises", async () => { - const memfs: { fs: typeof fs } = await vi.importActual("memfs"); - return memfs.fs.promises; -}); +vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); /** Helper to extract a single override by key from the result. */ function findOverride( diff --git a/test/unit/remote/sshProcess.test.ts b/test/unit/remote/sshProcess.test.ts index 44139b8e0..3ab3a61a1 100644 --- a/test/unit/remote/sshProcess.test.ts +++ b/test/unit/remote/sshProcess.test.ts @@ -32,10 +32,7 @@ function makeNetworkJson(overrides: Partial = {}): string { vi.mock("find-process", () => ({ default: vi.fn() })); -vi.mock("node:fs/promises", async () => { - const memfs: { fs: typeof fs } = await vi.importActual("memfs"); - return memfs.fs.promises; -}); +vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); describe("SshProcessMonitor", () => { let activeMonitors: SshProcessMonitor[] = []; diff --git a/test/unit/telemetry/export/writers/otlp.test.ts b/test/unit/telemetry/export/writers/otlp.test.ts new file mode 100644 index 000000000..53aa3a9d1 --- /dev/null +++ b/test/unit/telemetry/export/writers/otlp.test.ts @@ -0,0 +1,355 @@ +import { unzipSync } from "fflate"; +import { vol } from "memfs"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { writeOtlpZipExport } from "@/telemetry/export/writers/otlp"; + +import { asyncIterable } from "../../../../mocks/asyncIterable"; +import { createTelemetryEventFactory } from "../../../../mocks/telemetry"; + +import type { TelemetryContext, TelemetryEvent } from "@/telemetry/event"; + +vi.mock("node:fs", async () => (await import("memfs")).fs); +vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); + +const OUT = "/exports/telemetry.otlp.zip"; +const TRACE_ID = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + +let makeEvent: ReturnType; +let context: TelemetryContext; + +beforeEach(() => { + vol.reset(); + vol.mkdirSync("/exports", { recursive: true }); + makeEvent = createTelemetryEventFactory(); + context = makeEvent().context; +}); + +afterEach(() => vol.reset()); + +/** Flatten OTLP `[{key, value: {stringValue|doubleValue}}]` to `{key: value}`. */ +function attrs(raw: unknown): Record { + const list = raw as Array<{ + key: string; + value: { stringValue?: string; doubleValue?: number }; + }>; + return Object.fromEntries( + list.map((a) => [a.key, a.value.doubleValue ?? a.value.stringValue!]), + ); +} + +type RawRecord = Record; + +interface LogsFile { + resourceLogs: [ + { + resource: { attributes: unknown }; + scopeLogs: [{ logRecords: RawRecord[] }]; + }, + ]; +} +interface TracesFile { + resourceSpans: [{ scopeSpans: [{ spans: RawRecord[] }] }]; +} +interface MetricsFile { + resourceMetrics: [{ scopeMetrics: [{ metrics: RawRecord[] }] }]; +} + +interface Captured { + counts: Awaited>; + logResource: { attributes: unknown }; + logs: RawRecord[]; + spans: RawRecord[]; + metrics: RawRecord[]; +} + +const noopCleanup = () => {}; + +/** Writes `events` through the public API and reads the resulting envelopes. */ +async function capture(events: readonly TelemetryEvent[]): Promise { + const counts = await writeOtlpZipExport( + OUT, + asyncIterable(events), + context, + noopCleanup, + ); + const zip = unzipSync(vol.readFileSync(OUT) as Uint8Array); + const parse = (file: string): T => + JSON.parse(new TextDecoder().decode(zip[file])) as T; + + const logs = parse("logs.json").resourceLogs[0]; + const traces = parse("traces.json").resourceSpans[0]; + const metrics = parse("metrics.json").resourceMetrics[0]; + + return { + counts, + logResource: logs.resource, + logs: logs.scopeLogs[0].logRecords, + spans: traces.scopeSpans[0].spans, + metrics: metrics.scopeMetrics[0].metrics, + }; +} + +describe("writeOtlpZipExport: resource", () => { + it("emits service, OS, host, vscode, and coder attributes from the context", async () => { + const { logResource } = await capture([makeEvent()]); + + expect(attrs(logResource.attributes)).toEqual({ + "service.name": "coder-vscode-extension", + "service.version": "1.14.5", + "coder.machine.id": "machine-id", + "coder.session.id": "session-id", + "os.type": "linux", + "os.version": "6.0.0", + "host.arch": "x64", + "vscode.platform.name": "Visual Studio Code", + "vscode.platform.version": "1.106.0", + "coder.deployment.url": "https://coder.example.com", + }); + }); +}); + +describe("writeOtlpZipExport: logs", () => { + it("emits INFO records with merged properties and measurements", async () => { + const { logs } = await capture([ + makeEvent({ + eventName: "log.info", + properties: { source: "unit" }, + measurements: { count: 3 }, + }), + ]); + + expect(logs[0]).toMatchObject({ + severityNumber: 9, + severityText: "INFO", + body: { stringValue: "log.info" }, + }); + expect(logs[0].timeUnixNano).toBe(logs[0].observedTimeUnixNano); + expect(attrs(logs[0].attributes)).toEqual({ source: "unit", count: 3 }); + }); + + it("emits ERROR records with optional exception fields skipped when unset", async () => { + const { logs } = await capture([ + makeEvent({ + error: { message: "boom", type: "RangeError", code: "E_RANGE" }, + }), + makeEvent({ error: { message: "boom" } }), + ]); + + expect(logs[0]).toMatchObject({ + severityNumber: 17, + severityText: "ERROR", + }); + expect(attrs(logs[0].attributes)).toMatchObject({ + "exception.message": "boom", + "exception.type": "RangeError", + "exception.code": "E_RANGE", + }); + expect(attrs(logs[1].attributes)).toEqual({ "exception.message": "boom" }); + }); +}); + +describe("writeOtlpZipExport: spans", () => { + it("emits an INTERNAL span with derived start time and parent linkage", async () => { + const { spans } = await capture([ + makeEvent({ + eventName: "remote.setup.workspace_ready", + traceId: TRACE_ID, + parentEventId: "parent-span-id", + properties: { result: "success", route: "/api" }, + measurements: { durationMs: 250, retries: 2 }, + }), + ]); + + expect(spans[0]).toMatchObject({ + traceId: TRACE_ID, + parentSpanId: "parent-span-id", + name: "remote.setup.workspace_ready", + kind: 1, // OTel api SpanKind.INTERNAL (0) + 1 for the OTLP proto offset + status: { code: 1 }, + }); + expect( + BigInt(spans[0].endTimeUnixNano as string) - + BigInt(spans[0].startTimeUnixNano as string), + ).toBe(250_000_000n); + expect(attrs(spans[0].attributes)).toEqual({ + "coder.event_name": "remote.setup.workspace_ready", + result: "success", + route: "/api", + retries: 2, + }); + }); + + it("collapses start to end and omits parentSpanId on a minimal span", async () => { + const { spans } = await capture([makeEvent({ traceId: TRACE_ID })]); + + expect(spans[0]).not.toHaveProperty("parentSpanId"); + expect(spans[0].startTimeUnixNano).toBe(spans[0].endTimeUnixNano); + }); + + it.each([ + [{ properties: { result: "success" } }, { code: 1 }], + [{ properties: { result: "error" } }, { code: 2 }], + [{ error: { message: "boom" } }, { code: 2, message: "boom" }], + [{}, { code: 0 }], + ])("maps span status: %j -> %j", async (overrides, expected) => { + const { spans } = await capture([ + makeEvent({ traceId: TRACE_ID, ...overrides }), + ]); + expect(spans[0].status).toEqual(expected); + }); + + it("attaches an `exception` event when the event has an error", async () => { + const { spans } = await capture([ + makeEvent({ + traceId: TRACE_ID, + error: { message: "boom", type: "Error" }, + }), + ]); + const [exceptionEvent] = spans[0].events as Array<{ + name: string; + timeUnixNano: string; + attributes: unknown; + }>; + + expect(exceptionEvent.name).toBe("exception"); + expect(exceptionEvent.timeUnixNano).toBe(spans[0].endTimeUnixNano); + expect(attrs(exceptionEvent.attributes)).toEqual({ + "exception.message": "boom", + "exception.type": "Error", + }); + }); +}); + +describe("writeOtlpZipExport: metrics", () => { + it("emits one gauge per measurement for non-http events, with no startTimeUnixNano", async () => { + const { metrics } = await capture([ + makeEvent({ + eventName: "ssh.network.sampled", + properties: { p2p: "true" }, + measurements: { latencyMs: 35, downloadMbits: 10 }, + }), + ]); + + expect(metrics.map((m) => [m.name, m.unit])).toEqual([ + ["ssh.network.sampled.latencyMs", "ms"], + ["ssh.network.sampled.downloadMbits", "Mbit/s"], + ]); + const [point] = (metrics[0].gauge as { dataPoints: [unknown] }).dataPoints; + expect(point).not.toHaveProperty("startTimeUnixNano"); + expect(point).toMatchObject({ asDouble: 35 }); + expect(attrs((point as { attributes: unknown }).attributes)).toMatchObject({ + "coder.event_name": "ssh.network.sampled", + p2p: "true", + }); + }); + + it("splits http.requests into monotonic count sums and gauges sharing the window", async () => { + const { metrics } = await capture([ + makeEvent({ + eventName: "http.requests", + measurements: { window_seconds: 60, count_2xx: 2, p95_duration_ms: 42 }, + }), + ]); + const count = metrics[0].sum as { + aggregationTemporality: number; + isMonotonic: boolean; + dataPoints: [ + { asInt: string; startTimeUnixNano: string; timeUnixNano: string }, + ]; + }; + const p95Point = ( + metrics[1].gauge as { dataPoints: [{ startTimeUnixNano: string }] } + ).dataPoints[0]; + + expect(metrics.map((m) => [m.name, m.unit])).toEqual([ + ["http.requests.count_2xx", "{request}"], + ["http.requests.p95_duration_ms", "ms"], + ]); + expect(count).toMatchObject({ + aggregationTemporality: 1, + isMonotonic: true, + }); + expect(count.dataPoints[0].asInt).toBe("2"); + expect( + BigInt(count.dataPoints[0].timeUnixNano) - + BigInt(count.dataPoints[0].startTimeUnixNano), + ).toBe(60_000_000_000n); + expect(p95Point.startTimeUnixNano).toBe( + count.dataPoints[0].startTimeUnixNano, + ); + }); + + it("treats http.requests without window_seconds as a zero-width window", async () => { + const { metrics } = await capture([ + makeEvent({ + eventName: "http.requests", + measurements: { count_2xx: 1 }, + }), + ]); + const point = ( + metrics[0].sum as { + dataPoints: [{ startTimeUnixNano: string; timeUnixNano: string }]; + } + ).dataPoints[0]; + + expect(point.startTimeUnixNano).toBe(point.timeUnixNano); + }); + + it.each([ + ["latency_ms", "ms"], + ["durationMs", "ms"], + ["downloadMbits", "Mbit/s"], + ["something", "1"], + ])("derives unit for measurement '%s' -> '%s'", async (measurement, unit) => { + const { metrics } = await capture([ + makeEvent({ + eventName: "ssh.network.sampled", + measurements: { [measurement]: 1 }, + }), + ]); + expect(metrics[0].unit).toBe(unit); + }); +}); + +describe("writeOtlpZipExport: routing & counts", () => { + it("counts events routed to each signal, even when one event fans out into multiple records", async () => { + const { counts, logs, spans, metrics } = await capture([ + makeEvent({ eventName: "log.info" }), + makeEvent({ eventName: "log.warn" }), + makeEvent({ eventName: "trace.x", traceId: TRACE_ID }), + makeEvent({ + eventName: "http.requests", + measurements: { window_seconds: 60, count_2xx: 1, p95_duration_ms: 5 }, + }), + ]); + + expect(counts).toEqual({ logs: 2, traces: 1, metrics: 1 }); + expect(logs).toHaveLength(2); + expect(spans).toHaveLength(1); + expect(metrics).toHaveLength(2); + }); + + it("propagates midstream errors", async () => { + const failing = (async function* () { + yield makeEvent(); + await Promise.resolve(); + throw new Error("boom"); + })(); + + await expect( + writeOtlpZipExport(OUT, failing, context, noopCleanup), + ).rejects.toThrow(/boom/); + }); + + it("rejects on an unparseable event timestamp", async () => { + await expect( + writeOtlpZipExport( + OUT, + asyncIterable([makeEvent({ timestamp: "not-a-date" })]), + context, + noopCleanup, + ), + ).rejects.toThrow(/Invalid telemetry timestamp/); + }); +}); diff --git a/test/unit/telemetry/sinks/localJsonlSink.test.ts b/test/unit/telemetry/sinks/localJsonlSink.test.ts index c2191f4a2..9c0c31e4a 100644 --- a/test/unit/telemetry/sinks/localJsonlSink.test.ts +++ b/test/unit/telemetry/sinks/localJsonlSink.test.ts @@ -14,12 +14,7 @@ import { MockConfigurationProvider, } from "../../../mocks/testHelpers"; -import type * as fs from "node:fs"; - -vi.mock("node:fs/promises", async () => { - const memfs: { fs: typeof fs } = await vi.importActual("memfs"); - return memfs.fs.promises; -}); +vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); const BASE_DIR = "/telemetry"; const SESSION_ID = "12345678-aaaa-bbbb-cccc-dddddddddddd"; diff --git a/test/unit/util/fileCleanup.test.ts b/test/unit/util/fileCleanup.test.ts index a765669b7..b363f3ef1 100644 --- a/test/unit/util/fileCleanup.test.ts +++ b/test/unit/util/fileCleanup.test.ts @@ -6,12 +6,7 @@ import { cleanupFiles } from "@/util/fileCleanup"; import { createMockLogger } from "../../mocks/testHelpers"; -import type * as fs from "node:fs"; - -vi.mock("node:fs/promises", async () => { - const memfs: { fs: typeof fs } = await vi.importActual("memfs"); - return memfs.fs.promises; -}); +vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); describe("cleanupFiles", () => { beforeEach(() => { From 6ac471545ad74c082959abcec485f4344c92f0fb Mon Sep 17 00:00:00 2001 From: Ehab Younes Date: Thu, 21 May 2026 13:10:12 +0300 Subject: [PATCH 3/4] fix(telemetry): make OTLP export ingestable by Prom-family backends Wire-format fixes: - Switch http.requests count_* sums from DELTA to CUMULATIVE temporality (Prom/Mimir/Grafana Cloud reject the delta+sum combination). - Maintain per-series running totals across the export, anchored at the first observed window's startTimeUnixNano. - Suppress zero-cumulative counters so routes that never errored don't ship empty data points. Semantic-convention compliance: - Use OTel-standard `service.instance.id` (was `coder.session.id`) and `host.id` (was `coder.machine.id`) for portability with OTel-aware backends. The previous vendor keys were pure aliases. - Add `schemaUrl` to each ResourceLogs/ResourceSpans/ResourceMetrics container and each scope container. - Stamp scope.version with the extension version. --- src/telemetry/export/writers/otlp.ts | 88 ++++++++--- .../telemetry/export/writers/otlp.test.ts | 141 +++++++++++++----- 2 files changed, 170 insertions(+), 59 deletions(-) diff --git a/src/telemetry/export/writers/otlp.ts b/src/telemetry/export/writers/otlp.ts index fc771107e..68e14969b 100644 --- a/src/telemetry/export/writers/otlp.ts +++ b/src/telemetry/export/writers/otlp.ts @@ -29,7 +29,8 @@ export interface OtlpExportCounts { // OTLP proto SpanKind reserves 0 for UNSPECIFIED, so api values shift by 1 // on the wire. AGGREGATION_TEMPORALITY has no enum in @opentelemetry/api. const otlpSpanKind = (kind: SpanKind): number => kind + 1; -const AGGREGATION_TEMPORALITY_DELTA = 1; +const AGGREGATION_TEMPORALITY_CUMULATIVE = 2; +const OTLP_SCHEMA_URL = "https://opentelemetry.io/schemas/1.24.0"; const zipAsync = promisify(zip); /** @@ -66,6 +67,14 @@ export async function writeOtlpZipExport( ); } +/** Per-export state threaded through the metric mapper for cumulative sums. */ +interface ExportState { + readonly cumulative: { + startTimeUnixNano: string | undefined; + readonly totals: Map; + }; +} + // Per-signal layout driving file names, envelope JSON keys, routing, and counts. const ENVELOPES = [ { @@ -75,7 +84,7 @@ const ENVELOPES = [ resourceKey: "resourceLogs", scopeKey: "scopeLogs", recordsKey: "logRecords", - toRecords: (e: TelemetryEvent) => [toOtlpLogRecord(e)], + toRecords: (e: TelemetryEvent, _state: ExportState) => [toOtlpLogRecord(e)], }, { signal: "trace", @@ -84,7 +93,7 @@ const ENVELOPES = [ resourceKey: "resourceSpans", scopeKey: "scopeSpans", recordsKey: "spans", - toRecords: (e: TelemetryEvent) => [toOtlpSpan(e)], + toRecords: (e: TelemetryEvent, _state: ExportState) => [toOtlpSpan(e)], }, { signal: "metric", @@ -102,7 +111,7 @@ const ENVELOPES = [ resourceKey: string; scopeKey: string; recordsKey: string; - toRecords: (event: TelemetryEvent) => readonly unknown[]; + toRecords: (event: TelemetryEvent, state: ExportState) => readonly unknown[]; }>; async function writeOtlpJsonFiles( @@ -111,17 +120,20 @@ async function writeOtlpJsonFiles( context: TelemetryContext, ): Promise { const resource = JSON.stringify(toOtlpResource(context)); - const scope = JSON.stringify(toOtlpScope()); + const scope = JSON.stringify(toOtlpScope(context.extensionVersion)); const entries = await Promise.all( ENVELOPES.map(async (envelope) => ({ ...envelope, writer: await openEnvelope( path.join(dir, envelope.file), - `{"${envelope.resourceKey}":[{"resource":${resource},"${envelope.scopeKey}":[{"scope":${scope},"${envelope.recordsKey}":[`, + `{"${envelope.resourceKey}":[{"resource":${resource},"schemaUrl":"${OTLP_SCHEMA_URL}","${envelope.scopeKey}":[{"scope":${scope},"schemaUrl":"${OTLP_SCHEMA_URL}","${envelope.recordsKey}":[`, "]}]}]}\n", ), })), ); + const state: ExportState = { + cumulative: { startTimeUnixNano: undefined, totals: new Map() }, + }; const counts: Record = { logs: 0, traces: 0, @@ -136,7 +148,7 @@ async function writeOtlpJsonFiles( continue; } counts[entry.counter] += 1; - for (const record of entry.toRecords(event)) { + for (const record of entry.toRecords(event, state)) { await entry.writer.write(record); } } @@ -209,11 +221,11 @@ function toOtlpResource(context: TelemetryContext): JsonObject { attributes: keyValues({ "service.name": "coder-vscode-extension", "service.version": context.extensionVersion, - "coder.machine.id": context.machineId, - "coder.session.id": context.sessionId, + "service.instance.id": context.sessionId, + "host.id": context.machineId, + "host.arch": context.hostArch, "os.type": context.osType, "os.version": context.osVersion, - "host.arch": context.hostArch, "vscode.platform.name": context.platformName, "vscode.platform.version": context.platformVersion, "coder.deployment.url": context.deploymentUrl, @@ -222,8 +234,8 @@ function toOtlpResource(context: TelemetryContext): JsonObject { } /** OTLP `InstrumentationScope` shared by every record. */ -function toOtlpScope(): JsonObject { - return { name: "coder.vscode-coder.telemetry.export" }; +function toOtlpScope(version: string): JsonObject { + return { name: "coder.vscode-coder.telemetry.export", version }; } function toOtlpLogRecord(event: TelemetryEvent): JsonObject { @@ -291,11 +303,14 @@ function spanStatus(event: TelemetryEvent): JsonObject { } /** A metric event yields one record per measurement (http.requests fans out). */ -function toOtlpMetricRecords(event: TelemetryEvent): JsonObject[] { +function toOtlpMetricRecords( + event: TelemetryEvent, + state: ExportState, +): JsonObject[] { const timeUnixNano = toUnixNano(event.timestamp); const attributes = metricAttributes(event); if (event.eventName === "http.requests") { - return toHttpRequestMetrics(event, timeUnixNano, attributes); + return toHttpRequestMetrics(event, timeUnixNano, attributes, state); } return toGaugeMetrics( event, @@ -309,6 +324,7 @@ function toHttpRequestMetrics( event: TelemetryEvent, timeUnixNano: string, attributes: JsonObject[], + state: ExportState, ): JsonObject[] { const counts: Array<[string, number]> = []; const gauges: Array<[string, number]> = []; @@ -322,38 +338,64 @@ function toHttpRequestMetrics( gauges.push([name, value]); } } - const startTimeUnixNano = String( + const windowStartUnixNano = String( BigInt(timeUnixNano) - nanosFromSeconds(windowSeconds ?? 0), ); - - return [ - ...counts.map(([name, value]) => ({ + // Cumulative series anchor: first observed window's start, stable across + // every subsequent http.requests data point in this export. + state.cumulative.startTimeUnixNano ??= windowStartUnixNano; + const cumulativeStart = state.cumulative.startTimeUnixNano; + + const sumRecords: JsonObject[] = []; + for (const [name, value] of counts) { + const seriesKey = `${name}|${stableSeriesKey(event.properties)}`; + const total = + (state.cumulative.totals.get(seriesKey) ?? 0n) + + BigInt(Math.trunc(value)); + state.cumulative.totals.set(seriesKey, total); + // Suppress unchanged-from-zero counters to save bytes; receivers + // interpret an absent series as "no events yet". + if (total === 0n) { + continue; + } + sumRecords.push({ name: `${event.eventName}.${name}`, description: event.eventName, unit: "{request}", sum: { - aggregationTemporality: AGGREGATION_TEMPORALITY_DELTA, + aggregationTemporality: AGGREGATION_TEMPORALITY_CUMULATIVE, isMonotonic: true, dataPoints: [ { attributes, - startTimeUnixNano, + startTimeUnixNano: cumulativeStart, timeUnixNano, - asInt: String(Math.trunc(value)), + asInt: String(total), }, ], }, - })), + }); + } + + return [ + ...sumRecords, ...toGaugeMetrics( event, gauges, attributes, timeUnixNano, - startTimeUnixNano, + windowStartUnixNano, ), ]; } +function stableSeriesKey(properties: Readonly>): string { + return Object.entries(properties) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([k, v]) => `${k}=${v}`) + .join("|"); +} + function toGaugeMetrics( event: TelemetryEvent, measurements: Array<[string, number]>, diff --git a/test/unit/telemetry/export/writers/otlp.test.ts b/test/unit/telemetry/export/writers/otlp.test.ts index 53aa3a9d1..f4fff9585 100644 --- a/test/unit/telemetry/export/writers/otlp.test.ts +++ b/test/unit/telemetry/export/writers/otlp.test.ts @@ -52,12 +52,19 @@ interface TracesFile { resourceSpans: [{ scopeSpans: [{ spans: RawRecord[] }] }]; } interface MetricsFile { - resourceMetrics: [{ scopeMetrics: [{ metrics: RawRecord[] }] }]; + resourceMetrics: [ + { + resource: { attributes: unknown }; + scopeMetrics: [{ scope: RawRecord; metrics: RawRecord[] }]; + }, + ]; } interface Captured { counts: Awaited>; logResource: { attributes: unknown }; + metricsResource: { attributes: unknown }; + logsScope: RawRecord; logs: RawRecord[]; spans: RawRecord[]; metrics: RawRecord[]; @@ -84,6 +91,8 @@ async function capture(events: readonly TelemetryEvent[]): Promise { return { counts, logResource: logs.resource, + metricsResource: metrics.resource, + logsScope: (logs.scopeLogs[0] as unknown as { scope: RawRecord }).scope, logs: logs.scopeLogs[0].logRecords, spans: traces.scopeSpans[0].spans, metrics: metrics.scopeMetrics[0].metrics, @@ -91,20 +100,37 @@ async function capture(events: readonly TelemetryEvent[]): Promise { } describe("writeOtlpZipExport: resource", () => { - it("emits service, OS, host, vscode, and coder attributes from the context", async () => { - const { logResource } = await capture([makeEvent()]); - - expect(attrs(logResource.attributes)).toEqual({ - "service.name": "coder-vscode-extension", - "service.version": "1.14.5", - "coder.machine.id": "machine-id", - "coder.session.id": "session-id", - "os.type": "linux", - "os.version": "6.0.0", - "host.arch": "x64", - "vscode.platform.name": "Visual Studio Code", - "vscode.platform.version": "1.106.0", - "coder.deployment.url": "https://coder.example.com", + const EXPECTED_RESOURCE = { + "service.name": "coder-vscode-extension", + "service.version": "1.14.5", + "service.instance.id": "session-id", + "host.id": "machine-id", + "host.arch": "x64", + "os.type": "linux", + "os.version": "6.0.0", + "vscode.platform.name": "Visual Studio Code", + "vscode.platform.version": "1.106.0", + "coder.deployment.url": "https://coder.example.com", + }; + + it("uses OTel-standard semconv keys on the resource", async () => { + const { logResource, metricsResource } = await capture([ + makeEvent(), + makeEvent({ + eventName: "ssh.network.sampled", + measurements: { latencyMs: 1 }, + }), + ]); + + expect(attrs(logResource.attributes)).toEqual(EXPECTED_RESOURCE); + expect(attrs(metricsResource.attributes)).toEqual(EXPECTED_RESOURCE); + }); + + it("stamps each scope with the extension version", async () => { + const { logsScope } = await capture([makeEvent()]); + expect(logsScope).toMatchObject({ + name: "coder.vscode-coder.telemetry.export", + version: "1.14.5", }); }); }); @@ -244,40 +270,83 @@ describe("writeOtlpZipExport: metrics", () => { }); }); - it("splits http.requests into monotonic count sums and gauges sharing the window", async () => { + it("emits http.requests counts as cumulative monotonic sums with a stable start time", async () => { + const { metrics } = await capture([ + makeEvent({ + eventName: "http.requests", + properties: { method: "GET", route: "/a" }, + timestamp: "2026-05-04T12:01:00.000Z", + measurements: { window_seconds: 60, count_2xx: 2 }, + }), + makeEvent({ + eventName: "http.requests", + properties: { method: "GET", route: "/a" }, + timestamp: "2026-05-04T12:02:00.000Z", + measurements: { window_seconds: 60, count_2xx: 3 }, + }), + ]); + const counts = metrics.map( + (m) => + m.sum as { + aggregationTemporality: number; + isMonotonic: boolean; + dataPoints: [ + { + asInt: string; + startTimeUnixNano: string; + timeUnixNano: string; + }, + ]; + }, + ); + + expect(counts.map((c) => c.dataPoints[0].asInt)).toEqual(["2", "5"]); + expect(counts.every((c) => c.aggregationTemporality === 2)).toBe(true); + expect(counts.every((c) => c.isMonotonic)).toBe(true); + // startTimeUnixNano is set once on the first event and stays fixed. + expect(counts[1].dataPoints[0].startTimeUnixNano).toBe( + counts[0].dataPoints[0].startTimeUnixNano, + ); + }); + + it("keeps gauges windowed alongside the cumulative count sums", async () => { const { metrics } = await capture([ makeEvent({ eventName: "http.requests", measurements: { window_seconds: 60, count_2xx: 2, p95_duration_ms: 42 }, }), ]); - const count = metrics[0].sum as { - aggregationTemporality: number; - isMonotonic: boolean; - dataPoints: [ - { asInt: string; startTimeUnixNano: string; timeUnixNano: string }, - ]; - }; - const p95Point = ( - metrics[1].gauge as { dataPoints: [{ startTimeUnixNano: string }] } - ).dataPoints[0]; expect(metrics.map((m) => [m.name, m.unit])).toEqual([ ["http.requests.count_2xx", "{request}"], ["http.requests.p95_duration_ms", "ms"], ]); - expect(count).toMatchObject({ - aggregationTemporality: 1, - isMonotonic: true, - }); - expect(count.dataPoints[0].asInt).toBe("2"); + const p95Point = ( + metrics[1].gauge as { + dataPoints: [{ startTimeUnixNano: string; timeUnixNano: string }]; + } + ).dataPoints[0]; expect( - BigInt(count.dataPoints[0].timeUnixNano) - - BigInt(count.dataPoints[0].startTimeUnixNano), + BigInt(p95Point.timeUnixNano) - BigInt(p95Point.startTimeUnixNano), ).toBe(60_000_000_000n); - expect(p95Point.startTimeUnixNano).toBe( - count.dataPoints[0].startTimeUnixNano, - ); + }); + + it("suppresses zero-valued cumulative counters", async () => { + const { metrics } = await capture([ + makeEvent({ + eventName: "http.requests", + measurements: { + window_seconds: 60, + count_2xx: 0, + count_5xx: 0, + p95_duration_ms: 10, + }, + }), + ]); + + expect(metrics.map((m) => m.name)).toEqual([ + "http.requests.p95_duration_ms", + ]); }); it("treats http.requests without window_seconds as a zero-width window", async () => { From 5f665609a0dffb41a44b060508f4013a90418316 Mon Sep 17 00:00:00 2001 From: Ehab Younes Date: Fri, 22 May 2026 17:59:02 +0300 Subject: [PATCH 4/4] refactor(telemetry): split OTLP writer into per-layer modules Group OTLP code into src/telemetry/export/writers/otlp/ with one file per concern, and hoist metric classification to src/telemetry/export/metrics.ts so it can be shared by future writers. Layout: - metrics.ts: domain (isMetricEvent, describeMetricEvent, units) - otlp/writer.ts: public API + orchestration (signal routing, packZip) - otlp/envelope.ts: streaming JSON envelope writer + error wrapping - otlp/records.ts: log/span/metric record builders + helpers - otlp/types.ts: OTLP wire-format interfaces (pinned to proto v1.10.0) Safety fixes surfaced by the layer split: - envelope: listen for stream 'error' events so failed opens reject pending writes instead of hanging; close() is idempotent; suffix-write failures during close are labeled as close failures. - writer: on loop failure, use Promise.allSettled to close streams without masking the original error. - records: coerce NaN/Infinity numeric inputs to 0n instead of throwing from BigInt(); clamp startTimeUnixNano <= timeUnixNano for out-of-order events; include eventName in the cumulative-series key. Each layer has its own test file; total grows from 99 to 131 cases. --- src/telemetry/export/metrics.ts | 82 +++ src/telemetry/export/signal.ts | 24 - src/telemetry/export/writers/otlp.ts | 479 ------------------ src/telemetry/export/writers/otlp/envelope.ts | 97 ++++ src/telemetry/export/writers/otlp/records.ts | 269 ++++++++++ src/telemetry/export/writers/otlp/types.ts | 74 +++ src/telemetry/export/writers/otlp/writer.ts | 185 +++++++ test/unit/telemetry/export/metrics.test.ts | 95 ++++ .../telemetry/export/writers/otlp.test.ts | 424 ---------------- .../export/writers/otlp/envelope.test.ts | 120 +++++ .../export/writers/otlp/records.test.ts | 347 +++++++++++++ .../export/writers/otlp/writer.test.ts | 198 ++++++++ 12 files changed, 1467 insertions(+), 927 deletions(-) create mode 100644 src/telemetry/export/metrics.ts delete mode 100644 src/telemetry/export/signal.ts delete mode 100644 src/telemetry/export/writers/otlp.ts create mode 100644 src/telemetry/export/writers/otlp/envelope.ts create mode 100644 src/telemetry/export/writers/otlp/records.ts create mode 100644 src/telemetry/export/writers/otlp/types.ts create mode 100644 src/telemetry/export/writers/otlp/writer.ts create mode 100644 test/unit/telemetry/export/metrics.test.ts delete mode 100644 test/unit/telemetry/export/writers/otlp.test.ts create mode 100644 test/unit/telemetry/export/writers/otlp/envelope.test.ts create mode 100644 test/unit/telemetry/export/writers/otlp/records.test.ts create mode 100644 test/unit/telemetry/export/writers/otlp/writer.test.ts diff --git a/src/telemetry/export/metrics.ts b/src/telemetry/export/metrics.ts new file mode 100644 index 000000000..6b7f34daf --- /dev/null +++ b/src/telemetry/export/metrics.ts @@ -0,0 +1,82 @@ +import type { TelemetryEvent } from "../event"; + +/** One measurement, classified for export. */ +export interface MetricMeasurement { + readonly name: string; + readonly value: number; + readonly kind: "gauge" | "counter"; + /** OTel/UCUM unit (e.g. "ms", "Mbit/s", "{request}", "1"). */ + readonly unit: string; +} + +/** + * Typed view of a metric event. `windowSeconds` is set on windowed events + * (`http.requests`) and absent on point-in-time samples; exporters use it to + * stamp gauge start times and anchor cumulative counters. + */ +export interface MetricDescriptor { + readonly windowSeconds?: number; + readonly measurements: readonly MetricMeasurement[]; +} + +// Single source of truth for which event names are metric series. +const METRIC_EVENT_NAMES: ReadonlySet = new Set([ + "http.requests", + "ssh.network.info", + "ssh.network.sampled", +]); + +export function isMetricEvent(event: TelemetryEvent): boolean { + return METRIC_EVENT_NAMES.has(event.eventName); +} + +/** Typed layout for a metric event, or `undefined` if it isn't a metric. */ +export function describeMetricEvent( + event: TelemetryEvent, +): MetricDescriptor | undefined { + if (!isMetricEvent(event)) { + return undefined; + } + if (event.eventName === "http.requests") { + return describeHttpRequests(event); + } + return { + measurements: Object.entries(event.measurements).map(([name, value]) => ({ + name, + value, + kind: "gauge", + unit: measurementUnit(name), + })), + }; +} + +// `window_seconds` is metadata, `count_*` are cumulative counters, the rest gauges. +function describeHttpRequests(event: TelemetryEvent): MetricDescriptor { + let windowSeconds = 0; + const measurements: MetricMeasurement[] = []; + for (const [name, value] of Object.entries(event.measurements)) { + if (name === "window_seconds") { + windowSeconds = value; + } else if (name.startsWith("count_")) { + measurements.push({ name, value, kind: "counter", unit: "{request}" }); + } else { + measurements.push({ + name, + value, + kind: "gauge", + unit: measurementUnit(name), + }); + } + } + return { windowSeconds, measurements }; +} + +function measurementUnit(name: string): string { + if (name.endsWith("_ms") || name.endsWith("Ms")) { + return "ms"; + } + if (name.endsWith("Mbits")) { + return "Mbit/s"; + } + return "1"; +} diff --git a/src/telemetry/export/signal.ts b/src/telemetry/export/signal.ts deleted file mode 100644 index b580dbc29..000000000 --- a/src/telemetry/export/signal.ts +++ /dev/null @@ -1,24 +0,0 @@ -import type { TelemetryEvent } from "../event"; - -/** Telemetry signal an event maps to in an export. */ -export type ExportSignal = "log" | "trace" | "metric"; - -const METRIC_EVENT_NAMES = new Set([ - "http.requests", - "ssh.network.info", - "ssh.network.sampled", -]); - -/** - * Classify a `TelemetryEvent` for export. Metric-shaped event names map to - * `metric`; everything with a `traceId` is a `trace`; otherwise `log`. - */ -export function classifyEvent(event: TelemetryEvent): ExportSignal { - if (METRIC_EVENT_NAMES.has(event.eventName)) { - return "metric"; - } - if (event.traceId !== undefined) { - return "trace"; - } - return "log"; -} diff --git a/src/telemetry/export/writers/otlp.ts b/src/telemetry/export/writers/otlp.ts deleted file mode 100644 index 68e14969b..000000000 --- a/src/telemetry/export/writers/otlp.ts +++ /dev/null @@ -1,479 +0,0 @@ -import { SpanKind, SpanStatusCode } from "@opentelemetry/api"; -import { SeverityNumber } from "@opentelemetry/api-logs"; -import { zip } from "fflate"; -import { createWriteStream } from "node:fs"; -import * as fs from "node:fs/promises"; -import * as path from "node:path"; -import { promisify } from "node:util"; - -import { writeAtomically } from "../../../util/fs"; -import { parseTelemetryTimestampMs } from "../range"; -import { classifyEvent, type ExportSignal } from "../signal"; - -import type { TelemetryContext, TelemetryEvent } from "../../event"; - -type JsonPrimitive = string | number | boolean | null; -type JsonValue = - | JsonPrimitive - | JsonValue[] - | { readonly [key: string]: JsonValue }; -type JsonObject = Record; -type EventError = NonNullable; - -export interface OtlpExportCounts { - readonly logs: number; - readonly traces: number; - readonly metrics: number; -} - -// OTLP proto SpanKind reserves 0 for UNSPECIFIED, so api values shift by 1 -// on the wire. AGGREGATION_TEMPORALITY has no enum in @opentelemetry/api. -const otlpSpanKind = (kind: SpanKind): number => kind + 1; -const AGGREGATION_TEMPORALITY_CUMULATIVE = 2; -const OTLP_SCHEMA_URL = "https://opentelemetry.io/schemas/1.24.0"; -const zipAsync = promisify(zip); - -/** - * Writes `events` as an OTLP/JSON zip (`logs.json`, `traces.json`, - * `metrics.json`) to `outputPath`. Each file groups all records of one - * signal under a single Resource block built from `context`. Events stream - * into a sibling staging directory with backpressure; the directory is then - * packed in-memory and the zip is atomically renamed onto `outputPath`. - * `onCleanupError` is forwarded to `writeAtomically`. - */ -export async function writeOtlpZipExport( - outputPath: string, - events: AsyncIterable, - context: TelemetryContext, - onCleanupError: (err: unknown, tempPath: string) => void, -): Promise { - return writeAtomically( - outputPath, - async (zipPath) => { - const stagingDir = await fs.mkdtemp(`${outputPath}.staging-`); - try { - const counts = await writeOtlpJsonFiles(stagingDir, events, context); - await packZip( - zipPath, - stagingDir, - ENVELOPES.map((e) => e.file), - ); - return counts; - } finally { - await fs.rm(stagingDir, { recursive: true, force: true }); - } - }, - onCleanupError, - ); -} - -/** Per-export state threaded through the metric mapper for cumulative sums. */ -interface ExportState { - readonly cumulative: { - startTimeUnixNano: string | undefined; - readonly totals: Map; - }; -} - -// Per-signal layout driving file names, envelope JSON keys, routing, and counts. -const ENVELOPES = [ - { - signal: "log", - counter: "logs", - file: "logs.json", - resourceKey: "resourceLogs", - scopeKey: "scopeLogs", - recordsKey: "logRecords", - toRecords: (e: TelemetryEvent, _state: ExportState) => [toOtlpLogRecord(e)], - }, - { - signal: "trace", - counter: "traces", - file: "traces.json", - resourceKey: "resourceSpans", - scopeKey: "scopeSpans", - recordsKey: "spans", - toRecords: (e: TelemetryEvent, _state: ExportState) => [toOtlpSpan(e)], - }, - { - signal: "metric", - counter: "metrics", - file: "metrics.json", - resourceKey: "resourceMetrics", - scopeKey: "scopeMetrics", - recordsKey: "metrics", - toRecords: toOtlpMetricRecords, - }, -] as const satisfies ReadonlyArray<{ - signal: ExportSignal; - counter: keyof OtlpExportCounts; - file: string; - resourceKey: string; - scopeKey: string; - recordsKey: string; - toRecords: (event: TelemetryEvent, state: ExportState) => readonly unknown[]; -}>; - -async function writeOtlpJsonFiles( - dir: string, - events: AsyncIterable, - context: TelemetryContext, -): Promise { - const resource = JSON.stringify(toOtlpResource(context)); - const scope = JSON.stringify(toOtlpScope(context.extensionVersion)); - const entries = await Promise.all( - ENVELOPES.map(async (envelope) => ({ - ...envelope, - writer: await openEnvelope( - path.join(dir, envelope.file), - `{"${envelope.resourceKey}":[{"resource":${resource},"schemaUrl":"${OTLP_SCHEMA_URL}","${envelope.scopeKey}":[{"scope":${scope},"schemaUrl":"${OTLP_SCHEMA_URL}","${envelope.recordsKey}":[`, - "]}]}]}\n", - ), - })), - ); - const state: ExportState = { - cumulative: { startTimeUnixNano: undefined, totals: new Map() }, - }; - const counts: Record = { - logs: 0, - traces: 0, - metrics: 0, - }; - - try { - for await (const event of events) { - const signal = classifyEvent(event); - const entry = entries.find((e) => e.signal === signal); - if (!entry) { - continue; - } - counts[entry.counter] += 1; - for (const record of entry.toRecords(event, state)) { - await entry.writer.write(record); - } - } - } finally { - await Promise.all(entries.map((e) => e.writer.close())); - } - - return counts; -} - -interface EnvelopeWriter { - write(value: unknown): Promise; - close(): Promise; -} - -/** Streams a `v1,v2,...` JSON envelope to disk. */ -async function openEnvelope( - filePath: string, - prefix: string, - suffix: string, -): Promise { - const stream = createWriteStream(filePath, { encoding: "utf8" }); - await writeChunk(stream, prefix); - let written = 0; - - return { - async write(value) { - await writeChunk( - stream, - (written === 0 ? "" : ",") + JSON.stringify(value), - ); - written += 1; - }, - async close() { - await writeChunk(stream, suffix); - await new Promise((resolve, reject) => { - stream.end((err?: Error | null) => (err ? reject(err) : resolve())); - }); - }, - }; -} - -function writeChunk( - stream: NodeJS.WritableStream, - chunk: string, -): Promise { - return new Promise((resolve, reject) => { - stream.write(chunk, "utf8", (err) => (err ? reject(err) : resolve())); - }); -} - -async function packZip( - outputPath: string, - sourceDir: string, - names: readonly string[], -): Promise { - const entries = await Promise.all( - names.map( - async (name) => - [name, await fs.readFile(path.join(sourceDir, name))] as const, - ), - ); - const archive = await zipAsync(Object.fromEntries(entries)); - await fs.writeFile(outputPath, archive); -} - -/** OTLP `Resource` block built once per export from the session context. */ -function toOtlpResource(context: TelemetryContext): JsonObject { - return { - attributes: keyValues({ - "service.name": "coder-vscode-extension", - "service.version": context.extensionVersion, - "service.instance.id": context.sessionId, - "host.id": context.machineId, - "host.arch": context.hostArch, - "os.type": context.osType, - "os.version": context.osVersion, - "vscode.platform.name": context.platformName, - "vscode.platform.version": context.platformVersion, - "coder.deployment.url": context.deploymentUrl, - }), - }; -} - -/** OTLP `InstrumentationScope` shared by every record. */ -function toOtlpScope(version: string): JsonObject { - return { name: "coder.vscode-coder.telemetry.export", version }; -} - -function toOtlpLogRecord(event: TelemetryEvent): JsonObject { - const timeUnixNano = toUnixNano(event.timestamp); - const errored = event.error !== undefined; - return { - timeUnixNano, - observedTimeUnixNano: timeUnixNano, - severityNumber: errored ? SeverityNumber.ERROR : SeverityNumber.INFO, - severityText: errored ? "ERROR" : "INFO", - body: { stringValue: event.eventName }, - attributes: keyValues({ - ...event.properties, - ...event.measurements, - ...(event.error && exceptionAttributes(event.error)), - }), - }; -} - -function toOtlpSpan(event: TelemetryEvent): JsonObject { - const endTimeUnixNano = toUnixNano(event.timestamp); - const startTimeUnixNano = String( - BigInt(endTimeUnixNano) - nanosFromMs(event.measurements.durationMs ?? 0), - ); - - return { - traceId: event.traceId ?? "", - spanId: event.eventId, - ...(event.parentEventId !== undefined && { - parentSpanId: event.parentEventId, - }), - name: event.eventName, - kind: otlpSpanKind(SpanKind.INTERNAL), - startTimeUnixNano, - endTimeUnixNano, - attributes: keyValues({ - "coder.event_name": event.eventName, - ...event.properties, - ...withoutKey(event.measurements, "durationMs"), - }), - status: spanStatus(event), - ...(event.error && { - events: [ - { - name: "exception", - timeUnixNano: endTimeUnixNano, - attributes: keyValues(exceptionAttributes(event.error)), - }, - ], - }), - }; -} - -function spanStatus(event: TelemetryEvent): JsonObject { - if (event.properties.result === "success") { - return { code: SpanStatusCode.OK }; - } - if (event.properties.result === "error" || event.error !== undefined) { - return { - code: SpanStatusCode.ERROR, - ...(event.error && { message: event.error.message }), - }; - } - return { code: SpanStatusCode.UNSET }; -} - -/** A metric event yields one record per measurement (http.requests fans out). */ -function toOtlpMetricRecords( - event: TelemetryEvent, - state: ExportState, -): JsonObject[] { - const timeUnixNano = toUnixNano(event.timestamp); - const attributes = metricAttributes(event); - if (event.eventName === "http.requests") { - return toHttpRequestMetrics(event, timeUnixNano, attributes, state); - } - return toGaugeMetrics( - event, - Object.entries(event.measurements), - attributes, - timeUnixNano, - ); -} - -function toHttpRequestMetrics( - event: TelemetryEvent, - timeUnixNano: string, - attributes: JsonObject[], - state: ExportState, -): JsonObject[] { - const counts: Array<[string, number]> = []; - const gauges: Array<[string, number]> = []; - let windowSeconds: number | undefined; - for (const [name, value] of Object.entries(event.measurements)) { - if (name === "window_seconds") { - windowSeconds = value; - } else if (name.startsWith("count_")) { - counts.push([name, value]); - } else { - gauges.push([name, value]); - } - } - const windowStartUnixNano = String( - BigInt(timeUnixNano) - nanosFromSeconds(windowSeconds ?? 0), - ); - // Cumulative series anchor: first observed window's start, stable across - // every subsequent http.requests data point in this export. - state.cumulative.startTimeUnixNano ??= windowStartUnixNano; - const cumulativeStart = state.cumulative.startTimeUnixNano; - - const sumRecords: JsonObject[] = []; - for (const [name, value] of counts) { - const seriesKey = `${name}|${stableSeriesKey(event.properties)}`; - const total = - (state.cumulative.totals.get(seriesKey) ?? 0n) + - BigInt(Math.trunc(value)); - state.cumulative.totals.set(seriesKey, total); - // Suppress unchanged-from-zero counters to save bytes; receivers - // interpret an absent series as "no events yet". - if (total === 0n) { - continue; - } - sumRecords.push({ - name: `${event.eventName}.${name}`, - description: event.eventName, - unit: "{request}", - sum: { - aggregationTemporality: AGGREGATION_TEMPORALITY_CUMULATIVE, - isMonotonic: true, - dataPoints: [ - { - attributes, - startTimeUnixNano: cumulativeStart, - timeUnixNano, - asInt: String(total), - }, - ], - }, - }); - } - - return [ - ...sumRecords, - ...toGaugeMetrics( - event, - gauges, - attributes, - timeUnixNano, - windowStartUnixNano, - ), - ]; -} - -function stableSeriesKey(properties: Readonly>): string { - return Object.entries(properties) - .sort(([a], [b]) => a.localeCompare(b)) - .map(([k, v]) => `${k}=${v}`) - .join("|"); -} - -function toGaugeMetrics( - event: TelemetryEvent, - measurements: Array<[string, number]>, - attributes: JsonObject[], - timeUnixNano: string, - startTimeUnixNano?: string, -): JsonObject[] { - return measurements.map(([name, value]) => ({ - name: `${event.eventName}.${name}`, - description: event.eventName, - unit: metricUnit(name), - gauge: { - dataPoints: [ - { - attributes, - ...(startTimeUnixNano !== undefined && { startTimeUnixNano }), - timeUnixNano, - asDouble: value, - }, - ], - }, - })); -} - -function metricAttributes(event: TelemetryEvent): JsonObject[] { - return keyValues({ - "coder.event_name": event.eventName, - ...event.properties, - }); -} - -function metricUnit(measurementName: string): string { - if (measurementName.endsWith("_ms") || measurementName.endsWith("Ms")) { - return "ms"; - } - if (measurementName.endsWith("Mbits")) { - return "Mbit/s"; - } - return "1"; -} - -function exceptionAttributes(error: EventError): Record { - return { - "exception.message": error.message, - ...(error.type !== undefined && { "exception.type": error.type }), - ...(error.code !== undefined && { "exception.code": error.code }), - }; -} - -function keyValues( - values: Readonly>, -): JsonObject[] { - return Object.entries(values).map(([key, value]) => { - const otlpValue: JsonObject = - typeof value === "number" - ? { doubleValue: value } - : { stringValue: value }; - return { key, value: otlpValue }; - }); -} - -function withoutKey( - values: Readonly>, - excluded: string, -): Record { - return Object.fromEntries( - Object.entries(values).filter(([name]) => name !== excluded), - ); -} - -function toUnixNano(timestamp: string): string { - return String(BigInt(parseTelemetryTimestampMs(timestamp)) * 1_000_000n); -} - -function nanosFromMs(ms: number): bigint { - return BigInt(Math.max(0, Math.round(ms * 1e6))); -} - -function nanosFromSeconds(seconds: number): bigint { - return BigInt(Math.max(0, Math.round(seconds * 1e9))); -} diff --git a/src/telemetry/export/writers/otlp/envelope.ts b/src/telemetry/export/writers/otlp/envelope.ts new file mode 100644 index 000000000..82f2b9797 --- /dev/null +++ b/src/telemetry/export/writers/otlp/envelope.ts @@ -0,0 +1,97 @@ +import { createWriteStream } from "node:fs"; + +import { toError } from "../../../../error/errorUtils"; + +/** Append-only writer for one OTLP/JSON envelope file. `append` is not re-entrant. */ +export interface EnvelopeFile { + append(value: unknown): Promise; + close(): Promise; +} + +/** Streams `v1,v2,...` JSON into `filePath`. */ +export async function openEnvelopeFile( + filePath: string, + prefix: string, + suffix: string, +): Promise { + const stream = createWriteStream(filePath, { encoding: "utf8" }); + // Open failures (ENOENT/EACCES) surface as 'error' events, not write + // callbacks; capture them so pending writes reject instead of hanging. + let asyncError: Error | undefined; + stream.once("error", (err) => { + asyncError ??= err; + }); + + await write(stream, prefix, filePath, () => asyncError); + let written = 0; + let closed = false; + return { + async append(value) { + await write( + stream, + (written === 0 ? "" : ",") + JSON.stringify(value), + filePath, + () => asyncError, + ); + written += 1; + }, + async close() { + if (closed) { + return; + } + closed = true; + try { + await write(stream, suffix, filePath, () => asyncError); + } catch (err) { + // Re-label suffix-write failures as a close failure. + const inner = (err as { cause?: unknown }).cause; + const msg = + inner instanceof Error ? inner.message : toError(err).message; + throw new Error(`Failed to close ${filePath}: ${msg}`, { cause: err }); + } + await new Promise((resolve, reject) => { + stream.end((err?: Error | null) => { + const failure = err ?? asyncError; + if (failure) { + reject( + new Error(`Failed to close ${filePath}: ${failure.message}`, { + cause: failure, + }), + ); + } else { + resolve(); + } + }); + }); + }, + }; +} + +function write( + stream: NodeJS.WritableStream, + chunk: string, + filePath: string, + asyncError: () => Error | undefined, +): Promise { + return new Promise((resolve, reject) => { + const reject_ = (err: unknown) => + reject( + new Error(`Failed to write ${filePath}: ${toError(err).message}`, { + cause: err, + }), + ); + const existing = asyncError(); + if (existing) { + reject_(existing); + return; + } + stream.write(chunk, "utf8", (err) => { + const failure = err ?? asyncError(); + if (failure) { + reject_(failure); + } else { + resolve(); + } + }); + }); +} diff --git a/src/telemetry/export/writers/otlp/records.ts b/src/telemetry/export/writers/otlp/records.ts new file mode 100644 index 000000000..94c58f51e --- /dev/null +++ b/src/telemetry/export/writers/otlp/records.ts @@ -0,0 +1,269 @@ +import { SpanKind, SpanStatusCode } from "@opentelemetry/api"; +import { SeverityNumber } from "@opentelemetry/api-logs"; + +import { type MetricDescriptor, type MetricMeasurement } from "../../metrics"; +import { parseTelemetryTimestampMs } from "../../range"; + +import type { TelemetryContext, TelemetryEvent } from "../../../event"; + +import type { + OtlpKeyValue, + OtlpLogRecord, + OtlpMetric, + OtlpSpan, + OtlpStatus, +} from "./types"; + +/** Per-export state for cumulative HTTP counters. */ +export interface ExportState { + cumulativeStart: string | undefined; + readonly cumulativeTotals: Map; +} + +export function newExportState(): ExportState { + return { cumulativeStart: undefined, cumulativeTotals: new Map() }; +} + +/** OTLP `Resource`, one per export. */ +export function otlpResource(context: TelemetryContext) { + return { + attributes: keyValues({ + "service.name": "coder-vscode-extension", + "service.version": context.extensionVersion, + "service.instance.id": context.sessionId, + "host.id": context.machineId, + "host.arch": context.hostArch, + "os.type": context.osType, + "os.version": context.osVersion, + "vscode.platform.name": context.platformName, + "vscode.platform.version": context.platformVersion, + "coder.deployment.url": context.deploymentUrl, + }), + }; +} + +/** OTLP `InstrumentationScope`, shared by every record. */ +export function otlpScope(version: string) { + return { name: "coder.vscode-coder.telemetry.export", version }; +} + +export function logRecord(event: TelemetryEvent): OtlpLogRecord { + const timeUnixNano = toUnixNano(event.timestamp); + const errored = event.error !== undefined; + return { + timeUnixNano, + observedTimeUnixNano: timeUnixNano, + severityNumber: errored ? SeverityNumber.ERROR : SeverityNumber.INFO, + severityText: errored ? "ERROR" : "INFO", + body: { stringValue: event.eventName }, + attributes: keyValues({ + ...event.properties, + ...event.measurements, + ...(event.error && exceptionAttributes(event.error)), + }), + }; +} + +export function spanRecord(event: TelemetryEvent): OtlpSpan { + const endTimeUnixNano = toUnixNano(event.timestamp); + const startTimeUnixNano = String( + BigInt(endTimeUnixNano) - nanosFromMs(event.measurements.durationMs ?? 0), + ); + // durationMs is encoded as start/end times; don't repeat it as an attribute. + const { durationMs: _durationMs, ...measurements } = event.measurements; + return { + traceId: event.traceId ?? "", + spanId: event.eventId, + ...(event.parentEventId !== undefined && { + parentSpanId: event.parentEventId, + }), + name: event.eventName, + // OTLP proto SpanKind reserves 0 for UNSPECIFIED; api values shift by 1. + kind: SpanKind.INTERNAL + 1, + startTimeUnixNano, + endTimeUnixNano, + attributes: keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + ...measurements, + }), + status: spanStatus(event), + ...(event.error && { + events: [ + { + name: "exception", + timeUnixNano: endTimeUnixNano, + attributes: keyValues(exceptionAttributes(event.error)), + }, + ], + }), + }; +} + +function spanStatus(event: TelemetryEvent): OtlpStatus { + if (event.properties.result === "success") { + return { code: SpanStatusCode.OK }; + } + if (event.properties.result === "error" || event.error !== undefined) { + return { + code: SpanStatusCode.ERROR, + ...(event.error && { message: event.error.message }), + }; + } + return { code: SpanStatusCode.UNSET }; +} + +/** Gauge and cumulative-sum records for one classified metric event. */ +export function metricRecords( + event: TelemetryEvent, + descriptor: MetricDescriptor, + state: ExportState, +): OtlpMetric[] { + const timeUnixNano = toUnixNano(event.timestamp); + const attributes = keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + }); + const windowStart = + descriptor.windowSeconds !== undefined + ? String( + BigInt(timeUnixNano) - nanosFromSeconds(descriptor.windowSeconds), + ) + : undefined; + // Anchor cumulative series on the first event seen; reused across the export. + state.cumulativeStart ??= windowStart ?? timeUnixNano; + + const records: OtlpMetric[] = []; + for (const m of descriptor.measurements) { + if (m.kind === "counter") { + const sum = cumulativeSum(event, m, attributes, timeUnixNano, state); + if (sum) { + records.push(sum); + } + } else { + records.push( + gaugeRecord(event.eventName, m, attributes, timeUnixNano, windowStart), + ); + } + } + return records; +} + +function gaugeRecord( + eventName: string, + measurement: MetricMeasurement, + attributes: readonly OtlpKeyValue[], + timeUnixNano: string, + startTimeUnixNano?: string, +): OtlpMetric { + return { + name: `${eventName}.${measurement.name}`, + description: eventName, + unit: measurement.unit, + gauge: { + dataPoints: [ + { + attributes, + ...(startTimeUnixNano !== undefined && { startTimeUnixNano }), + timeUnixNano, + asDouble: measurement.value, + }, + ], + }, + }; +} + +// No enum in @opentelemetry/api; 2 == CUMULATIVE. +const AGGREGATION_TEMPORALITY_CUMULATIVE = 2; + +function cumulativeSum( + event: TelemetryEvent, + measurement: MetricMeasurement, + attributes: readonly OtlpKeyValue[], + timeUnixNano: string, + state: ExportState, +): OtlpMetric | undefined { + // Clamp the anchor so out-of-order events can't emit startTime > timeTime. + const anchor = state.cumulativeStart ?? timeUnixNano; + const startTimeUnixNano = + BigInt(anchor) <= BigInt(timeUnixNano) ? anchor : timeUnixNano; + const key = `${event.eventName}|${measurement.name}|${seriesKey(event.properties)}`; + const total = + (state.cumulativeTotals.get(key) ?? 0n) + + toIntegerBigInt(measurement.value); + state.cumulativeTotals.set(key, total); + // Suppress zero counters; absence reads as "no events". + if (total === 0n) { + return undefined; + } + return { + name: `${event.eventName}.${measurement.name}`, + description: event.eventName, + unit: measurement.unit, + sum: { + aggregationTemporality: AGGREGATION_TEMPORALITY_CUMULATIVE, + isMonotonic: true, + dataPoints: [ + { attributes, startTimeUnixNano, timeUnixNano, asInt: String(total) }, + ], + }, + }; +} + +/** Stable key for property labels so identical labels share a series. */ +function seriesKey(properties: Readonly>): string { + return Object.entries(properties) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([k, v]) => `${k}=${v}`) + .join("|"); +} + +function exceptionAttributes( + error: NonNullable, +): Record { + return { + "exception.message": error.message, + ...(error.type !== undefined && { "exception.type": error.type }), + ...(error.code !== undefined && { "exception.code": error.code }), + }; +} + +function keyValues( + values: Readonly>, +): OtlpKeyValue[] { + return Object.entries(values).map(([key, value]) => ({ + key, + value: + typeof value === "number" + ? { doubleValue: value } + : { stringValue: value }, + })); +} + +function toUnixNano(timestamp: string): string { + return String(BigInt(parseTelemetryTimestampMs(timestamp)) * 1_000_000n); +} + +function nanosFromMs(ms: number): bigint { + return toNonNegativeBigInt(ms * 1e6); +} + +function nanosFromSeconds(seconds: number): bigint { + return toNonNegativeBigInt(seconds * 1e9); +} + +// Coerce non-finite/negative to 0n; round the rest. +function toNonNegativeBigInt(n: number): bigint { + if (!Number.isFinite(n) || n <= 0) { + return 0n; + } + return BigInt(Math.round(n)); +} + +// Counter increments must be integers; coerce NaN/Infinity to 0n. +function toIntegerBigInt(n: number): bigint { + if (!Number.isFinite(n)) { + return 0n; + } + return BigInt(Math.round(n)); +} diff --git a/src/telemetry/export/writers/otlp/types.ts b/src/telemetry/export/writers/otlp/types.ts new file mode 100644 index 000000000..697a5ddf9 --- /dev/null +++ b/src/telemetry/export/writers/otlp/types.ts @@ -0,0 +1,74 @@ +/** + * Local TypeScript shapes for the OTLP/JSON wire format. Upstream TS types + * live in `@opentelemetry/otlp-transformer`, which pulls in the OTel SDK; we + * mirror the proto schema instead so the writer stays SDK-free. + * + * Spec (opentelemetry-proto v1.10.0): + * https://github.com/open-telemetry/opentelemetry-proto/tree/ca839c51f706f5d53bfb46f06c3e90c3af3a52c6/opentelemetry/proto + */ + +export interface OtlpKeyValue { + readonly key: string; + readonly value: + | { readonly stringValue: string } + | { readonly doubleValue: number }; +} + +export interface OtlpLogRecord { + readonly timeUnixNano: string; + readonly observedTimeUnixNano: string; + readonly severityNumber: number; + readonly severityText: string; + readonly body: { readonly stringValue: string }; + readonly attributes: readonly OtlpKeyValue[]; +} + +export interface OtlpStatus { + readonly code: number; + readonly message?: string; +} + +export interface OtlpSpanEvent { + readonly name: string; + readonly timeUnixNano: string; + readonly attributes: readonly OtlpKeyValue[]; +} + +export interface OtlpSpan { + readonly traceId: string; + readonly spanId: string; + readonly parentSpanId?: string; + readonly name: string; + readonly kind: number; + readonly startTimeUnixNano: string; + readonly endTimeUnixNano: string; + readonly attributes: readonly OtlpKeyValue[]; + readonly status: OtlpStatus; + readonly events?: readonly OtlpSpanEvent[]; +} + +export interface OtlpGaugePoint { + readonly attributes: readonly OtlpKeyValue[]; + readonly startTimeUnixNano?: string; + readonly timeUnixNano: string; + readonly asDouble: number; +} + +export interface OtlpSumPoint { + readonly attributes: readonly OtlpKeyValue[]; + readonly startTimeUnixNano: string; + readonly timeUnixNano: string; + readonly asInt: string; +} + +export interface OtlpMetric { + readonly name: string; + readonly description: string; + readonly unit: string; + readonly gauge?: { readonly dataPoints: readonly OtlpGaugePoint[] }; + readonly sum?: { + readonly aggregationTemporality: number; + readonly isMonotonic: boolean; + readonly dataPoints: readonly OtlpSumPoint[]; + }; +} diff --git a/src/telemetry/export/writers/otlp/writer.ts b/src/telemetry/export/writers/otlp/writer.ts new file mode 100644 index 000000000..bc16a97dc --- /dev/null +++ b/src/telemetry/export/writers/otlp/writer.ts @@ -0,0 +1,185 @@ +import { zip } from "fflate"; +import * as fs from "node:fs/promises"; +import * as path from "node:path"; +import { promisify } from "node:util"; + +import { toError } from "../../../../error/errorUtils"; +import { writeAtomically } from "../../../../util/fs"; +import { describeMetricEvent } from "../../metrics"; + +import { openEnvelopeFile, type EnvelopeFile } from "./envelope"; +import { + type ExportState, + logRecord, + metricRecords, + newExportState, + otlpResource, + otlpScope, + spanRecord, +} from "./records"; + +import type { TelemetryContext, TelemetryEvent } from "../../../event"; + +export interface OtlpExportCounts { + readonly logs: number; + readonly traces: number; + readonly metrics: number; +} + +interface Envelope { + readonly file: string; + readonly resourceKey: string; + readonly scopeKey: string; + readonly recordsKey: string; +} + +const ENVELOPES = { + logs: { + file: "logs.json", + resourceKey: "resourceLogs", + scopeKey: "scopeLogs", + recordsKey: "logRecords", + }, + traces: { + file: "traces.json", + resourceKey: "resourceSpans", + scopeKey: "scopeSpans", + recordsKey: "spans", + }, + metrics: { + file: "metrics.json", + resourceKey: "resourceMetrics", + scopeKey: "scopeMetrics", + recordsKey: "metrics", + }, +} as const satisfies Record; + +const OTLP_SCHEMA_URL = "https://opentelemetry.io/schemas/1.24.0"; +const ENVELOPE_SUFFIX = "]}]}]}\n"; + +const zipAsync = promisify(zip); + +/** + * Writes `events` as an OTLP/JSON zip (`logs.json`, `traces.json`, + * `metrics.json`) to `outputPath`. Records stream into a staging directory + * then get packed in-memory; the zip is atomically renamed at the end. + */ +export async function writeOtlpZipExport( + outputPath: string, + events: AsyncIterable, + context: TelemetryContext, + onCleanupError: (err: unknown, tempPath: string) => void, +): Promise { + return writeAtomically( + outputPath, + async (zipPath) => { + const stagingDir = await fs.mkdtemp(`${outputPath}.staging-`); + try { + const counts = await writeStagedFiles(stagingDir, events, context); + await packZip(zipPath, stagingDir); + return counts; + } finally { + await fs.rm(stagingDir, { recursive: true, force: true }); + } + }, + onCleanupError, + ); +} + +async function writeStagedFiles( + dir: string, + events: AsyncIterable, + context: TelemetryContext, +): Promise { + const resource = JSON.stringify(otlpResource(context)); + const scope = JSON.stringify(otlpScope(context.extensionVersion)); + const open = (e: Envelope) => + openEnvelopeFile( + path.join(dir, e.file), + envelopePrefix(e, resource, scope), + ENVELOPE_SUFFIX, + ); + const [logs, traces, metrics] = await Promise.all([ + open(ENVELOPES.logs), + open(ENVELOPES.traces), + open(ENVELOPES.metrics), + ]); + const state = newExportState(); + const counts = { logs: 0, traces: 0, metrics: 0 }; + + try { + for await (const event of events) { + await routeEvent(event, { logs, traces, metrics }, counts, state); + } + // Success path: surface close failures. + await Promise.all([logs.close(), traces.close(), metrics.close()]); + } catch (loopError) { + // Failure path: close quietly so the original error isn't masked. + await Promise.allSettled([logs.close(), traces.close(), metrics.close()]); + throw loopError; + } + + return counts; +} + +async function routeEvent( + event: TelemetryEvent, + files: { logs: EnvelopeFile; traces: EnvelopeFile; metrics: EnvelopeFile }, + counts: { logs: number; traces: number; metrics: number }, + state: ExportState, +): Promise { + try { + const metric = describeMetricEvent(event); + if (metric) { + counts.metrics += 1; + for (const record of metricRecords(event, metric, state)) { + await files.metrics.append(record); + } + } else if (event.traceId !== undefined) { + counts.traces += 1; + await files.traces.append(spanRecord(event)); + } else { + counts.logs += 1; + await files.logs.append(logRecord(event)); + } + } catch (err) { + throw new Error( + `Failed to export event ${event.eventId} (${event.eventName}): ${toError(err).message}`, + { cause: err }, + ); + } +} + +async function packZip(outputPath: string, sourceDir: string): Promise { + const files = await Promise.all( + Object.values(ENVELOPES).map(async (e) => { + try { + return [ + e.file, + await fs.readFile(path.join(sourceDir, e.file)), + ] as const; + } catch (err) { + throw new Error( + `Failed to read staged ${e.file}: ${toError(err).message}`, + { cause: err }, + ); + } + }), + ); + try { + await fs.writeFile(outputPath, await zipAsync(Object.fromEntries(files))); + } catch (err) { + throw new Error( + `Failed to pack OTLP zip ${path.basename(outputPath)}: ${toError(err).message}`, + { cause: err }, + ); + } +} + +function envelopePrefix( + envelope: Envelope, + resource: string, + scope: string, +): string { + return `{"${envelope.resourceKey}":[{"resource":${resource},"schemaUrl":"${OTLP_SCHEMA_URL}","${envelope.scopeKey}":[{"scope":${scope},"schemaUrl":"${OTLP_SCHEMA_URL}","${envelope.recordsKey}":[`; +} diff --git a/test/unit/telemetry/export/metrics.test.ts b/test/unit/telemetry/export/metrics.test.ts new file mode 100644 index 000000000..f6a3f33cf --- /dev/null +++ b/test/unit/telemetry/export/metrics.test.ts @@ -0,0 +1,95 @@ +import { describe, expect, it } from "vitest"; + +import { describeMetricEvent, isMetricEvent } from "@/telemetry/export/metrics"; + +import { createTelemetryEventFactory } from "../../../mocks/telemetry"; + +const makeEvent = createTelemetryEventFactory(); + +describe("isMetricEvent", () => { + it.each([ + ["http.requests", true], + ["ssh.network.info", true], + ["ssh.network.sampled", true], + ["log.something", false], + ["remote.setup.workspace_ready", false], + ])("returns %p for %s", (name, expected) => { + expect(isMetricEvent(makeEvent({ eventName: name }))).toBe(expected); + }); +}); + +describe("describeMetricEvent", () => { + it("returns undefined for non-metric events", () => { + expect( + describeMetricEvent(makeEvent({ eventName: "log.info" })), + ).toBeUndefined(); + }); + + it("classifies all measurements as gauges for non-http metric events", () => { + const descriptor = describeMetricEvent( + makeEvent({ + eventName: "ssh.network.sampled", + measurements: { latencyMs: 35, downloadMbits: 10, custom: 1 }, + }), + ); + expect(descriptor).toEqual({ + measurements: [ + { name: "latencyMs", value: 35, kind: "gauge", unit: "ms" }, + { name: "downloadMbits", value: 10, kind: "gauge", unit: "Mbit/s" }, + { name: "custom", value: 1, kind: "gauge", unit: "1" }, + ], + }); + }); + + it("partitions http.requests into counters and gauges and extracts the window", () => { + const descriptor = describeMetricEvent( + makeEvent({ + eventName: "http.requests", + measurements: { + window_seconds: 60, + count_2xx: 5, + count_5xx: 1, + p95_duration_ms: 42, + }, + }), + ); + expect(descriptor).toEqual({ + windowSeconds: 60, + measurements: [ + { name: "count_2xx", value: 5, kind: "counter", unit: "{request}" }, + { name: "count_5xx", value: 1, kind: "counter", unit: "{request}" }, + { + name: "p95_duration_ms", + value: 42, + kind: "gauge", + unit: "ms", + }, + ], + }); + }); + + it("defaults http.requests windowSeconds to 0 when absent", () => { + const descriptor = describeMetricEvent( + makeEvent({ + eventName: "http.requests", + measurements: { count_2xx: 1 }, + }), + ); + expect(descriptor?.windowSeconds).toBe(0); + }); + + it.each([ + ["latency_ms", "ms"], + ["durationMs", "ms"], + ["downloadMbits", "Mbit/s"], + ["something_else", "1"], + ])("derives unit for measurement '%s' -> '%s'", (name, unit) => { + const descriptor = describeMetricEvent( + makeEvent({ + eventName: "ssh.network.sampled", + measurements: { [name]: 1 }, + }), + ); + expect(descriptor?.measurements[0].unit).toBe(unit); + }); +}); diff --git a/test/unit/telemetry/export/writers/otlp.test.ts b/test/unit/telemetry/export/writers/otlp.test.ts deleted file mode 100644 index f4fff9585..000000000 --- a/test/unit/telemetry/export/writers/otlp.test.ts +++ /dev/null @@ -1,424 +0,0 @@ -import { unzipSync } from "fflate"; -import { vol } from "memfs"; -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; - -import { writeOtlpZipExport } from "@/telemetry/export/writers/otlp"; - -import { asyncIterable } from "../../../../mocks/asyncIterable"; -import { createTelemetryEventFactory } from "../../../../mocks/telemetry"; - -import type { TelemetryContext, TelemetryEvent } from "@/telemetry/event"; - -vi.mock("node:fs", async () => (await import("memfs")).fs); -vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); - -const OUT = "/exports/telemetry.otlp.zip"; -const TRACE_ID = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - -let makeEvent: ReturnType; -let context: TelemetryContext; - -beforeEach(() => { - vol.reset(); - vol.mkdirSync("/exports", { recursive: true }); - makeEvent = createTelemetryEventFactory(); - context = makeEvent().context; -}); - -afterEach(() => vol.reset()); - -/** Flatten OTLP `[{key, value: {stringValue|doubleValue}}]` to `{key: value}`. */ -function attrs(raw: unknown): Record { - const list = raw as Array<{ - key: string; - value: { stringValue?: string; doubleValue?: number }; - }>; - return Object.fromEntries( - list.map((a) => [a.key, a.value.doubleValue ?? a.value.stringValue!]), - ); -} - -type RawRecord = Record; - -interface LogsFile { - resourceLogs: [ - { - resource: { attributes: unknown }; - scopeLogs: [{ logRecords: RawRecord[] }]; - }, - ]; -} -interface TracesFile { - resourceSpans: [{ scopeSpans: [{ spans: RawRecord[] }] }]; -} -interface MetricsFile { - resourceMetrics: [ - { - resource: { attributes: unknown }; - scopeMetrics: [{ scope: RawRecord; metrics: RawRecord[] }]; - }, - ]; -} - -interface Captured { - counts: Awaited>; - logResource: { attributes: unknown }; - metricsResource: { attributes: unknown }; - logsScope: RawRecord; - logs: RawRecord[]; - spans: RawRecord[]; - metrics: RawRecord[]; -} - -const noopCleanup = () => {}; - -/** Writes `events` through the public API and reads the resulting envelopes. */ -async function capture(events: readonly TelemetryEvent[]): Promise { - const counts = await writeOtlpZipExport( - OUT, - asyncIterable(events), - context, - noopCleanup, - ); - const zip = unzipSync(vol.readFileSync(OUT) as Uint8Array); - const parse = (file: string): T => - JSON.parse(new TextDecoder().decode(zip[file])) as T; - - const logs = parse("logs.json").resourceLogs[0]; - const traces = parse("traces.json").resourceSpans[0]; - const metrics = parse("metrics.json").resourceMetrics[0]; - - return { - counts, - logResource: logs.resource, - metricsResource: metrics.resource, - logsScope: (logs.scopeLogs[0] as unknown as { scope: RawRecord }).scope, - logs: logs.scopeLogs[0].logRecords, - spans: traces.scopeSpans[0].spans, - metrics: metrics.scopeMetrics[0].metrics, - }; -} - -describe("writeOtlpZipExport: resource", () => { - const EXPECTED_RESOURCE = { - "service.name": "coder-vscode-extension", - "service.version": "1.14.5", - "service.instance.id": "session-id", - "host.id": "machine-id", - "host.arch": "x64", - "os.type": "linux", - "os.version": "6.0.0", - "vscode.platform.name": "Visual Studio Code", - "vscode.platform.version": "1.106.0", - "coder.deployment.url": "https://coder.example.com", - }; - - it("uses OTel-standard semconv keys on the resource", async () => { - const { logResource, metricsResource } = await capture([ - makeEvent(), - makeEvent({ - eventName: "ssh.network.sampled", - measurements: { latencyMs: 1 }, - }), - ]); - - expect(attrs(logResource.attributes)).toEqual(EXPECTED_RESOURCE); - expect(attrs(metricsResource.attributes)).toEqual(EXPECTED_RESOURCE); - }); - - it("stamps each scope with the extension version", async () => { - const { logsScope } = await capture([makeEvent()]); - expect(logsScope).toMatchObject({ - name: "coder.vscode-coder.telemetry.export", - version: "1.14.5", - }); - }); -}); - -describe("writeOtlpZipExport: logs", () => { - it("emits INFO records with merged properties and measurements", async () => { - const { logs } = await capture([ - makeEvent({ - eventName: "log.info", - properties: { source: "unit" }, - measurements: { count: 3 }, - }), - ]); - - expect(logs[0]).toMatchObject({ - severityNumber: 9, - severityText: "INFO", - body: { stringValue: "log.info" }, - }); - expect(logs[0].timeUnixNano).toBe(logs[0].observedTimeUnixNano); - expect(attrs(logs[0].attributes)).toEqual({ source: "unit", count: 3 }); - }); - - it("emits ERROR records with optional exception fields skipped when unset", async () => { - const { logs } = await capture([ - makeEvent({ - error: { message: "boom", type: "RangeError", code: "E_RANGE" }, - }), - makeEvent({ error: { message: "boom" } }), - ]); - - expect(logs[0]).toMatchObject({ - severityNumber: 17, - severityText: "ERROR", - }); - expect(attrs(logs[0].attributes)).toMatchObject({ - "exception.message": "boom", - "exception.type": "RangeError", - "exception.code": "E_RANGE", - }); - expect(attrs(logs[1].attributes)).toEqual({ "exception.message": "boom" }); - }); -}); - -describe("writeOtlpZipExport: spans", () => { - it("emits an INTERNAL span with derived start time and parent linkage", async () => { - const { spans } = await capture([ - makeEvent({ - eventName: "remote.setup.workspace_ready", - traceId: TRACE_ID, - parentEventId: "parent-span-id", - properties: { result: "success", route: "/api" }, - measurements: { durationMs: 250, retries: 2 }, - }), - ]); - - expect(spans[0]).toMatchObject({ - traceId: TRACE_ID, - parentSpanId: "parent-span-id", - name: "remote.setup.workspace_ready", - kind: 1, // OTel api SpanKind.INTERNAL (0) + 1 for the OTLP proto offset - status: { code: 1 }, - }); - expect( - BigInt(spans[0].endTimeUnixNano as string) - - BigInt(spans[0].startTimeUnixNano as string), - ).toBe(250_000_000n); - expect(attrs(spans[0].attributes)).toEqual({ - "coder.event_name": "remote.setup.workspace_ready", - result: "success", - route: "/api", - retries: 2, - }); - }); - - it("collapses start to end and omits parentSpanId on a minimal span", async () => { - const { spans } = await capture([makeEvent({ traceId: TRACE_ID })]); - - expect(spans[0]).not.toHaveProperty("parentSpanId"); - expect(spans[0].startTimeUnixNano).toBe(spans[0].endTimeUnixNano); - }); - - it.each([ - [{ properties: { result: "success" } }, { code: 1 }], - [{ properties: { result: "error" } }, { code: 2 }], - [{ error: { message: "boom" } }, { code: 2, message: "boom" }], - [{}, { code: 0 }], - ])("maps span status: %j -> %j", async (overrides, expected) => { - const { spans } = await capture([ - makeEvent({ traceId: TRACE_ID, ...overrides }), - ]); - expect(spans[0].status).toEqual(expected); - }); - - it("attaches an `exception` event when the event has an error", async () => { - const { spans } = await capture([ - makeEvent({ - traceId: TRACE_ID, - error: { message: "boom", type: "Error" }, - }), - ]); - const [exceptionEvent] = spans[0].events as Array<{ - name: string; - timeUnixNano: string; - attributes: unknown; - }>; - - expect(exceptionEvent.name).toBe("exception"); - expect(exceptionEvent.timeUnixNano).toBe(spans[0].endTimeUnixNano); - expect(attrs(exceptionEvent.attributes)).toEqual({ - "exception.message": "boom", - "exception.type": "Error", - }); - }); -}); - -describe("writeOtlpZipExport: metrics", () => { - it("emits one gauge per measurement for non-http events, with no startTimeUnixNano", async () => { - const { metrics } = await capture([ - makeEvent({ - eventName: "ssh.network.sampled", - properties: { p2p: "true" }, - measurements: { latencyMs: 35, downloadMbits: 10 }, - }), - ]); - - expect(metrics.map((m) => [m.name, m.unit])).toEqual([ - ["ssh.network.sampled.latencyMs", "ms"], - ["ssh.network.sampled.downloadMbits", "Mbit/s"], - ]); - const [point] = (metrics[0].gauge as { dataPoints: [unknown] }).dataPoints; - expect(point).not.toHaveProperty("startTimeUnixNano"); - expect(point).toMatchObject({ asDouble: 35 }); - expect(attrs((point as { attributes: unknown }).attributes)).toMatchObject({ - "coder.event_name": "ssh.network.sampled", - p2p: "true", - }); - }); - - it("emits http.requests counts as cumulative monotonic sums with a stable start time", async () => { - const { metrics } = await capture([ - makeEvent({ - eventName: "http.requests", - properties: { method: "GET", route: "/a" }, - timestamp: "2026-05-04T12:01:00.000Z", - measurements: { window_seconds: 60, count_2xx: 2 }, - }), - makeEvent({ - eventName: "http.requests", - properties: { method: "GET", route: "/a" }, - timestamp: "2026-05-04T12:02:00.000Z", - measurements: { window_seconds: 60, count_2xx: 3 }, - }), - ]); - const counts = metrics.map( - (m) => - m.sum as { - aggregationTemporality: number; - isMonotonic: boolean; - dataPoints: [ - { - asInt: string; - startTimeUnixNano: string; - timeUnixNano: string; - }, - ]; - }, - ); - - expect(counts.map((c) => c.dataPoints[0].asInt)).toEqual(["2", "5"]); - expect(counts.every((c) => c.aggregationTemporality === 2)).toBe(true); - expect(counts.every((c) => c.isMonotonic)).toBe(true); - // startTimeUnixNano is set once on the first event and stays fixed. - expect(counts[1].dataPoints[0].startTimeUnixNano).toBe( - counts[0].dataPoints[0].startTimeUnixNano, - ); - }); - - it("keeps gauges windowed alongside the cumulative count sums", async () => { - const { metrics } = await capture([ - makeEvent({ - eventName: "http.requests", - measurements: { window_seconds: 60, count_2xx: 2, p95_duration_ms: 42 }, - }), - ]); - - expect(metrics.map((m) => [m.name, m.unit])).toEqual([ - ["http.requests.count_2xx", "{request}"], - ["http.requests.p95_duration_ms", "ms"], - ]); - const p95Point = ( - metrics[1].gauge as { - dataPoints: [{ startTimeUnixNano: string; timeUnixNano: string }]; - } - ).dataPoints[0]; - expect( - BigInt(p95Point.timeUnixNano) - BigInt(p95Point.startTimeUnixNano), - ).toBe(60_000_000_000n); - }); - - it("suppresses zero-valued cumulative counters", async () => { - const { metrics } = await capture([ - makeEvent({ - eventName: "http.requests", - measurements: { - window_seconds: 60, - count_2xx: 0, - count_5xx: 0, - p95_duration_ms: 10, - }, - }), - ]); - - expect(metrics.map((m) => m.name)).toEqual([ - "http.requests.p95_duration_ms", - ]); - }); - - it("treats http.requests without window_seconds as a zero-width window", async () => { - const { metrics } = await capture([ - makeEvent({ - eventName: "http.requests", - measurements: { count_2xx: 1 }, - }), - ]); - const point = ( - metrics[0].sum as { - dataPoints: [{ startTimeUnixNano: string; timeUnixNano: string }]; - } - ).dataPoints[0]; - - expect(point.startTimeUnixNano).toBe(point.timeUnixNano); - }); - - it.each([ - ["latency_ms", "ms"], - ["durationMs", "ms"], - ["downloadMbits", "Mbit/s"], - ["something", "1"], - ])("derives unit for measurement '%s' -> '%s'", async (measurement, unit) => { - const { metrics } = await capture([ - makeEvent({ - eventName: "ssh.network.sampled", - measurements: { [measurement]: 1 }, - }), - ]); - expect(metrics[0].unit).toBe(unit); - }); -}); - -describe("writeOtlpZipExport: routing & counts", () => { - it("counts events routed to each signal, even when one event fans out into multiple records", async () => { - const { counts, logs, spans, metrics } = await capture([ - makeEvent({ eventName: "log.info" }), - makeEvent({ eventName: "log.warn" }), - makeEvent({ eventName: "trace.x", traceId: TRACE_ID }), - makeEvent({ - eventName: "http.requests", - measurements: { window_seconds: 60, count_2xx: 1, p95_duration_ms: 5 }, - }), - ]); - - expect(counts).toEqual({ logs: 2, traces: 1, metrics: 1 }); - expect(logs).toHaveLength(2); - expect(spans).toHaveLength(1); - expect(metrics).toHaveLength(2); - }); - - it("propagates midstream errors", async () => { - const failing = (async function* () { - yield makeEvent(); - await Promise.resolve(); - throw new Error("boom"); - })(); - - await expect( - writeOtlpZipExport(OUT, failing, context, noopCleanup), - ).rejects.toThrow(/boom/); - }); - - it("rejects on an unparseable event timestamp", async () => { - await expect( - writeOtlpZipExport( - OUT, - asyncIterable([makeEvent({ timestamp: "not-a-date" })]), - context, - noopCleanup, - ), - ).rejects.toThrow(/Invalid telemetry timestamp/); - }); -}); diff --git a/test/unit/telemetry/export/writers/otlp/envelope.test.ts b/test/unit/telemetry/export/writers/otlp/envelope.test.ts new file mode 100644 index 000000000..f45a25cc9 --- /dev/null +++ b/test/unit/telemetry/export/writers/otlp/envelope.test.ts @@ -0,0 +1,120 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +const { stream } = vi.hoisted(() => ({ + stream: { + chunks: [] as string[], + writeFailMsg: null as string | null, + endFailMsg: null as string | null, + listeners: new Map void>(), + write(chunk: string, _enc: string, cb: (err?: Error | null) => void) { + if (stream.writeFailMsg) { + cb(new Error(stream.writeFailMsg)); + return; + } + stream.chunks.push(chunk); + cb(null); + }, + end(cb: (err?: Error | null) => void) { + if (stream.endFailMsg) { + cb(new Error(stream.endFailMsg)); + return; + } + cb(null); + }, + once(event: string, listener: (err: Error) => void) { + stream.listeners.set(event, listener); + return stream; + }, + emit(event: string, err: Error) { + stream.listeners.get(event)?.(err); + }, + }, +})); + +vi.mock("node:fs", () => ({ + createWriteStream: () => stream, +})); + +const { openEnvelopeFile } = + await import("@/telemetry/export/writers/otlp/envelope"); + +beforeEach(() => { + stream.chunks = []; + stream.writeFailMsg = null; + stream.endFailMsg = null; + stream.listeners = new Map(); +}); + +afterEach(() => vi.clearAllMocks()); + +describe("openEnvelopeFile", () => { + it("writes only the prefix and suffix when no values are appended", async () => { + const env = await openEnvelopeFile("/x.json", "PRE", "SUF"); + await env.close(); + expect(stream.chunks.join("")).toBe("PRESUF"); + }); + + it("serializes appended values as JSON, comma-separated", async () => { + const env = await openEnvelopeFile("/x.json", "[", "]"); + await env.append({ a: 1 }); + await env.append("two"); + await env.append([3, 4]); + await env.close(); + expect(stream.chunks.join("")).toBe('[{"a":1},"two",[3,4]]'); + }); + + it("inserts commas between values but not before the first", async () => { + const env = await openEnvelopeFile("/x.json", "[", "]"); + await env.append(1); + await env.append(2); + await env.close(); + expect(stream.chunks).toEqual(["[", "1", ",2", "]"]); + }); + + it("wraps prefix-write failures with the file path", async () => { + stream.writeFailMsg = "disk full"; + await expect(openEnvelopeFile("/foo.json", "[", "]")).rejects.toThrow( + "Failed to write /foo.json: disk full", + ); + }); + + it("wraps append-time write failures with the file path", async () => { + const env = await openEnvelopeFile("/foo.json", "[", "]"); + stream.writeFailMsg = "disk full"; + await expect(env.append({ a: 1 })).rejects.toThrow( + "Failed to write /foo.json: disk full", + ); + }); + + it("wraps suffix-write failures during close as a close failure", async () => { + const env = await openEnvelopeFile("/foo.json", "[", "]"); + stream.writeFailMsg = "disk full"; + await expect(env.close()).rejects.toThrow( + "Failed to close /foo.json: disk full", + ); + }); + + it("wraps stream.end failures with the file path", async () => { + const env = await openEnvelopeFile("/foo.json", "[", "]"); + stream.endFailMsg = "stream gone"; + await expect(env.close()).rejects.toThrow( + "Failed to close /foo.json: stream gone", + ); + }); + + it("rejects subsequent writes after the stream emits 'error'", async () => { + const env = await openEnvelopeFile("/foo.json", "[", "]"); + stream.emit("error", new Error("ENOENT")); + await expect(env.append({ a: 1 })).rejects.toThrow( + "Failed to write /foo.json: ENOENT", + ); + }); + + it("is idempotent: calling close() twice is safe", async () => { + const env = await openEnvelopeFile("/foo.json", "[", "]"); + await env.close(); + // Second close is a no-op and does not double-write the suffix. + await env.close(); + expect(stream.chunks).toEqual(["[", "]"]); + }); +}); diff --git a/test/unit/telemetry/export/writers/otlp/records.test.ts b/test/unit/telemetry/export/writers/otlp/records.test.ts new file mode 100644 index 000000000..414dbf498 --- /dev/null +++ b/test/unit/telemetry/export/writers/otlp/records.test.ts @@ -0,0 +1,347 @@ +import { describe, expect, it } from "vitest"; + +import { type MetricDescriptor } from "@/telemetry/export/metrics"; +import { + type ExportState, + logRecord, + metricRecords, + newExportState, + otlpResource, + otlpScope, + spanRecord, +} from "@/telemetry/export/writers/otlp/records"; + +import { createTelemetryEventFactory } from "../../../../../mocks/telemetry"; + +const TRACE_ID = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + +const makeEvent = createTelemetryEventFactory(); + +/** Flatten OTLP `[{key, value: {stringValue|doubleValue}}]` to `{key: value}`. */ +function attrs(raw: unknown): Record { + const list = raw as Array<{ + key: string; + value: { stringValue?: string; doubleValue?: number }; + }>; + return Object.fromEntries( + list.map((a) => [a.key, a.value.doubleValue ?? a.value.stringValue!]), + ); +} + +describe("otlpResource", () => { + it("maps the session context onto OTel-standard semconv keys", () => { + const { context } = makeEvent(); + expect(attrs(otlpResource(context).attributes)).toEqual({ + "service.name": "coder-vscode-extension", + "service.version": "1.14.5", + "service.instance.id": "session-id", + "host.id": "machine-id", + "host.arch": "x64", + "os.type": "linux", + "os.version": "6.0.0", + "vscode.platform.name": "Visual Studio Code", + "vscode.platform.version": "1.106.0", + "coder.deployment.url": "https://coder.example.com", + }); + }); +}); + +describe("otlpScope", () => { + it("names the scope and stamps the extension version", () => { + expect(otlpScope("9.9.9")).toEqual({ + name: "coder.vscode-coder.telemetry.export", + version: "9.9.9", + }); + }); +}); + +describe("logRecord", () => { + it("emits INFO records merging properties and measurements", () => { + const record = logRecord( + makeEvent({ + eventName: "log.info", + properties: { source: "unit" }, + measurements: { count: 3 }, + }), + ); + + expect(record).toMatchObject({ + severityNumber: 9, + severityText: "INFO", + body: { stringValue: "log.info" }, + }); + expect(record.timeUnixNano).toBe(record.observedTimeUnixNano); + expect(attrs(record.attributes)).toEqual({ source: "unit", count: 3 }); + }); + + it("emits ERROR records and omits optional exception fields when unset", () => { + const full = logRecord( + makeEvent({ + error: { message: "boom", type: "RangeError", code: "E_RANGE" }, + }), + ); + const minimal = logRecord(makeEvent({ error: { message: "boom" } })); + + expect(full).toMatchObject({ severityNumber: 17, severityText: "ERROR" }); + expect(attrs(full.attributes)).toMatchObject({ + "exception.message": "boom", + "exception.type": "RangeError", + "exception.code": "E_RANGE", + }); + expect(attrs(minimal.attributes)).toEqual({ "exception.message": "boom" }); + }); +}); + +describe("spanRecord", () => { + it("encodes an INTERNAL span with derived start time and parent linkage", () => { + const span = spanRecord( + makeEvent({ + eventName: "remote.setup.workspace_ready", + traceId: TRACE_ID, + parentEventId: "parent-span-id", + properties: { result: "success", route: "/api" }, + measurements: { durationMs: 250, retries: 2 }, + }), + ); + + expect(span).toMatchObject({ + traceId: TRACE_ID, + parentSpanId: "parent-span-id", + name: "remote.setup.workspace_ready", + kind: 1, // OTel SpanKind.INTERNAL (0) + 1 OTLP proto offset. + status: { code: 1 }, + }); + expect(BigInt(span.endTimeUnixNano) - BigInt(span.startTimeUnixNano)).toBe( + 250_000_000n, + ); + expect(attrs(span.attributes)).toEqual({ + "coder.event_name": "remote.setup.workspace_ready", + result: "success", + route: "/api", + retries: 2, + }); + }); + + it("collapses start to end and omits parentSpanId on a minimal span", () => { + const span = spanRecord(makeEvent({ traceId: TRACE_ID })); + + expect(span).not.toHaveProperty("parentSpanId"); + expect(span.startTimeUnixNano).toBe(span.endTimeUnixNano); + }); + + it.each([ + [{ properties: { result: "success" } }, { code: 1 }], + [{ properties: { result: "error" } }, { code: 2 }], + [{ error: { message: "boom" } }, { code: 2, message: "boom" }], + [{}, { code: 0 }], + ])("maps span status: %j -> %j", (overrides, expected) => { + const span = spanRecord(makeEvent({ traceId: TRACE_ID, ...overrides })); + expect(span.status).toEqual(expected); + }); + + it("attaches an `exception` event to errored spans", () => { + const span = spanRecord( + makeEvent({ + traceId: TRACE_ID, + error: { message: "boom", type: "Error" }, + }), + ); + const [exception] = span.events ?? []; + + expect(exception.name).toBe("exception"); + expect(exception.timeUnixNano).toBe(span.endTimeUnixNano); + expect(attrs(exception.attributes)).toEqual({ + "exception.message": "boom", + "exception.type": "Error", + }); + }); +}); + +describe("metricRecords", () => { + const gauge = (name: string, value: number, unit = "1") => + ({ name, value, kind: "gauge", unit }) as const; + const counter = (name: string, value: number) => + ({ name, value, kind: "counter", unit: "{request}" }) as const; + + it("emits one gauge per measurement when the descriptor has no window", () => { + const event = makeEvent({ + eventName: "ssh.network.sampled", + properties: { p2p: "true" }, + }); + const descriptor: MetricDescriptor = { + measurements: [ + gauge("latencyMs", 35, "ms"), + gauge("downloadMbits", 10, "Mbit/s"), + ], + }; + + const records = metricRecords(event, descriptor, newExportState()); + + expect(records.map((r) => [r.name, r.unit])).toEqual([ + ["ssh.network.sampled.latencyMs", "ms"], + ["ssh.network.sampled.downloadMbits", "Mbit/s"], + ]); + const point = records[0].gauge!.dataPoints[0]; + expect(point).not.toHaveProperty("startTimeUnixNano"); + expect(point).toMatchObject({ asDouble: 35 }); + expect(attrs(point.attributes)).toMatchObject({ + "coder.event_name": "ssh.network.sampled", + p2p: "true", + }); + }); + + it("accumulates counter values into cumulative monotonic sums anchored at the first window", () => { + const state = newExportState(); + const first = metricRecords( + makeEvent({ + eventName: "http.requests", + properties: { method: "GET", route: "/a" }, + timestamp: "2026-05-04T12:01:00.000Z", + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 2)] }, + state, + ); + const second = metricRecords( + makeEvent({ + eventName: "http.requests", + properties: { method: "GET", route: "/a" }, + timestamp: "2026-05-04T12:02:00.000Z", + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 3)] }, + state, + ); + // 2026-05-04T12:00:00.000Z (window start = first event time − 60s) in ns: + const expectedStart = String( + BigInt(Date.parse("2026-05-04T12:00:00.000Z")) * 1_000_000n, + ); + + expect([ + first[0].sum!.dataPoints[0].asInt, + second[0].sum!.dataPoints[0].asInt, + ]).toEqual(["2", "5"]); + expect(first[0].sum!.aggregationTemporality).toBe(2); + expect(first[0].sum!.isMonotonic).toBe(true); + expect(first[0].sum!.dataPoints[0].startTimeUnixNano).toBe(expectedStart); + expect(second[0].sum!.dataPoints[0].startTimeUnixNano).toBe(expectedStart); + }); + + it("clamps startTimeUnixNano <= timeUnixNano for events that arrive before the anchor", () => { + const state = newExportState(); + // First event lands at T=12:03 with a 60s window → anchor = 12:02. + metricRecords( + makeEvent({ + eventName: "http.requests", + timestamp: "2026-05-04T12:03:00.000Z", + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 1)] }, + state, + ); + // Out-of-order event at T=12:01:30 (earlier than the anchor). + const records = metricRecords( + makeEvent({ + eventName: "http.requests", + timestamp: "2026-05-04T12:01:30.000Z", + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 1)] }, + state, + ); + const point = records[0].sum!.dataPoints[0]; + + expect(BigInt(point.startTimeUnixNano)).toBeLessThanOrEqual( + BigInt(point.timeUnixNano), + ); + }); + + it("keeps cumulative totals separate by eventName when measurement names collide", () => { + const state = newExportState(); + const first = metricRecords( + makeEvent({ + eventName: "http.requests", + properties: { route: "/a" }, + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 3)] }, + state, + ); + // A second metric event type that happens to share a counter name and + // matching properties must not accumulate into the first series. + const second = metricRecords( + makeEvent({ + eventName: "ssh.network.info", + properties: { route: "/a" }, + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 5)] }, + state, + ); + + expect(first[0].sum!.dataPoints[0].asInt).toBe("3"); + expect(second[0].sum!.dataPoints[0].asInt).toBe("5"); + }); + + it("coerces NaN/Infinity inputs to safe zeros instead of throwing", () => { + // durationMs=NaN in a span; counter=Infinity in a metric; windowSeconds=NaN. + expect(() => + spanRecord( + makeEvent({ + traceId: TRACE_ID, + measurements: { durationMs: NaN, retries: 1 }, + }), + ), + ).not.toThrow(); + + const state = newExportState(); + const records = metricRecords( + makeEvent({ eventName: "http.requests" }), + { windowSeconds: NaN, measurements: [counter("count_2xx", Infinity)] }, + state, + ); + + // Infinity counter coerces to 0 → suppressed. + expect(records).toEqual([]); + }); + + it("stamps gauges with a window start when the descriptor declares one", () => { + const event = makeEvent({ eventName: "http.requests" }); + const descriptor: MetricDescriptor = { + windowSeconds: 60, + measurements: [gauge("p95_duration_ms", 42, "ms")], + }; + + const [record] = metricRecords(event, descriptor, newExportState()); + const point = record.gauge!.dataPoints[0]; + + expect(BigInt(point.timeUnixNano) - BigInt(point.startTimeUnixNano!)).toBe( + 60_000_000_000n, + ); + }); + + it("suppresses zero-valued cumulative counters", () => { + const state: ExportState = newExportState(); + const records = metricRecords( + makeEvent({ eventName: "http.requests" }), + { + windowSeconds: 60, + measurements: [ + counter("count_2xx", 0), + counter("count_5xx", 0), + gauge("p95_duration_ms", 10, "ms"), + ], + }, + state, + ); + + expect(records.map((r) => r.name)).toEqual([ + "http.requests.p95_duration_ms", + ]); + }); + + it("treats windowSeconds=0 as a zero-width window", () => { + const [record] = metricRecords( + makeEvent({ eventName: "http.requests" }), + { windowSeconds: 0, measurements: [counter("count_2xx", 1)] }, + newExportState(), + ); + const point = record.sum!.dataPoints[0]; + + expect(point.startTimeUnixNano).toBe(point.timeUnixNano); + }); +}); diff --git a/test/unit/telemetry/export/writers/otlp/writer.test.ts b/test/unit/telemetry/export/writers/otlp/writer.test.ts new file mode 100644 index 000000000..fcc6d24b1 --- /dev/null +++ b/test/unit/telemetry/export/writers/otlp/writer.test.ts @@ -0,0 +1,198 @@ +import { unzipSync } from "fflate"; +import { vol } from "memfs"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { writeOtlpZipExport } from "@/telemetry/export/writers/otlp/writer"; + +import { asyncIterable } from "../../../../../mocks/asyncIterable"; +import { createTelemetryEventFactory } from "../../../../../mocks/telemetry"; + +import type { TelemetryContext, TelemetryEvent } from "@/telemetry/event"; + +vi.mock("node:fs", async () => (await import("memfs")).fs); +vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); + +const OUT = "/exports/telemetry.otlp.zip"; +const TRACE_ID = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + +let makeEvent: ReturnType; +let context: TelemetryContext; + +beforeEach(() => { + vol.reset(); + vol.mkdirSync("/exports", { recursive: true }); + makeEvent = createTelemetryEventFactory(); + context = makeEvent().context; +}); + +afterEach(() => vol.reset()); + +type Rec = Record; + +interface ParsedEnvelope { + resource: { attributes: unknown }; + schemaUrl: unknown; + scope: { name: string; version: string }; + records: unknown[]; +} + +function parseEnvelope( + files: Record, + name: string, + resourceKey: string, + scopeKey: string, + recordsKey: string, +): ParsedEnvelope { + const env = JSON.parse(new TextDecoder().decode(files[name])) as Rec; + const wrapper = (env[resourceKey] as Rec[])[0]; + const scopeWrapper = (wrapper[scopeKey] as Rec[])[0]; + return { + resource: wrapper.resource as { attributes: unknown }, + schemaUrl: wrapper.schemaUrl, + scope: scopeWrapper.scope as { name: string; version: string }, + records: scopeWrapper[recordsKey] as unknown[], + }; +} + +/** Reads the zip and returns the parsed envelope for each signal. */ +async function exportAndRead(events: readonly TelemetryEvent[]) { + const counts = await writeOtlpZipExport( + OUT, + asyncIterable(events), + context, + () => {}, + ); + const files = unzipSync(vol.readFileSync(OUT) as Uint8Array); + return { + counts, + logs: parseEnvelope( + files, + "logs.json", + "resourceLogs", + "scopeLogs", + "logRecords", + ), + traces: parseEnvelope( + files, + "traces.json", + "resourceSpans", + "scopeSpans", + "spans", + ), + metrics: parseEnvelope( + files, + "metrics.json", + "resourceMetrics", + "scopeMetrics", + "metrics", + ), + }; +} + +/** Flatten OTLP `[{key, value: {stringValue|doubleValue}}]` to `{key: value}`. */ +function attrs(raw: unknown): Record { + const list = raw as Array<{ + key: string; + value: { stringValue?: string; doubleValue?: number }; + }>; + return Object.fromEntries( + list.map((a) => [a.key, a.value.doubleValue ?? a.value.stringValue!]), + ); +} + +describe("writeOtlpZipExport", () => { + it("packs logs.json, traces.json, and metrics.json into the zip", async () => { + await writeOtlpZipExport( + OUT, + asyncIterable([makeEvent()]), + context, + () => {}, + ); + const files = unzipSync(vol.readFileSync(OUT) as Uint8Array); + expect(Object.keys(files).sort()).toEqual([ + "logs.json", + "metrics.json", + "traces.json", + ]); + }); + + it("counts events by signal even when a metric event fans out into multiple records", async () => { + const { counts, logs, traces, metrics } = await exportAndRead([ + makeEvent({ eventName: "log.info" }), + makeEvent({ eventName: "log.warn" }), + makeEvent({ eventName: "trace.x", traceId: TRACE_ID }), + makeEvent({ + eventName: "http.requests", + measurements: { window_seconds: 60, count_2xx: 1, p95_duration_ms: 5 }, + }), + ]); + + expect(counts).toEqual({ logs: 2, traces: 1, metrics: 1 }); + expect([ + logs.records.length, + traces.records.length, + metrics.records.length, + ]).toEqual([2, 1, 2]); + }); + + it("writes the same resource and scope into every envelope file", async () => { + const { logs, traces, metrics } = await exportAndRead([ + makeEvent({ eventName: "log.info" }), + makeEvent({ eventName: "trace.x", traceId: TRACE_ID }), + makeEvent({ + eventName: "http.requests", + measurements: { window_seconds: 60, count_2xx: 1 }, + }), + ]); + + const expectedResource = { + "service.name": "coder-vscode-extension", + "service.version": "1.14.5", + "service.instance.id": "session-id", + "host.id": "machine-id", + "host.arch": "x64", + "os.type": "linux", + "os.version": "6.0.0", + "vscode.platform.name": "Visual Studio Code", + "vscode.platform.version": "1.106.0", + "coder.deployment.url": "https://coder.example.com", + }; + const expectedScope = { + name: "coder.vscode-coder.telemetry.export", + version: "1.14.5", + }; + + for (const env of [logs, traces, metrics]) { + expect(attrs(env.resource.attributes)).toEqual(expectedResource); + expect(env.scope).toEqual(expectedScope); + expect(env.schemaUrl).toBe("https://opentelemetry.io/schemas/1.24.0"); + } + }); + + it("propagates midstream iterator errors", async () => { + const failing = (async function* () { + yield makeEvent(); + await Promise.resolve(); + throw new Error("boom"); + })(); + + await expect( + writeOtlpZipExport(OUT, failing, context, () => {}), + ).rejects.toThrow(/boom/); + }); + + it("wraps per-event mapping failures with the event identity", async () => { + await expect( + writeOtlpZipExport( + OUT, + asyncIterable([ + makeEvent({ eventId: "id-bad", timestamp: "not-a-date" }), + ]), + context, + () => {}, + ), + ).rejects.toThrow( + /Failed to export event id-bad .*Invalid telemetry timestamp/, + ); + }); +});