Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/server/adaptors/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,21 @@ export class CSVAdaptor implements DataSourceAdaptor {
return null;
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
fetchByExternalId(externalIds: string[]): Promise<ExternalRecord[]> {
throw new Error("Method not implemented.");
async fetchByExternalId(externalIds: string[]): Promise<ExternalRecord[]> {
const idSet = new Set(externalIds);
const results: ExternalRecord[] = [];
const content = await this.createReadStream();
const parser = content.pipe(parse({ columns: true }));
let row = 1;
for await (const record of parser) {
if (Object.keys(record).length) {
if (idSet.has(String(row))) {
results.push({ externalId: String(row), json: record });
}
row++;
}
}
return results;
}

removeDevWebhooks(): Promise<void> {
Expand Down
22 changes: 20 additions & 2 deletions src/server/jobs/importDataRecords.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { db } from "@/server/services/database";
import logger from "@/server/services/logger";
import { batchAsync } from "../utils";
import { importBatch, inferColumnSemanticTypes } from "./importDataSource";
import type { GeocodeResult } from "@/models/DataRecord";
import type { Point } from "@/models/shared";

const importDataRecords = async (args: object | null): Promise<boolean> => {
if (!args || !("dataSourceId" in args)) {
Expand All @@ -17,7 +19,7 @@ const importDataRecords = async (args: object | null): Promise<boolean> => {

const dataRecords = db
.selectFrom("dataRecord")
.select(["id", "externalId"])
.select(["id", "externalId", "json", "geocodeResult", "geocodePoint"])
.where("dataSourceId", "=", dataSourceId)
Comment on lines 20 to 23
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

geocodePoint is selected from the DB but not used anywhere (only json/geocodeResult are consulted for the skip logic, and geocodePoint isn't read). This adds extra DB payload (including a PostGIS geography) with no effect. Either drop geocodePoint from the select, or actually use it when jsonUnchanged to preserve the existing point explicitly.

Copilot uses AI. Check for mistakes.
.where("needsImport", "=", true)
.stream();
Expand Down Expand Up @@ -52,11 +54,27 @@ const importDataRecords = async (args: object | null): Promise<boolean> => {

for await (const batch of batches) {
try {
const existingRecords = new Map(
batch.map((r) => [
r.externalId,
{
json: r.json as Record<string, unknown>,
geocodeResult: r.geocodeResult as GeocodeResult | null,
geocodePoint: r.geocodePoint as Point | null,
},
]),
);

const records = await adaptor.fetchByExternalId(
batch.map((r) => r.externalId),
);

await importBatch(records, dataSource, columnDefsAccumulator);
await importBatch({
batch: records,
dataSource,
columnDefsAccumulator,
existingRecords,
});

await db
.updateTable("dataRecord")
Expand Down
50 changes: 37 additions & 13 deletions src/server/jobs/importDataSource.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { isDeepStrictEqual } from "node:util";
import { DATA_SOURCE_JOB_BATCH_SIZE } from "@/constants";
import { ColumnSemanticType, ColumnType } from "@/models/DataSource";
import { getDataSourceAdaptor } from "@/server/adaptors";
Expand All @@ -14,8 +15,10 @@ import {
import logger from "@/server/services/logger";
import { getPubSub } from "@/server/services/pubsub";
import { batchAsync } from "@/server/utils";
import type { GeocodeResult } from "@/models/DataRecord";
import type { ColumnDef, ColumnMetadata } from "@/models/DataSource";
import type { DataSource } from "@/models/DataSource";
import type { Point } from "@/models/shared";
import type { ExternalRecord } from "@/types";

const importDataSource = async (args: object | null): Promise<boolean> => {
Expand Down Expand Up @@ -58,7 +61,7 @@ const importDataSource = async (args: object | null): Promise<boolean> => {
const batches = batchAsync(records, DATA_SOURCE_JOB_BATCH_SIZE);

for await (const batch of batches) {
await importBatch(batch, dataSource, columnDefsAccumulator);
await importBatch({ batch, dataSource, columnDefsAccumulator });
count += batch.length;
if (total) {
const percentComplete = Math.floor((count * 100) / total);
Expand Down Expand Up @@ -110,25 +113,46 @@ const importDataSource = async (args: object | null): Promise<boolean> => {
return false;
};

export const importBatch = async (
batch: ExternalRecord[],
dataSource: DataSource,
columnDefsAccumulator: ColumnDef[],
) => {
export const importBatch = async ({
batch,
dataSource,
columnDefsAccumulator,
existingRecords,
}: {
batch: ExternalRecord[];
dataSource: DataSource;
columnDefsAccumulator: ColumnDef[];
existingRecords?: Map<
string,
{
json: Record<string, unknown>;
geocodeResult: GeocodeResult | null;
geocodePoint: Point | null;
}
Comment on lines +128 to +131
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existingRecords value type includes geocodePoint, but importBatch() never reads it (and always derives geocodePoint from geocodeResult). Consider removing geocodePoint from this type unless you plan to use it, to avoid dead fields and make the intent clearer.

Copilot uses AI. Check for mistakes.
>;
}) => {
const naIsNull = Boolean(dataSource.naIsNull);
const updatedRecords = await Promise.all(
batch.map(async (record) => {
const { columnDefs, typedJson } = typeJson(record.json, naIsNull);
addColumnDefs(columnDefsAccumulator, columnDefs);
const geocodeResult = await geocodeRecord(
record,
dataSource.geocodingConfig,
);

const existing = existingRecords?.get(record.externalId);
const jsonUnchanged =
existing && isDeepStrictEqual(typedJson, existing.json);

let geocodeResult: GeocodeResult | null;
if (jsonUnchanged) {
geocodeResult = existing.geocodeResult;
} else {
geocodeResult = await geocodeRecord(record, dataSource.geocodingConfig);
}
Comment on lines +140 to +149
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing can be undefined here, but is accessed in the if (jsonUnchanged) branch (existing.geocodeResult). With strict TypeScript this should be a compile error, and at runtime it can also throw if the map doesn't contain the externalId. Consider folding the check into a single condition (e.g. if (existing && isDeepStrictEqual(...))) so existing is guaranteed defined when reading its fields.

Copilot uses AI. Check for mistakes.

return {
externalId: record.externalId,
json: typedJson,
geocodeResult: geocodeResult,
geocodePoint: geocodeResult?.centralPoint,
geocodeResult,
geocodePoint: geocodeResult?.centralPoint ?? null,
dataSourceId: dataSource.id,
};
}),
Expand Down Expand Up @@ -232,7 +256,7 @@ const cleanNumber = (value: string): string => {
return value.trim().replace(/%$/, "").replace(/,/g, "").replace(/ /g, "");
};

const addColumnDefs = (
export const addColumnDefs = (
columnDefsAccumulator: ColumnDef[],
recordColumnDefs: ColumnDef[],
): void => {
Expand Down
Loading
Loading