diff --git a/packages/consumer/src/main.ts b/packages/consumer/src/main.ts index 68466b8..84363c1 100644 --- a/packages/consumer/src/main.ts +++ b/packages/consumer/src/main.ts @@ -83,7 +83,7 @@ while (true) { for (const item of items) { const result = exitRequested ? Result.ok('exit' as const) - : await processItem(item.name, item.revId); + : await processItem(item); if (result.isErr()) { logger.error(`packument store failed`, { diff --git a/packages/consumer/src/process.ts b/packages/consumer/src/process.ts index 3bacbd5..eca0562 100644 --- a/packages/consumer/src/process.ts +++ b/packages/consumer/src/process.ts @@ -1,14 +1,67 @@ import { processPackument } from './shared/packument'; import { processVersion } from './pkv/version'; import { processPackage } from './pkg/package'; +import { eq, notExists } from 'drizzle-orm'; +import { db } from '@npm.rest/db/server'; import { Result } from 'better-result'; import pLimit from 'p-limit'; +import { + type changeTable, + repositoryTable, + dependencyTable, + packumentTable, + specifierTable, + packageTable, + versionTable, +} from '@npm.rest/db/schema'; -export async function process(name: string, rev: string) { - const packument = await processPackument(name, rev); +export async function process(item: typeof changeTable.$inferSelect) { + if (item.deleted) { + await db.transaction(async (tx) => { + // Delete package (cascades to versions, dependencies, and publint) + await tx + .delete(packageTable) + .where(eq(packageTable.name, item.name)); + + await tx + .delete(packumentTable) + .where(eq(packumentTable.id, item.name)); + + // Clean up orphaned specifiers (no longer referenced by any dependency) + await tx + .delete(specifierTable) + .where( + notExists( + tx + .select({ id: dependencyTable.specifierId }) + .from(dependencyTable) + .where( + eq( + dependencyTable.specifierId, + specifierTable.id, + ), + ), + ), + ); + + // Clean up orphaned repositories (no longer referenced by any version) + await tx + .delete(repositoryTable) + .where( + notExists( + tx + .select({ id: versionTable.id }) + .from(versionTable) + .where(eq(versionTable.repo, repositoryTable.id)), + ), + ); + }); + } + + const packument = await processPackument(item.name, item.revId); if (packument.isErr()) return packument; - const packageId = await processPackage(packument.value, rev); + const packageId = await processPackage(packument.value, item.revId); if (packageId.isErr()) return packageId; if (packument.value.versions) { @@ -22,7 +75,7 @@ export async function process(name: string, rev: string) { packageId.value, packument.value, pkv, - rev, + item.revId, ); }); }), diff --git a/packages/consumer/test/pkv/types.test.ts b/packages/consumer/test/pkv/types.test.ts index 9364436..e148968 100644 --- a/packages/consumer/test/pkv/types.test.ts +++ b/packages/consumer/test/pkv/types.test.ts @@ -26,14 +26,6 @@ const createPackument = (name: string) => ({ }, }); -vi.mock('@npm.rest/db/server', async () => { - const { drizzle } = await import('drizzle-orm/postgres-js'); - - return { - db: drizzle.mock({}), - }; -}); - vi.mock(import('../../src/shared/logger'), async () => { const { getLogger } = await import('@logtape/logtape'); diff --git a/packages/consumer/test/process.test.ts b/packages/consumer/test/process.test.ts new file mode 100644 index 0000000..7109574 --- /dev/null +++ b/packages/consumer/test/process.test.ts @@ -0,0 +1,507 @@ +import '@npm.rest/db/mock'; +import { describe, expect, it, vi } from 'vitest'; +import { generateId } from '@npm.rest/db/id'; +import { db } from '@npm.rest/db/server'; +import { process } from '../src/process'; +import { eq } from 'drizzle-orm'; +import { + type changeTable, + dependencyTable, + packageTable, + packumentTable, + repositoryTable, + specifierTable, + versionTable, +} from '@npm.rest/db/schema'; + +vi.mock(import('../src/shared/logger'), async () => { + const { getLogger } = await import('@logtape/logtape'); + + return { + logger: getLogger('test'), + }; +}); + +vi.mock(import('../src/shared/packument'), () => ({ + processPackument: vi.fn(), +})); + +vi.mock(import('../src/pkg/package'), () => ({ + processPackage: vi.fn(), +})); + +vi.mock(import('../src/pkv/version'), () => ({ + processVersion: vi.fn(), +})); + +describe('process() deletion', () => { + it('deletes package and packument when item.deleted is true', async () => { + const packageId = generateId('pkg'); + const packageName = `test-package-${crypto.randomUUID()}`; + + await db.insert(packageTable).values({ + id: packageId, + name: packageName, + revId: '1-abc', + createdAt: new Date(), + npmUpdatedAt: new Date(), + }); + + await db.insert(packumentTable).values({ + id: packageName, + revId: '1-abc', + data: { name: packageName }, + }); + + const change: typeof changeTable.$inferSelect = { + name: packageName, + revId: '2-def', + state: 'pending', + deleted: true, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await process(change); + + const packages = await db + .select() + .from(packageTable) + .where(eq(packageTable.name, packageName)); + expect(packages).toHaveLength(0); + + const packuments = await db + .select() + .from(packumentTable) + .where(eq(packumentTable.id, packageName)); + expect(packuments).toHaveLength(0); + }); + + it('cascades deletion from package to versions', async () => { + const packageId = generateId('pkg'); + const versionId = generateId('pkv'); + const packageName = `test-package-${crypto.randomUUID()}`; + + await db.insert(packageTable).values({ + id: packageId, + name: packageName, + revId: '1-abc', + createdAt: new Date(), + npmUpdatedAt: new Date(), + }); + + await db.insert(versionTable).values({ + id: versionId, + packageId: packageId, + version: '1.0.0', + unpackedSize: 1000, + packedSize: 500, + types: 'none', + moduleType: 'esm', + publishedAt: new Date(), + }); + + const change: typeof changeTable.$inferSelect = { + name: packageName, + revId: '2-def', + state: 'pending', + deleted: true, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await process(change); + + const versions = await db + .select() + .from(versionTable) + .where(eq(versionTable.packageId, packageId)); + expect(versions).toHaveLength(0); + }); + + it('cleans up orphaned specifiers after package deletion', async () => { + const packageId = generateId('pkg'); + const versionId = generateId('pkv'); + const specifierId = generateId('spc'); + const packageName = `test-package-${crypto.randomUUID()}`; + + // Create package and version + await db.insert(packageTable).values({ + id: packageId, + name: packageName, + revId: '1-abc', + createdAt: new Date(), + npmUpdatedAt: new Date(), + }); + + await db.insert(versionTable).values({ + id: versionId, + packageId: packageId, + version: '1.0.0', + unpackedSize: 1000, + packedSize: 500, + types: 'none', + moduleType: 'esm', + publishedAt: new Date(), + }); + + // Create specifier and dependency + await db.insert(specifierTable).values({ + id: specifierId, + name: 'lodash', + specifier: '^4.17.21', + type: 'range', + }); + + await db.insert(dependencyTable).values({ + versionId: versionId, + specifierId: specifierId, + type: 'prod', + optional: false, + }); + + const change: typeof changeTable.$inferSelect = { + name: packageName, + revId: '2-def', + state: 'pending', + deleted: true, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await process(change); + + // Check that orphaned specifier was deleted + const specifiers = await db + .select() + .from(specifierTable) + .where(eq(specifierTable.id, specifierId)); + expect(specifiers).toHaveLength(0); + }); + + it('does not delete specifiers still referenced by other packages', async () => { + const package1Id = generateId('pkg'); + const package2Id = generateId('pkg'); + const version1Id = generateId('pkv'); + const version2Id = generateId('pkv'); + const specifierId = generateId('spc'); + const package1Name = `test-package-1-${crypto.randomUUID()}`; + const package2Name = `test-package-2-${crypto.randomUUID()}`; + + // Create two packages with versions + await db.insert(packageTable).values([ + { + id: package1Id, + name: package1Name, + revId: '1-abc', + createdAt: new Date(), + npmUpdatedAt: new Date(), + }, + { + id: package2Id, + name: package2Name, + revId: '1-def', + createdAt: new Date(), + npmUpdatedAt: new Date(), + }, + ]); + + await db.insert(versionTable).values([ + { + id: version1Id, + packageId: package1Id, + version: '1.0.0', + unpackedSize: 1000, + packedSize: 500, + types: 'none', + moduleType: 'esm', + publishedAt: new Date(), + }, + { + id: version2Id, + packageId: package2Id, + version: '1.0.0', + unpackedSize: 1000, + packedSize: 500, + types: 'none', + moduleType: 'esm', + publishedAt: new Date(), + }, + ]); + + // Create shared specifier + await db.insert(specifierTable).values({ + id: specifierId, + name: 'lodash', + specifier: '^4.17.21', + type: 'range', + }); + + // Both versions depend on the same specifier + await db.insert(dependencyTable).values([ + { + versionId: version1Id, + specifierId: specifierId, + type: 'prod', + optional: false, + }, + { + versionId: version2Id, + specifierId: specifierId, + type: 'prod', + optional: false, + }, + ]); + + // Delete first package + const change: typeof changeTable.$inferSelect = { + name: package1Name, + revId: '2-ghi', + state: 'pending', + deleted: true, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await process(change); + + // Specifier should still exist because package2 still references it + const specifiers = await db + .select() + .from(specifierTable) + .where(eq(specifierTable.id, specifierId)); + expect(specifiers).toHaveLength(1); + }); + + it('cleans up orphaned repositories after package deletion', async () => { + const packageId = generateId('pkg'); + const versionId = generateId('pkv'); + const repoId = generateId('repo'); + const packageName = `test-package-${crypto.randomUUID()}`; + + // Create repository + await db.insert(repositoryTable).values({ + id: repoId, + url: 'https://github.com/test/repo', + lastFetched: new Date(), + }); + + // Create package and version referencing the repo + await db.insert(packageTable).values({ + id: packageId, + name: packageName, + revId: '1-abc', + createdAt: new Date(), + npmUpdatedAt: new Date(), + }); + + await db.insert(versionTable).values({ + id: versionId, + packageId: packageId, + version: '1.0.0', + unpackedSize: 1000, + packedSize: 500, + types: 'none', + moduleType: 'esm', + repo: repoId, + publishedAt: new Date(), + }); + + const change: typeof changeTable.$inferSelect = { + name: packageName, + revId: '2-def', + state: 'pending', + deleted: true, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await process(change); + + // Check that orphaned repository was deleted + const repos = await db + .select() + .from(repositoryTable) + .where(eq(repositoryTable.id, repoId)); + expect(repos).toHaveLength(0); + }); + + it('does not delete repositories still referenced by other versions', async () => { + const package1Id = generateId('pkg'); + const package2Id = generateId('pkg'); + const version1Id = generateId('pkv'); + const version2Id = generateId('pkv'); + const repoId = generateId('repo'); + const package1Name = `test-package-1-${crypto.randomUUID()}`; + const package2Name = `test-package-2-${crypto.randomUUID()}`; + + // Create shared repository + await db.insert(repositoryTable).values({ + id: repoId, + url: 'https://github.com/test/monorepo', + lastFetched: new Date(), + }); + + // Create two packages with versions + await db.insert(packageTable).values([ + { + id: package1Id, + name: package1Name, + revId: '1-abc', + createdAt: new Date(), + npmUpdatedAt: new Date(), + }, + { + id: package2Id, + name: package2Name, + revId: '1-def', + createdAt: new Date(), + npmUpdatedAt: new Date(), + }, + ]); + + // Both versions reference the same repo + await db.insert(versionTable).values([ + { + id: version1Id, + packageId: package1Id, + version: '1.0.0', + unpackedSize: 1000, + packedSize: 500, + types: 'none', + moduleType: 'esm', + repo: repoId, + publishedAt: new Date(), + }, + { + id: version2Id, + packageId: package2Id, + version: '1.0.0', + unpackedSize: 1000, + packedSize: 500, + types: 'none', + moduleType: 'esm', + repo: repoId, + publishedAt: new Date(), + }, + ]); + + // Delete first package + const change: typeof changeTable.$inferSelect = { + name: package1Name, + revId: '2-ghi', + state: 'pending', + deleted: true, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await process(change); + + // Repository should still exist because package2 still references it + const repos = await db + .select() + .from(repositoryTable) + .where(eq(repositoryTable.id, repoId)); + expect(repos).toHaveLength(1); + }); + + it('performs all deletions in a transaction', async () => { + const packageId = generateId('pkg'); + const versionId = generateId('pkv'); + const specifierId = generateId('spc'); + const repoId = generateId('repo'); + const packageName = `test-package-${crypto.randomUUID()}`; + + // Create repository + await db.insert(repositoryTable).values({ + id: repoId, + url: 'https://github.com/test/repo', + lastFetched: new Date(), + }); + + // Create package and version + await db.insert(packageTable).values({ + id: packageId, + name: packageName, + revId: '1-abc', + createdAt: new Date(), + npmUpdatedAt: new Date(), + }); + + await db.insert(versionTable).values({ + id: versionId, + packageId: packageId, + version: '1.0.0', + unpackedSize: 1000, + packedSize: 500, + types: 'none', + moduleType: 'esm', + repo: repoId, + publishedAt: new Date(), + }); + + // Create specifier and dependency + await db.insert(specifierTable).values({ + id: specifierId, + name: 'lodash', + specifier: '^4.17.21', + type: 'range', + }); + + await db.insert(dependencyTable).values({ + versionId: versionId, + specifierId: specifierId, + type: 'prod', + optional: false, + }); + + await db.insert(packumentTable).values({ + id: packageName, + revId: '1-abc', + data: { name: packageName }, + }); + + const change: typeof changeTable.$inferSelect = { + name: packageName, + revId: '2-def', + state: 'pending', + deleted: true, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await process(change); + + // Verify everything was deleted + const packages = await db + .select() + .from(packageTable) + .where(eq(packageTable.name, packageName)); + expect(packages).toHaveLength(0); + + const versions = await db + .select() + .from(versionTable) + .where(eq(versionTable.id, versionId)); + expect(versions).toHaveLength(0); + + const specifiers = await db + .select() + .from(specifierTable) + .where(eq(specifierTable.id, specifierId)); + expect(specifiers).toHaveLength(0); + + const repos = await db + .select() + .from(repositoryTable) + .where(eq(repositoryTable.id, repoId)); + expect(repos).toHaveLength(0); + + const packuments = await db + .select() + .from(packumentTable) + .where(eq(packumentTable.id, packageName)); + expect(packuments).toHaveLength(0); + }); +}); diff --git a/packages/db/.drizzle/0007_sad_alex_power.sql b/packages/db/.drizzle/0007_sad_alex_power.sql new file mode 100644 index 0000000..309c7b7 --- /dev/null +++ b/packages/db/.drizzle/0007_sad_alex_power.sql @@ -0,0 +1,2 @@ +ALTER TYPE "core"."change_state" ADD VALUE 'skipped' BEFORE 'completed';--> statement-breakpoint +ALTER TABLE "core"."change" ADD COLUMN "deleted" boolean DEFAULT false NOT NULL; \ No newline at end of file diff --git a/packages/db/.drizzle/meta/0007_snapshot.json b/packages/db/.drizzle/meta/0007_snapshot.json new file mode 100644 index 0000000..f704b9c --- /dev/null +++ b/packages/db/.drizzle/meta/0007_snapshot.json @@ -0,0 +1,826 @@ +{ + "id": "f51ff42d-0950-4b0f-bade-929f146653ce", + "prevId": "2fd5b7bd-f7bf-4589-8031-fd6a6c57dbb2", + "version": "7", + "dialect": "postgresql", + "tables": { + "core.change": { + "name": "change", + "schema": "core", + "columns": { + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "rev_id": { + "name": "rev_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "state": { + "name": "state", + "type": "change_state", + "typeSchema": "core", + "primaryKey": false, + "notNull": true + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "change_state_idx": { + "name": "change_state_idx", + "columns": [ + { + "expression": "state", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": { + "change_name_rev_id_pk": { + "name": "change_name_rev_id_pk", + "columns": [ + "name", + "rev_id" + ] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "core.dependency": { + "name": "dependency", + "schema": "core", + "columns": { + "version_id": { + "name": "version_id", + "type": "varchar(40)", + "primaryKey": false, + "notNull": true + }, + "specifier_id": { + "name": "specifier_id", + "type": "varchar(40)", + "primaryKey": false, + "notNull": true + }, + "type": { + "name": "type", + "type": "dependency_type", + "typeSchema": "core", + "primaryKey": false, + "notNull": true + }, + "optional": { + "name": "optional", + "type": "boolean", + "primaryKey": false, + "notNull": true + }, + "alias": { + "name": "alias", + "type": "text", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "dependency_specifier_idx": { + "name": "dependency_specifier_idx", + "columns": [ + { + "expression": "specifier_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "dependency_version_id_version_id_fk": { + "name": "dependency_version_id_version_id_fk", + "tableFrom": "dependency", + "tableTo": "version", + "schemaTo": "core", + "columnsFrom": [ + "version_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "dependency_specifier_id_specifier_id_fk": { + "name": "dependency_specifier_id_specifier_id_fk", + "tableFrom": "dependency", + "tableTo": "specifier", + "schemaTo": "core", + "columnsFrom": [ + "specifier_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "dependency_version_id_specifier_id_type_pk": { + "name": "dependency_version_id_specifier_id_type_pk", + "columns": [ + "version_id", + "specifier_id", + "type" + ] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "core.package": { + "name": "package", + "schema": "core", + "columns": { + "id": { + "name": "id", + "type": "varchar(40)", + "primaryKey": true, + "notNull": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "rev_id": { + "name": "rev_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "dist_tags": { + "name": "dist_tags", + "type": "jsonb", + "primaryKey": false, + "notNull": true, + "default": "'{}'::jsonb" + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true + }, + "npm_updated_at": { + "name": "npm_updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "package_name_unique_idx": { + "name": "package_name_unique_idx", + "columns": [ + { + "expression": "name", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": { + "package_resource_id": { + "name": "package_resource_id", + "value": "\"core\".\"package\".\"id\" LIKE 'pkg_%'" + } + }, + "isRLSEnabled": false + }, + "internal.packument": { + "name": "packument", + "schema": "internal", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "rev_id": { + "name": "rev_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "data": { + "name": "data", + "type": "jsonb", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "packument_data_gin_idx": { + "name": "packument_data_gin_idx", + "columns": [ + { + "expression": "data", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "gin", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "core.publint": { + "name": "publint", + "schema": "core", + "columns": { + "id": { + "name": "id", + "type": "varchar(41)", + "primaryKey": true, + "notNull": true + }, + "version_id": { + "name": "version_id", + "type": "varchar(40)", + "primaryKey": false, + "notNull": true + }, + "publint_version": { + "name": "publint_version", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "messages": { + "name": "messages", + "type": "jsonb", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "publint_version_unique_idx": { + "name": "publint_version_unique_idx", + "columns": [ + { + "expression": "version_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "publint_version_id_version_id_fk": { + "name": "publint_version_id_version_id_fk", + "tableFrom": "publint", + "tableTo": "version", + "schemaTo": "core", + "columnsFrom": [ + "version_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": { + "publint_resource_id": { + "name": "publint_resource_id", + "value": "\"core\".\"publint\".\"id\" LIKE 'publ_%'" + }, + "publint_version_resource_id": { + "name": "publint_version_resource_id", + "value": "\"core\".\"publint\".\"version_id\" LIKE 'pkv_%'" + } + }, + "isRLSEnabled": false + }, + "core.repository": { + "name": "repository", + "schema": "core", + "columns": { + "id": { + "name": "id", + "type": "varchar(41)", + "primaryKey": true, + "notNull": true + }, + "url": { + "name": "url", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "stars": { + "name": "stars", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "forks": { + "name": "forks", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "archived": { + "name": "archived", + "type": "boolean", + "primaryKey": false, + "notNull": false + }, + "languages": { + "name": "languages", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "last_fetched": { + "name": "last_fetched", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "repository_url_unique_idx": { + "name": "repository_url_unique_idx", + "columns": [ + { + "expression": "url", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": { + "repository_resource_id": { + "name": "repository_resource_id", + "value": "\"core\".\"repository\".\"id\" LIKE 'repo_%'" + } + }, + "isRLSEnabled": false + }, + "core.specifier": { + "name": "specifier", + "schema": "core", + "columns": { + "id": { + "name": "id", + "type": "varchar(40)", + "primaryKey": true, + "notNull": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "specifier": { + "name": "specifier", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "type": { + "name": "type", + "type": "specifier_type", + "typeSchema": "core", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "specifier_name_specifier_idx": { + "name": "specifier_name_specifier_idx", + "columns": [ + { + "expression": "name", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "specifier", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + }, + "specifier_name_idx": { + "name": "specifier_name_idx", + "columns": [ + { + "expression": "name", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": { + "specifier_resource_id": { + "name": "specifier_resource_id", + "value": "\"core\".\"specifier\".\"id\" LIKE 'spc_%'" + } + }, + "isRLSEnabled": false + }, + "internal.state": { + "name": "state", + "schema": "internal", + "columns": { + "key": { + "name": "key", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "value": { + "name": "value", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "core.version": { + "name": "version", + "schema": "core", + "columns": { + "id": { + "name": "id", + "type": "varchar(40)", + "primaryKey": true, + "notNull": true + }, + "package_id": { + "name": "package_id", + "type": "varchar(40)", + "primaryKey": false, + "notNull": true + }, + "version": { + "name": "version", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "homepage": { + "name": "homepage", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "deprecated": { + "name": "deprecated", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "license": { + "name": "license", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "unpacked_size": { + "name": "unpacked_size", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "packed_size": { + "name": "packed_size", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "types": { + "name": "types", + "type": "types_state", + "typeSchema": "core", + "primaryKey": false, + "notNull": true + }, + "module_type": { + "name": "module_type", + "type": "module_type", + "typeSchema": "core", + "primaryKey": false, + "notNull": true + }, + "keywords": { + "name": "keywords", + "type": "text[]", + "primaryKey": false, + "notNull": false + }, + "repo": { + "name": "repo", + "type": "varchar(41)", + "primaryKey": false, + "notNull": false + }, + "repo_directory": { + "name": "repo_directory", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "repo_branch": { + "name": "repo_branch", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "published_at": { + "name": "published_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "version_package_id_version_unique_idx": { + "name": "version_package_id_version_unique_idx", + "columns": [ + { + "expression": "package_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "version", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "version_package_id_package_id_fk": { + "name": "version_package_id_package_id_fk", + "tableFrom": "version", + "tableTo": "package", + "schemaTo": "core", + "columnsFrom": [ + "package_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "version_repo_repository_id_fk": { + "name": "version_repo_repository_id_fk", + "tableFrom": "version", + "tableTo": "repository", + "schemaTo": "core", + "columnsFrom": [ + "repo" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": { + "version_resource_id": { + "name": "version_resource_id", + "value": "\"core\".\"version\".\"id\" LIKE 'pkv_%'" + }, + "version_package_resource_id": { + "name": "version_package_resource_id", + "value": "\"core\".\"version\".\"package_id\" LIKE 'pkg_%'" + } + }, + "isRLSEnabled": false + } + }, + "enums": { + "core.change_state": { + "name": "change_state", + "schema": "core", + "values": [ + "pending", + "processing", + "failed", + "skipped", + "completed" + ] + }, + "core.dependency_type": { + "name": "dependency_type", + "schema": "core", + "values": [ + "prod", + "dev", + "peer" + ] + }, + "core.module_type": { + "name": "module_type", + "schema": "core", + "values": [ + "cjs", + "esm", + "dual", + "faux", + "dts", + "unknown" + ] + }, + "core.specifier_type": { + "name": "specifier_type", + "schema": "core", + "values": [ + "git", + "tag", + "version", + "range", + "file", + "directory", + "remote" + ] + }, + "core.types_state": { + "name": "types_state", + "schema": "core", + "values": [ + "definitely-typed", + "built-in", + "none" + ] + } + }, + "schemas": { + "core": "core", + "internal": "internal" + }, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} \ No newline at end of file diff --git a/packages/db/.drizzle/meta/_journal.json b/packages/db/.drizzle/meta/_journal.json index b3995ed..e32828e 100644 --- a/packages/db/.drizzle/meta/_journal.json +++ b/packages/db/.drizzle/meta/_journal.json @@ -50,6 +50,13 @@ "when": 1770163334699, "tag": "0006_overrated_outlaw_kid", "breakpoints": true + }, + { + "idx": 7, + "version": "7", + "when": 1770429383496, + "tag": "0007_sad_alex_power", + "breakpoints": true } ] } \ No newline at end of file diff --git a/packages/db/src/schema.ts b/packages/db/src/schema.ts index fdc82e8..a4fc995 100644 --- a/packages/db/src/schema.ts +++ b/packages/db/src/schema.ts @@ -69,6 +69,7 @@ export const changeState = coreSchema.enum('change_state', [ 'pending', 'processing', 'failed', + 'skipped', 'completed', ]); @@ -78,6 +79,7 @@ export const changeTable = coreSchema.table( name: text().notNull(), revId: text().notNull(), state: changeState().notNull(), + deleted: boolean().notNull().default(false), createdAt: timestamp().defaultNow().notNull(), updatedAt: timestamp().defaultNow().notNull(), }, diff --git a/packages/replication/src/changes.ts b/packages/replication/src/changes.ts index 27b73d0..0d2d2a8 100644 --- a/packages/replication/src/changes.ts +++ b/packages/replication/src/changes.ts @@ -1,8 +1,8 @@ -import { packumentTable, changeTable } from '@npm.rest/db/schema'; +import { changeTable } from '@npm.rest/db/schema'; import { setTimeout } from 'node:timers/promises'; +import { and, eq, not, or } from 'drizzle-orm'; import { db } from '@npm.rest/db/server'; import { logger, seq } from './shared'; -import { eq } from 'drizzle-orm'; import { ofetch } from 'ofetch'; interface ChangeResult { @@ -34,37 +34,52 @@ export async function watchChanges() { }, }); - const changes: (typeof changeTable.$inferInsert)[] = []; + const changes = response.results.map( + (change): typeof changeTable.$inferInsert => ({ + name: change.id, + revId: change.changes[0].rev, + state: 'pending', + deleted: change.deleted, + }), + ); - for (const change of response.results) { - if (change.deleted) { - await db - .delete(packumentTable) - .where(eq(packumentTable.id, change.id)); - } else { - changes.push({ - name: change.id, - revId: change.changes[0].rev, - state: 'pending', - }); - } - } + const deletions = changes.filter((change) => change.deleted); if (changes.length) { - await db - .insert(changeTable) - .values(changes) - .onConflictDoUpdate({ - target: [changeTable.name, changeTable.revId], - set: { updatedAt: new Date() }, - }); + await db.transaction(async (tx) => { + await tx + .insert(changeTable) + .values(changes) + .onConflictDoUpdate({ + target: [changeTable.name, changeTable.revId], + set: { updatedAt: new Date() }, + }); + + for (const deletion of deletions) { + await tx + .update(changeTable) + .set({ state: 'skipped', updatedAt: new Date() }) + .where( + and( + eq(changeTable.name, deletion.name), + or( + eq(changeTable.state, 'failed'), + and( + eq(changeTable.state, 'pending'), + not(eq(changeTable.deleted, true)), + ), + ), + ), + ); + } + }); } logger.debug(`changes ${response.results.length}`, { results_len: response.results.length, last_seq: response.last_seq, change_count: changes.length, - deletion_count: response.results.length - changes.length, + deletion_count: deletions.length, }); await seq.set({ last_seq: response.last_seq });