From 1e2606b0246b27e689e55c0e07a1460b04f8cccf Mon Sep 17 00:00:00 2001 From: Joaquim d'Souza Date: Thu, 16 Apr 2026 12:46:35 +0200 Subject: [PATCH] feat: dont regeocode records that havent changed in importdatarecords --- src/server/adaptors/csv.ts | 18 +- src/server/jobs/importDataRecords.ts | 22 ++- src/server/jobs/importDataSource.ts | 50 +++-- tests/feature/importDataRecords.test.ts | 239 ++++++++++++++++++++++++ tests/setup.ts | 2 - 5 files changed, 311 insertions(+), 20 deletions(-) create mode 100644 tests/feature/importDataRecords.test.ts diff --git a/src/server/adaptors/csv.ts b/src/server/adaptors/csv.ts index 69112dc96..50e3e2c8f 100644 --- a/src/server/adaptors/csv.ts +++ b/src/server/adaptors/csv.ts @@ -128,9 +128,21 @@ export class CSVAdaptor implements DataSourceAdaptor { return null; } - // eslint-disable-next-line @typescript-eslint/no-unused-vars - fetchByExternalId(externalIds: string[]): Promise { - throw new Error("Method not implemented."); + async fetchByExternalId(externalIds: string[]): Promise { + const idSet = new Set(externalIds); + const results: ExternalRecord[] = []; + const content = await this.createReadStream(); + const parser = content.pipe(parse({ columns: true })); + let row = 1; + for await (const record of parser) { + if (Object.keys(record).length) { + if (idSet.has(String(row))) { + results.push({ externalId: String(row), json: record }); + } + row++; + } + } + return results; } removeDevWebhooks(): Promise { diff --git a/src/server/jobs/importDataRecords.ts b/src/server/jobs/importDataRecords.ts index c8d2bc6c4..85c0d1e12 100644 --- a/src/server/jobs/importDataRecords.ts +++ b/src/server/jobs/importDataRecords.ts @@ -8,6 +8,8 @@ import { db } from "@/server/services/database"; import logger from "@/server/services/logger"; import { batchAsync } from "../utils"; import { importBatch, inferColumnSemanticTypes } from "./importDataSource"; +import type { GeocodeResult } from "@/models/DataRecord"; +import type { Point } from "@/models/shared"; const importDataRecords = async (args: object | null): Promise => { if (!args || !("dataSourceId" in args)) { @@ -17,7 +19,7 @@ const importDataRecords = async (args: object | null): Promise => { const dataRecords = db .selectFrom("dataRecord") - .select(["id", "externalId"]) + .select(["id", "externalId", "json", "geocodeResult", "geocodePoint"]) .where("dataSourceId", "=", dataSourceId) .where("needsImport", "=", true) .stream(); @@ -52,11 +54,27 @@ const importDataRecords = async (args: object | null): Promise => { for await (const batch of batches) { try { + const existingRecords = new Map( + batch.map((r) => [ + r.externalId, + { + json: r.json as Record, + geocodeResult: r.geocodeResult as GeocodeResult | null, + geocodePoint: r.geocodePoint as Point | null, + }, + ]), + ); + const records = await adaptor.fetchByExternalId( batch.map((r) => r.externalId), ); - await importBatch(records, dataSource, columnDefsAccumulator); + await importBatch({ + batch: records, + dataSource, + columnDefsAccumulator, + existingRecords, + }); await db .updateTable("dataRecord") diff --git a/src/server/jobs/importDataSource.ts b/src/server/jobs/importDataSource.ts index d286a85a3..01d997b84 100644 --- a/src/server/jobs/importDataSource.ts +++ b/src/server/jobs/importDataSource.ts @@ -1,3 +1,4 @@ +import { isDeepStrictEqual } from "node:util"; import { DATA_SOURCE_JOB_BATCH_SIZE } from "@/constants"; import { ColumnSemanticType, ColumnType } from "@/models/DataSource"; import { getDataSourceAdaptor } from "@/server/adaptors"; @@ -14,8 +15,10 @@ import { import logger from "@/server/services/logger"; import { getPubSub } from "@/server/services/pubsub"; import { batchAsync } from "@/server/utils"; +import type { GeocodeResult } from "@/models/DataRecord"; import type { ColumnDef, ColumnMetadata } from "@/models/DataSource"; import type { DataSource } from "@/models/DataSource"; +import type { Point } from "@/models/shared"; import type { ExternalRecord } from "@/types"; const importDataSource = async (args: object | null): Promise => { @@ -58,7 +61,7 @@ const importDataSource = async (args: object | null): Promise => { const batches = batchAsync(records, DATA_SOURCE_JOB_BATCH_SIZE); for await (const batch of batches) { - await importBatch(batch, dataSource, columnDefsAccumulator); + await importBatch({ batch, dataSource, columnDefsAccumulator }); count += batch.length; if (total) { const percentComplete = Math.floor((count * 100) / total); @@ -110,25 +113,46 @@ const importDataSource = async (args: object | null): Promise => { return false; }; -export const importBatch = async ( - batch: ExternalRecord[], - dataSource: DataSource, - columnDefsAccumulator: ColumnDef[], -) => { +export const importBatch = async ({ + batch, + dataSource, + columnDefsAccumulator, + existingRecords, +}: { + batch: ExternalRecord[]; + dataSource: DataSource; + columnDefsAccumulator: ColumnDef[]; + existingRecords?: Map< + string, + { + json: Record; + geocodeResult: GeocodeResult | null; + geocodePoint: Point | null; + } + >; +}) => { const naIsNull = Boolean(dataSource.naIsNull); const updatedRecords = await Promise.all( batch.map(async (record) => { const { columnDefs, typedJson } = typeJson(record.json, naIsNull); addColumnDefs(columnDefsAccumulator, columnDefs); - const geocodeResult = await geocodeRecord( - record, - dataSource.geocodingConfig, - ); + + const existing = existingRecords?.get(record.externalId); + const jsonUnchanged = + existing && isDeepStrictEqual(typedJson, existing.json); + + let geocodeResult: GeocodeResult | null; + if (jsonUnchanged) { + geocodeResult = existing.geocodeResult; + } else { + geocodeResult = await geocodeRecord(record, dataSource.geocodingConfig); + } + return { externalId: record.externalId, json: typedJson, - geocodeResult: geocodeResult, - geocodePoint: geocodeResult?.centralPoint, + geocodeResult, + geocodePoint: geocodeResult?.centralPoint ?? null, dataSourceId: dataSource.id, }; }), @@ -232,7 +256,7 @@ const cleanNumber = (value: string): string => { return value.trim().replace(/%$/, "").replace(/,/g, "").replace(/ /g, ""); }; -const addColumnDefs = ( +export const addColumnDefs = ( columnDefsAccumulator: ColumnDef[], recordColumnDefs: ColumnDef[], ): void => { diff --git a/tests/feature/importDataRecords.test.ts b/tests/feature/importDataRecords.test.ts new file mode 100644 index 000000000..e49cb9a76 --- /dev/null +++ b/tests/feature/importDataRecords.test.ts @@ -0,0 +1,239 @@ +import { afterAll, describe, expect, test, vi } from "vitest"; +import { AreaSetCode } from "@/models/AreaSet"; +import { + DataSourceRecordType, + DataSourceType, + GeocodingType, +} from "@/models/DataSource"; +import { FilterType } from "@/models/MapView"; +import importDataRecords from "@/server/jobs/importDataRecords"; +import importDataSource from "@/server/jobs/importDataSource"; +import * as geocodeModule from "@/server/mapping/geocode"; +import { + markDataRecordsAsDirty, + streamDataRecordsByDataSource, +} from "@/server/repositories/DataRecord"; +import { + createDataSource, + deleteDataSource, +} from "@/server/repositories/DataSource"; +import { upsertOrganisation } from "@/server/repositories/Organisation"; +import { db } from "@/server/services/database"; + +const getRecords = async (dataSourceId: string) => { + const stream = streamDataRecordsByDataSource( + dataSourceId, + { type: FilterType.MULTI }, + "", + ); + const records = []; + for await (const record of stream) { + records.push({ + externalId: record.externalId, + json: record.json, + geocodeResult: record.geocodeResult, + geocodePoint: record.geocodePoint, + }); + } + records.sort((a, b) => a.externalId.localeCompare(b.externalId)); + return records; +}; + +describe("importDataRecords", () => { + const toRemove: string[] = []; + + test("skips geocoding when record JSON is unchanged", async () => { + const org = await upsertOrganisation({ + name: "Test importDataRecords Org", + }); + + const dataSource = await createDataSource({ + name: "Test importDataRecords CSV", + autoEnrich: false, + autoImport: false, + recordType: DataSourceRecordType.Members, + config: { + type: DataSourceType.CSV, + url: "file://tests/resources/members.csv", + }, + columnDefs: [], + columnMetadata: [], + columnRoles: { nameColumns: [] }, + enrichments: [], + geocodingConfig: { + type: GeocodingType.Code, + column: "Postcode", + areaSetCode: AreaSetCode.PC, + }, + organisationId: org.id, + public: false, + }); + + toRemove.push(dataSource.id); + + // 1. Initial full import — geocodes all records + await importDataSource({ dataSourceId: dataSource.id }); + const recordsAfterFirstImport = await getRecords(dataSource.id); + + // Sanity check: records were geocoded + const geocodedRecords = recordsAfterFirstImport.filter( + (r) => r.geocodeResult !== null, + ); + expect(geocodedRecords.length).toBeGreaterThan(0); + + // 2. Mark all records as needing import (simulates webhook) + const allExternalIds = recordsAfterFirstImport.map((r) => r.externalId); + await markDataRecordsAsDirty(allExternalIds, dataSource.id); + + // 3. Spy on geocodeRecord to count calls + const geocodeSpy = vi.spyOn(geocodeModule, "geocodeRecord"); + + // 4. Run importDataRecords — data hasn't changed, so geocode should be skipped + await importDataRecords({ dataSourceId: dataSource.id }); + + // 5. Verify geocodeRecord was never called + expect(geocodeSpy).not.toHaveBeenCalled(); + + // 6. Verify records still have their geocode results + const recordsAfterSecondImport = await getRecords(dataSource.id); + expect(recordsAfterSecondImport).toEqual(recordsAfterFirstImport); + + geocodeSpy.mockRestore(); + }); + + test("re-geocodes when record JSON has changed", async () => { + const org = await upsertOrganisation({ + name: "Test importDataRecords Changed Org", + }); + + const dataSource = await createDataSource({ + name: "Test importDataRecords Changed CSV", + autoEnrich: false, + autoImport: false, + recordType: DataSourceRecordType.Members, + config: { + type: DataSourceType.CSV, + url: "file://tests/resources/members.csv", + }, + columnDefs: [], + columnMetadata: [], + columnRoles: { nameColumns: [] }, + enrichments: [], + geocodingConfig: { + type: GeocodingType.Code, + column: "Postcode", + areaSetCode: AreaSetCode.PC, + }, + organisationId: org.id, + public: false, + }); + + toRemove.push(dataSource.id); + + // 1. Initial full import + await importDataSource({ dataSourceId: dataSource.id }); + + // 2. Manually alter a record's JSON in the DB to simulate a change + // (the CSV will return the original data, which differs from the altered DB data) + await db + .updateTable("dataRecord") + .set({ + json: { Name: "ALTERED", Postcode: "TN4 0PP", Notes: "" }, + }) + .where("dataSourceId", "=", dataSource.id) + .where("externalId", "=", "1") + .execute(); + + // 3. Mark record 1 as needing import + await markDataRecordsAsDirty(["1"], dataSource.id); + + // 4. Spy on geocodeRecord + const geocodeSpy = vi.spyOn(geocodeModule, "geocodeRecord"); + + // 5. Run importDataRecords — record 1's JSON has changed, so it should re-geocode + await importDataRecords({ dataSourceId: dataSource.id }); + + // 6. Verify geocodeRecord was called exactly once (for record 1) + expect(geocodeSpy).toHaveBeenCalledTimes(1); + + geocodeSpy.mockRestore(); + }); + + test("handles deep equality correctly when JSON key order differs", async () => { + const org = await upsertOrganisation({ + name: "Test importDataRecords Key Order Org", + }); + + const dataSource = await createDataSource({ + name: "Test importDataRecords Key Order CSV", + autoEnrich: false, + autoImport: false, + recordType: DataSourceRecordType.Members, + config: { + type: DataSourceType.CSV, + url: "file://tests/resources/members.csv", + }, + columnDefs: [], + columnMetadata: [], + columnRoles: { nameColumns: [] }, + enrichments: [], + geocodingConfig: { + type: GeocodingType.Code, + column: "Postcode", + areaSetCode: AreaSetCode.PC, + }, + organisationId: org.id, + public: false, + }); + + toRemove.push(dataSource.id); + + // 1. Initial full import + await importDataSource({ dataSourceId: dataSource.id }); + const recordsAfterFirstImport = await getRecords(dataSource.id); + + // 2. Manually reorder JSON keys in the DB for record 1 + // Same values, different key order — should still be treated as unchanged + const record1 = recordsAfterFirstImport.find((r) => r.externalId === "1"); + if (!record1) throw new Error("record1 not found"); + const reorderedJson = { + Notes: record1.json.Notes, + Postcode: record1.json.Postcode, + Name: record1.json.Name, + }; + + await db + .updateTable("dataRecord") + .set({ json: reorderedJson }) + .where("dataSourceId", "=", dataSource.id) + .where("externalId", "=", "1") + .execute(); + + // 3. Mark record 1 as needing import + await markDataRecordsAsDirty(["1"], dataSource.id); + + // 4. Spy on geocodeRecord + const geocodeSpy = vi.spyOn(geocodeModule, "geocodeRecord"); + + // 5. Run importDataRecords — keys are reordered but values are the same + await importDataRecords({ dataSourceId: dataSource.id }); + + // 6. isDeepStrictEqual ignores key order, so this should NOT re-geocode + expect(geocodeSpy).not.toHaveBeenCalled(); + + // 7. Geocode results should be preserved + const recordsAfterSecondImport = await getRecords(dataSource.id); + const record1After = recordsAfterSecondImport.find( + (r) => r.externalId === "1", + ); + expect(record1After?.geocodeResult).toEqual(record1?.geocodeResult); + + geocodeSpy.mockRestore(); + }); + + afterAll(async () => { + for (const id of toRemove) { + await deleteDataSource(id); + } + }); +}, 30000); diff --git a/tests/setup.ts b/tests/setup.ts index ba10a4d08..466eaf02b 100644 --- a/tests/setup.ts +++ b/tests/setup.ts @@ -6,7 +6,6 @@ import { getPubSub } from "@/server/services/pubsub"; import { boss } from "@/server/services/queue"; import { getClient as getRedisClient } from "@/server/services/redis"; import { startPublicTunnel, stopPublicTunnel } from "@/server/services/urls"; -import { runWorker } from "@/server/services/worker"; export async function setup() { // Load sampleAreas.psql into the test database @@ -30,7 +29,6 @@ export async function setup() { // if it does not return an OK response, which causes some tests to fail. let server = null; try { - await runWorker(); await startPublicTunnel("http"); server = http.createServer((req, res) => { res.writeHead(200, { "Content-Type": "text/plain" });