Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-migration-prephase-snapshot.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tailor-platform/sdk": patch
---

Fix `tailor deploy` so an intermediate migration's data script can still read fields that a later migration removes. Each migration's pre/post phase now submits the schema state reconstructed up to that migration (initial baseline + diffs through N), instead of the FINAL post-all-migrations schema. Previously, removals declared in later migrations leaked into earlier migrations' pre-phase and caused `field 'X' not found` failures at script execution time.
1 change: 1 addition & 0 deletions packages/sdk/src/cli/commands/deploy/deploy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ function emptyResults(): PlanResults {
workspaceId: "ws",
application: {} as PlanResults["tailorDB"]["context"]["application"],
tailorDBInputs: [],
executorUsedTypes: new Set<string>(),
config: {} as PlanResults["tailorDB"]["context"]["config"],
noSchemaCheck: false,
},
Expand Down
200 changes: 147 additions & 53 deletions packages/sdk/src/cli/commands/deploy/tailordb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import {
createSnapshotType,
getLatestMigrationNumber,
isSnapshotFieldRefOperand,
type SchemaSnapshot,
type SnapshotFieldConfig,
type TailorDBSnapshotType,
type SnapshotRecordPermission,
Expand Down Expand Up @@ -95,7 +96,6 @@ import type {
RemoteSchemaVerificationResult,
} from "@/cli/commands/tailordb/migrate/types";
import type { LoadedConfig } from "@/cli/shared/config-loader";
import type { Executor } from "@/types/executor.generated";
import type { GqlOperations, TailorDBServiceConfig } from "@/types/tailordb.generated";
import type { SetMetadataRequestSchema } from "@tailor-proto/tailor/v1/metadata_pb";

Expand Down Expand Up @@ -457,6 +457,7 @@ export async function applyTailorDB(
// Reset tracking state for this migration run
processedTypes.reset();
deletedResources.reset();
migrationSnapshotCache.reset();

// Step 1: Create/update services once at the beginning (services don't need per-migration handling)
await executeServicesCreation(client, changeSet);
Expand All @@ -477,15 +478,27 @@ export async function applyTailorDB(

for (const migration of pendingMigrations) {
// Pre-migration phase: Create/update types with breaking fields as optional
await executeSingleMigrationPrePhase(client, changeSet, migration);
await executeSingleMigrationPrePhase(
client,
changeSet,
migration,
migrationContext.tailorDBInputs,
migrationContext.executorUsedTypes,
);

// Script execution (only if migrate.ts exists for this migration)
if (migration.hasScript && migrationCtx) {
await executeMigrations(migrationCtx, [migration]);
}

// Post-migration phase: Apply final types (required: true) and deletions
await executeSingleMigrationPostPhase(client, changeSet, migration);
await executeSingleMigrationPostPhase(
client,
changeSet,
migration,
migrationContext.tailorDBInputs,
migrationContext.executorUsedTypes,
);

// Update migration label only after all phases complete successfully
await updateMigrationLabel(
Expand Down Expand Up @@ -678,17 +691,75 @@ const processedTypes = {
},
};

/**
* Snapshot cache for per-migration schema lookups during a single apply run.
*
* Only the initial baseline `0000/schema.json` is stored on disk; later migrations
* ship `diff.json` only. To get the schema state AFTER migration N we replay the
* initial snapshot through all diffs up to N via `reconstructSnapshotFromMigrations`.
* Results are memoized per (namespace, migration number) for the apply run.
*/
const migrationSnapshotCache = {
cache: new Map<string, SchemaSnapshot>(),
reset() {
this.cache.clear();
},
load(migration: PendingMigration): SchemaSnapshot {
const key = `${migration.namespace}/${migration.number}`;
let snapshot = this.cache.get(key);
if (!snapshot) {
const reconstructed = reconstructSnapshotFromMigrations(
migration.migrationsDir,
migration.number,
);
if (!reconstructed) {
throw new Error(
`Cannot reconstruct snapshot for ${migration.namespace} migration ${migration.number}: no migrations found in ${migration.migrationsDir}`,
);
}
snapshot = reconstructed;
this.cache.set(key, snapshot);
}
return snapshot;
},
};

/**
* Build the TailorDBType manifest for `typeName` from migration N's snapshot.
* @param migration - The pending migration whose snapshot to consult
* @param typeName - The type name to look up in the snapshot
* @param tailorDBInputs - Deploy inputs, used to resolve namespace gqlOperations
* @param executorUsedTypes - Types used by executors (drives publishRecordEvents default)
* @returns The manifest, or undefined if `typeName` is not in that snapshot.
*/
function buildSnapshotTypeManifest(
migration: PendingMigration,
typeName: string,
tailorDBInputs: ReadonlyArray<TailorDBDeployInput>,
executorUsedTypes: ReadonlySet<string>,
): MessageInitShape<typeof TailorDBTypeSchema> | undefined {
const snapshot = migrationSnapshotCache.load(migration);
const snapshotType = snapshot.types[typeName];
if (!snapshotType) return undefined;
const input = tailorDBInputs.find((i) => i.namespace === migration.namespace);
return generateTailorDBTypeManifest(snapshotType, executorUsedTypes, input?.config.gqlOperations);
}
Comment on lines +694 to +746
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* Execute pre-migration phase for a single migration
* @param {OperatorClient} client - Operator client instance
* @param {TailorDBChangeSet} changeSet - TailorDB change set
* @param {PendingMigration} migration - Single pending migration
* @param tailorDBInputs - Deploy inputs, used to resolve namespace gqlOperations for the snapshot
* @param executorUsedTypes - Types used by executors (drives publishRecordEvents default)
* @returns {Promise<void>} Promise that resolves when pre-migration phase completes
*/
async function executeSingleMigrationPrePhase(
client: OperatorClient,
changeSet: TailorDBChangeSet,
migration: PendingMigration,
tailorDBInputs: ReadonlyArray<TailorDBDeployInput>,
executorUsedTypes: ReadonlySet<string>,
): Promise<void> {
// Build pre-migration changes map for this single migration. Includes both
// breaking changes (required-add, unique-add, enum value removal) and the
Expand All @@ -697,27 +768,25 @@ async function executeSingleMigrationPrePhase(
const affectedTypes = getAffectedTypeNames(migration);
const createdBeforeMigration = new Set(processedTypes.created);

// Types - create/update only types affected by this migration
await Promise.all([
// Create types that are affected by this migration and haven't been created yet
...changeSet.type.creates
.filter((create) => {
const typeName = create.request.tailordbType?.name;
return typeName && affectedTypes.has(typeName) && !createdBeforeMigration.has(typeName);
})
.map((create) => {
const typeName = create.request.tailordbType?.name;
const snapshotType = typeName
? buildSnapshotTypeManifest(migration, typeName, tailorDBInputs, executorUsedTypes)
: undefined;
if (!snapshotType) return undefined;
if (typeName) processedTypes.created.add(typeName);

const typeChanges = typeName ? preMigrationChanges.get(typeName) : undefined;

if (!typeChanges || typeChanges.size === 0) {
return client.createTailorDBType(create.request);
}

// Clone request to avoid modifying the original changeSet
const clonedRequest = structuredClone(create.request);
if (clonedRequest.tailordbType?.schema?.fields) {
clonedRequest.tailordbType = snapshotType;

const typeChanges = typeName ? preMigrationChanges.get(typeName) : undefined;
if (typeChanges && typeChanges.size > 0 && clonedRequest.tailordbType?.schema?.fields) {
applyPreMigrationFieldAdjustments(clonedRequest.tailordbType.schema.fields, typeChanges);
}

Expand All @@ -731,27 +800,22 @@ async function executeSingleMigrationPrePhase(
})
.map((create) => {
const typeName = create.request.tailordbType?.name;
const snapshotType = typeName
? buildSnapshotTypeManifest(migration, typeName, tailorDBInputs, executorUsedTypes)
: undefined;
if (!snapshotType) return undefined;
if (typeName) processedTypes.updated.add(typeName);

const clonedTypeRequest = structuredClone(snapshotType);
const typeChanges = typeName ? preMigrationChanges.get(typeName) : undefined;

if (!typeChanges || typeChanges.size === 0) {
return client.updateTailorDBType({
workspaceId: create.request.workspaceId,
namespaceName: create.request.namespaceName,
tailordbType: create.request.tailordbType,
});
}

const clonedRequest = structuredClone(create.request);
if (clonedRequest.tailordbType?.schema?.fields) {
applyPreMigrationFieldAdjustments(clonedRequest.tailordbType.schema.fields, typeChanges);
if (typeChanges && typeChanges.size > 0 && clonedTypeRequest.schema?.fields) {
applyPreMigrationFieldAdjustments(clonedTypeRequest.schema.fields, typeChanges);
}

return client.updateTailorDBType({
workspaceId: create.request.workspaceId,
namespaceName: create.request.namespaceName,
tailordbType: clonedRequest.tailordbType,
tailordbType: clonedTypeRequest,
});
}),
// Update types that are affected by this migration
Expand All @@ -762,17 +826,17 @@ async function executeSingleMigrationPrePhase(
})
.map((update) => {
const typeName = update.request.tailordbType?.name;
const snapshotType = typeName
? buildSnapshotTypeManifest(migration, typeName, tailorDBInputs, executorUsedTypes)
: undefined;
if (!snapshotType) return undefined;
if (typeName) processedTypes.updated.add(typeName);

const typeChanges = typeName ? preMigrationChanges.get(typeName) : undefined;

if (!typeChanges || typeChanges.size === 0) {
return client.updateTailorDBType(update.request);
}

// Clone request to avoid modifying the original changeSet
const clonedRequest = structuredClone(update.request);
if (clonedRequest.tailordbType?.schema?.fields) {
clonedRequest.tailordbType = snapshotType;

const typeChanges = typeName ? preMigrationChanges.get(typeName) : undefined;
if (typeChanges && typeChanges.size > 0 && clonedRequest.tailordbType?.schema?.fields) {
applyPreMigrationFieldAdjustments(clonedRequest.tailordbType.schema.fields, typeChanges);
}

Expand Down Expand Up @@ -839,43 +903,65 @@ const deletedResources = {
* @param {OperatorClient} client - Operator client instance
* @param {TailorDBChangeSet} changeSet - TailorDB change set
* @param {PendingMigration} migration - Single pending migration
* @param tailorDBInputs - Deploy inputs, used to resolve namespace gqlOperations for the snapshot
* @param executorUsedTypes - Types used by executors (drives publishRecordEvents default)
* @returns {Promise<void>} Promise that resolves when post-migration phase completes
*/
async function executeSingleMigrationPostPhase(
client: OperatorClient,
changeSet: TailorDBChangeSet,
migration: PendingMigration,
tailorDBInputs: ReadonlyArray<TailorDBDeployInput>,
executorUsedTypes: ReadonlySet<string>,
): Promise<void> {
// Re-use the pre-migration changes map to know which types were touched in
// this migration (so we send the post-phase final-schema update for them).
const preMigrationChanges = buildPreMigrationChangesMap([migration]);
const affectedTypes = getAffectedTypeNames(migration);
const deletedTypeNames = getDeletedTypeNames(migration);

// Types - apply final schema values for types affected by this migration
// Pre-migration used cloned requests, so the original changeSet still has correct values
// Types - apply schema as of migration N (= snapshot[N]) with all breaking
// changes enforced. The prePhase sent the same schema with breaking fields
// relaxed; here we send it again without relaxation so required/unique/etc.
// take effect after the data script has reconciled records.
try {
await Promise.all([
// For newly created types that had pre-migration adjustments in this migration, send update with final values
// For newly created types that had pre-migration adjustments in this migration, send update with snapshot[N] values
...changeSet.type.creates
.filter((create) => {
const typeName = create.request.tailordbType?.name;
return typeName && affectedTypes.has(typeName) && preMigrationChanges.has(typeName);
})
.map((create) =>
client.updateTailorDBType({
.map((create) => {
const typeName = create.request.tailordbType?.name;
const snapshotType = typeName
? buildSnapshotTypeManifest(migration, typeName, tailorDBInputs, executorUsedTypes)
: undefined;
if (!snapshotType) return undefined;
return client.updateTailorDBType({
workspaceId: create.request.workspaceId,
namespaceName: create.request.namespaceName,
tailordbType: create.request.tailordbType,
}),
),
// For updated types affected by this migration, send update with final values
tailordbType: snapshotType,
});
}),
// For updated types affected by this migration, send update with snapshot[N] values
...changeSet.type.updates
.filter((update) => {
const typeName = update.request.tailordbType?.name;
return typeName && affectedTypes.has(typeName) && preMigrationChanges.has(typeName);
})
.map((update) => client.updateTailorDBType(update.request)),
.map((update) => {
const typeName = update.request.tailordbType?.name;
const snapshotType = typeName
? buildSnapshotTypeManifest(migration, typeName, tailorDBInputs, executorUsedTypes)
: undefined;
if (!snapshotType) return undefined;
return client.updateTailorDBType({
workspaceId: update.request.workspaceId,
namespaceName: update.request.namespaceName,
tailordbType: snapshotType,
});
}),
]);
} catch (error) {
handleOptionalToRequiredError(error, [
Expand Down Expand Up @@ -973,6 +1059,12 @@ export async function planTailorDB(context: PlanContext) {
const executors = forRemoval
? []
: Object.values((await application.executorService?.loadExecutors()) ?? {});
const executorUsedTypes = new Set<string>();
for (const executor of executors) {
if (executor.trigger.kind === "tailordb") {
executorUsedTypes.add(executor.trigger.typeName);
}
}

const {
changeSet: serviceChangeSet,
Expand All @@ -982,7 +1074,15 @@ export async function planTailorDB(context: PlanContext) {
} = await planServices(client, workspaceId, application.name, application.id, tailordbs);
const deletedServices = serviceChangeSet.deletes.map((del) => del.name);
const [typeChangeSet, gqlPermissionChangeSet] = await Promise.all([
planTypes(client, workspaceId, tailordbs, executors, deletedServices, undefined, forceApplyAll),
planTypes(
client,
workspaceId,
tailordbs,
executorUsedTypes,
deletedServices,
undefined,
forceApplyAll,
),
planGqlPermissions(client, workspaceId, tailordbs, deletedServices, forceApplyAll),
]);

Expand All @@ -999,6 +1099,7 @@ export async function planTailorDB(context: PlanContext) {
workspaceId,
application,
tailorDBInputs: tailordbs,
executorUsedTypes,
config,
noSchemaCheck: noSchemaCheck ?? false,
},
Expand Down Expand Up @@ -1266,7 +1367,7 @@ async function planTypes(
client: OperatorClient,
workspaceId: string,
tailordbs: ReadonlyArray<TailorDBDeployInput>,
executors: ReadonlyArray<Executor>,
executorUsedTypes: ReadonlySet<string>,
deletedServices: ReadonlyArray<string>,
filteredTypesByNamespace?: Map<string, Record<string, TailorDBSnapshotType>>,
forceApplyAll = false,
Expand All @@ -1292,13 +1393,6 @@ async function planTypes(
});
};

const executorUsedTypes = new Set<string>();
for (const executor of executors) {
if (executor.trigger.kind === "tailordb") {
executorUsedTypes.add(executor.trigger.typeName);
}
}

// Validate that types used by executors don't have publishEvents explicitly set to false
for (const tailordb of tailordbs) {
const types = filteredTypesByNamespace?.get(tailordb.namespace) ?? tailordb.types;
Expand Down
Loading
Loading