Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
GraphObjectLocationOnDisk
>(ENTITY_LOCATION_ON_DISK_DEFAULT_MAP_KEY_SPACE);
private readonly logger?: IntegrationLogger;
private entityFileCache = new Map<string, FlushedEntityData>();
private static readonly ENTITY_FILE_CACHE_MAX_SIZE = 50;

constructor(params?: FileSystemGraphObjectStoreParams) {
this.semaphore = new Sema(BINARY_SEMAPHORE_CONCURRENCY);
Expand Down Expand Up @@ -209,10 +211,22 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
const filePath = getRootStorageAbsolutePath(
entityLocationOnDisk.graphDataPath,
);
const { entities } = await readGraphObjectFile<FlushedEntityData>({
filePath,
});
return entities[entityLocationOnDisk.index];

let fileData = this.entityFileCache.get(filePath);
if (!fileData) {
fileData = await readGraphObjectFile<FlushedEntityData>({ filePath });

if (
this.entityFileCache.size >=
FileSystemGraphObjectStore.ENTITY_FILE_CACHE_MAX_SIZE
) {
const oldestKey = this.entityFileCache.keys().next().value;
this.entityFileCache.delete(oldestKey!);
}
this.entityFileCache.set(filePath, fileData);
}

return fileData.entities[entityLocationOnDisk.index];
}

async iterateEntities<T extends Entity = Entity>(
Expand Down Expand Up @@ -303,6 +317,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
onEntitiesFlushed?: (entities: Entity[]) => Promise<void>,
force: Boolean = false,
) {
this.entityFileCache.clear();
await this.lockOperation(async () => {
// This code rechecks the condition that triggers the flushing process to avoid unnecessary uploads
// During concurrent steps, we might be deleting items from memory while a step is adding new items. This could cause the threshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,135 @@ describe('findEntity', () => {
});
});

describe('findEntity caching', () => {
test('should serve second lookup from cache without re-reading file', async () => {
const { storageDirectoryPath, store } = setupFileSystemObjectStore();
const _type = uuid();
const _key = uuid();

const entities = [
...times(5, () => createTestEntity({ _type })),
createTestEntity({ _type, _key }),
];

await store.addEntities(storageDirectoryPath, entities);
await store.flushEntitiesToDisk(undefined, true);

const readFileSpy = jest.spyOn(fs, 'readFile');

// First lookup — should read from disk
const entity1 = await store.findEntity(_key);
expect(entity1).toBeDefined();
const readCountAfterFirst = readFileSpy.mock.calls.length;
expect(readCountAfterFirst).toBeGreaterThanOrEqual(1);

// Second lookup — should be served from cache (no additional reads)
const entity2 = await store.findEntity(_key);
expect(entity2).toEqual(entity1);
expect(readFileSpy.mock.calls.length).toBe(readCountAfterFirst);

readFileSpy.mockRestore();
});

test('should serve different entities from the same chunk file without re-reading', async () => {
const { storageDirectoryPath, store } = setupFileSystemObjectStore();
const _type = uuid();
const keyA = uuid();
const keyB = uuid();

const entities = [
createTestEntity({ _type, _key: keyA }),
createTestEntity({ _type, _key: keyB }),
];

await store.addEntities(storageDirectoryPath, entities);
await store.flushEntitiesToDisk(undefined, true);

const readFileSpy = jest.spyOn(fs, 'readFile');

// First lookup — reads file
const entityA = await store.findEntity(keyA);
expect(entityA).toBeDefined();
const readCountAfterFirst = readFileSpy.mock.calls.length;

// Second lookup for different entity in same chunk — should use cache
const entityB = await store.findEntity(keyB);
expect(entityB).toBeDefined();
expect(entityB!._key).not.toBe(entityA!._key);
expect(readFileSpy.mock.calls.length).toBe(readCountAfterFirst);

readFileSpy.mockRestore();
});

test('should invalidate cache when entities are flushed to disk', async () => {
const { storageDirectoryPath, store } = setupFileSystemObjectStore();
const _type = uuid();
const _key = uuid();

await store.addEntities(storageDirectoryPath, [
createTestEntity({ _type, _key }),
]);
await store.flushEntitiesToDisk(undefined, true);

const readFileSpy = jest.spyOn(fs, 'readFile');

// Populate cache
await store.findEntity(_key);
const readCountAfterFirst = readFileSpy.mock.calls.length;

// Flush more entities — should clear cache
await store.addEntities(storageDirectoryPath, [
createTestEntity({ _type }),
]);
await store.flushEntitiesToDisk(undefined, true);

// Lookup again — cache was cleared, should read from disk again
await store.findEntity(_key);
expect(readFileSpy.mock.calls.length).toBeGreaterThan(readCountAfterFirst);

readFileSpy.mockRestore();
});

test('should evict oldest cache entry when cache exceeds max size', async () => {
const { storageDirectoryPath, store } = setupFileSystemObjectStore({
graphObjectFileSize: 1, // 1 entity per file to force many files
});

const entityKeys: string[] = [];

// Create 52 entities each with a unique type (forces separate files)
for (let i = 0; i < 52; i++) {
const _type = uuid();
const _key = uuid();
entityKeys.push(_key);
await store.addEntities(storageDirectoryPath, [
createTestEntity({ _type, _key }),
]);
}
await store.flushEntitiesToDisk(undefined, true);

const readFileSpy = jest.spyOn(fs, 'readFile');

// Lookup all 52 entities to fill and overflow the cache (max 50)
for (const key of entityKeys) {
await store.findEntity(key);
}
const readCountAfterAll = readFileSpy.mock.calls.length;
expect(readCountAfterAll).toBe(52);

// The first two entries should have been evicted
// Looking up the first entity again should require a file read
await store.findEntity(entityKeys[0]);
expect(readFileSpy.mock.calls.length).toBe(readCountAfterAll + 1);

// But looking up the last entity should still be cached
await store.findEntity(entityKeys[51]);
expect(readFileSpy.mock.calls.length).toBe(readCountAfterAll + 1);

readFileSpy.mockRestore();
});
});

describe('iterateEntities', () => {
test('iterated entities are mutable, but only if TS error is ignored.', async () => {
const { storageDirectoryPath, store } = setupFileSystemObjectStore();
Expand Down
Loading