Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions src/transports/__tests__/file.transport.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Tests for FileTransport batching + flush correctness.
*
* Regression coverage for the batch-flush race: addToBatch() fires flush()
* un-awaited on every Nth entry, so a synchronous burst of writes triggers
* many overlapping flushes. Before the fix each overlapping flush snapshotted
* the not-yet-cleared batch and wrote it again, turning N log calls into N²
* file lines. The fix detaches the batch synchronously before awaiting and
* serializes concurrent flushes.
*/

import * as fs from 'node:fs';
import { mkdtempSync, readFileSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import * as path from 'node:path';

import type { TransportLogEntry } from '../../types/transport.types';
import { FileTransport } from '../file.transport';

function makeEntry(index: number): TransportLogEntry {
return {
timestamp: new Date('2026-01-01T00:00:00.000Z'),
level: 'info',
message: `burst-line-${index}`,
};
}

function readLines(filePath: string): string[] {
if (!fs.existsSync(filePath)) return [];
return readFileSync(filePath, 'utf8').split('\n').filter(Boolean);
}

describe('FileTransport — batch flush', () => {
let dir: string;

beforeEach(() => {
dir = mkdtempSync(path.join(tmpdir(), 'logixia-file-transport-'));
});

afterEach(() => {
rmSync(dir, { recursive: true, force: true });
});

it('writes each entry exactly once on a synchronous un-awaited burst far larger than batchSize', async () => {
const filename = 'burst.log';
const transport = new FileTransport({ dirname: dir, filename, batchSize: 100 });

const total = 500;
// Fire the whole burst synchronously without awaiting each write — exactly like
// a buffered-log flush replaying many entries. write() returns immediately while
// addToBatch fires its threshold flushes in the background.
const writes: Array<Promise<void>> = [];
for (let index = 0; index < total; index += 1) {
writes.push(transport.write(makeEntry(index)));
}
await Promise.allSettled(writes);
await transport.flush();

const lines = readLines(path.join(dir, filename));
expect(lines).toHaveLength(total);
expect(new Set(lines).size).toBe(total);
});

it('flushes any partial batch left below the threshold', async () => {
const filename = 'partial.log';
const transport = new FileTransport({ dirname: dir, filename, batchSize: 100 });

for (let index = 0; index < 7; index += 1) {
await transport.write(makeEntry(index));
}

await transport.flush();

expect(readLines(path.join(dir, filename))).toHaveLength(7);
});

it('is a no-op when flushed with an empty batch', async () => {
const filename = 'empty.log';
const transport = new FileTransport({ dirname: dir, filename, batchSize: 100 });

await transport.flush();
await transport.flush();

expect(readLines(path.join(dir, filename))).toHaveLength(0);
});

it('does not re-write entries when flush() is called concurrently', async () => {
const filename = 'concurrent.log';
const transport = new FileTransport({ dirname: dir, filename, batchSize: 1000 });

const writes: Array<Promise<void>> = [];
for (let index = 0; index < 50; index += 1) {
writes.push(transport.write(makeEntry(index)));
}
await Promise.allSettled(writes);

// Several overlapping flushes must collapse to one logical drain.
await Promise.all([transport.flush(), transport.flush(), transport.flush()]);
await transport.flush();

const lines = readLines(path.join(dir, filename));
expect(lines).toHaveLength(50);
expect(new Set(lines).size).toBe(50);
});
});
57 changes: 47 additions & 10 deletions src/transports/file.transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,21 @@ export class FileTransport implements ITransport, IBatchTransport {
public readonly flushInterval?: number;
public readonly filter?: (entry: TransportLogEntry) => boolean;

private config: FileTransportConfig;
private readonly config: FileTransportConfig;
private writeStream: WriteStream | undefined;
private batch: TransportLogEntry[] = [];
private batchTimer?: NodeJS.Timeout | undefined;
private lastRotation: Date = new Date();
private currentFilePath: string;
/** Guards against concurrent rotation — two simultaneous writes both seeing shouldRotateNow(). */
private isRotating = false;
/**
* The in-flight drain promise, or undefined when idle. Concurrent flush() calls
* (e.g. the un-awaited flush addToBatch fires on every Nth entry) all await this
* single promise instead of starting their own drain, so the batch is never
* snapshotted and written twice.
*/
private flushPromise: Promise<void> | undefined;

constructor(config: FileTransportConfig) {
this.config = {
Expand Down Expand Up @@ -80,15 +87,39 @@ export class FileTransport implements ITransport, IBatchTransport {
}

async flush(): Promise<void> {
if (this.batch.length > 0) {
await this.writeBatch([...this.batch]);
this.batch = [];
}

if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = undefined;
}

// Serialize concurrent flushes. addToBatch() calls flush() un-awaited on every
// Nth entry, so a synchronous burst of writes can fire many overlapping flushes.
// Every caller (awaited or not) joins the SAME in-flight drain, so the batch is
// never snapshotted twice — which previously turned N log calls into N² file
// lines (the batch-flush duplication bug). Callers that await flush() still see
// the batch fully drained because they await the shared drain promise.
if (!this.flushPromise) {
this.flushPromise = this.drain().finally(() => {
this.flushPromise = undefined;
});
}

await this.flushPromise;
}

/**
* Drains the batch to disk one snapshot at a time. Entries appended while a
* write is in flight are picked up by the next loop iteration, so a single
* drain() empties the batch completely regardless of concurrent writes.
*/
private async drain(): Promise<void> {
while (this.batch.length > 0) {
// Detach the current batch SYNCHRONOUSLY before awaiting, so entries pushed
// during the write land in a fresh array and are never re-written.
const pending = this.batch;
this.batch = [];
await this.writeBatch(pending);
}
}

private formatEntry(entry: TransportLogEntry): string {
Expand Down Expand Up @@ -124,6 +155,12 @@ export class FileTransport implements ITransport, IBatchTransport {
this.batch.push(entry);

if (this.batch.length >= (this.config.batchSize || 100)) {
// Cancel any pending interval flush — the threshold flush supersedes it, and
// a leftover timer would fire a redundant flush after the batch is already gone.
if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = undefined;
}
this.flush().catch((err: unknown) => internalError('FileTransport batch flush failed', err));
} else if (!this.batchTimer && this.config.flushInterval) {
this.batchTimer = setTimeout(() => {
Expand Down Expand Up @@ -152,18 +189,18 @@ export class FileTransport implements ITransport, IBatchTransport {
}

private shouldRotateNow(): boolean {
if (!this.config.rotation) return false;
if (!this.config.rotation?.interval) return false;

const now = new Date();
const timeDiff = now.getTime() - this.lastRotation.getTime();
const rotationInterval = this.parseInterval(this.config.rotation.interval!);
const rotationInterval = this.parseInterval(this.config.rotation.interval);

return timeDiff >= rotationInterval;
}

private parseInterval(interval: string): number {
// eslint-disable-next-line sonarjs/slow-regex -- simple bounded pattern for interval parsing
const match = interval.match(/(\d+)([hdwmy])/i);
const match = new RegExp(/(\d+)([hdwmy])/i).exec(interval);
if (!match) return 24 * 60 * 60 * 1000; // Default 1 day

const value = Number.parseInt(match[1] || '1', 10);
Expand Down Expand Up @@ -268,7 +305,7 @@ export class FileTransport implements ITransport, IBatchTransport {
const fileStats = await Promise.all(logFiles);
const sortedFiles = fileStats.sort((a, b) => b.mtime.getTime() - a.mtime.getTime());

const filesToDelete = sortedFiles.slice(this.config.rotation!.maxFiles!);
const filesToDelete = sortedFiles.slice(this.config.rotation.maxFiles);

for (const file of filesToDelete) {
await unlink(file.path);
Expand Down
Loading