diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts index 575183519..e571190fa 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts @@ -142,6 +142,8 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { GraphObjectLocationOnDisk >(ENTITY_LOCATION_ON_DISK_DEFAULT_MAP_KEY_SPACE); private readonly logger?: IntegrationLogger; + private entityFileCache = new Map(); + private static readonly ENTITY_FILE_CACHE_MAX_SIZE = 50; constructor(params?: FileSystemGraphObjectStoreParams) { this.semaphore = new Sema(BINARY_SEMAPHORE_CONCURRENCY); @@ -209,10 +211,22 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { const filePath = getRootStorageAbsolutePath( entityLocationOnDisk.graphDataPath, ); - const { entities } = await readGraphObjectFile({ - filePath, - }); - return entities[entityLocationOnDisk.index]; + + let fileData = this.entityFileCache.get(filePath); + if (!fileData) { + fileData = await readGraphObjectFile({ filePath }); + + if ( + this.entityFileCache.size >= + FileSystemGraphObjectStore.ENTITY_FILE_CACHE_MAX_SIZE + ) { + const oldestKey = this.entityFileCache.keys().next().value; + this.entityFileCache.delete(oldestKey!); + } + this.entityFileCache.set(filePath, fileData); + } + + return fileData.entities[entityLocationOnDisk.index]; } async iterateEntities( @@ -303,6 +317,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { onEntitiesFlushed?: (entities: Entity[]) => Promise, force: Boolean = false, ) { + this.entityFileCache.clear(); await this.lockOperation(async () => { // This code rechecks the condition that triggers the flushing process to avoid unnecessary uploads // During concurrent steps, we might be deleting items from memory while a step is adding new items. This could cause the threshold diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts index a60f320bf..8c96a82c2 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts @@ -505,6 +505,135 @@ describe('findEntity', () => { }); }); +describe('findEntity caching', () => { + test('should serve second lookup from cache without re-reading file', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore(); + const _type = uuid(); + const _key = uuid(); + + const entities = [ + ...times(5, () => createTestEntity({ _type })), + createTestEntity({ _type, _key }), + ]; + + await store.addEntities(storageDirectoryPath, entities); + await store.flushEntitiesToDisk(undefined, true); + + const readFileSpy = jest.spyOn(fs, 'readFile'); + + // First lookup — should read from disk + const entity1 = await store.findEntity(_key); + expect(entity1).toBeDefined(); + const readCountAfterFirst = readFileSpy.mock.calls.length; + expect(readCountAfterFirst).toBeGreaterThanOrEqual(1); + + // Second lookup — should be served from cache (no additional reads) + const entity2 = await store.findEntity(_key); + expect(entity2).toEqual(entity1); + expect(readFileSpy.mock.calls.length).toBe(readCountAfterFirst); + + readFileSpy.mockRestore(); + }); + + test('should serve different entities from the same chunk file without re-reading', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore(); + const _type = uuid(); + const keyA = uuid(); + const keyB = uuid(); + + const entities = [ + createTestEntity({ _type, _key: keyA }), + createTestEntity({ _type, _key: keyB }), + ]; + + await store.addEntities(storageDirectoryPath, entities); + await store.flushEntitiesToDisk(undefined, true); + + const readFileSpy = jest.spyOn(fs, 'readFile'); + + // First lookup — reads file + const entityA = await store.findEntity(keyA); + expect(entityA).toBeDefined(); + const readCountAfterFirst = readFileSpy.mock.calls.length; + + // Second lookup for different entity in same chunk — should use cache + const entityB = await store.findEntity(keyB); + expect(entityB).toBeDefined(); + expect(entityB!._key).not.toBe(entityA!._key); + expect(readFileSpy.mock.calls.length).toBe(readCountAfterFirst); + + readFileSpy.mockRestore(); + }); + + test('should invalidate cache when entities are flushed to disk', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore(); + const _type = uuid(); + const _key = uuid(); + + await store.addEntities(storageDirectoryPath, [ + createTestEntity({ _type, _key }), + ]); + await store.flushEntitiesToDisk(undefined, true); + + const readFileSpy = jest.spyOn(fs, 'readFile'); + + // Populate cache + await store.findEntity(_key); + const readCountAfterFirst = readFileSpy.mock.calls.length; + + // Flush more entities — should clear cache + await store.addEntities(storageDirectoryPath, [ + createTestEntity({ _type }), + ]); + await store.flushEntitiesToDisk(undefined, true); + + // Lookup again — cache was cleared, should read from disk again + await store.findEntity(_key); + expect(readFileSpy.mock.calls.length).toBeGreaterThan(readCountAfterFirst); + + readFileSpy.mockRestore(); + }); + + test('should evict oldest cache entry when cache exceeds max size', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectFileSize: 1, // 1 entity per file to force many files + }); + + const entityKeys: string[] = []; + + // Create 52 entities each with a unique type (forces separate files) + for (let i = 0; i < 52; i++) { + const _type = uuid(); + const _key = uuid(); + entityKeys.push(_key); + await store.addEntities(storageDirectoryPath, [ + createTestEntity({ _type, _key }), + ]); + } + await store.flushEntitiesToDisk(undefined, true); + + const readFileSpy = jest.spyOn(fs, 'readFile'); + + // Lookup all 52 entities to fill and overflow the cache (max 50) + for (const key of entityKeys) { + await store.findEntity(key); + } + const readCountAfterAll = readFileSpy.mock.calls.length; + expect(readCountAfterAll).toBe(52); + + // The first two entries should have been evicted + // Looking up the first entity again should require a file read + await store.findEntity(entityKeys[0]); + expect(readFileSpy.mock.calls.length).toBe(readCountAfterAll + 1); + + // But looking up the last entity should still be cached + await store.findEntity(entityKeys[51]); + expect(readFileSpy.mock.calls.length).toBe(readCountAfterAll + 1); + + readFileSpy.mockRestore(); + }); +}); + describe('iterateEntities', () => { test('iterated entities are mutable, but only if TS error is ignored.', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore();