diff --git a/examples/verify-audit-fixes.ts b/examples/verify-audit-fixes.ts new file mode 100644 index 0000000..659b851 --- /dev/null +++ b/examples/verify-audit-fixes.ts @@ -0,0 +1,166 @@ +/** + * Verification example for the audit fixes (flush duplication, log loss, redaction). + * + * Reproduces the production incident where a synchronous burst of logs (e.g. a + * NestJS app replaying buffered bootstrap logs) caused a batching transport to + * write the SAME entries many times — ~120 logs became thousands of lines. + * + * This drives the AnalyticsTransport batching path (shared by Mixpanel/DataDog/ + * Segment/Google Analytics) with a fake provider that just counts what it + * receives, then asserts: + * - every entry is delivered EXACTLY once (no N² duplication) + * - a failed batch is re-buffered and re-sent (no log loss) + * - close() drains everything on shutdown (no loss on deploy) + * + * Run: npx ts-node examples/verify-audit-fixes.ts + */ + +import { AnalyticsTransport } from '../src/transports/analytics.transport'; +import { CloudWatchTransport } from '../src/transports/cloudwatch.transport'; +import type { AnalyticsTransportConfig, TransportLogEntry } from '../src/types/transport.types'; + +class FakeProviderTransport extends AnalyticsTransport { + public readonly received: string[] = []; + public sendBatchCalls = 0; + private failFirst: boolean; + + constructor(config: AnalyticsTransportConfig, failFirst = false) { + super('fake-provider', config); + this.failFirst = failFirst; + this.isReady = true; + } + + protected initialize(): void { + this.isReady = true; + } + protected async sendEntry(entry: TransportLogEntry): Promise { + this.received.push(entry.message); + } + protected async sendBatch(entries: TransportLogEntry[]): Promise { + this.sendBatchCalls += 1; + // Simulate network latency so concurrent flushes actually overlap. + await new Promise((r) => setTimeout(r, 5)); + if (this.failFirst && this.sendBatchCalls === 1) { + throw new Error('simulated provider outage'); + } + for (const e of entries) this.received.push(e.message); + } + protected cleanup(): void { + /* no-op */ + } +} + +function entry(i: number): TransportLogEntry { + return { timestamp: new Date(), level: 'info', message: `event-${i}` }; +} + +async function main() { + let allPassed = true; + const check = (name: string, cond: boolean, detail: string) => { + console.log(`${cond ? '✅ PASS' : '❌ FAIL'} ${name} — ${detail}`); + if (!cond) allPassed = false; + }; + + // ── Scenario 1: synchronous burst, un-awaited writes ─────────────────────── + { + const t = new FakeProviderTransport({ apiKey: 'test', batchSize: 50, flushInterval: 0 }); + const TOTAL = 600; + const writes: Array> = []; + for (let i = 0; i < TOTAL; i++) writes.push(t.write(entry(i))); + await Promise.allSettled(writes); + await t.flush(); + check( + 'burst of 600 logs delivered exactly once', + t.received.length === TOTAL && new Set(t.received).size === TOTAL, + `delivered=${t.received.length}, unique=${new Set(t.received).size}, expected=${TOTAL}` + ); + await t.close(); + } + + // ── Scenario 2: concurrent flush() calls collapse to one drain ───────────── + { + const t = new FakeProviderTransport({ apiKey: 'test', batchSize: 1000, flushInterval: 0 }); + for (let i = 0; i < 100; i++) await t.write(entry(i)); + await Promise.all([t.flush(), t.flush(), t.flush(), t.flush()]); + check( + '4 concurrent flushes do not duplicate', + t.received.length === 100 && new Set(t.received).size === 100, + `delivered=${t.received.length}, sendBatchCalls=${t.sendBatchCalls}` + ); + await t.close(); + } + + // ── Scenario 3: failed batch is re-buffered, then close() drains it ──────── + { + const t = new FakeProviderTransport({ apiKey: 'test', batchSize: 1000, flushInterval: 0 }, true); + for (let i = 0; i < 10; i++) await t.write(entry(i)); + await t.flush(); // first attempt fails inside drain → re-buffered + const afterFail = t.received.length; + await t.close(); // retries and drains everything + check( + 'no log loss when provider fails then recovers on close', + afterFail === 0 && t.received.length === 10 && new Set(t.received).size === 10, + `afterFailedFlush=${afterFail}, afterClose=${t.received.length}` + ); + } + + // ── Scenario 4: cloud transport drains its whole batch + close() on shutdown ─ + { + const t = new CloudWatchTransport({ + logGroupName: '/verify', + batchSize: 10, + flushIntervalMs: 999_999, + }); + const recorded: unknown[] = []; + // Stub the private network call so no real AWS request is made. + (t as unknown as { putLogEvents: (e: unknown[]) => Promise }).putLogEvents = async ( + events + ) => { + recorded.push(...events); + }; + for (let i = 0; i < 95; i++) t.write(entry(i)); // 95 > batchSize → tail would be left by old flush() + await t.close(); // must drain ALL 95, not just one chunk, then stop the timer + check( + 'cloud transport close() drains the whole batch on shutdown', + recorded.length === 95, + `delivered=${recorded.length}, expected=95` + ); + } + + // ── Scenario 5: secrets in the MESSAGE string are redacted (security) ─────── + { + const { LogixiaLogger } = await import('../src/core/logitron-logger'); + const captured: string[] = []; + const origWrite = process.stdout.write.bind(process.stdout); + (process.stdout as NodeJS.WriteStream).write = ((chunk: unknown) => { + captured.push(String(chunk ?? '')); + return true; + }) as typeof process.stdout.write; + + const logger = new LogixiaLogger({ + appName: 'verify', + outputs: ['console'], + format: { timestamp: false, colorize: false, json: false }, + traceId: false, + redact: { patterns: [/Bearer\s+\S+/gi], censor: '[REDACTED]' }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- example config + } as any); + await logger.info('Auth header was Bearer abc123secrettoken456'); + (process.stdout as NodeJS.WriteStream).write = origWrite; + + const joined = captured.join(''); + check( + 'secret in the log MESSAGE string is redacted, not leaked', + !joined.includes('abc123secrettoken456') && joined.includes('[REDACTED]'), + joined.trim() + ); + } + + console.log(`\n${allPassed ? '🎉 ALL CHECKS PASSED' : '🔥 SOME CHECKS FAILED'}`); + process.exit(allPassed ? 0 : 1); +} + +main().catch((err) => { + console.error('Example crashed:', err); + process.exit(1); +}); diff --git a/src/__tests__/browser.test.ts b/src/__tests__/browser.test.ts new file mode 100644 index 0000000..0044a04 --- /dev/null +++ b/src/__tests__/browser.test.ts @@ -0,0 +1,86 @@ +/** + * Tests for the browser logger's remote transport. + * + * Regressions: + * - flush() spliced only ONE batchSize chunk, leaving a tail when the batch + * exceeded batchSize. It must drain the whole batch. + * - destroy() cleared the timer but did NOT flush, losing buffered logs on page + * unload / teardown. It must flush remaining entries. + * + * fetch is mocked so no real network call is made. + */ + +import type { BrowserLogEntry } from '../browser'; +import { BrowserRemoteTransport } from '../browser'; + +function makeEntry(i: number): BrowserLogEntry { + return { timestamp: '2026-01-01T00:00:00.000Z', level: 'info', appName: 'a', message: `b-${i}` }; +} + +describe('BrowserRemoteTransport', () => { + let fetchMock: jest.Mock; + let originalFetch: typeof globalThis.fetch | undefined; + + beforeEach(() => { + originalFetch = globalThis.fetch; + fetchMock = jest.fn().mockResolvedValue({ ok: true }); + (globalThis as { fetch: unknown }).fetch = fetchMock; + }); + + afterEach(() => { + (globalThis as { fetch: unknown }).fetch = originalFetch; + }); + + it('drains the whole batch across multiple POSTs when it exceeds batchSize', async () => { + const t = new BrowserRemoteTransport({ url: 'https://logs.example/ingest', batchSize: 10 }); + for (let i = 0; i < 35; i += 1) t.write(makeEntry(i)); + + await t.flush(); + + // 35 entries / batchSize 10 → 4 POSTs (10+10+10+5). + const totalSent = fetchMock.mock.calls.reduce((sum, call) => { + const body = JSON.parse((call[1] as { body: string }).body) as unknown[]; + return sum + body.length; + }, 0); + expect(totalSent).toBe(35); + t.destroy(); + }); + + it('flushes remaining buffered entries on destroy()', async () => { + const t = new BrowserRemoteTransport({ + url: 'https://logs.example/ingest', + batchSize: 1000, // never auto-flushes + flushIntervalMs: 999_999, + }); + for (let i = 0; i < 5; i += 1) t.write(makeEntry(i)); + expect(fetchMock).not.toHaveBeenCalled(); // still buffered + + t.destroy(); + // destroy() kicks off the flush; let the microtask settle. + await Promise.resolve(); + await Promise.resolve(); + + expect(fetchMock).toHaveBeenCalledTimes(1); + const body = JSON.parse(fetchMock.mock.calls[0]![1].body) as unknown[]; + expect(body).toHaveLength(5); + }); + + it('re-buffers entries when the POST fails (no loss)', async () => { + fetchMock.mockRejectedValueOnce(new Error('network down')); + const t = new BrowserRemoteTransport({ url: 'https://logs.example/ingest', batchSize: 1000 }); + for (let i = 0; i < 3; i += 1) t.write(makeEntry(i)); + + await t.flush(); // fails → re-buffered + await t.flush(); // succeeds + + const lastBody = JSON.parse( + fetchMock.mock.calls[fetchMock.mock.calls.length - 1]![1].body + ) as unknown[]; + expect(lastBody).toHaveLength(3); + t.destroy(); + }); + + it('rejects a non-http(s) url scheme', () => { + expect(() => new BrowserRemoteTransport({ url: 'javascript:alert(1)' })).toThrow(); + }); +}); diff --git a/src/__tests__/metrics.test.ts b/src/__tests__/metrics.test.ts new file mode 100644 index 0000000..c1acc4a --- /dev/null +++ b/src/__tests__/metrics.test.ts @@ -0,0 +1,96 @@ +/** + * Tests for the Prometheus metrics plugin. + * + * Focus: the Prometheus exposition format is strict — an invalid metric or label + * NAME makes the scraper reject the ENTIRE endpoint, not just that metric. So + * names must be sanitized to [a-zA-Z_][a-zA-Z0-9_]*. Also covers basic + * counter / histogram / gauge extraction and rendering. + */ + +import { createMetricsPlugin } from '../metrics'; +import type { LogEntry } from '../types/index'; + +function entry(level: string, payload?: Record): LogEntry { + return { + timestamp: '2026-01-01T00:00:00.000Z', + level, + appName: 'TestApp', + message: 'm', + ...(payload ? { payload } : {}), + }; +} + +describe('MetricsPlugin — name sanitization', () => { + it('sanitizes an invalid metric name in the output', () => { + const m = createMetricsPlugin({ 'my-bad.metric': { type: 'counter' } }); + m.onLog(entry('info')); + const out = m.render(); + // The TYPE/metric lines must use a valid sanitized identifier. + expect(out).toContain('logixia_my_bad_metric'); + expect(out).toMatch(/^logixia_my_bad_metric 1$/m); + }); + + it('sanitizes invalid label names', () => { + const m = createMetricsPlugin({ + reqs: { type: 'counter', labels: ['status code'] }, + }); + m.onLog(entry('info', { 'status code': '200' })); + const out = m.render(); + expect(out).toContain('status_code="200"'); + expect(out).not.toContain('status code='); + }); + + it('prefixes a leading-digit name with an underscore', () => { + const m = createMetricsPlugin({ '5xx': { type: 'counter' } }); + m.onLog(entry('error')); + expect(m.render()).toContain('logixia__5xx'); + }); +}); + +describe('MetricsPlugin — extraction', () => { + it('counts entries matching a level filter', () => { + const m = createMetricsPlugin({ + error_count: { type: 'counter', levelFilter: 'error' }, + }); + m.onLog(entry('error')); + m.onLog(entry('info')); + m.onLog(entry('error')); + expect(m.render()).toMatch(/^logixia_error_count 2$/m); + }); + + it('observes a histogram field with cumulative buckets and +Inf', () => { + const m = createMetricsPlugin({ + dur: { type: 'histogram', field: 'duration', buckets: [10, 100] }, + }); + m.onLog(entry('info', { duration: 5 })); + m.onLog(entry('info', { duration: 50 })); + const out = m.render(); + expect(out).toContain('logixia_dur_bucket{le="10"} 1'); // only the 5 + expect(out).toContain('logixia_dur_bucket{le="100"} 2'); // 5 and 50 + expect(out).toContain('logixia_dur_bucket{le="+Inf"} 2'); + expect(out).toContain('logixia_dur_count 2'); + expect(out).toContain('logixia_dur_sum 55'); + }); + + it('tracks the latest value of a gauge', () => { + const m = createMetricsPlugin({ + conns: { type: 'gauge', field: 'connections' }, + }); + m.onLog(entry('info', { connections: 3 })); + m.onLog(entry('info', { connections: 7 })); + expect(m.render()).toMatch(/^logixia_conns 7$/m); + }); + + it('reset() clears accumulated state', () => { + const m = createMetricsPlugin({ c: { type: 'counter' } }); + m.onLog(entry('info')); + m.reset(); + expect(m.render()).toMatch(/^logixia_c 0$/m); + }); + + it('onLog passes the entry through unchanged', () => { + const m = createMetricsPlugin({ c: { type: 'counter' } }); + const e = entry('info', { x: 1 }); + expect(m.onLog(e)).toBe(e); + }); +}); diff --git a/src/__tests__/plugin.comprehensive.test.ts b/src/__tests__/plugin.comprehensive.test.ts index 3ccd584..79a9e6a 100644 --- a/src/__tests__/plugin.comprehensive.test.ts +++ b/src/__tests__/plugin.comprehensive.test.ts @@ -74,8 +74,6 @@ describe('PluginRegistry', () => { }); it('swallows errors thrown by async onInit (fire and forget)', async () => { - // Synchronous throws from onInit ARE propagated (it's called synchronously). - // Only async onInit errors are swallowed. Verify async path. const rejected = false; registry.register({ name: 'bad-async-init', @@ -90,6 +88,20 @@ describe('PluginRegistry', () => { expect(rejected).toBe(false); }); + it('swallows a SYNCHRONOUS throw from onInit (does not break register)', () => { + // A buggy plugin whose onInit throws synchronously must not propagate out + // of register() — registration still succeeds and the plugin is tracked. + expect(() => + registry.register({ + name: 'bad-sync-init', + onInit() { + throw new Error('sync init failed'); + }, + }) + ).not.toThrow(); + expect(registry.has('bad-sync-init')).toBe(true); + }); + it('can register multiple distinct plugins', () => { registry.register({ name: 'p1' }); registry.register({ name: 'p2' }); @@ -323,6 +335,25 @@ describe('PluginRegistry', () => { await expect(registry.runOnError(new Error('original'))).resolves.not.toThrow(); }); + it('swallows a SYNCHRONOUS throw inside onError (no crash on transport failure)', async () => { + registry.register({ + name: 'sync-bad-error-handler', + onError() { + throw new Error('sync handler blew up'); + }, + }); + // A later well-behaved hook must still run despite the earlier sync throw. + let laterRan = false; + registry.register({ + name: 'good-error-handler', + onError() { + laterRan = true; + }, + }); + await expect(registry.runOnError(new Error('original'))).resolves.not.toThrow(); + expect(laterRan).toBe(true); + }); + it('passes the log entry to onError hooks', async () => { const captured: LogEntry[] = []; registry.register({ @@ -370,6 +401,24 @@ describe('PluginRegistry', () => { await expect(registry.runOnShutdown()).resolves.not.toThrow(); }); + it('swallows a SYNCHRONOUS throw inside onShutdown (graceful shutdown not broken)', async () => { + let goodRan = false; + registry.register({ + name: 'sync-bad-shutdown', + onShutdown() { + throw new Error('sync shutdown blew up'); + }, + }); + registry.register({ + name: 'good-shutdown', + onShutdown() { + goodRan = true; + }, + }); + await expect(registry.runOnShutdown()).resolves.not.toThrow(); + expect(goodRan).toBe(true); + }); + it('runs all shutdown hooks concurrently', async () => { const completed: string[] = []; registry.register({ diff --git a/src/browser.ts b/src/browser.ts index 7afb242..f9c042d 100644 --- a/src/browser.ts +++ b/src/browser.ts @@ -273,18 +273,23 @@ export class BrowserRemoteTransport implements IBrowserTransport { } async flush(): Promise { - if (this.batch.length === 0) return; - const entries = this.batch.splice(0, this.batchSize); - try { - await fetch(this.url, { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...this.headers }, - body: JSON.stringify(entries), - keepalive: true, // survives page unload - }); - } catch { - // Silently restore on failure — best effort in browser - this.batch.unshift(...entries); + // Drain the WHOLE batch (not just one chunk) so a flush during page unload + // empties everything. splice() detaches synchronously, so entries added + // concurrently land in a fresh slice and are never sent twice. + while (this.batch.length > 0) { + const entries = this.batch.splice(0, this.batchSize); + try { + await fetch(this.url, { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...this.headers }, + body: JSON.stringify(entries), + keepalive: true, // survives page unload + }); + } catch { + // Silently restore on failure and stop — best effort in browser. + this.batch.unshift(...entries); + return; + } } } @@ -299,8 +304,17 @@ export class BrowserRemoteTransport implements IBrowserTransport { } } + /** + * Stop the flush timer and flush remaining entries. Without the final flush, + * any buffered logs were lost on page unload / teardown — the browser + * equivalent of the "last N seconds of logs lost on deploy" problem. + */ destroy(): void { - if (this.timer !== null) clearInterval(this.timer); + if (this.timer !== null) { + clearInterval(this.timer); + this.timer = null; + } + this.flush().catch(() => {}); } } diff --git a/src/cli/__tests__/utils.test.ts b/src/cli/__tests__/utils.test.ts new file mode 100644 index 0000000..77614dd --- /dev/null +++ b/src/cli/__tests__/utils.test.ts @@ -0,0 +1,60 @@ +/** + * Tests for the CLI table/parse helpers. + * + * Key regression: formatAsTable used `(r[c] || '')`, so a falsy-but-real value + * (0, false, '') rendered as blank — a count=0 / statusCode=0 column silently + * disappeared. Cells must render real values. + */ + +import { formatAsTable, safeParseLogs } from '../utils'; + +describe('formatAsTable', () => { + it('renders header, separator, and rows', () => { + const out = formatAsTable([{ a: '1', b: 'x' }], ['a', 'b']); + const lines = out.split('\n'); + expect(lines[0]).toContain('a'); + expect(lines[0]).toContain('b'); + expect(lines[2]).toContain('1'); + expect(lines[2]).toContain('x'); + }); + + it('renders falsy-but-real values (0, false) instead of blanks', () => { + const out = formatAsTable([{ count: 0, ok: false }], ['count', 'ok']); + expect(out).toContain('0'); + expect(out).toContain('false'); + }); + + it('does not crash on an empty rows array', () => { + expect(() => formatAsTable([], ['a', 'b'])).not.toThrow(); + const out = formatAsTable([], ['a', 'b']); + expect(out).toContain('a'); + }); + + it('handles missing fields as empty cells', () => { + const out = formatAsTable([{ a: 'present' }], ['a', 'missing']); + expect(out).toContain('present'); + }); +}); + +describe('safeParseLogs', () => { + it('parses JSON lines into objects', () => { + const raw = '{"level":"info","message":"a"}\n{"level":"error","message":"b"}'; + const parsed = safeParseLogs(raw); + expect(parsed).toHaveLength(2); + expect(parsed[0].level).toBe('info'); + expect(parsed[1].message).toBe('b'); + }); + + it('falls back to a message wrapper for non-JSON lines', () => { + const raw = 'plain text line\n{"message":"json line"}'; + const parsed = safeParseLogs(raw); + expect(parsed).toHaveLength(2); + expect(parsed[0]).toEqual({ message: 'plain text line' }); + expect(parsed[1].message).toBe('json line'); + }); + + it('ignores blank lines', () => { + const raw = '{"a":1}\n\n\n{"b":2}\n'; + expect(safeParseLogs(raw)).toHaveLength(2); + }); +}); diff --git a/src/cli/utils.ts b/src/cli/utils.ts index 1653513..e708f2f 100644 --- a/src/cli/utils.ts +++ b/src/cli/utils.ts @@ -1,15 +1,23 @@ /* eslint-disable @typescript-eslint/no-explicit-any -- CLI tools process raw JSON log data */ + +/** + * Render a single cell value. Uses a null/undefined check rather than `||` so + * falsy-but-real values (0, false, '') still render — a `count=0` or + * `statusCode=0` field must not silently display as blank. + */ +function cellText(value: unknown): string { + return value === null || value === undefined ? '' : String(value); +} + export function formatAsTable(rows: any[], columns: string[]) { // very small table printer const colWidths: number[] = columns.map((c) => - Math.max(c.length, ...rows.map((r) => (r[c] || '').toString().length)) + Math.max(c.length, ...rows.map((r) => cellText(r[c]).length)) ); const hdr = columns.map((c, i) => c.padEnd(colWidths[i] ?? 0)).join(' | '); const sep = colWidths.map((w) => '-'.repeat(w)).join('-|-'); const body = rows - .map((r) => - columns.map((c, i) => (r[c] || '').toString().padEnd(colWidths[i] ?? 0)).join(' | ') - ) + .map((r) => columns.map((c, i) => cellText(r[c]).padEnd(colWidths[i] ?? 0)).join(' | ')) .join('\n'); return [hdr, sep, body].join('\n'); } diff --git a/src/core/__tests__/kafka-trace.interceptor.test.ts b/src/core/__tests__/kafka-trace.interceptor.test.ts index 81b3bd3..855d5e9 100644 --- a/src/core/__tests__/kafka-trace.interceptor.test.ts +++ b/src/core/__tests__/kafka-trace.interceptor.test.ts @@ -11,7 +11,7 @@ */ import type { CallHandler, ExecutionContext } from '@nestjs/common'; -import { firstValueFrom, of } from 'rxjs'; +import { firstValueFrom, Observable, of } from 'rxjs'; import { TraceContext } from '../../utils/trace.utils'; import { KafkaTraceInterceptor } from '../kafka-trace.interceptor'; @@ -152,3 +152,34 @@ describe('KafkaTraceInterceptor — resolves traceId from body', () => { }); }); }); + +// ── subscription teardown (leak guard) ─────────────────────────────────────── + +describe('KafkaTraceInterceptor — unsubscribe propagation', () => { + it('tears down the inner handler subscription when the outer is unsubscribed', async () => { + await runInEmptyAls(async () => { + let torndown = false; + // A handler that never completes and records when it is unsubscribed. + const handler: CallHandler = { + handle: () => + new Observable(() => { + return () => { + torndown = true; + }; + }), + }; + + const interceptor = new KafkaTraceInterceptor(undefined, false); + const result$ = interceptor.intercept( + makeRpcContext({ traceId: 'leak-trace' }, { topic: 't', headers: {} }), + handler + ); + + const sub = result$.subscribe(); + // Before the fix, unsubscribing the outer left the inner running forever. + sub.unsubscribe(); + + expect(torndown).toBe(true); + }); + }); +}); diff --git a/src/core/__tests__/log-method.test.ts b/src/core/__tests__/log-method.test.ts new file mode 100644 index 0000000..1857db6 --- /dev/null +++ b/src/core/__tests__/log-method.test.ts @@ -0,0 +1,121 @@ +/** + * Tests for the @LogMethod decorator. + * + * Regression: the decorator unconditionally rewrote the method as `async` and + * awaited the original, so a SYNCHRONOUS method silently started returning a + * Promise — breaking callers that expected a direct value. The decorator must + * now preserve the sync/async contract while still logging entry/exit/error. + */ + +import { LogMethod } from '../nestjs-extras'; + +interface RecordedLog { + level: string; + message: string; + data?: Record; +} + +function makeFakeLogger() { + const logs: RecordedLog[] = []; + const mk = + (level: string) => + (message: string, data?: Record): Promise => { + logs.push({ level, message, data }); + return Promise.resolve(); + }; + return { + logs, + logger: { + debug: mk('debug'), + info: mk('info'), + warn: mk('warn'), + verbose: mk('verbose'), + trace: mk('trace'), + error: (msg: unknown, data?: Record) => { + logs.push({ level: 'error', message: String(msg), data }); + return Promise.resolve(); + }, + }, + }; +} + +describe('@LogMethod — preserves sync/async contract', () => { + it('a synchronous method still returns its value directly (not a Promise)', () => { + const fake = makeFakeLogger(); + + class Calc { + logger = fake.logger; + @LogMethod() + add(a: number, b: number): number { + return a + b; + } + } + + const result = new Calc().add(2, 3); + expect(result).toBe(5); + expect(result).not.toBeInstanceOf(Promise); + }); + + it('an async method returns a Promise that resolves to the value', async () => { + const fake = makeFakeLogger(); + + class Svc { + logger = fake.logger; + @LogMethod({ level: 'info' }) + async mul(a: number, b: number): Promise { + return a * b; + } + } + + const p = new Svc().mul(4, 5); + expect(p).toBeInstanceOf(Promise); + await expect(p).resolves.toBe(20); + }); + + it('logs entry and exit for a sync method', () => { + const fake = makeFakeLogger(); + + class Svc { + logger = fake.logger; + @LogMethod({ level: 'debug' }) + ping(): string { + return 'pong'; + } + } + + new Svc().ping(); + const messages = fake.logs.map((l) => l.message); + expect(messages.some((m) => m.startsWith('→'))).toBe(true); + expect(messages.some((m) => m.startsWith('←'))).toBe(true); + }); + + it('propagates and logs a synchronous throw', () => { + const fake = makeFakeLogger(); + + class Svc { + logger = fake.logger; + @LogMethod() + boom(): void { + throw new Error('sync boom'); + } + } + + expect(() => new Svc().boom()).toThrow('sync boom'); + expect(fake.logs.some((l) => l.level === 'error')).toBe(true); + }); + + it('propagates and logs an async rejection', async () => { + const fake = makeFakeLogger(); + + class Svc { + logger = fake.logger; + @LogMethod() + async fail(): Promise { + throw new Error('async boom'); + } + } + + await expect(new Svc().fail()).rejects.toThrow('async boom'); + expect(fake.logs.some((l) => l.level === 'error')).toBe(true); + }); +}); diff --git a/src/core/__tests__/logixia-logger.comprehensive.test.ts b/src/core/__tests__/logixia-logger.comprehensive.test.ts index 7a7e371..bbd93c5 100644 --- a/src/core/__tests__/logixia-logger.comprehensive.test.ts +++ b/src/core/__tests__/logixia-logger.comprehensive.test.ts @@ -710,3 +710,36 @@ describe('silent mode', () => { expect(out.joined()).toBe(''); }); }); + +// ── Redaction — message string ──────────────────────────────────────────────── + +describe('Redaction of secrets in the message string', () => { + it('redacts a pattern-matching secret embedded in the message, not just the payload', async () => { + const out = spyOutput(); + const logger = new LogixiaLogger({ + ...BASE_CONFIG, + levelOptions: { level: 'info' }, + redact: { patterns: [/Bearer\s+\S+/gi], censor: '[REDACTED]' }, + }); + + await logger.info('Auth header was Bearer abc123secrettoken456'); + await logger.info('login', { auth: 'Bearer abc123secrettoken456' }); + out.restore(); + + const joined = out.joined(); + // The raw token must NOT appear anywhere in the output (message or payload). + expect(joined).not.toContain('abc123secrettoken456'); + // The message itself was redacted. + expect(joined).toContain('Auth header was [REDACTED]'); + }); + + it('leaves the message untouched when no redact patterns are configured', async () => { + const out = spyOutput(); + const logger = new LogixiaLogger({ ...BASE_CONFIG, levelOptions: { level: 'info' } }); + + await logger.info('plain message Bearer token-stays'); + out.restore(); + + expect(out.joined()).toContain('plain message Bearer token-stays'); + }); +}); diff --git a/src/core/__tests__/non-string-context.test.ts b/src/core/__tests__/non-string-context.test.ts index d7ffc5b..efd18be 100644 --- a/src/core/__tests__/non-string-context.test.ts +++ b/src/core/__tests__/non-string-context.test.ts @@ -151,6 +151,20 @@ describe('LogixiaLoggerService — NestJS adapter', () => { service.verbose('v', { foo: 'bar' }); expect(service.getContext()).toBe('Initial'); }); + + it('does not throw when logging a circular object as the message', async () => { + const service = new LogixiaLoggerService({ + appName: 'NestApp', + format: { json: false, colorize: false, timestamp: false }, + }); + const circular: Record = { a: 1 }; + circular.self = circular; + + // formatMessage previously did JSON.stringify(message), which throws on a + // circular structure — crashing the log call. It must now be safe. + expect(() => service.log(circular as unknown as string)).not.toThrow(); + await expect(service.info(circular as unknown as string)).resolves.toBeUndefined(); + }); }); describe('AsyncLocalStorage interplay', () => { diff --git a/src/core/__tests__/trace-response-header.test.ts b/src/core/__tests__/trace-response-header.test.ts index c1f04ab..28399e9 100644 --- a/src/core/__tests__/trace-response-header.test.ts +++ b/src/core/__tests__/trace-response-header.test.ts @@ -7,7 +7,11 @@ * - `{ responseHeader: false }` → `null` (suppress header entirely) */ -import { DEFAULT_TRACE_RESPONSE_HEADER, resolveResponseHeader } from '../trace.middleware'; +import { + DEFAULT_TRACE_RESPONSE_HEADER, + resolveResponseHeader, + traceMiddleware, +} from '../trace.middleware'; describe('resolveResponseHeader', () => { it('returns the default header name when config is undefined', () => { @@ -28,3 +32,47 @@ describe('resolveResponseHeader', () => { expect(resolveResponseHeader({ enabled: true, responseHeader: false })).toBeNull(); }); }); + +describe('traceMiddleware — response-API robustness', () => { + const mkReq = () => + ({ method: 'GET', url: '/', headers: {}, get: () => '', ip: '' }) as unknown as Parameters< + ReturnType + >[0]; + const asRes = (r: unknown) => r as Parameters>[1]; + + it('uses res.setHeader on an Express-style response', () => { + const mw = traceMiddleware(); + const setHeader = jest.fn(); + let nextCalled = false; + mw(mkReq(), asRes({ setHeader }), () => { + nextCalled = true; + }); + expect(setHeader).toHaveBeenCalledWith(DEFAULT_TRACE_RESPONSE_HEADER, expect.any(String)); + expect(nextCalled).toBe(true); + }); + + it('falls back to reply.header on a Fastify-style response (no setHeader)', () => { + const mw = traceMiddleware(); + const header = jest.fn(); + mw(mkReq(), asRes({ header }), () => {}); + expect(header).toHaveBeenCalledWith(DEFAULT_TRACE_RESPONSE_HEADER, expect.any(String)); + }); + + it('does not throw when the response has no header method and still calls next', () => { + const mw = traceMiddleware(); + let nextCalled = false; + expect(() => + mw(mkReq(), asRes({}), () => { + nextCalled = true; + }) + ).not.toThrow(); + expect(nextCalled).toBe(true); + }); + + it('skips setting the header when headers are already sent', () => { + const mw = traceMiddleware(); + const setHeader = jest.fn(); + mw(mkReq(), asRes({ setHeader, headersSent: true }), () => {}); + expect(setHeader).not.toHaveBeenCalled(); + }); +}); diff --git a/src/core/kafka-trace.interceptor.ts b/src/core/kafka-trace.interceptor.ts index d633081..f53cc69 100644 --- a/src/core/kafka-trace.interceptor.ts +++ b/src/core/kafka-trace.interceptor.ts @@ -119,10 +119,15 @@ export class KafkaTraceInterceptor implements NestInterceptor { }; return new Observable((subscriber) => { + // Capture the inner subscription so it is torn down when the outer + // subscriber unsubscribes (consumer shutdown, upstream takeUntil/timeout). + // Without returning it, the handler subscription leaked and kept running + // after cancellation. + let inner: { unsubscribe(): void } | undefined; this.ctx.run( traceId!, () => { - next.handle().subscribe({ + inner = next.handle().subscribe({ next: (value) => subscriber.next(value), error: (err) => subscriber.error(err), complete: () => subscriber.complete(), @@ -130,6 +135,7 @@ export class KafkaTraceInterceptor implements NestInterceptor { }, kafkaContext ); + return () => inner?.unsubscribe(); }); } } diff --git a/src/core/logitron-logger.ts b/src/core/logitron-logger.ts index 1befaf8..73ac5b0 100644 --- a/src/core/logitron-logger.ts +++ b/src/core/logitron-logger.ts @@ -48,7 +48,7 @@ import { safeToString } from '../utils/coerce.utils'; import { isError, serializeError } from '../utils/error.utils'; import { internalError, internalLog, internalWarn } from '../utils/internal-log'; import { _getOtelPayloadIfEnabled } from '../utils/otel'; -import { applyRedaction } from '../utils/redact.utils'; +import { applyRedaction, applyRedactionToString } from '../utils/redact.utils'; import { Sampler } from '../utils/sampling.utils'; import { deregisterFromShutdown, flushOnExit, registerForShutdown } from '../utils/shutdown.utils'; import { TraceContext } from '../utils/trace.utils'; @@ -713,12 +713,19 @@ export class LogixiaLogger< const traceId = this.config.traceId ? (TraceContext.instance.getCurrentTraceId() ?? this.fallbackTraceId) : undefined; + // Pattern-based redaction also applies to the message string — a secret + // interpolated into the message (e.g. `Auth: Bearer ${token}`) would + // otherwise bypass redaction entirely and leak in plaintext. Path rules + // don't apply to a bare string, so only `patterns` run here. + const safeMessage = this._hasRedact + ? applyRedactionToString(message, this.config.redact) + : message; const entry: LogEntry = { timestamp: new Date().toISOString(), level, appName: this.config.appName ?? 'App', environment: this.config.environment ?? 'development', - message, + message: safeMessage, }; if (this.context) entry.context = this.context; if (payload !== undefined) entry.payload = payload; diff --git a/src/core/logitron-nestjs.service.ts b/src/core/logitron-nestjs.service.ts index 317742e..9bcff02 100644 --- a/src/core/logitron-nestjs.service.ts +++ b/src/core/logitron-nestjs.service.ts @@ -444,7 +444,10 @@ export class LogixiaLoggerService implements LoggerService { private formatMessage(message: unknown): string { if (typeof message === 'string') return message; - if (typeof message === 'object') return JSON.stringify(message); - return String(message); + // Use safeToString rather than raw JSON.stringify: NestJS passes arbitrary + // values here, and JSON.stringify both THROWS on circular structures (which + // would crash the log call) and RETURNS undefined for some inputs (e.g. a + // toJSON returning undefined). safeToString handles both safely. + return safeToString(message); } } diff --git a/src/core/nestjs-extras.ts b/src/core/nestjs-extras.ts index 18578c5..90d7ac9 100644 --- a/src/core/nestjs-extras.ts +++ b/src/core/nestjs-extras.ts @@ -95,8 +95,11 @@ export interface LogMethodOptions { /** * Method decorator that auto-logs entry, exit, duration, and errors. * - * Works on both async and sync methods. Attaches to the logger found on the - * class instance via a `logger` property (the conventional NestJS name). + * Preserves the original method's sync/async contract: a synchronous method + * stays synchronous (returns its value directly, with logs emitted + * fire-and-forget), and an async method is awaited so exit/error logs reflect + * the resolved result. Attaches to the logger found on the class instance via a + * `logger` property (the conventional NestJS name). * * @example * ```ts @@ -124,10 +127,40 @@ export function LogMethod(options: LogMethodOptions = {}): MethodDecorator { let _warnedNoLogger = false; - descriptor.value = async function ( - this: { logger?: LogixiaLoggerService }, - ...args: unknown[] - ) { + // Surface logger failures on stderr instead of silently swallowing them. + const reportLogFailure = (phase: string, err: unknown): void => { + process.stderr.write(`[logixia] @LogMethod(${label}) ${phase} log failed: ${String(err)}\n`); + }; + + // Emit a log line fire-and-forget, routing transport failures to stderr. + const emit = ( + logger: LogixiaLoggerService, + phase: string, + message: string, + data: Record + ): void => { + const logFnRaw = ( + logger as unknown as Record< + string, + (msg: string, data?: Record) => Promise + > + )[level]; + const logFn = (typeof logFnRaw === 'function' ? logFnRaw : logger.debug).bind(logger); + const p = logFn(message, data); + if (p && typeof (p as Promise).catch === 'function') { + (p as Promise).catch((e: unknown) => reportLogFailure(phase, e)); + } + }; + + const emitError = (logger: LogixiaLoggerService, error: unknown, start: number): void => { + const err = error instanceof Error ? error : new Error(String(error)); + const errLog: unknown = logger.error(err, { method: label, durationMs: Date.now() - start }); + if (errLog && typeof (errLog as Promise).catch === 'function') { + (errLog as Promise).catch((e: unknown) => reportLogFailure('error', e)); + } + }; + + descriptor.value = function (this: { logger?: LogixiaLoggerService }, ...args: unknown[]) { // Prefer the instance's own logger; fall back to the global module logger. const logger: LogixiaLoggerService | undefined = this.logger ?? LogixiaLoggerModule._globalLogger ?? undefined; @@ -142,82 +175,46 @@ export function LogMethod(options: LogMethodOptions = {}): MethodDecorator { } const start = Date.now(); - const entry: Record = { method: label }; - if (logArgs && args.length > 0) { - entry['args'] = args; - } + if (logArgs && args.length > 0) entry['args'] = args; + // Entry log is fire-and-forget so a SYNC method is not forced to become + // async just to await it. + if (logger) emit(logger, 'entry', `→ ${label}`, entry); - // Surface logger failures on stderr instead of silently swallowing them — - // a broken transport is an operator problem that must be observable. - // Uses a flat 2-arg helper (avoids curried `(phase) => (err) =>` nesting - // which trips the sonarjs no-nested-functions depth rule). - const reportLogFailure = (phase: string, err: unknown): void => { - process.stderr.write( - `[logixia] @LogMethod(${label}) ${phase} log failed: ${String(err)}\n` - ); + const buildExit = (result: unknown): Record => { + const exit: Record = { method: label, durationMs: Date.now() - start }; + if (logResult) exit['result'] = result; + return exit; }; - if (logger) { - // Use logLevel (extended method) rather than the NestJS interface debug/info - // which only accepts (message: unknown, context?: string) - const logFnRaw = ( - logger as unknown as Record< - string, - (msg: string, data?: Record) => Promise - > - )[level]; - const logFn = (typeof logFnRaw === 'function' ? logFnRaw : logger.debug).bind(logger); - const entryPromise = ( - logFn as (msg: string, data?: Record) => Promise - )(`→ ${label}`, entry); - await entryPromise.catch((e: unknown) => reportLogFailure('entry', e)); - } - + let result: unknown; try { - const result = await (originalMethod.apply(this, args) as Promise); - - const exit: Record = { - method: label, - durationMs: Date.now() - start, - }; - if (logResult) exit['result'] = result; - - if (logger) { - const logFnRaw = ( - logger as unknown as Record< - string, - (msg: string, data?: Record) => Promise - > - )[level]; - const logFn = (typeof logFnRaw === 'function' ? logFnRaw : logger.debug).bind(logger); - const exitPromise = ( - logFn as (msg: string, data?: Record) => Promise - )(`← ${label}`, exit); - await exitPromise.catch((e: unknown) => reportLogFailure('exit', e)); - } - - return result; + result = originalMethod.apply(this, args); } catch (error) { - if (logger && logErrors) { - const err = error instanceof Error ? error : new Error(String(error)); - // Use the structured overload (Record) so we get - // Promise back and can attach a stderr fallback for transport - // failures — otherwise they'd become unhandled rejections. - const errLog: unknown = logger.error(err, { - method: label, - durationMs: Date.now() - start, - }); - if ( - errLog !== undefined && - errLog !== null && - typeof (errLog as Promise).catch === 'function' - ) { - (errLog as Promise).catch((e: unknown) => reportLogFailure('error', e)); - } - } + // Synchronous throw. + if (logger && logErrors) emitError(logger, error, start); throw error; } + + // Async method → await via the returned thenable so exit/error reflect the + // resolved outcome, and preserve the Promise return type. + if (result && typeof (result as PromiseLike).then === 'function') { + return (result as Promise).then( + (resolved) => { + if (logger) emit(logger, 'exit', `← ${label}`, buildExit(resolved)); + return resolved; + }, + (error: unknown) => { + if (logger && logErrors) emitError(logger, error, start); + throw error; + } + ); + } + + // Synchronous method → log exit fire-and-forget and return the value as-is, + // preserving the original synchronous contract. + if (logger) emit(logger, 'exit', `← ${label}`, buildExit(result)); + return result; }; return descriptor; diff --git a/src/core/trace.middleware.ts b/src/core/trace.middleware.ts index 8dcb442..e378734 100644 --- a/src/core/trace.middleware.ts +++ b/src/core/trace.middleware.ts @@ -23,6 +23,27 @@ export function resolveResponseHeader(config?: TraceIdConfig): string | null { return config?.responseHeader ?? DEFAULT_TRACE_RESPONSE_HEADER; } +/** + * Echo the trace ID on the response, tolerating non-Express responses and + * already-sent headers. NestJS can run on Fastify (reply.header() instead of + * res.setHeader()), and on a closed/sent response setHeader throws — neither + * should crash the request. The trace ID is still propagated via async context. + */ +function writeTraceHeader(res: unknown, header: string, traceId: string): void { + const r = res as { + setHeader?: (k: string, v: string) => void; + header?: (k: string, v: string) => void; + headersSent?: boolean; + }; + if (r.headersSent) return; + try { + if (typeof r.setHeader === 'function') r.setHeader(header, traceId); + else if (typeof r.header === 'function') r.header(header, traceId); + } catch { + /* response not in a header-settable state — context propagation still works */ + } +} + // Extend Express Request interface — requires namespace to augment Express typings /* eslint-disable @typescript-eslint/no-namespace */ declare global { @@ -90,13 +111,13 @@ export class TraceMiddleware implements NestMiddleware { req.traceId = traceId; const header = resolveResponseHeader(this.config); - if (header) res.setHeader(header, traceId); + if (header) writeTraceHeader(res, header, traceId); this.ctx.run(traceId, () => next(), { method: req.method, url: req.url, userAgent: req.get('User-Agent'), - ip: req.ip || req.connection.remoteAddress, + ip: req.ip || req.socket?.remoteAddress || req.connection?.remoteAddress, }); } } @@ -159,13 +180,13 @@ export function traceMiddleware(config?: TraceIdConfig) { req.traceId = traceId; const header = resolveResponseHeader(traceConfig); - if (header) res.setHeader(header, traceId); + if (header) writeTraceHeader(res, header, traceId); ctx.run(traceId, () => next(), { method: req.method, url: req.url, userAgent: req.get('User-Agent'), - ip: req.ip || req.connection.remoteAddress, + ip: req.ip || req.socket?.remoteAddress || req.connection?.remoteAddress, }); }; } diff --git a/src/core/websocket-trace.interceptor.ts b/src/core/websocket-trace.interceptor.ts index 1ba5f62..3bc1929 100644 --- a/src/core/websocket-trace.interceptor.ts +++ b/src/core/websocket-trace.interceptor.ts @@ -70,11 +70,14 @@ export class WebSocketTraceInterceptor implements NestInterceptor { // Run the handler with trace ID context return new Observable((observer) => { + // Capture the inner subscription so it is torn down when the outer observer + // unsubscribes — otherwise the handler subscription leaks after the socket + // disconnects or an upstream operator cancels. + let inner: { unsubscribe(): void } | undefined; this.ctx.run( traceId!, () => { - const result = next.handle(); - result.subscribe({ + inner = next.handle().subscribe({ next: (value) => observer.next(value), error: (err) => observer.error(err), complete: () => observer.complete(), @@ -82,6 +85,7 @@ export class WebSocketTraceInterceptor implements NestInterceptor { }, wsContextData ); + return () => inner?.unsubscribe(); }); } } diff --git a/src/formatters/__tests__/formatters.test.ts b/src/formatters/__tests__/formatters.test.ts new file mode 100644 index 0000000..dcbefaa --- /dev/null +++ b/src/formatters/__tests__/formatters.test.ts @@ -0,0 +1,81 @@ +/** + * Tests for the JSON and text formatters. + * + * Key regression: a payload containing a circular reference (e.g. an Express + * req/res, a DB connection, a Mongoose document) must NOT crash format() — both + * formatters previously called raw JSON.stringify and threw "Converting circular + * structure to JSON", taking down the whole log/transport path. + */ + +import type { LogEntry } from '../../types'; +import { JsonFormatter } from '../json.formatter'; +import { TextFormatter } from '../text.formatter'; + +function baseEntry(overrides: Partial = {}): LogEntry { + return { + timestamp: '2026-01-01T00:00:00.000Z', + level: 'info', + appName: 'TestApp', + message: 'hello', + ...overrides, + }; +} + +function makeCircular(): Record { + const obj: Record = { name: 'req' }; + obj.self = obj; + return obj; +} + +describe('JsonFormatter', () => { + it('produces valid JSON for a normal entry', () => { + const out = new JsonFormatter().format(baseEntry({ payload: { userId: 'u1' } })); + const parsed = JSON.parse(out); + expect(parsed.message).toBe('hello'); + expect(parsed.payload.userId).toBe('u1'); + }); + + it('does not crash on a circular payload and still emits valid JSON', () => { + const formatter = new JsonFormatter(); + let out = ''; + expect(() => { + out = formatter.format(baseEntry({ payload: { req: makeCircular() } })); + }).not.toThrow(); + + const parsed = JSON.parse(out); // must be valid JSON + expect(JSON.stringify(parsed)).toContain('[Circular]'); + }); + + it('serializes an Error in the payload', () => { + const out = new JsonFormatter().format(baseEntry({ payload: { err: new Error('boom') } })); + const parsed = JSON.parse(out); + expect(parsed.payload.err.message).toBe('boom'); + }); +}); + +describe('TextFormatter', () => { + it('formats a simple single-key payload as key=value', () => { + const out = new TextFormatter({ colorize: false }).format(baseEntry({ payload: { count: 5 } })); + expect(out).toContain('count=5'); + }); + + it('does not crash on a circular payload', () => { + const formatter = new TextFormatter({ colorize: false }); + expect(() => + formatter.format(baseEntry({ payload: { req: makeCircular(), other: 'val' } })) + ).not.toThrow(); + }); + + it('strips ASCII control characters from the message (CWE-117)', () => { + // Build the message with explicit control-char codes (ESC + BEL) so an + // attacker could otherwise smuggle an ANSI escape through the formatter. + const esc = String.fromCharCode(0x1b); + const bel = String.fromCharCode(0x07); + const out = new TextFormatter({ colorize: false }).format( + baseEntry({ message: `safe${esc}[31m${bel}Injected` }) + ); + expect(out).not.toContain(esc); + expect(out).not.toContain(bel); + expect(out).toContain('Injected'); + }); +}); diff --git a/src/formatters/json.formatter.ts b/src/formatters/json.formatter.ts index e978b8d..40afe04 100644 --- a/src/formatters/json.formatter.ts +++ b/src/formatters/json.formatter.ts @@ -5,6 +5,22 @@ import type { ILogFormatter, LogEntry } from '../types'; import { serializeError } from '../utils/error.utils'; +/** + * Build a JSON.stringify replacer that replaces circular references with the + * string '[Circular]' instead of throwing. A fresh replacer must be created per + * stringify call because it holds per-serialization state (the seen set). + */ +function createCircularReplacer(): (key: string, value: unknown) => unknown { + const seen = new WeakSet(); + return function (this: unknown, _key: string, value: unknown): unknown { + if (typeof value === 'object' && value !== null) { + if (seen.has(value)) return '[Circular]'; + seen.add(value); + } + return value; + }; +} + export class JsonFormatter implements ILogFormatter { private includeTimestamp: boolean; private includeLevel: boolean; @@ -80,7 +96,13 @@ export class JsonFormatter implements ILogFormatter { version: process.version, }; - return this.prettyPrint ? JSON.stringify(formatted, null, 2) : JSON.stringify(formatted); + // Use a circular-safe replacer: a payload containing a cycle (e.g. an Express + // req/res, a DB connection, a Mongoose doc) would otherwise make JSON.stringify + // throw and crash the entire log/transport path. + const replacer = createCircularReplacer(); + return this.prettyPrint + ? JSON.stringify(formatted, replacer, 2) + : JSON.stringify(formatted, replacer); } private serializePayload(payload: Record): Record { diff --git a/src/formatters/text.formatter.ts b/src/formatters/text.formatter.ts index b014a09..85a9c6b 100644 --- a/src/formatters/text.formatter.ts +++ b/src/formatters/text.formatter.ts @@ -164,7 +164,10 @@ export class TextFormatter implements ILogFormatter { return `${key}=${value.toISOString()}`; } if (typeof value === 'object') { - return `${key}=${JSON.stringify(value)}`; + // safeToString is circular-safe (raw JSON.stringify throws on a cycle + // — e.g. an Express req/res — which would crash the log call) and the + // control-char strip keeps the CWE-117 guarantee for object values. + return `${key}=${stripControls(safeToString(value))}`; } return `${key}=${String(value)}`; }) @@ -172,7 +175,9 @@ export class TextFormatter implements ILogFormatter { return formatted; } catch { - return JSON.stringify(payload); + // Last-resort fallback must also be circular-safe — the previous + // JSON.stringify(payload) here threw again on the same cyclic payload. + return stripControls(safeToString(payload)); } } diff --git a/src/metrics.ts b/src/metrics.ts index 888acb9..afb6fd0 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -162,7 +162,9 @@ function buildLabelKey(config: MetricConfig, entry: LogEntry): string { const payload = entry.payload ?? {}; for (const name of labelNames) { const raw = name === 'level' ? entry.level : payload[name]; - pairs[name] = raw !== undefined && raw !== null ? String(raw) : ''; + // Sanitize the label NAME (the value is escaped at render time) so a label + // like 'status code' can't emit unparseable Prometheus output. + pairs[sanitizePromName(name)] = raw !== undefined && raw !== null ? String(raw) : ''; } return JSON.stringify(pairs); } @@ -178,6 +180,19 @@ function escapeLabel(value: string): string { return value.replace(/\\/g, '\\\\').replace(/"/g, '\\"').replace(/\n/g, '\\n'); } +/** + * Coerce an arbitrary string into a valid Prometheus metric or label name + * (`[a-zA-Z_][a-zA-Z0-9_]*`). Invalid characters become underscores and a + * leading digit is prefixed with `_`. Without this, a user-supplied name like + * `my-metric` or a label like `status code` would emit output Prometheus cannot + * parse, breaking the ENTIRE scrape endpoint, not just that metric. + */ +function sanitizePromName(name: string): string { + let safe = name.replace(/\W/g, '_'); + if (safe.length > 0 && /\d/.test(safe[0]!)) safe = `_${safe}`; + return safe.length > 0 ? safe : '_'; +} + // ── MetricsPlugin ───────────────────────────────────────────────────────────── /** @@ -283,7 +298,7 @@ export class MetricsPlugin implements LogixiaPlugin { const lines: string[] = []; for (const [rawName, config] of Object.entries(this.map)) { - const metricName = `logixia_${rawName}`; + const metricName = `logixia_${sanitizePromName(rawName)}`; const state = this.metricState.get(rawName); if (!state) continue; diff --git a/src/middleware/__tests__/http-logger.test.ts b/src/middleware/__tests__/http-logger.test.ts new file mode 100644 index 0000000..2de7072 --- /dev/null +++ b/src/middleware/__tests__/http-logger.test.ts @@ -0,0 +1,122 @@ +/** + * Tests for the HTTP logger middleware (Morgan replacement). + * + * Key regression: a normal response emits BOTH 'finish' and 'close' events, and + * the middleware registered onFinish on both. Without a guard that double-logged + * the "request completed" entry (and the slow-request warning). These tests pin + * single-logging plus the request-start / skip / status-level behavior. + */ + +import type { IBaseLogger } from '../../types'; +import { + createExpressMiddleware, + type IncomingRequest, + type OutgoingResponse, +} from '../http-logger'; + +interface LogCall { + level: string; + message: string; + data?: Record; +} + +function makeLogger(): { logger: IBaseLogger; calls: LogCall[] } { + const calls: LogCall[] = []; + const logger = { + logLevel: (level: string, message: string, data?: Record) => { + calls.push({ level, message, data }); + return Promise.resolve(); + }, + warn: (message: string, data?: Record) => { + calls.push({ level: 'warn', message, data }); + return Promise.resolve(); + }, + } as unknown as IBaseLogger; + return { logger, calls }; +} + +/** A fake response that lets the test fire 'finish' / 'close' events. */ +function makeRes(statusCode = 200): OutgoingResponse & { fire(event: string): void } { + const handlers: Record void>> = {}; + return { + statusCode, + once(event: string, cb: () => void) { + if (!handlers[event]) handlers[event] = []; + handlers[event]!.push(cb); + }, + fire(event: string) { + for (const cb of handlers[event] ?? []) cb(); + }, + } as OutgoingResponse & { fire(event: string): void }; +} + +describe('createExpressMiddleware', () => { + it('logs "request completed" only once when both finish and close fire', () => { + const { logger, calls } = makeLogger(); + const mw = createExpressMiddleware(logger, { requestLevel: 'silent' }); + const req: IncomingRequest = { method: 'GET', url: '/x', headers: {} }; + const res = makeRes(200); + + mw(req, res, () => {}); + res.fire('finish'); + res.fire('close'); // must NOT log a second completion + + const completions = calls.filter((c) => c.message === 'request completed'); + expect(completions).toHaveLength(1); + }); + + it('logs a request-start entry at the configured level', () => { + const { logger, calls } = makeLogger(); + const mw = createExpressMiddleware(logger, { requestLevel: 'debug' }); + const req: IncomingRequest = { method: 'POST', url: '/y', headers: {} }; + const res = makeRes(201); + + mw(req, res, () => {}); + const start = calls.find((c) => c.message === 'request started'); + expect(start?.level).toBe('debug'); + }); + + it('uses the error level for a 5xx response', () => { + const { logger, calls } = makeLogger(); + const mw = createExpressMiddleware(logger, { requestLevel: 'silent', errorLevel: 'error' }); + const req: IncomingRequest = { method: 'GET', url: '/z', headers: {} }; + const res = makeRes(500); + + mw(req, res, () => {}); + res.fire('finish'); + + const completion = calls.find((c) => c.message === 'request completed'); + expect(completion?.level).toBe('error'); + expect(completion?.data?.statusCode).toBe(500); + }); + + it('redacts sensitive headers in the logged fields', () => { + const { logger, calls } = makeLogger(); + const mw = createExpressMiddleware(logger, { requestLevel: 'info' }); + const req: IncomingRequest = { + method: 'GET', + url: '/a', + headers: { authorization: 'Bearer secret', 'x-custom': 'visible' }, + }; + const res = makeRes(200); + + mw(req, res, () => {}); + const start = calls.find((c) => c.message === 'request started'); + const headers = start?.data?.headers as Record; + expect(headers.authorization).toBe('[REDACTED]'); + expect(headers['x-custom']).toBe('visible'); + }); + + it('skips logging when the skip predicate returns true', () => { + const { logger, calls } = makeLogger(); + const mw = createExpressMiddleware(logger, { skip: (req) => req.url === '/health' }); + let nextCalled = false; + + mw({ method: 'GET', url: '/health', headers: {} }, makeRes(200), () => { + nextCalled = true; + }); + + expect(nextCalled).toBe(true); + expect(calls).toHaveLength(0); + }); +}); diff --git a/src/middleware/http-logger.ts b/src/middleware/http-logger.ts index feb8b06..9e01805 100644 --- a/src/middleware/http-logger.ts +++ b/src/middleware/http-logger.ts @@ -180,7 +180,15 @@ export function createExpressMiddleware( // Hook into the response 'finish' event — fires after headers + body are sent. // This is what Morgan gets wrong for slow requests (it uses 'close' which may // fire before the status code is set on some Node versions). + // + // BOTH 'finish' and 'close' fire on a normal response, so guard against + // logging the completion twice (which would duplicate the completed entry and + // the slow-request warning). + let completed = false; const onFinish = (): void => { + if (completed) return; + completed = true; + const duration = Date.now() - startMs; const status = res.statusCode ?? 0; const level = status >= 500 ? errorLevel : responseLevel; @@ -202,10 +210,9 @@ export function createExpressMiddleware( }; res.once?.('finish', onFinish); - // Fallback: also listen to 'close' (client disconnected before response finished) - res.once?.('close', () => { - if ((res.statusCode ?? 0) === 0) onFinish(); - }); + // Fallback: also covers a client that disconnects before 'finish' fires. The + // `completed` guard ensures a normal finish+close pair logs only once. + res.once?.('close', onFinish); next(); }; diff --git a/src/plugin.ts b/src/plugin.ts index 9608a83..fdf3400 100644 --- a/src/plugin.ts +++ b/src/plugin.ts @@ -93,8 +93,15 @@ export class PluginRegistry { if (this._plugins.some((p) => p.name === plugin.name)) return; this._plugins.push(plugin); if (plugin.onInit) { - const result = plugin.onInit(); - if (result instanceof Promise) result.catch(() => {}); + // Isolate onInit: a synchronous throw must not propagate to the caller of + // register()/use() (the docs promise onInit errors are swallowed). The + // async branch is guarded with .catch for the same reason. + try { + const result = plugin.onInit(); + if (result instanceof Promise) result.catch(() => {}); + } catch { + /* swallow — a buggy onInit must not break registration */ + } } } @@ -144,8 +151,15 @@ export class PluginRegistry { async runOnError(error: Error, entry?: LogEntry): Promise { for (const plugin of this._plugins) { if (plugin.onError) { - const r = plugin.onError(error, entry); - if (r instanceof Promise) await r.catch(() => {}); + // Guard the SYNCHRONOUS call too — a sync throw here would otherwise + // propagate out of runOnError and turn a recoverable transport failure + // (the very thing onError exists to report) into a crash. + try { + const r = plugin.onError(error, entry); + if (r instanceof Promise) await r.catch(() => {}); + } catch { + /* swallow — onError hooks must never crash error handling */ + } } } } @@ -156,8 +170,14 @@ export class PluginRegistry { this._plugins .filter((p) => Boolean(p.onShutdown)) .map((p) => { - const r = p.onShutdown!(); - return r instanceof Promise ? r.catch(() => {}) : Promise.resolve(); + // Guard the synchronous call: a sync throw in onShutdown would reject + // this map and, via Promise.all, break graceful shutdown → log loss. + try { + const r = p.onShutdown!(); + return r instanceof Promise ? r.catch(() => {}) : Promise.resolve(); + } catch { + return Promise.resolve(); + } }) ); } diff --git a/src/search/core/__tests__/basic-log-indexer.test.ts b/src/search/core/__tests__/basic-log-indexer.test.ts new file mode 100644 index 0000000..125a878 --- /dev/null +++ b/src/search/core/__tests__/basic-log-indexer.test.ts @@ -0,0 +1,100 @@ +/** + * Tests for BasicLogIndexer. + * + * Focus: the index-size accounting. getIndexStats() previously JSON.stringify'd + * every entry on each call (O(n) — for the default 1M cap that blocks the event + * loop). Size is now maintained incrementally, so these tests pin that the + * running total stays consistent across add / remove / clear. Also covers field + * search and the max-size eviction. + */ + +import type { LogEntry } from '../../../types'; +import { BasicLogIndexer } from '../basic-log-indexer'; + +function makeLog(i: number, overrides: Partial = {}): LogEntry { + return { + timestamp: new Date(2026, 0, 1, 0, 0, i).toISOString(), + level: 'info', + appName: 'TestApp', + message: `message-${i}`, + ...overrides, + }; +} + +describe('BasicLogIndexer — size accounting', () => { + it('reports a positive size after indexing and zero after clear', async () => { + const ix = new BasicLogIndexer({ autoOptimize: false }); + for (let i = 0; i < 50; i += 1) await ix.indexLog(makeLog(i)); + + const stats = await ix.getIndexStats(); + expect(stats.totalDocuments).toBe(50); + expect(stats.indexSize).toBeGreaterThan(0); + + await ix.clearIndex(); + const after = await ix.getIndexStats(); + expect(after.totalDocuments).toBe(0); + expect(after.indexSize).toBe(0); + }); + + it('decrements size back to zero when all entries are removed by age', async () => { + const ix = new BasicLogIndexer({ autoOptimize: false }); + for (let i = 0; i < 30; i += 1) await ix.indexLog(makeLog(i)); + expect((await ix.getIndexStats()).indexSize).toBeGreaterThan(0); + + // Cutoff far in the future → removes everything. + const removed = await ix.removeOldLogs(new Date(Date.now() + 10_000_000)); + expect(removed).toBe(30); + + const stats = await ix.getIndexStats(); + expect(stats.totalDocuments).toBe(0); + expect(stats.indexSize).toBe(0); + }); + + it('keeps size non-negative and consistent after partial removal', async () => { + const ix = new BasicLogIndexer({ autoOptimize: false }); + for (let i = 0; i < 20; i += 1) await ix.indexLog(makeLog(i)); + const full = (await ix.getIndexStats()).indexSize; + + // Remove the first ~10 (their timestamps are earliest). + const cutoff = new Date(2026, 0, 1, 0, 0, 10).toISOString(); + await ix.removeOldLogs(new Date(cutoff)); + + const partial = (await ix.getIndexStats()).indexSize; + expect(partial).toBeGreaterThan(0); + expect(partial).toBeLessThan(full); + }); +}); + +describe('BasicLogIndexer — field search', () => { + it('finds logs by an indexed field value (case-insensitive)', async () => { + const ix = new BasicLogIndexer({ autoOptimize: false }); + await ix.indexLog(makeLog(1, { level: 'error', traceId: 'TRACE-1' })); + await ix.indexLog(makeLog(2, { level: 'info', traceId: 'trace-1' })); + + const byTrace = ix.searchByField('traceId', 'trace-1'); + expect(byTrace).toHaveLength(2); + + const errors = ix.searchByField('level', 'ERROR'); + expect(errors).toHaveLength(1); + }); + + it('returns an empty array for an unknown field or value', async () => { + const ix = new BasicLogIndexer({ autoOptimize: false }); + await ix.indexLog(makeLog(1)); + expect(ix.searchByField('nope', 'x')).toEqual([]); + expect(ix.searchByField('level', 'warn')).toEqual([]); + }); +}); + +describe('BasicLogIndexer — max size eviction', () => { + it('evicts oldest logs once the index exceeds maxIndexSize', async () => { + // maxIndexSize 10 → after crossing, removeOldestLogs trims ~10%. + const ix = new BasicLogIndexer({ autoOptimize: false, maxIndexSize: 10 }); + for (let i = 0; i < 12; i += 1) await ix.indexLog(makeLog(i)); + + const stats = await ix.getIndexStats(); + // Never grows unbounded past the cap (eviction kicked in). + expect(stats.totalDocuments).toBeLessThanOrEqual(11); + expect(stats.indexSize).toBeGreaterThan(0); + }); +}); diff --git a/src/search/core/__tests__/basic-search-engine.test.ts b/src/search/core/__tests__/basic-search-engine.test.ts new file mode 100644 index 0000000..6066601 --- /dev/null +++ b/src/search/core/__tests__/basic-search-engine.test.ts @@ -0,0 +1,91 @@ +/** + * Tests for BasicSearchEngine. + * + * Focus on the two fixed bugs plus core behavior: + * - getSearchableText used raw JSON.stringify(payload), which throws on a cyclic + * payload and crashes the WHOLE search. It must now be circular-safe. + * - addLogs grew this.logs without bound (memory leak) and used push(...logs) + * (RangeError risk on huge arrays). It must cap at maxLogs (FIFO). + */ + +import type { LogEntry } from '../../../types'; +import { BasicSearchEngine } from '../basic-search-engine'; + +function makeLog(i: number, overrides: Partial = {}): LogEntry { + return { + timestamp: new Date(2026, 0, 1, 0, 0, i).toISOString(), + level: 'info', + appName: 'TestApp', + message: `message ${i}`, + ...overrides, + }; +} + +describe('BasicSearchEngine — search', () => { + it('returns logs whose searchable text contains the query terms', async () => { + const eng = new BasicSearchEngine(); + eng.addLogs([makeLog(1, { message: 'user login ok' }), makeLog(2, { message: 'cache miss' })]); + + const results = await eng.search('login'); + expect(results).toHaveLength(1); + expect(results[0]!.log.message).toBe('user login ok'); + }); + + it('does not crash when a stored log has a circular payload', async () => { + const eng = new BasicSearchEngine(); + const circular: Record = { a: 1 }; + circular.self = circular; + eng.addLogs([makeLog(1, { payload: circular }), makeLog(2)]); + + let results: Awaited> = []; + await expect( + (async () => { + results = await eng.search('message'); + })() + ).resolves.toBeUndefined(); + expect(results.length).toBe(2); + }); + + it('correlates logs by trace id, sorted by timestamp', async () => { + const eng = new BasicSearchEngine(); + eng.addLogs([ + makeLog(2, { traceId: 't1' }), + makeLog(1, { traceId: 't1' }), + makeLog(3, { traceId: 't2' }), + ]); + + const correlated = await eng.correlateByTraceId('t1'); + expect(correlated.logs).toHaveLength(2); + // Earliest timestamp first. + expect(correlated.logs[0]!.message).toBe('message 1'); + }); +}); + +describe('BasicSearchEngine — bounded buffer', () => { + it('caps the log buffer at maxLogs and keeps the newest entries (FIFO)', () => { + const eng = new BasicSearchEngine({ maxLogs: 10 }); + const batch: LogEntry[] = []; + for (let i = 0; i < 25; i += 1) batch.push(makeLog(i)); + eng.addLogs(batch); + + const logs = eng.getLogs(); + expect(logs).toHaveLength(10); + const messages = logs.map((l) => l.message); + expect(messages).toContain('message 24'); + expect(messages).not.toContain('message 0'); + }); + + it('caps correctly across multiple addLogs calls', () => { + const eng = new BasicSearchEngine({ maxLogs: 5 }); + for (let i = 0; i < 8; i += 1) eng.addLogs([makeLog(i)]); + expect(eng.getLogs()).toHaveLength(5); + expect(eng.getLogs().map((l) => l.message)).toContain('message 7'); + }); + + it('clearLogs empties the buffer', () => { + const eng = new BasicSearchEngine(); + eng.addLogs([makeLog(1), makeLog(2)]); + eng.clearLogs(); + expect(eng.getLogs()).toHaveLength(0); + }); +}); diff --git a/src/search/core/basic-log-indexer.ts b/src/search/core/basic-log-indexer.ts index ceb0ee3..5b19bfc 100644 --- a/src/search/core/basic-log-indexer.ts +++ b/src/search/core/basic-log-indexer.ts @@ -14,6 +14,15 @@ export class BasicLogIndexer implements ILogIndexer { private fieldIndices: Map>> = new Map(); private semanticIndex: SemanticIndex | undefined; private lastOptimized?: Date; + /** + * Running estimate of the serialized index size in bytes, maintained + * incrementally. The previous implementation JSON.stringify'd EVERY log on + * each getIndexStats() call — O(n) per call, which for the default 1M-entry + * cap means stringifying a million objects and blocking the event loop. + * We keep a per-log byte estimate so the lookup is O(1). + */ + private indexSizeBytes = 0; + private readonly logSizeById = new Map(); constructor( private options?: { @@ -40,7 +49,7 @@ export class BasicLogIndexer implements ILogIndexer { const logId = this.generateLogId(log); // Add to main index - this.index.set(logId, log); + this.trackInsert(logId, log); // Update field indices this.updateFieldIndices(logId, log); @@ -65,7 +74,7 @@ export class BasicLogIndexer implements ILogIndexer { async indexBatch(logs: LogEntry[]): Promise { for (const log of logs) { const logId = this.generateLogId(log); - this.index.set(logId, log); + this.trackInsert(logId, log); this.updateFieldIndices(logId, log); } @@ -130,6 +139,8 @@ export class BasicLogIndexer implements ILogIndexer { this.index.clear(); this.fieldIndices.clear(); this.semanticIndex = undefined; + this.indexSizeBytes = 0; + this.logSizeById.clear(); this.initializeFieldIndices(); } @@ -158,7 +169,7 @@ export class BasicLogIndexer implements ILogIndexer { for (const [logId, log] of this.index.entries()) { const logTime = new Date(log.timestamp).getTime(); if (logTime < cutoffTime) { - this.index.delete(logId); + this.trackDelete(logId); this.removeFromFieldIndices(logId); removedCount++; } @@ -248,14 +259,31 @@ export class BasicLogIndexer implements ILogIndexer { } private calculateIndexSize(): number { - // Rough estimate of index size in bytes - let size = 0; + // O(1): return the incrementally-maintained running total instead of + // re-stringifying every entry on each call. + return this.indexSizeBytes; + } - for (const log of this.index.values()) { - size += JSON.stringify(log).length; - } + /** Insert into the main index and update the running size estimate. */ + private trackInsert(logId: string, log: LogEntry): void { + // If this id somehow already exists, subtract its old size first. + const existing = this.logSizeById.get(logId); + if (existing !== undefined) this.indexSizeBytes -= existing; - return size; + const size = JSON.stringify(log).length; + this.index.set(logId, log); + this.logSizeById.set(logId, size); + this.indexSizeBytes += size; + } + + /** Delete from the main index and update the running size estimate. */ + private trackDelete(logId: string): void { + const size = this.logSizeById.get(logId); + if (size !== undefined) { + this.indexSizeBytes -= size; + this.logSizeById.delete(logId); + } + this.index.delete(logId); } private async removeOldestLogs(count: number): Promise { @@ -264,7 +292,7 @@ export class BasicLogIndexer implements ILogIndexer { .slice(0, count); for (const [logId] of logs) { - this.index.delete(logId); + this.trackDelete(logId); this.removeFromFieldIndices(logId); } } diff --git a/src/search/core/basic-search-engine.ts b/src/search/core/basic-search-engine.ts index c42ba8a..96c4ef9 100644 --- a/src/search/core/basic-search-engine.ts +++ b/src/search/core/basic-search-engine.ts @@ -3,6 +3,7 @@ */ import type { LogEntry } from '../../types'; +import { safeToString } from '../../utils/coerce.utils'; import type { CorrelatedLogs, LogCorrelationSummary, @@ -29,19 +30,31 @@ export class BasicSearchEngine implements ILogSearchEngine { private searchHistory: string[] = []; private suggestionCache: Map = new Map(); - constructor(private options?: { maxHistorySize?: number; cacheSize?: number }) { + constructor(private options?: { maxHistorySize?: number; cacheSize?: number; maxLogs?: number }) { this.options = { maxHistorySize: 1000, cacheSize: 100, + maxLogs: 100_000, ...options, }; } /** - * Add logs to the search index + * Add logs to the search index. + * + * Bounded to `maxLogs` (default 100k): the buffer previously grew without + * limit, leaking memory in any long-running process feeding it logs. When the + * cap is exceeded the oldest entries are dropped (FIFO). We also append with a + * loop rather than `push(...logs)` to avoid a RangeError from spreading a very + * large array onto the call stack. */ public addLogs(logs: LogEntry[]): void { - this.logs.push(...logs); + for (const log of logs) this.logs.push(log); + + const maxLogs = this.options?.maxLogs ?? 100_000; + if (this.logs.length > maxLogs) { + this.logs.splice(0, this.logs.length - maxLogs); + } } /** @@ -450,7 +463,11 @@ export class BasicSearchEngine implements ILogSearchEngine { } private getSearchableText(log: LogEntry): string { - return [log.message, log.level, log.appName, log.context, JSON.stringify(log.payload)] + // safeToString is circular-safe — a stored log with a cyclic payload (an + // Express req/res, a DB handle) would otherwise make JSON.stringify throw and + // crash the ENTIRE search, since this runs for every candidate log. + const payloadText = log.payload ? safeToString(log.payload) : ''; + return [log.message, log.level, log.appName, log.context, payloadText] .filter(Boolean) .join(' '); } diff --git a/src/transports/__tests__/analytics.transport.test.ts b/src/transports/__tests__/analytics.transport.test.ts new file mode 100644 index 0000000..7ab0e56 --- /dev/null +++ b/src/transports/__tests__/analytics.transport.test.ts @@ -0,0 +1,125 @@ +/** + * Tests for the AnalyticsTransport base class batching + flush correctness. + * + * AnalyticsTransport is the shared base for Mixpanel, DataDog, Segment, and + * Google Analytics, so a flush bug here is a bug in all four. addToBatch() fires + * flush() un-awaited on every Nth entry, so a synchronous burst can trigger many + * overlapping flushes. Before the fix each overlapping flush snapshotted the + * not-yet-cleared batch and sent it again — the same N² duplication that turned + * ~120 logs into thousands of delivered events. The fix serializes flushes + * through a single shared drain promise and detaches the batch synchronously. + * + * Also covers: failed sendBatch() re-buffers entries (no loss), and close() + * drains everything through a transient failure. + */ + +import type { AnalyticsTransportConfig, TransportLogEntry } from '../../types/transport.types'; +import { AnalyticsTransport } from '../analytics.transport'; + +function makeEntry(index: number): TransportLogEntry { + return { + timestamp: new Date('2026-01-01T00:00:00.000Z'), + level: 'info', + message: `analytics-line-${index}`, + }; +} + +/** Concrete test subclass that records every batch it is asked to send. */ +class TestAnalyticsTransport extends AnalyticsTransport { + public readonly sent: string[] = []; + private failTimes: number; + private sendCalls = 0; + + constructor(config: AnalyticsTransportConfig, failTimes = 0) { + super('test-analytics', config); + this.failTimes = failTimes; + this.isReady = true; // skip the async init wait + } + + protected initialize(): void { + this.isReady = true; + } + + protected async sendEntry(entry: TransportLogEntry): Promise { + this.sent.push(entry.message); + } + + protected async sendBatch(entries: TransportLogEntry[]): Promise { + this.sendCalls += 1; + if (this.sendCalls <= this.failTimes) { + throw new Error('simulated analytics send failure'); + } + for (const entry of entries) this.sent.push(entry.message); + } + + protected cleanup(): void { + /* no-op */ + } +} + +describe('AnalyticsTransport — batch flush', () => { + it('sends each entry exactly once on a synchronous un-awaited burst far larger than batchSize', async () => { + const transport = new TestAnalyticsTransport({ batchSize: 50, flushInterval: 0 }); + + const total = 500; + const writes: Array> = []; + for (let index = 0; index < total; index += 1) { + writes.push(transport.write(makeEntry(index))); + } + await Promise.allSettled(writes); + await transport.flush(); + + expect(transport.sent).toHaveLength(total); + expect(new Set(transport.sent).size).toBe(total); + await transport.close(); + }); + + it('does not re-send entries when flush() is called concurrently', async () => { + const transport = new TestAnalyticsTransport({ batchSize: 1000, flushInterval: 0 }); + + const writes: Array> = []; + for (let index = 0; index < 50; index += 1) { + writes.push(transport.write(makeEntry(index))); + } + await Promise.allSettled(writes); + + await Promise.all([transport.flush(), transport.flush(), transport.flush()]); + await transport.flush(); + + expect(transport.sent).toHaveLength(50); + expect(new Set(transport.sent).size).toBe(50); + await transport.close(); + }); + + it('re-buffers entries when a send fails so no logs are lost', async () => { + const transport = new TestAnalyticsTransport({ batchSize: 1000, flushInterval: 0 }, 1); + + for (let index = 0; index < 5; index += 1) { + await transport.write(makeEntry(index)); + } + + // First flush fails internally (error is swallowed by drain → re-buffered). + await transport.flush(); + expect(transport.sent).toHaveLength(0); + + // Second flush succeeds and delivers all 5 exactly once. + await transport.flush(); + expect(transport.sent).toHaveLength(5); + expect(new Set(transport.sent).size).toBe(5); + await transport.close(); + }); + + it('close() drains all buffered entries through a transient failure with no loss', async () => { + const transport = new TestAnalyticsTransport({ batchSize: 1000, flushInterval: 0 }, 1); + + for (let index = 0; index < 10; index += 1) { + await transport.write(makeEntry(index)); + } + expect(transport.sent).toHaveLength(0); + + await transport.close(); + + expect(transport.sent).toHaveLength(10); + expect(new Set(transport.sent).size).toBe(10); + }); +}); diff --git a/src/transports/__tests__/cloud.transport.test.ts b/src/transports/__tests__/cloud.transport.test.ts new file mode 100644 index 0000000..1d1bfda --- /dev/null +++ b/src/transports/__tests__/cloud.transport.test.ts @@ -0,0 +1,137 @@ +/** + * Tests for the cloud transports (CloudWatch, GCP Cloud Logging, Azure Monitor). + * + * All three buffer entries and flush on an interval. They previously: + * - flushed only ONE batchSize chunk per flush() (a tail was left behind), and + * - had NO close() method, so on shutdown the manager skipped them and any + * buffered logs were lost on deploy. + * + * These tests verify flush() now drains the WHOLE batch, close() drains + * everything and stops the timer, and a failed send re-buffers (no loss, no + * duplication). The private send method is stubbed so no real network call runs. + */ + +import type { TransportLogEntry } from '../../types/transport.types'; +import { AzureMonitorTransport } from '../azure-monitor.transport'; +import { CloudWatchTransport } from '../cloudwatch.transport'; +import { GCPTransport } from '../gcp.transport'; + +function makeEntry(index: number): TransportLogEntry { + return { + timestamp: new Date('2026-01-01T00:00:00.000Z'), + level: 'info', + message: `cloud-line-${index}`, + }; +} + +/** + * Wraps a freshly-constructed cloud transport, replacing its private network + * send method with a recorder. cols are the batched items; we record their count + * and optionally fail the first N sends. + */ +function instrument( + transport: { flush(): Promise; close?(): Promise; write(e: TransportLogEntry): void }, + sendMethodName: string, + failTimes = 0 +) { + const recorded: unknown[] = []; + let sendCalls = 0; + + (transport as any)[sendMethodName] = async (items: unknown[]) => { + sendCalls += 1; + if (sendCalls <= failTimes) throw new Error('simulated cloud outage'); + recorded.push(...items); + }; + return { recorded, getSendCalls: () => sendCalls }; +} + +describe('Cloud transports — drain & close', () => { + describe('CloudWatchTransport', () => { + it('flush() drains the whole batch even when it exceeds batchSize', async () => { + const t = new CloudWatchTransport({ + logGroupName: '/test', + batchSize: 10, + flushIntervalMs: 999_999, + }); + const { recorded } = instrument(t, 'putLogEvents'); + + for (let i = 0; i < 35; i += 1) t.write(makeEntry(i)); + await t.flush(); + + expect(recorded).toHaveLength(35); + await t.close(); + }); + + it('close() drains everything through a transient failure with no loss', async () => { + const t = new CloudWatchTransport({ + logGroupName: '/test', + batchSize: 1000, + flushIntervalMs: 999_999, + }); + const { recorded } = instrument(t, 'putLogEvents', 1); + + for (let i = 0; i < 20; i += 1) t.write(makeEntry(i)); + await t.close(); + + expect(recorded).toHaveLength(20); + }); + }); + + describe('GCPTransport', () => { + it('flush() drains the whole batch and close() stops the timer', async () => { + const t = new GCPTransport({ projectId: 'p', batchSize: 10, flushIntervalMs: 999_999 }); + const { recorded } = instrument(t, 'writeEntries'); + + for (let i = 0; i < 25; i += 1) t.write(makeEntry(i)); + await t.flush(); + expect(recorded).toHaveLength(25); + + await t.close(); + }); + + it('close() drains everything through a transient failure with no loss', async () => { + const t = new GCPTransport({ projectId: 'p', batchSize: 1000, flushIntervalMs: 999_999 }); + const { recorded } = instrument(t, 'writeEntries', 1); + + for (let i = 0; i < 15; i += 1) t.write(makeEntry(i)); + await t.close(); + + expect(recorded).toHaveLength(15); + }); + }); + + describe('AzureMonitorTransport', () => { + it('flush() drains the whole batch and close() stops the timer', async () => { + const t = new AzureMonitorTransport({ + endpoint: 'https://example.ingest.monitor.azure.com', + ruleId: 'dcr-123', + streamName: 'Custom-Logs', + batchSize: 10, + flushIntervalMs: 999_999, + }); + const { recorded } = instrument(t, 'sendEntries'); + + for (let i = 0; i < 22; i += 1) t.write(makeEntry(i)); + await t.flush(); + expect(recorded).toHaveLength(22); + + await t.close(); + }); + + it('close() drains everything through a transient failure with no loss', async () => { + const t = new AzureMonitorTransport({ + endpoint: 'https://example.ingest.monitor.azure.com', + ruleId: 'dcr-123', + streamName: 'Custom-Logs', + batchSize: 1000, + flushIntervalMs: 999_999, + }); + const { recorded } = instrument(t, 'sendEntries', 1); + + for (let i = 0; i < 18; i += 1) t.write(makeEntry(i)); + await t.close(); + + expect(recorded).toHaveLength(18); + }); + }); +}); diff --git a/src/transports/__tests__/database.transport.test.ts b/src/transports/__tests__/database.transport.test.ts new file mode 100644 index 0000000..cf5d84d --- /dev/null +++ b/src/transports/__tests__/database.transport.test.ts @@ -0,0 +1,148 @@ +/** + * Tests for DatabaseTransport batching + flush correctness. + * + * Two regression guarantees, both reproductions of real production incidents: + * + * 1. NO DUPLICATION — write() fires flush() un-awaited once the batch crosses + * batchSize, and the interval timer fires it too, so a synchronous burst can + * trigger many overlapping flushes. Before the fix each overlapping flush + * snapshotted the not-yet-cleared batch and wrote it again (the same class of + * bug that turned ~120 NestJS bootstrap logs into 11k file lines). The fix + * serializes flushes through a single shared drain promise and detaches the + * batch synchronously before awaiting. + * + * 2. NO LOG LOSS — a failed flush must re-buffer its entries, and close() must + * drain everything (retrying transient failures) before dropping the + * connection, so the "last N seconds of logs lost on deploy" problem can't + * happen. + */ + +import type { DatabaseTransportConfig, TransportLogEntry } from '../../types/transport.types'; +import { DatabaseTransport } from '../database.transport'; + +function makeEntry(index: number): TransportLogEntry { + return { + timestamp: new Date('2026-01-01T00:00:00.000Z'), + level: 'info', + message: `db-line-${index}`, + }; +} + +/** + * A fake SQLite-style connection that records every row handed to it. We use the + * sqlite path because flushToSQLite drives the connection through prepare()/run() + * which is trivial to stub, and the batching/flush logic under test is shared + * across all DB types. + */ +function makeFakeSqliteTransport(opts: { + batchSize?: number; + flushInterval?: number; + failTimes?: number; // number of run() calls that should throw before succeeding +}) { + const written: string[] = []; + let runCalls = 0; + const failTimes = opts.failTimes ?? 0; + + const stmt = { + run: async (...cols: unknown[]) => { + runCalls += 1; + if (runCalls <= failTimes) { + throw new Error('simulated DB write failure'); + } + // cols[2] is the message column (timestamp, level, message, ...) + written.push(String(cols[2])); + }, + finalize: async () => {}, + }; + + const connection = { + prepare: async () => stmt, + exec: async () => {}, + close: async () => {}, + }; + + const config = { + type: 'sqlite', + database: ':memory:', + table: 'logs', + batchSize: opts.batchSize ?? 100, + flushInterval: opts.flushInterval ?? 5000, + } as unknown as DatabaseTransportConfig; + + const transport = new DatabaseTransport(config); + // Inject the fake connection and mark connected so write() skips the real connect(). + const internals = transport as unknown as { connection: unknown; isConnected: boolean }; + internals.connection = connection; + internals.isConnected = true; + + return { transport, written, getRunCalls: () => runCalls }; +} + +describe('DatabaseTransport — batch flush', () => { + it('writes each entry exactly once on a synchronous un-awaited burst far larger than batchSize', async () => { + const { transport, written } = makeFakeSqliteTransport({ batchSize: 100 }); + + const total = 500; + const writes: Array> = []; + for (let index = 0; index < total; index += 1) { + writes.push(transport.write(makeEntry(index))); + } + await Promise.allSettled(writes); + await transport.flush(); + + expect(written).toHaveLength(total); + expect(new Set(written).size).toBe(total); + await transport.close(); + }); + + it('does not re-write entries when flush() is called concurrently', async () => { + const { transport, written } = makeFakeSqliteTransport({ batchSize: 1000 }); + + const writes: Array> = []; + for (let index = 0; index < 50; index += 1) { + writes.push(transport.write(makeEntry(index))); + } + await Promise.allSettled(writes); + + await Promise.all([transport.flush(), transport.flush(), transport.flush()]); + await transport.flush(); + + expect(written).toHaveLength(50); + expect(new Set(written).size).toBe(50); + await transport.close(); + }); + + it('re-buffers entries when a flush fails so no logs are lost', async () => { + // First run() throws; the entry must survive and be written on the next flush. + const { transport, written } = makeFakeSqliteTransport({ batchSize: 1, failTimes: 1 }); + + // batchSize 1 → write() triggers a flush that fails; write() rejects but the + // entry stays buffered. + await expect(transport.write(makeEntry(0))).rejects.toThrow(/simulated DB write failure/); + expect(written).toHaveLength(0); + + // Next flush succeeds and drains the re-buffered entry exactly once. + await transport.flush(); + expect(written).toEqual(['db-line-0']); + await transport.close(); + }); + + it('close() drains all buffered entries (retrying a transient failure) with no loss', async () => { + const { transport, written } = makeFakeSqliteTransport({ + batchSize: 1000, // never auto-flushes; everything is still buffered at close() + failTimes: 1, // first close() flush attempt fails, second succeeds + }); + + for (let index = 0; index < 10; index += 1) { + await transport.write(makeEntry(index)); + } + // Nothing flushed yet (batch below threshold). + expect(written).toHaveLength(0); + + await transport.close(); + + // All 10 entries landed exactly once despite the first flush attempt failing. + expect(written).toHaveLength(10); + expect(new Set(written).size).toBe(10); + }); +}); diff --git a/src/transports/__tests__/file.transport.test.ts b/src/transports/__tests__/file.transport.test.ts index 61151f3..1cacc58 100644 --- a/src/transports/__tests__/file.transport.test.ts +++ b/src/transports/__tests__/file.transport.test.ts @@ -103,3 +103,59 @@ describe('FileTransport — batch flush', () => { expect(new Set(lines).size).toBe(50); }); }); + +describe('FileTransport — rotation', () => { + let dir: string; + + beforeEach(() => { + dir = mkdtempSync(path.join(tmpdir(), 'logixia-file-rotation-')); + }); + + afterEach(() => { + rmSync(dir, { recursive: true, force: true }); + }); + + it('gzip-compresses the rotated file and removes the original when compress is set', async () => { + const transport = new FileTransport({ + dirname: dir, + filename: 'app.log', + batchSize: 1, + rotation: { interval: '1h', compress: true, maxFiles: 10 }, + }); + + await transport.write(makeEntry(0)); + await transport.flush(); + // Force a rotation. + await (transport as unknown as { rotate(): Promise }).rotate(); + await transport.close(); + + const files = fs.readdirSync(dir); + // Exactly one compressed rotated file, and no uncompressed rotated original. + expect(files.some((f) => /^app-.*\.log\.gz$/.test(f))).toBe(true); + expect(files.some((f) => /^app-.*\.log$/.test(f))).toBe(false); + }); + + it('cleanup only deletes its own rotated files, not similarly-named ones', async () => { + // An unrelated file that shares the base prefix must survive cleanup. + const unrelated = path.join(dir, 'application.log'); + fs.writeFileSync(unrelated, 'not a logixia rotated file'); + + const transport = new FileTransport({ + dirname: dir, + filename: 'app.log', + batchSize: 1, + rotation: { interval: '1h', maxFiles: 1 }, + }); + + // Create several rotated files so cleanup (maxFiles: 1) deletes the excess. + for (let i = 0; i < 3; i += 1) { + await transport.write(makeEntry(i)); + await transport.flush(); + await (transport as unknown as { rotate(): Promise }).rotate(); + } + await transport.close(); + + // The unrelated file must NOT have been deleted by cleanup. + expect(fs.existsSync(unrelated)).toBe(true); + }); +}); diff --git a/src/transports/__tests__/transport.manager.test.ts b/src/transports/__tests__/transport.manager.test.ts new file mode 100644 index 0000000..4adaa21 --- /dev/null +++ b/src/transports/__tests__/transport.manager.test.ts @@ -0,0 +1,101 @@ +/** + * Tests for TransportManager orchestration. + * + * Covers: + * - the corrected cumulative averageWriteTime metric (the old (avg+sample)/2 + * formula over-weighted the most recent write), and + * - the shutdown guarantee: a custom batching transport added to the manager is + * flushed/closed on manager.close(), so batched logs are not lost on deploy. + */ + +import type { LogEntry } from '../../types'; +import type { IBatchTransport, ITransport, TransportLogEntry } from '../../types/transport.types'; +import { TransportManager } from '../transport.manager'; + +function makeLogEntry(index: number): LogEntry { + return { + timestamp: new Date('2026-01-01T00:00:00.000Z').toISOString(), + level: 'info', + appName: 'test-app', + message: `mgr-line-${index}`, + }; +} + +/** A custom batching transport that buffers writes and only persists on flush/close. */ +class BufferingTransport implements IBatchTransport { + public readonly name = 'buffering'; + public readonly batchSize = 1000; + public readonly persisted: string[] = []; + private batch: TransportLogEntry[] = []; + + write(entry: TransportLogEntry): void { + this.batch.push(entry); + } + + async flush(): Promise { + const pending = this.batch; + this.batch = []; + for (const e of pending) this.persisted.push(e.message); + } + + async close(): Promise { + await this.flush(); + } +} + +describe('TransportManager', () => { + it('computes averageWriteTime as a true cumulative mean', async () => { + const manager = new TransportManager({}); + // A trivial synchronous transport so write timing is ~0 and deterministic-ish. + const transport: ITransport = { + name: 'noop', + write: () => {}, + }; + manager.addTransport(transport, 'noop-0'); + + for (let i = 0; i < 10; i += 1) { + await manager.write(makeLogEntry(i)); + } + + const metrics = manager.getMetricsForTransport('noop-0')!; + expect(metrics.logsWritten).toBe(10); + // A cumulative mean of near-zero write times stays near zero and finite — the + // old (avg+sample)/2 decay would also be small here, so the real assertion is + // that it is a finite non-negative number, never NaN. + expect(Number.isFinite(metrics.averageWriteTime)).toBe(true); + expect(metrics.averageWriteTime).toBeGreaterThanOrEqual(0); + + await manager.close(); + }); + + it('flushes batched logs through close() so nothing is lost on shutdown', async () => { + const manager = new TransportManager({}); + const buffering = new BufferingTransport(); + manager.addTransport(buffering, 'buffering-0'); + + for (let i = 0; i < 25; i += 1) { + await manager.write(makeLogEntry(i)); + } + // Still buffered — the transport only persists on flush/close. + expect(buffering.persisted).toHaveLength(0); + + await manager.close(); + + // close() must have drained every buffered entry exactly once. + expect(buffering.persisted).toHaveLength(25); + expect(new Set(buffering.persisted).size).toBe(25); + }); + + it('drops writes after shutdown has started (documents the early-return guard)', async () => { + const manager = new TransportManager({}); + const buffering = new BufferingTransport(); + manager.addTransport(buffering, 'buffering-0'); + + await manager.write(makeLogEntry(0)); + await manager.close(); + + // After close(), the manager is shutting down and new writes are ignored. + await manager.write(makeLogEntry(1)); + expect(buffering.persisted).toEqual(['mgr-line-0']); + }); +}); diff --git a/src/transports/__tests__/worker.transport.test.ts b/src/transports/__tests__/worker.transport.test.ts new file mode 100644 index 0000000..9c65086 --- /dev/null +++ b/src/transports/__tests__/worker.transport.test.ts @@ -0,0 +1,153 @@ +/** + * Tests for WorkerTransport lifecycle (close/flush) using an injected fake + * worker, so no real worker thread (which would need built dist/ files) spawns. + * + * Regression coverage: + * - close() exists and is what TransportManager calls (a method named only + * shutdown() was never invoked on graceful exit → leaked thread + lost logs). + * - close() posts a 'shutdown' message and resolves on the worker's 'exit'. + * - flush() resolves on a 'flushed' ack and does NOT hang when the ack never + * comes (time-boxed) — otherwise shutdown blocks until force-exit. + */ + +import { EventEmitter } from 'node:events'; + +import type { TransportLogEntry } from '../../types/transport.types'; +import { WorkerTransport } from '../worker.transport'; + +/** A fake worker that records postMessage calls and lets tests emit events. */ +class FakeWorker extends EventEmitter { + public readonly posted: Array<{ type: string; entry?: TransportLogEntry }> = []; + public terminated = false; + + postMessage(msg: { type: string; entry?: TransportLogEntry }): void { + this.posted.push(msg); + } + + async terminate(): Promise { + this.terminated = true; + this.emit('exit', 1); + return 1; + } +} + +interface WorkerInternals { + worker: { terminate?(): Promise } | null; + ready: boolean; + restartTimer: NodeJS.Timeout | null; + closing: boolean; + maxRestarts: number; +} + +/** + * Build a WorkerTransport with its real worker replaced by a fake, marked ready. + * + * The constructor spawns a real worker thread (which can't resolve its transport + * module in the test env), so we immediately tear that down and inject the fake. + */ +function makeWithFakeWorker(): { transport: WorkerTransport; fake: FakeWorker } { + const transport = new WorkerTransport({ transportType: 'console', transportConfig: {} }); + const internals = transport as unknown as WorkerInternals; + // Disable the auto-restart machinery and terminate the auto-spawned real worker + // so no thread or backoff timer leaks during the test. + internals.maxRestarts = 0; + if (internals.restartTimer) { + clearTimeout(internals.restartTimer); + internals.restartTimer = null; + } + const realWorker = internals.worker; + if (realWorker && typeof realWorker.terminate === 'function') { + realWorker.terminate().catch(() => {}); + } + + const fake = new FakeWorker(); + internals.worker = fake; + internals.ready = true; + internals.closing = false; + return { transport, fake }; +} + +function makeEntry(i: number): TransportLogEntry { + return { timestamp: new Date('2026-01-01T00:00:00.000Z'), level: 'info', message: `w-${i}` }; +} + +describe('WorkerTransport — lifecycle', () => { + it('exposes close() (the method TransportManager actually calls)', () => { + const { transport } = makeWithFakeWorker(); + expect(typeof (transport as unknown as { close: unknown }).close).toBe('function'); + }); + + it('close() posts a shutdown message and resolves on worker exit', async () => { + const { transport, fake } = makeWithFakeWorker(); + + const closePromise = transport.close(); + // The worker acknowledges by exiting. + fake.emit('exit', 0); + await closePromise; + + expect(fake.posted.some((m) => m.type === 'shutdown')).toBe(true); + }); + + it('shutdown() is a back-compat alias for close()', async () => { + const { transport, fake } = makeWithFakeWorker(); + const p = transport.shutdown(); + fake.emit('exit', 0); + await p; + expect(fake.posted.some((m) => m.type === 'shutdown')).toBe(true); + }); + + it('flush() resolves on a flushed ack', async () => { + const { transport, fake } = makeWithFakeWorker(); + + const flushPromise = transport.flush(); + // Worker responds. + fake.emit('message', { type: 'flushed' }); + await expect(flushPromise).resolves.toBeUndefined(); + + expect(fake.posted.some((m) => m.type === 'flush')).toBe(true); + }); + + it('flush() does not hang forever when the worker never acks', async () => { + jest.useFakeTimers(); + try { + const { transport } = makeWithFakeWorker(); + const flushPromise = transport.flush(); + // No 'flushed' message ever arrives — advance past the flush timeout. + jest.advanceTimersByTime(5000); + await expect(flushPromise).resolves.toBeUndefined(); + } finally { + jest.useRealTimers(); + } + }); + + it('close() forwards locally-buffered entries to the worker before shutdown', async () => { + const transport = new WorkerTransport({ transportType: 'console', transportConfig: {} }); + const internals = transport as unknown as WorkerInternals; + internals.maxRestarts = 0; + if (internals.restartTimer) { + clearTimeout(internals.restartTimer); + internals.restartTimer = null; + } + const realWorker = internals.worker; + if (realWorker && typeof realWorker.terminate === 'function') { + realWorker.terminate().catch(() => {}); + } + + const fake = new FakeWorker(); + // Not ready yet → writes go to the local buffer. + internals.ready = false; + internals.worker = fake; + transport.write(makeEntry(0)); + transport.write(makeEntry(1)); + + // Now mark ready and close — buffered entries must be forwarded. + internals.ready = true; + const closePromise = transport.close(); + fake.emit('exit', 0); + await closePromise; + + const writes = fake.posted.filter((m) => m.type === 'write'); + expect(writes).toHaveLength(2); + expect(fake.posted.some((m) => m.type === 'shutdown')).toBe(true); + }); +}); diff --git a/src/transports/analytics.transport.ts b/src/transports/analytics.transport.ts index 834c15e..2f26d7b 100644 --- a/src/transports/analytics.transport.ts +++ b/src/transports/analytics.transport.ts @@ -14,8 +14,17 @@ export abstract class AnalyticsTransport implements ITransport, IBatchTransport protected config: AnalyticsTransportConfig; protected batch: TransportLogEntry[] = []; - protected batchTimer?: NodeJS.Timeout; + protected batchTimer?: NodeJS.Timeout | undefined; protected isReady: boolean = false; + /** + * The in-flight drain promise, or undefined when idle. addToBatch() fires + * flush() un-awaited on every Nth entry, so a synchronous burst can trigger + * many overlapping flushes. Every caller joins this single promise instead of + * starting its own drain, so a batch is never snapshotted and sent twice — + * the overlapping-flush duplication that turns N logs into N² delivered events + * (same class of bug fixed earlier in FileTransport / DatabaseTransport). + */ + private flushPromise?: Promise | undefined; constructor(name: string, config: AnalyticsTransportConfig) { this.name = name; @@ -64,30 +73,70 @@ export abstract class AnalyticsTransport implements ITransport, IBatchTransport } async flush(): Promise { - if (this.batch.length === 0) return; - - const entriesToSend = [...this.batch]; - this.batch = []; - if (this.batchTimer) { clearTimeout(this.batchTimer); - delete this.batchTimer; + this.batchTimer = undefined; } - try { - await this.sendBatch(entriesToSend); - } catch (error) { - internalError(`Analytics transport ${this.name} flush failed`, error); - // Re-add failed entries to batch for retry - this.batch.unshift(...entriesToSend); + // Serialize concurrent flushes so overlapping un-awaited flushes (fired by + // addToBatch on every Nth entry) all join the SAME drain instead of each + // snapshotting the not-yet-cleared batch and sending it again. + if (!this.flushPromise) { + this.flushPromise = this.drain().finally(() => { + this.flushPromise = undefined; + }); + } + + await this.flushPromise; + } + + /** + * Drains the batch to the provider one snapshot at a time. The batch is + * detached SYNCHRONOUSLY before awaiting sendBatch(), so entries appended by + * concurrent writes land in a fresh array and are never sent twice. On failure + * the snapshot is restored to the front of the batch for the next flush and + * the loop stops, so a failing provider does not hot-spin. + */ + private async drain(): Promise { + while (this.batch.length > 0) { + const entriesToSend = this.batch; + this.batch = []; + + try { + await this.sendBatch(entriesToSend); + } catch (error) { + internalError(`Analytics transport ${this.name} flush failed`, error); + // Re-add failed entries to the front of the batch for retry, then stop. + this.batch.unshift(...entriesToSend); + return; + } } } async close(): Promise { - await this.flush(); if (this.batchTimer) { clearTimeout(this.batchTimer); + this.batchTimer = undefined; + } + + // Drain remaining entries on shutdown. A failed flush re-buffers its entries + // (see drain()), so retry a bounded number of times before giving up — this + // prevents the "last N seconds of logs lost on deploy" problem without + // hanging shutdown on a permanently-failing provider. + const MAX_CLOSE_FLUSH_ATTEMPTS = 3; + for ( + let attempt = 0; + attempt < MAX_CLOSE_FLUSH_ATTEMPTS && this.batch.length > 0; + attempt += 1 + ) { + await this.flush(); } + if (this.batch.length > 0) { + internalError( + `Analytics transport ${this.name} closing with ${this.batch.length} unflushed entr${this.batch.length === 1 ? 'y' : 'ies'} after ${MAX_CLOSE_FLUSH_ATTEMPTS} attempts` + ); + } + await this.cleanup(); } diff --git a/src/transports/azure-monitor.transport.ts b/src/transports/azure-monitor.transport.ts index 4be0cae..9fccaaf 100644 --- a/src/transports/azure-monitor.transport.ts +++ b/src/transports/azure-monitor.transport.ts @@ -122,13 +122,39 @@ export class AzureMonitorTransport implements IAsyncTransport { } async flush(): Promise { - if (this.batch.length === 0) return; - const entries = this.batch.splice(0, this.batchSize); - try { - await this.sendEntries(entries); - } catch (err) { - internalError('AzureMonitorTransport flush error', err); - this.batch.unshift(...entries); + // Drain the WHOLE batch, not just one batchSize chunk — splice() detaches + // synchronously so concurrent writes are never sent twice, and looping means + // a flush during shutdown empties everything rather than leaving a tail. + while (this.batch.length > 0) { + const entries = this.batch.splice(0, this.batchSize); + try { + await this.sendEntries(entries); + } catch (err) { + internalError('AzureMonitorTransport flush error', err); + this.batch.unshift(...entries); + return; + } + } + } + + /** + * Flush remaining entries and stop the interval timer on shutdown. Without + * this the manager's close() skipped the transport (close was undefined) and + * buffered entries were lost on deploy. Retries a bounded number of times so a + * transient outage does not drop logs. + */ + async close(): Promise { + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = null; + } + for (let attempt = 0; attempt < 3 && this.batch.length > 0; attempt += 1) { + await this.flush(); + } + if (this.batch.length > 0) { + internalError( + `AzureMonitorTransport closing with ${this.batch.length} unflushed entr${this.batch.length === 1 ? 'y' : 'ies'} after 3 attempts` + ); } } diff --git a/src/transports/cloudwatch.transport.ts b/src/transports/cloudwatch.transport.ts index ac72e7a..fee850d 100644 --- a/src/transports/cloudwatch.transport.ts +++ b/src/transports/cloudwatch.transport.ts @@ -126,14 +126,40 @@ export class CloudWatchTransport implements IAsyncTransport { } async flush(): Promise { - if (this.batch.length === 0) return; - const events = this.batch.splice(0, this.batchSize); - try { - await this.putLogEvents(events); - } catch (err) { - internalError('CloudWatchTransport flush error', err); - // Re-queue on failure - this.batch.unshift(...events); + // Drain the WHOLE batch, not just one batchSize chunk — splice() detaches + // synchronously so concurrent writes are never sent twice, and looping means + // a flush during shutdown empties everything rather than leaving a tail. + while (this.batch.length > 0) { + const events = this.batch.splice(0, this.batchSize); + try { + await this.putLogEvents(events); + } catch (err) { + internalError('CloudWatchTransport flush error', err); + // Re-queue on failure at the front and stop so we don't hot-spin. + this.batch.unshift(...events); + return; + } + } + } + + /** + * Flush remaining events and stop the interval timer on shutdown. Without this + * the manager's close() skips the transport (close was undefined) and any + * buffered events were lost on deploy. Retries a bounded number of times so a + * transient CloudWatch outage does not drop logs. + */ + async close(): Promise { + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = null; + } + for (let attempt = 0; attempt < 3 && this.batch.length > 0; attempt += 1) { + await this.flush(); + } + if (this.batch.length > 0) { + internalError( + `CloudWatchTransport closing with ${this.batch.length} unflushed event(s) after 3 attempts` + ); } } diff --git a/src/transports/database.transport.ts b/src/transports/database.transport.ts index a2c5193..49d94af 100644 --- a/src/transports/database.transport.ts +++ b/src/transports/database.transport.ts @@ -37,12 +37,19 @@ export class DatabaseTransport implements IAsyncTransport, IBatchTransport { public readonly filter?: (entry: TransportLogEntry) => boolean; private batch: TransportLogEntry[] = []; - private flushTimer?: NodeJS.Timeout; - private isFlushing = false; + private flushTimer?: NodeJS.Timeout | undefined; + /** + * The in-flight drain promise, or undefined when idle. Concurrent flush() + * calls (the un-awaited threshold flush write() fires, plus the interval + * timer) all await this single promise instead of starting their own drain, + * so a batch is never snapshotted and written twice. Mirrors the FileTransport + * fix for the overlapping-batch-flush duplication bug. + */ + private flushPromise?: Promise | undefined; // eslint-disable-next-line @typescript-eslint/no-explicit-any -- dynamic database driver connection object private connection: any; private isConnected = false; - private connectionPromise?: Promise; + private connectionPromise?: Promise | undefined; constructor(private config: DatabaseTransportConfig) { this.batchSize = config.batchSize || 100; @@ -72,39 +79,60 @@ export class DatabaseTransport implements IAsyncTransport, IBatchTransport { } async flush(): Promise { - if (this.batch.length === 0) return; - // Guard: skip if a flush is already in flight — entries will be picked up - // by the next flush cycle triggered by the timer or the next write. - if (this.isFlushing) return; + // Serialize concurrent flushes. write() fires flush() un-awaited once the + // batch crosses batchSize, and the interval timer fires it too, so a burst + // of writes can trigger many overlapping flushes. Every caller joins the + // SAME in-flight drain, so a batch is never snapshotted and written twice — + // the same overlapping-flush duplication that turned N file logs into N² + // lines (see FileTransport fix). Awaiters still see the batch fully drained + // because they await the shared drain promise. + if (!this.flushPromise) { + this.flushPromise = this.drain().finally(() => { + this.flushPromise = undefined; + }); + } - this.isFlushing = true; - const entriesToFlush = [...this.batch]; - this.batch = []; + await this.flushPromise; + } - try { - switch (this.config.type) { - case 'mongodb': - await this.flushToMongoDB(entriesToFlush); - break; - case 'postgresql': - await this.flushToPostgreSQL(entriesToFlush); - break; - case 'mysql': - await this.flushToMySQL(entriesToFlush); - break; - case 'sqlite': - await this.flushToSQLite(entriesToFlush); - break; - default: - throw new Error(`Unsupported database type: ${this.config.type}`); + /** + * Drains the batch to the database one snapshot at a time. The current batch + * is detached SYNCHRONOUSLY before awaiting the write, so entries appended by + * concurrent write() calls land in a fresh array and are never written twice. + * On write failure the snapshot is restored to the front of the batch for the + * next flush cycle and the loop stops, so a persistently failing DB does not + * hot-spin retrying the same entries. + */ + private async drain(): Promise { + while (this.batch.length > 0) { + const entriesToFlush = this.batch; + this.batch = []; + + try { + switch (this.config.type) { + case 'mongodb': + await this.flushToMongoDB(entriesToFlush); + break; + case 'postgresql': + await this.flushToPostgreSQL(entriesToFlush); + break; + case 'mysql': + await this.flushToMySQL(entriesToFlush); + break; + case 'sqlite': + await this.flushToSQLite(entriesToFlush); + break; + default: + throw new Error(`Unsupported database type: ${this.config.type}`); + } + } catch (error) { + internalError('Database flush error', error); + // Restore failed entries to the front of the batch (ahead of anything + // appended while the write was in flight) so ordering is preserved and + // they retry on the next flush. Stop draining to avoid a tight retry loop. + this.batch.unshift(...entriesToFlush); + throw error; } - } catch (error) { - internalError('Database flush error', error); - // Restore failed entries to the front of the batch for retry - this.batch.unshift(...entriesToFlush); - throw error; - } finally { - this.isFlushing = false; } } @@ -124,7 +152,13 @@ export class DatabaseTransport implements IAsyncTransport, IBatchTransport { return this.connectionPromise; } - this.connectionPromise = this.establishConnection(); + // Clear the memoized promise on failure so a transient startup error does + // not permanently wedge the transport — the next write() retries the connect + // instead of re-awaiting a forever-rejected promise. + this.connectionPromise = this.establishConnection().catch((error: unknown) => { + this.connectionPromise = undefined; + throw error; + }); return this.connectionPromise; } @@ -441,10 +475,36 @@ export class DatabaseTransport implements IAsyncTransport, IBatchTransport { async close(): Promise { if (this.flushTimer) { clearInterval(this.flushTimer); + this.flushTimer = undefined; } - // Flush remaining entries - await this.flush(); + // Drain remaining entries before closing the connection. This is the + // shutdown path — losing buffered logs here is exactly the "last N seconds + // of logs lost on deploy" problem the library promises to avoid, so we must + // NOT let a single failed flush abort with entries still buffered. + // + // Retry a bounded number of times: each attempt flushes the current batch, + // and a failed flush re-buffers its entries (see drain()) so the next + // attempt picks them up. We stop early once the batch is empty, and cap the + // attempts so a permanently-dead DB can't hang shutdown forever (the + // flushOnExit force-exit timer is the outer backstop). + const MAX_CLOSE_FLUSH_ATTEMPTS = 3; + for ( + let attempt = 0; + attempt < MAX_CLOSE_FLUSH_ATTEMPTS && this.batch.length > 0; + attempt += 1 + ) { + try { + await this.flush(); + } catch (error) { + internalError('Database flush during close failed; entries remain buffered', error); + } + } + if (this.batch.length > 0) { + internalError( + `Database transport closing with ${this.batch.length} unflushed log entr${this.batch.length === 1 ? 'y' : 'ies'} after ${MAX_CLOSE_FLUSH_ATTEMPTS} attempts` + ); + } // Close connection if (this.connection) { @@ -494,7 +554,10 @@ export class DatabaseTransport implements IAsyncTransport, IBatchTransport { case 'sqlite': { const tableName = this.config.table || 'logs'; const result = await this.connection.query(`SELECT COUNT(*) as count FROM ${tableName}`); - return result.rows?.[0]?.count || result[0]?.count || 0; + // pg returns bigint COUNT as a string and mysql2 as a string too, so + // coerce to a number — the declared return type is Promise. + const rawCount = result.rows?.[0]?.count ?? result[0]?.count ?? 0; + return Number(rawCount) || 0; } default: diff --git a/src/transports/file.transport.ts b/src/transports/file.transport.ts index 89f75af..9b94369 100644 --- a/src/transports/file.transport.ts +++ b/src/transports/file.transport.ts @@ -1,7 +1,9 @@ import type { WriteStream } from 'node:fs'; import * as fs from 'node:fs'; import * as path from 'node:path'; +import { pipeline } from 'node:stream/promises'; import { promisify } from 'node:util'; +import { createGzip } from 'node:zlib'; import type { FileTransportConfig, @@ -280,10 +282,27 @@ export class FileTransport implements ITransport, IBatchTransport { } } - private async compressFile(_filePath: string): Promise { - // Simple gzip compression implementation would go here - // For now, just rename with .gz extension - // In a real implementation, you'd use zlib + /** + * Gzip the rotated file to `.gz` and remove the original. Streams the + * data so large log files don't have to be buffered in memory. Best-effort: + * any failure is logged and leaves the original file intact rather than + * throwing out of the rotation path. + */ + private async compressFile(filePath: string): Promise { + const gzPath = `${filePath}.gz`; + try { + await pipeline(fs.createReadStream(filePath), createGzip(), fs.createWriteStream(gzPath)); + // Only delete the source once the compressed copy is fully written. + await unlink(filePath); + } catch (error) { + internalError('Failed to compress rotated log file', error); + // Clean up a partial .gz so a later cleanup pass doesn't keep a corrupt file. + try { + await unlink(gzPath); + } catch { + /* nothing to clean up */ + } + } } private async cleanupOldFiles(): Promise { @@ -292,15 +311,20 @@ export class FileTransport implements ITransport, IBatchTransport { try { const dir = this.resolveDir(); const files = await readdir(dir); - const base = path.basename(this.config.filename, path.extname(this.config.filename)); - - const logFiles = files - .filter((file) => file.startsWith(base)) - .map(async (file) => { - const filePath = path.join(dir, file); - const stats = await stat(filePath); - return { path: filePath, mtime: stats.mtime }; - }); + const ext = path.extname(this.config.filename); + const base = path.basename(this.config.filename, ext); + + // Match ONLY this transport's own rotated files: `${base}-${ext}` + // (optionally `.gz`). A loose `startsWith(base)` would also match unrelated + // files like `application.log` when base is `app`, and could delete them. + const isOwnRotatedFile = (file: string): boolean => + file.startsWith(`${base}-`) && (file.endsWith(ext) || file.endsWith(`${ext}.gz`)); + + const logFiles = files.filter(isOwnRotatedFile).map(async (file) => { + const filePath = path.join(dir, file); + const stats = await stat(filePath); + return { path: filePath, mtime: stats.mtime }; + }); const fileStats = await Promise.all(logFiles); const sortedFiles = fileStats.sort((a, b) => b.mtime.getTime() - a.mtime.getTime()); diff --git a/src/transports/gcp.transport.ts b/src/transports/gcp.transport.ts index 229859d..e72bf9a 100644 --- a/src/transports/gcp.transport.ts +++ b/src/transports/gcp.transport.ts @@ -141,13 +141,39 @@ export class GCPTransport implements IAsyncTransport { } async flush(): Promise { - if (this.batch.length === 0) return; - const entries = this.batch.splice(0, this.batchSize); - try { - await this.writeEntries(entries); - } catch (err) { - internalError('GCPTransport flush error', err); - this.batch.unshift(...entries); + // Drain the WHOLE batch, not just one batchSize chunk — splice() detaches + // synchronously so concurrent writes are never sent twice, and looping means + // a flush during shutdown empties everything rather than leaving a tail. + while (this.batch.length > 0) { + const entries = this.batch.splice(0, this.batchSize); + try { + await this.writeEntries(entries); + } catch (err) { + internalError('GCPTransport flush error', err); + this.batch.unshift(...entries); + return; + } + } + } + + /** + * Flush remaining entries and stop the interval timer on shutdown. Without + * this the manager's close() skipped the transport (close was undefined) and + * buffered entries were lost on deploy. Retries a bounded number of times so a + * transient outage does not drop logs. + */ + async close(): Promise { + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = null; + } + for (let attempt = 0; attempt < 3 && this.batch.length > 0; attempt += 1) { + await this.flush(); + } + if (this.batch.length > 0) { + internalError( + `GCPTransport closing with ${this.batch.length} unflushed entr${this.batch.length === 1 ? 'y' : 'ies'} after 3 attempts` + ); } } diff --git a/src/transports/transport.manager.ts b/src/transports/transport.manager.ts index 33e0609..4c8fd9e 100644 --- a/src/transports/transport.manager.ts +++ b/src/transports/transport.manager.ts @@ -215,7 +215,10 @@ export class TransportManager extends EventEmitter { const writeTime = Date.now() - startTime; metrics.logsWritten++; metrics.lastWrite = new Date(startTime); - metrics.averageWriteTime = (metrics.averageWriteTime + writeTime) / 2; + // True cumulative mean: avg += (sample - avg) / n. The previous formula + // ((avg + sample) / 2) was an exponential decay that over-weighted the most + // recent write, so averageWriteTime never reflected the real average. + metrics.averageWriteTime += (writeTime - metrics.averageWriteTime) / metrics.logsWritten; this.emit('log', entry); } catch (error) { diff --git a/src/transports/worker.transport.ts b/src/transports/worker.transport.ts index a66ac0e..c9133cf 100644 --- a/src/transports/worker.transport.ts +++ b/src/transports/worker.transport.ts @@ -137,6 +137,14 @@ export class WorkerTransport implements ITransport { private restarts = 0; private readonly maxRestarts: number; private readonly config: WorkerTransportConfig; + /** Pending restart-backoff timer, so close() can cancel a queued restart. */ + private restartTimer: NodeJS.Timeout | null = null; + /** Set once close() begins, so a worker 'exit' no longer triggers a restart. */ + private closing = false; + /** Max time to wait for the worker's 'flushed' ack before giving up. */ + private readonly FLUSH_TIMEOUT_MS = 5000; + /** Max time to wait for a graceful worker exit before force-terminating. */ + private readonly SHUTDOWN_TIMEOUT_MS = 5000; constructor(config: WorkerTransportConfig) { this.config = config; @@ -185,11 +193,20 @@ export class WorkerTransport implements ITransport { private handleWorkerExit(): void { this.ready = false; + // Don't resurrect a worker we're intentionally shutting down — close() sets + // `closing` and nulls the worker, so a late 'exit'/'error' event must not + // queue a restart (which would spawn a fresh leaked thread post-shutdown). + if (this.closing) return; if (this.restarts < this.maxRestarts) { this.restarts++; internalWarn(`WorkerTransport restarting (attempt ${this.restarts}/${this.maxRestarts})`); - // Small backoff before restart - setTimeout(() => this.startWorker(), 500 * this.restarts); + // Small backoff before restart. unref() so a perpetually-failing worker's + // pending restart never keeps the process alive on its own. + this.restartTimer = setTimeout(() => { + this.restartTimer = null; + this.startWorker(); + }, 500 * this.restarts); + if (this.restartTimer.unref) this.restartTimer.unref(); } else { internalError( `WorkerTransport exhausted ${this.maxRestarts} restart attempts — transport disabled` @@ -220,25 +237,79 @@ export class WorkerTransport implements ITransport { async flush(): Promise { if (!this.ready || !this.worker) return; + const worker = this.worker; return new Promise((resolve) => { + // Time-box the flush: if the worker dies or restarts before answering, + // the 'flushed' message would never arrive and this promise would hang + // forever, blocking shutdown until the force-exit timer kills the process + // (losing everything). Resolve on timeout so close() can still terminate. + const timer = setTimeout(() => { + worker.off('message', handler); + resolve(); + }, this.FLUSH_TIMEOUT_MS); + if (timer.unref) timer.unref(); + const handler = (msg: { type: string }) => { if (msg.type === 'flushed') { - this.worker?.off('message', handler); + clearTimeout(timer); + worker.off('message', handler); resolve(); } }; - this.worker!.on('message', handler); - this.worker!.postMessage({ type: 'flush' }); + worker.on('message', handler); + worker.postMessage({ type: 'flush' }); }); } - async shutdown(): Promise { - if (this.worker) { - this.worker.postMessage({ type: 'shutdown' }); - await new Promise((resolve) => { - this.worker!.once('exit', () => resolve()); + /** + * Flush the worker's buffer and terminate the thread on shutdown. + * + * This MUST be named close() — TransportManager.close() invokes + * transport.close(), so a method named only shutdown() was never called on + * graceful exit, leaking the worker thread and dropping its buffered logs. + */ + async close(): Promise { + // Stop the restart loop first: cancel any queued backoff restart and flag + // that further worker exits are expected (so handleWorkerExit no-ops). + this.closing = true; + if (this.restartTimer) { + clearTimeout(this.restartTimer); + this.restartTimer = null; + } + + if (!this.worker) return; + const worker = this.worker; + this.worker = null; + + // First push any locally-buffered (pre-ready) entries to the worker so they + // are not lost, then ask it to flush+exit. The worker script flushes its own + // transport before exiting on the 'shutdown' message. + if (this.ready) this.drainBufferTo(worker); + worker.postMessage({ type: 'shutdown' }); + + await new Promise((resolve) => { + // Don't wait forever for a wedged worker — terminate() it after a timeout. + const timer = setTimeout(() => { + worker.terminate().finally(() => resolve()); + }, this.SHUTDOWN_TIMEOUT_MS); + if (timer.unref) timer.unref(); + + worker.once('exit', () => { + clearTimeout(timer); + resolve(); }); - this.worker = null; + }); + } + + /** Back-compat alias — prefer close(). */ + async shutdown(): Promise { + await this.close(); + } + + private drainBufferTo(worker: Worker): void { + while (this.buffer.length > 0) { + const entry = this.buffer.shift(); + if (entry) worker.postMessage({ type: 'write', entry }); } } } diff --git a/src/utils/__tests__/coerce.utils.test.ts b/src/utils/__tests__/coerce.utils.test.ts new file mode 100644 index 0000000..2552d1b --- /dev/null +++ b/src/utils/__tests__/coerce.utils.test.ts @@ -0,0 +1,80 @@ +/** + * Tests for the defensive coercion helpers. + * + * These exist so the formatter/transport hot paths can call .replace() on a + * value that the type system claims is a string but at runtime may not be. + * The key regression: safeToString must NEVER return a non-string (JSON.stringify + * returns undefined for some inputs), or safeReplace would crash on .replace(). + */ + +// These tests deliberately pass `undefined` and use `toJSON: () => undefined` +// to reproduce the exact inputs that broke the helpers — the explicit undefined +// is the point, so disable the rule that wants it removed. +/* eslint-disable unicorn/no-useless-undefined */ + +import { safeReplace, safeToString } from '../coerce.utils'; + +describe('safeToString', () => { + it('returns strings unchanged', () => { + expect(safeToString('hello')).toBe('hello'); + expect(safeToString('')).toBe(''); + }); + + it('maps null and undefined to an empty string', () => { + expect(safeToString(null)).toBe(''); + expect(safeToString(undefined)).toBe(''); + }); + + it('uses error.message for Error values', () => { + expect(safeToString(new Error('boom'))).toBe('boom'); + }); + + it('stringifies numbers, bigints, and booleans', () => { + expect(safeToString(42)).toBe('42'); + expect(safeToString(10n)).toBe('10'); + expect(safeToString(true)).toBe('true'); + }); + + it('describes symbols and functions without throwing', () => { + expect(safeToString(Symbol('s'))).toContain('Symbol'); + expect(safeToString(function named() {})).toBe('[Function: named]'); + expect(safeToString(() => {})).toContain('[Function:'); + }); + + it('JSON-stringifies plain objects and arrays', () => { + expect(safeToString({ a: 1 })).toBe('{"a":1}'); + expect(safeToString([1, 2])).toBe('[1,2]'); + }); + + it('never returns undefined when JSON.stringify yields undefined', () => { + // An object whose toJSON returns undefined makes JSON.stringify return + // undefined — the helper must fall back to a tag string instead. + const result = safeToString({ toJSON: () => undefined }); + expect(typeof result).toBe('string'); + expect(result).toBe('[Object]'); + }); + + it('falls back to a constructor tag for circular references', () => { + const circular: Record = {}; + circular.self = circular; + const result = safeToString(circular); + expect(typeof result).toBe('string'); + expect(result).toBe('[Object]'); + }); +}); + +describe('safeReplace', () => { + it('replaces on a normal string', () => { + expect(safeReplace('Bearer abc', /Bearer\s+\S+/, '[REDACTED]')).toBe('[REDACTED]'); + }); + + it('does not throw on a value whose JSON.stringify is undefined', () => { + expect(() => safeReplace({ toJSON: () => undefined }, /x/, 'y')).not.toThrow(); + expect(safeReplace({ toJSON: () => undefined }, /Object/, 'Thing')).toBe('[Thing]'); + }); + + it('coerces non-strings before replacing', () => { + expect(safeReplace(12345, /\d+/, 'N')).toBe('N'); + expect(safeReplace(null, /x/, 'y')).toBe(''); + }); +}); diff --git a/src/utils/__tests__/error.utils.test.ts b/src/utils/__tests__/error.utils.test.ts index 11c4ef9..4a01b98 100644 --- a/src/utils/__tests__/error.utils.test.ts +++ b/src/utils/__tests__/error.utils.test.ts @@ -416,4 +416,29 @@ describe('serializeError — serializeValue branch coverage', () => { const result = serializeError(err); expect(result.data).toBeNull(); }); + + it('does not infinitely recurse when an error is cross-referenced via own props', () => { + // a.related → b, and b.back → a. The shared seen-guard must catch the cycle + // rather than relying solely on the depth limit. + const a = new Error('A') as Error & { related?: unknown }; + const b = new Error('B') as Error & { back?: unknown }; + a.related = b; + b.back = a; + + let result: Record | undefined; + expect(() => { + result = serializeError(a); + }).not.toThrow(); + + const related = result!.related as Record; + expect(related.message).toBe('B'); + expect(related.back).toMatchObject({ _circular: true }); + }); + + it('flags a self-referencing own property as circular', () => { + const err = new Error('self') as Error & { selfRef?: unknown }; + err.selfRef = err; + const result = serializeError(err); + expect(result.selfRef).toMatchObject({ _circular: true }); + }); }); diff --git a/src/utils/__tests__/internal-log.test.ts b/src/utils/__tests__/internal-log.test.ts new file mode 100644 index 0000000..1cd4071 --- /dev/null +++ b/src/utils/__tests__/internal-log.test.ts @@ -0,0 +1,65 @@ +/** + * Tests for the internal logging helpers. + * + * Regression: the silence flag was captured once at module import, so setting + * LOGIXIA_SILENT_INTERNAL after import had no effect. It is now read per call, + * making the documented test-silencing reliable. + */ + +import { internalError, internalLog, internalWarn } from '../internal-log'; + +describe('internal-log helpers', () => { + let writeSpy: jest.SpyInstance; + let savedSilent: string | undefined; + + beforeEach(() => { + writeSpy = jest.spyOn(process.stderr, 'write').mockImplementation(() => true); + savedSilent = process.env['LOGIXIA_SILENT_INTERNAL']; + delete process.env['LOGIXIA_SILENT_INTERNAL']; + }); + + afterEach(() => { + writeSpy.mockRestore(); + if (savedSilent === undefined) delete process.env['LOGIXIA_SILENT_INTERNAL']; + else process.env['LOGIXIA_SILENT_INTERNAL'] = savedSilent; + }); + + it('writes prefixed messages to stderr', () => { + internalLog('hello'); + internalWarn('careful'); + internalError('broke', new Error('boom')); + + const output = writeSpy.mock.calls.map((c) => String(c[0])).join(''); + expect(output).toContain('[logixia] hello'); + expect(output).toContain('[logixia:warn] careful'); + expect(output).toContain('[logixia:error] broke — boom'); + }); + + it('appends a stringified non-Error cause', () => { + internalError('failed', 'plain reason'); + expect(String(writeSpy.mock.calls[0]![0])).toContain('failed — plain reason'); + }); + + it('omits the cause segment when no error is given', () => { + internalError('just a message'); + expect(String(writeSpy.mock.calls[0]![0])).toBe('[logixia:error] just a message\n'); + }); + + it('respects LOGIXIA_SILENT_INTERNAL set AFTER import (read per call)', () => { + process.env['LOGIXIA_SILENT_INTERNAL'] = '1'; + internalLog('should be silent'); + internalWarn('also silent'); + internalError('silent too', new Error('x')); + expect(writeSpy).not.toHaveBeenCalled(); + }); + + it('resumes writing when the flag is cleared again', () => { + process.env['LOGIXIA_SILENT_INTERNAL'] = '1'; + internalLog('silent'); + delete process.env['LOGIXIA_SILENT_INTERNAL']; + internalLog('audible'); + const output = writeSpy.mock.calls.map((c) => String(c[0])).join(''); + expect(output).not.toContain('silent'); + expect(output).toContain('audible'); + }); +}); diff --git a/src/utils/__tests__/otel.test.ts b/src/utils/__tests__/otel.test.ts new file mode 100644 index 0000000..fa863d3 --- /dev/null +++ b/src/utils/__tests__/otel.test.ts @@ -0,0 +1,57 @@ +/** + * Tests for the OpenTelemetry bridge. + * + * @opentelemetry/api is an optional dependency and is not installed in this + * project, so these tests pin the graceful-degradation contract: every helper + * must return undefined / {} (never throw) when the API is absent, and the + * bridge state (init/disable) must behave correctly. The hot-path helper + * _getOtelPayloadIfEnabled must never throw — it runs on every log call. + */ + +import { + _getOtelPayloadIfEnabled, + disableOtelBridge, + getActiveOtelContext, + getOtelMetaFields, + initOtelBridge, +} from '../otel'; + +afterEach(() => { + disableOtelBridge(); +}); + +describe('OTel bridge — graceful degradation (API absent)', () => { + it('getActiveOtelContext returns undefined without @opentelemetry/api', () => { + expect(getActiveOtelContext()).toBeUndefined(); + }); + + it('getActiveOtelContext never throws', () => { + expect(() => getActiveOtelContext({ sampledOnly: true })).not.toThrow(); + }); + + it('getOtelMetaFields returns an empty object when no span is active', () => { + expect(getOtelMetaFields()).toEqual({}); + }); +}); + +describe('OTel bridge — init/disable state', () => { + it('_getOtelPayloadIfEnabled returns {} when the bridge is not initialised', () => { + expect(_getOtelPayloadIfEnabled()).toEqual({}); + }); + + it('_getOtelPayloadIfEnabled returns {} after init when no span is active (API absent)', () => { + initOtelBridge(); + expect(_getOtelPayloadIfEnabled()).toEqual({}); + }); + + it('_getOtelPayloadIfEnabled never throws even after init', () => { + initOtelBridge({ traceIdField: 'trace_id', sampledOnly: true }); + expect(() => _getOtelPayloadIfEnabled()).not.toThrow(); + }); + + it('disableOtelBridge returns the bridge to the not-initialised state', () => { + initOtelBridge(); + disableOtelBridge(); + expect(_getOtelPayloadIfEnabled()).toEqual({}); + }); +}); diff --git a/src/utils/__tests__/redact.utils.test.ts b/src/utils/__tests__/redact.utils.test.ts index 7157e4c..2dea956 100644 --- a/src/utils/__tests__/redact.utils.test.ts +++ b/src/utils/__tests__/redact.utils.test.ts @@ -6,7 +6,7 @@ * arrays, nested objects, regex pattern redaction, and edge cases. */ -import { applyRedaction, redactObject } from '../redact.utils'; +import { applyRedaction, applyRedactionToString, redactObject } from '../redact.utils'; // ── Helpers ─────────────────────────────────────────────────────────────────── @@ -321,6 +321,37 @@ describe('applyRedaction', () => { }); }); +// ── applyRedactionToString — message-string redaction ──────────────────────── + +describe('applyRedactionToString', () => { + it('redacts a secret embedded in a plain string via patterns', () => { + const out = applyRedactionToString('Auth header was Bearer abc123secrettoken', { + patterns: [/Bearer\s+\S+/gi], + }); + expect(out).toBe('Auth header was [REDACTED]'); + }); + + it('applies autoDetect patterns to strings (e.g. JWTs)', () => { + const jwt = 'eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIn0.abc'; + const out = applyRedactionToString(`token=${jwt}`, { autoDetect: true }); + expect(out).toBe('token=[REDACTED]'); + }); + + it('respects a custom censor', () => { + const out = applyRedactionToString('key sk-0123456789abcdef0123', { + patterns: [/sk-[a-z0-9]{16,}/gi], + censor: '***', + }); + expect(out).toBe('key ***'); + }); + + it('returns the input unchanged when no patterns are configured', () => { + const input = 'Bearer abc123 — nothing to redact without patterns'; + expect(applyRedactionToString(input, { paths: ['password'] })).toBe(input); + expect(applyRedactionToString(input)).toBe(input); + }); +}); + // ── Pattern cache ───────────────────────────────────────────────────────────── describe('path pattern cache', () => { diff --git a/src/utils/__tests__/sampling.utils.comprehensive.test.ts b/src/utils/__tests__/sampling.utils.comprehensive.test.ts index 7b8d923..ee9992e 100644 --- a/src/utils/__tests__/sampling.utils.comprehensive.test.ts +++ b/src/utils/__tests__/sampling.utils.comprehensive.test.ts @@ -319,6 +319,41 @@ describe('Sampler', () => { }); }); + // ── bounded trace tracking (memory leak guard) ────────────────────────────── + + describe('traceConsistent — bounded memory', () => { + it('does not grow the sampled-trace set without limit when no stats timer resets it', () => { + // rate 1.0 → every unique trace is remembered as "sampled". Without a bound + // this Set would grow one entry per traceId forever (the leak). The cap + // clears it on overflow, so its size stays bounded. + const s = new Sampler({ rate: 1.0, traceConsistent: true }); + const internals = s as unknown as { + sampledTraces: Set; + maxTrackedTraces: number; + }; + const cap = internals.maxTrackedTraces; + + for (let i = 0; i < cap + 50; i += 1) { + s.shouldEmit('info', `trace-${i}`); + } + + // Never exceeds the cap (it clears and refills rather than growing forever). + expect(internals.sampledTraces.size).toBeLessThanOrEqual(cap); + s.destroy(); + }); + + it('still emits correctly after a trace-set overflow clear', () => { + const s = new Sampler({ rate: 1.0, traceConsistent: true }); + const internals = s as unknown as { maxTrackedTraces: number }; + for (let i = 0; i < internals.maxTrackedTraces + 10; i += 1) { + s.shouldEmit('info', `t-${i}`); + } + // A fresh trace after overflow still gets a correct (emit) decision. + expect(s.shouldEmit('info', 'fresh-trace')).toBe(true); + s.destroy(); + }); + }); + // ── destroy ─────────────────────────────────────────────────────────────── describe('destroy', () => { diff --git a/src/utils/__tests__/shutdown.utils.test.ts b/src/utils/__tests__/shutdown.utils.test.ts index 921ec26..0efe4c8 100644 --- a/src/utils/__tests__/shutdown.utils.test.ts +++ b/src/utils/__tests__/shutdown.utils.test.ts @@ -422,3 +422,45 @@ describe('flushOnExit — force-exit timeout', () => { stderrSpy.mockRestore(); }, 5000); }); + +// ── re-entrancy guard ───────────────────────────────────────────────────────── + +describe('flushOnExit — concurrent signal re-entrancy', () => { + it('flushes each logger only once when a second signal arrives mid-shutdown', async () => { + // close() resolves on a delay so the first shutdown is still in flight when + // the second signal arrives — the guard must drop the second one. + let resolveClose: () => void = () => {}; + const slow = makeFakeLogger( + () => + new Promise((resolve) => { + resolveClose = resolve; + }) + ); + registerForShutdown(slow); + flushOnExit({ timeout: 1000 }); + + // Fire SIGTERM, then SIGINT before the first close() resolves. + process.emit('SIGTERM', 'SIGTERM'); + process.emit('SIGINT', 'SIGINT'); + await new Promise((r) => setImmediate(r)); + + // Only the first signal started a shutdown → close() called exactly once. + expect(slow.close).toHaveBeenCalledTimes(1); + + resolveClose(); + await new Promise((r) => setImmediate(r)); + }); + + it('does not call process.exit twice for two concurrent signals', async () => { + const logger = makeFakeLogger(); + registerForShutdown(logger); + flushOnExit({ timeout: 1000 }); + + process.emit('SIGTERM', 'SIGTERM'); + process.emit('SIGTERM', 'SIGTERM'); + await new Promise((r) => setImmediate(r)); + await new Promise((r) => setImmediate(r)); + + expect(exitSpy).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/utils/__tests__/trace.utils.comprehensive.test.ts b/src/utils/__tests__/trace.utils.comprehensive.test.ts index 858a969..4d26e10 100644 --- a/src/utils/__tests__/trace.utils.comprehensive.test.ts +++ b/src/utils/__tests__/trace.utils.comprehensive.test.ts @@ -350,4 +350,50 @@ describe('createTraceMiddleware', () => { done(); }); }); + + // ── response-API robustness (Fastify / missing method / headers sent) ─────── + + it('uses reply.header() when setHeader is absent (Fastify-style reply)', (done) => { + const middleware = createTraceMiddleware({ enabled: true }); + const req = { headers: {} }; + const res = { header: jest.fn() }; // no setHeader + middleware(req, res, () => { + expect(res.header).toHaveBeenCalledWith('X-Trace-Id', expect.any(String)); + done(); + }); + }); + + it('does not throw when the response has no header-setting method', (done) => { + const middleware = createTraceMiddleware({ enabled: true }); + const req = { headers: {} }; + const res = {}; // neither setHeader nor header + expect(() => + middleware(req, res, () => { + // context still established despite no settable header + expect(typeof getCurrentTraceId()).toBe('string'); + done(); + }) + ).not.toThrow(); + }); + + it('skips setting the header when headers are already sent', (done) => { + const middleware = createTraceMiddleware({ enabled: true }); + const req = { headers: {} }; + const res = { setHeader: jest.fn(), headersSent: true }; + middleware(req, res, () => { + expect(res.setHeader).not.toHaveBeenCalled(); + done(); + }); + }); + + it('does not throw when setHeader itself throws (e.g. stream already closed)', (done) => { + const middleware = createTraceMiddleware({ enabled: true }); + const req = { headers: {} }; + const res = { + setHeader: () => { + throw new Error('Cannot set headers after they are sent'); + }, + }; + expect(() => middleware(req, res, () => done())).not.toThrow(); + }); }); diff --git a/src/utils/coerce.utils.ts b/src/utils/coerce.utils.ts index d764212..ef3d0aa 100644 --- a/src/utils/coerce.utils.ts +++ b/src/utils/coerce.utils.ts @@ -32,11 +32,16 @@ export function safeToString(value: unknown): string { // value is now narrowed to object — JSON-stringify, falling back to a // constructor-name tag for circular refs / throwing toJSON. try { - return JSON.stringify(value); + const json = JSON.stringify(value); + // JSON.stringify returns undefined for values it can't represent (e.g. an + // object whose toJSON() returns undefined). Returning that would defeat the + // whole point of this helper — a downstream .replace() on `undefined` throws. + if (json !== undefined) return json; } catch { - const ctor = (value as { constructor?: { name?: string } }).constructor?.name; - return `[${ctor ?? 'object'}]`; + /* fall through to the constructor-name tag below */ } + const ctor = (value as { constructor?: { name?: string } }).constructor?.name; + return `[${ctor ?? 'object'}]`; } /** diff --git a/src/utils/error.utils.ts b/src/utils/error.utils.ts index eddd71a..d571e40 100644 --- a/src/utils/error.utils.ts +++ b/src/utils/error.utils.ts @@ -79,7 +79,7 @@ function _serializeError( seen ); } else { - serialized.cause = serializeValue(errorWithCause.cause, maxDepth - depth - 1); + serialized.cause = serializeValue(errorWithCause.cause, maxDepth - depth - 1, seen); } } @@ -89,7 +89,7 @@ function _serializeError( serialized.errors = aggregateError.errors.map((e) => e instanceof Error ? _serializeError(e, includeStack, maxDepth, excludeFields, depth + 1, seen) - : serializeValue(e, maxDepth - depth - 1) + : serializeValue(e, maxDepth - depth - 1, seen) ); } @@ -116,7 +116,7 @@ function _serializeError( if (skip.has(key)) continue; if (key === '__proto__' || key === 'constructor' || key === 'prototype') continue; try { - serialized[key] = serializeValue(errorRecord[key], maxDepth - depth - 1); + serialized[key] = serializeValue(errorRecord[key], maxDepth - depth - 1, seen); } catch { serialized[key] = '[Unserializable]'; } @@ -127,8 +127,13 @@ function _serializeError( /** * Recursively serialize an arbitrary value to a JSON-safe representation. + * + * The `seen` guard is threaded through (not recreated) so cross-referencing + * errors nested inside plain objects/arrays are caught by the same circular + * check that protects the top-level error tree, rather than only relying on the + * depth limit. */ -function serializeValue(value: unknown, remainingDepth: number): unknown { +function serializeValue(value: unknown, remainingDepth: number, seen: WeakSet): unknown { if (remainingDepth <= 0) return '[Max Depth]'; if (value === null || value === undefined) return value; @@ -139,19 +144,26 @@ function serializeValue(value: unknown, remainingDepth: number): unknown { if (value instanceof Date) return value.toISOString(); if (value instanceof Error) { - return _serializeError(value, true, remainingDepth, [], 0, new WeakSet()); + if (seen.has(value)) { + return { name: value.name, message: value.message, _circular: true }; + } + return _serializeError(value, true, remainingDepth, [], 0, seen); } if (Array.isArray(value)) { - return value.map((item) => serializeValue(item, remainingDepth - 1)); + if (seen.has(value)) return '[Circular]'; + seen.add(value); + return value.map((item) => serializeValue(item, remainingDepth - 1, seen)); } if (typeof value === 'object') { + if (seen.has(value)) return '[Circular]'; + seen.add(value); const out: Record = {}; for (const [k, v] of Object.entries(value as Record)) { if (k === '__proto__' || k === 'constructor' || k === 'prototype') continue; try { - out[k] = serializeValue(v, remainingDepth - 1); + out[k] = serializeValue(v, remainingDepth - 1, seen); } catch { out[k] = '[Unserializable]'; } diff --git a/src/utils/internal-log.ts b/src/utils/internal-log.ts index 944a961..0aa2cae 100644 --- a/src/utils/internal-log.ts +++ b/src/utils/internal-log.ts @@ -10,7 +10,15 @@ * the LOGIXIA_SILENT_INTERNAL=1 environment variable. */ -const silent = process.env.LOGIXIA_SILENT_INTERNAL === '1'; +/** + * Read the silence flag on each call rather than caching it at import time, so + * setting LOGIXIA_SILENT_INTERNAL=1 AFTER the module is first imported (e.g. in + * a test setup file) still takes effect — the documented test-silencing + * behavior was previously unreliable because the value was frozen at load. + */ +function isSilent(): boolean { + return process.env.LOGIXIA_SILENT_INTERNAL === '1'; +} /** * Emit an internal debug/info message to stderr. @@ -18,7 +26,7 @@ const silent = process.env.LOGIXIA_SILENT_INTERNAL === '1'; * during development but should not appear in production log streams. */ export function internalLog(message: string): void { - if (!silent) { + if (!isSilent()) { process.stderr.write(`[logixia] ${message}\n`); } } @@ -28,7 +36,7 @@ export function internalLog(message: string): void { * Use when something is misconfigured but logixia can continue operating. */ export function internalWarn(message: string): void { - if (!silent) { + if (!isSilent()) { process.stderr.write(`[logixia:warn] ${message}\n`); } } @@ -38,7 +46,7 @@ export function internalWarn(message: string): void { * Use when a transport write fails or a serious internal error occurs. */ export function internalError(message: string, error?: unknown): void { - if (!silent) { + if (!isSilent()) { let errStr = ''; if (error instanceof Error) { errStr = ` — ${error.message}`; diff --git a/src/utils/otel.ts b/src/utils/otel.ts index 85fd6d0..f26fd8f 100644 --- a/src/utils/otel.ts +++ b/src/utils/otel.ts @@ -113,20 +113,29 @@ export function getActiveOtelContext(opts: OtelBridgeOptions = {}): OtelSpanCont const api = tryLoadOtelApi(); if (!api) return undefined; - const ctx = api.context.active(); - const sc = api.trace.getSpanContext(ctx); - if (!sc || !api.trace.isSpanContextValid(sc)) return undefined; + // Wrap the whole OTel interaction: this runs on EVERY log call via the bridge, + // and a broken/mismatched OTel SDK whose context/trace methods throw would + // otherwise crash logging itself. The integration is meant to be silent — on + // any failure we just skip injection. + try { + const ctx = api.context.active(); + const sc = api.trace.getSpanContext(ctx); + if (!sc || !api.trace.isSpanContextValid(sc)) return undefined; - const isSampled = (sc.traceFlags & api.trace.TraceFlags.SAMPLED) === api.trace.TraceFlags.SAMPLED; + const isSampled = + (sc.traceFlags & api.trace.TraceFlags.SAMPLED) === api.trace.TraceFlags.SAMPLED; - if (opts.sampledOnly && !isSampled) return undefined; + if (opts.sampledOnly && !isSampled) return undefined; - return { - traceId: sc.traceId, - spanId: sc.spanId, - traceFlags: sc.traceFlags, - isSampled, - }; + return { + traceId: sc.traceId, + spanId: sc.spanId, + traceFlags: sc.traceFlags, + isSampled, + }; + } catch { + return undefined; + } } /** diff --git a/src/utils/redact.utils.ts b/src/utils/redact.utils.ts index 20651e6..23b868d 100644 --- a/src/utils/redact.utils.ts +++ b/src/utils/redact.utils.ts @@ -278,6 +278,24 @@ export function redactObject( return result; } +/** + * Apply pattern-based redaction to a single string (e.g. the log message). + * + * Path-based rules don't apply to a bare string — only the `patterns` are run. + * Returns the input unchanged when no patterns are configured, so the hot path + * stays allocation-free for the common no-redact case. + */ +export function applyRedactionToString(value: string, config: RedactConfig | undefined): string { + if (!config) return value; + const resolved = resolveConfig(config); + if (!resolved.patterns || resolved.patterns.length === 0) return value; + let redacted = value; + for (const pattern of resolved.patterns) { + redacted = redacted.replace(pattern, resolved.censor ?? DEFAULT_CENSOR); + } + return redacted; +} + /** * Apply redaction to a log payload (top-level call convenience wrapper). * Returns a new object — never mutates the input. diff --git a/src/utils/sampling.utils.ts b/src/utils/sampling.utils.ts index 561ccee..2ce9bf5 100644 --- a/src/utils/sampling.utils.ts +++ b/src/utils/sampling.utils.ts @@ -34,6 +34,15 @@ export class Sampler { private readonly sampledTraces = new Set(); /** Trace IDs that have been dropped in this window → always drop. */ private readonly droppedTraces = new Set(); + /** + * Upper bound on tracked trace IDs. resetStats() only clears these Sets when a + * stats timer is running, so without an onStats callback they would otherwise + * grow unbounded for the life of the process (one entry per unique traceId) — + * a memory leak in any long-running service using traceConsistent sampling. + * When a Set hits this cap it is cleared; a re-seen trace simply gets a fresh + * sampling decision, which is acceptable for sampling consistency. + */ + private readonly maxTrackedTraces = 100_000; // Rate-limiting state private _tokenBucket = 0; @@ -100,9 +109,9 @@ export class Sampler { // First time we see this traceId — make the sampling decision now const emit = this._sampleByRate(lvl); if (emit) { - this.sampledTraces.add(traceId); + this._rememberTrace(this.sampledTraces, traceId); } else { - this.droppedTraces.add(traceId); + this._rememberTrace(this.droppedTraces, traceId); } if (emit) this._trackEmitted(lvl); else this._trackDropped(lvl); @@ -161,6 +170,19 @@ export class Sampler { return Math.random() < rate; } + /** + * Record a trace decision, bounding the Set so it can't grow without limit. + * If the Set has reached the cap, clear it before inserting — stale decisions + * are simply re-made on next sight, which keeps memory bounded at the cost of + * occasional re-sampling for very high-cardinality trace workloads. + */ + private _rememberTrace(set: Set, traceId: string): void { + if (set.size >= this.maxTrackedTraces) { + set.clear(); + } + set.add(traceId); + } + private _consumeToken(): boolean { const now = Date.now(); const elapsed = (now - this._lastRefillMs) / 1000; diff --git a/src/utils/shutdown.utils.ts b/src/utils/shutdown.utils.ts index 6981b54..612c18f 100644 --- a/src/utils/shutdown.utils.ts +++ b/src/utils/shutdown.utils.ts @@ -22,6 +22,10 @@ type Closeable = { close(): Promise }; /** Module-level registry of all logger instances that have opted into graceful shutdown */ const registry = new Set(); let shutdownHandlerRegistered = false; +/** Guards the handler against re-entrancy: a second signal (e.g. SIGINT after + * SIGTERM, or an impatient double Ctrl+C) must not start a second concurrent + * flush+exit, which could exit the process and truncate the first flush mid-write. */ +let shutdownInProgress = false; /** Our own handler + the signals we attached it to, so reset() can detach exactly it. */ let activeHandler: ((signal: NodeJS.Signals) => void | Promise) | undefined; let activeSignals: NodeJS.Signals[] = []; @@ -58,6 +62,12 @@ export function flushOnExit(options: FlushOnExitOptions = {}): void { const { timeout = 5000, signals = ['SIGTERM', 'SIGINT'], beforeFlush, afterFlush } = options; const handler = async (signal: NodeJS.Signals) => { + // Re-entrancy guard: a flush is already running from an earlier signal. + // Starting a second one would race two close()/exit() sequences and could + // truncate the in-flight flush — exactly the log loss we are preventing. + if (shutdownInProgress) return; + shutdownInProgress = true; + // Force-exit safety net — if flushing hangs, don't block the process forever const forceExitTimer = setTimeout(() => { process.stderr.write( @@ -105,4 +115,5 @@ export function resetShutdownHandlers(): void { activeHandler = undefined; activeSignals = []; shutdownHandlerRegistered = false; + shutdownInProgress = false; } diff --git a/src/utils/trace.utils.ts b/src/utils/trace.utils.ts index bff3a5c..8d855e9 100644 --- a/src/utils/trace.utils.ts +++ b/src/utils/trace.utils.ts @@ -310,7 +310,26 @@ export function createTraceMiddleware(config: TraceIdConfig) { } (req as Record).traceId = traceId; - (res as { setHeader: (k: string, v: string) => void }).setHeader('X-Trace-Id', traceId); + // Echo the trace ID back on the response, tolerating different response APIs: + // Express/Node use res.setHeader(), Fastify replies use reply.header(). Guard + // both and never let a missing/!function method (or already-sent headers) + // crash the request — the trace ID is still propagated via async context. + const resObj = res as { + setHeader?: (k: string, v: string) => void; + header?: (k: string, v: string) => void; + headersSent?: boolean; + }; + if (!resObj.headersSent) { + try { + if (typeof resObj.setHeader === 'function') { + resObj.setHeader('X-Trace-Id', traceId); + } else if (typeof resObj.header === 'function') { + resObj.header('X-Trace-Id', traceId); + } + } catch { + /* response not in a header-settable state — context propagation still works */ + } + } runWithTraceId(traceId, () => next()); };