Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 34 additions & 27 deletions src/db/libsql_migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/// - PL/pgSQL functions -> SQLite triggers
pub const SCHEMA: &str = include_str!("../../migrations/libsql_schema.sql");

const V10_WASM_TOOLS_COLUMNS: &str = r#" id TEXT PRIMARY KEY,
const V12_WASM_TOOLS_COLUMNS: &str = r#" id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
name TEXT NOT NULL,
version TEXT NOT NULL DEFAULT '1.0.0',
Expand All @@ -37,14 +37,14 @@ const V10_WASM_TOOLS_COLUMNS: &str = r#" id TEXT PRIMARY KEY,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
UNIQUE (user_id, name, version)"#;

const V10_WASM_TOOLS_COPY_COLUMNS: &str = r#"id, user_id, name, version, wit_version, description, wasm_binary, binary_hash,
const V12_WASM_TOOLS_COPY_COLUMNS: &str = r#"id, user_id, name, version, wit_version, description, wasm_binary, binary_hash,
parameters_schema, source_url, trust_level, status, created_at, updated_at"#;

const V10_WASM_TOOLS_POST_REBUILD_SQL: &str = r#"CREATE INDEX IF NOT EXISTS idx_wasm_tools_user ON wasm_tools(user_id);
const V12_WASM_TOOLS_POST_REBUILD_SQL: &str = r#"CREATE INDEX IF NOT EXISTS idx_wasm_tools_user ON wasm_tools(user_id);
CREATE INDEX IF NOT EXISTS idx_wasm_tools_status ON wasm_tools(status);
CREATE INDEX IF NOT EXISTS idx_wasm_tools_trust ON wasm_tools(trust_level);"#;

const V10_WASM_CHANNELS_COLUMNS: &str = r#" id TEXT PRIMARY KEY,
const V12_WASM_CHANNELS_COLUMNS: &str = r#" id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
name TEXT NOT NULL,
version TEXT NOT NULL DEFAULT '0.1.0',
Expand All @@ -58,13 +58,20 @@ const V10_WASM_CHANNELS_COLUMNS: &str = r#" id TEXT PRIMARY KEY,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
UNIQUE (user_id, name)"#;

const V10_WASM_CHANNELS_COPY_COLUMNS: &str = r#"id, user_id, name, version, wit_version, description, wasm_binary, binary_hash,
const V12_WASM_CHANNELS_COPY_COLUMNS: &str = r#"id, user_id, name, version, wit_version, description, wasm_binary, binary_hash,
capabilities_json, status, created_at, updated_at"#;

/// Incremental migrations applied after the base schema.
///
/// Each entry is `(version, name, sql)`. Migrations are idempotent: the
/// `_migrations` table tracks which versions have been applied.
///
/// `INCREMENTAL_MIGRATIONS` intentionally jumps from versions 9 -> 12 for the
/// libSQL backend. The PostgreSQL `wasm_versioning` (V10) and
/// `conversation_unique_indexes` (V11) migrations are already baked into
/// `libsql_schema.sql` rather than applied incrementally, so the gap is
/// backend-specific and should not be "filled in" when editing
/// `INCREMENTAL_MIGRATIONS` or adding later versions.
pub const INCREMENTAL_MIGRATIONS: &[(i64, &str, &str)] = &[
(
9,
Expand Down Expand Up @@ -129,18 +136,18 @@ END;
"#,
),
(
10,
12,
"wasm_wit_default_0_3_0",
// Update existing databases that still default newly inserted wasm tool
Comment thread
leynos marked this conversation as resolved.
// and channel rows to the historical 0.1.0 WIT version. This rebuilds
// the affected tables because SQLite cannot ALTER COLUMN defaults.
//
// `legacy_alter_table=ON` is required so child foreign keys keep pointing
// at `wasm_tools` while we rename the old table out of the way.
"-- generated by v10_wasm_wit_default_migration_sql()",
"-- generated by v12_wasm_wit_default_migration_sql()",
),
(
12,
13,
"job_token_budget",
// Add token budget tracking columns to agent_jobs.
// SQLite supports ALTER TABLE ADD COLUMN, so no table rebuild needed.
Expand All @@ -150,9 +157,9 @@ ALTER TABLE agent_jobs ADD COLUMN total_tokens_used INTEGER NOT NULL DEFAULT 0;
"#,
),
(
13,
14,
"drop_redundant_wasm_tools_name_index",
include_str!("../../migrations/V13__drop_redundant_wasm_tools_name_index.sql"),
include_str!("../../migrations/V14__drop_redundant_wasm_tools_name_index.sql"),
),
];

Expand Down Expand Up @@ -187,11 +194,11 @@ pub async fn run_incremental(conn: &libsql::Connection) -> Result<(), crate::err

tracing::info!(version, name, "libSQL: applying incremental migration");

// V10 contains its own `BEGIN IMMEDIATE`/`COMMIT` block and sets
// V12 contains its own `BEGIN IMMEDIATE`/`COMMIT` block and sets
// PRAGMAs that must execute outside a transaction, so bypass the
// outer transaction wrapper.
if version == 10 {
let sql = v10_wasm_wit_default_migration_sql();
if version == 12 {
let sql = v12_wasm_wit_default_migration_sql();
apply_non_transactional_migration(conn, version, name, &sql).await?;
tracing::info!(version, name, "libSQL: migration applied successfully");
continue;
Expand Down Expand Up @@ -302,23 +309,23 @@ fn append_table_rebuild_sql(
sql.push_str("\n\n");
}

fn v10_wasm_wit_default_migration_sql() -> String {
fn v12_wasm_wit_default_migration_sql() -> String {
let mut sql = String::from(
"PRAGMA legacy_alter_table=ON;\nPRAGMA foreign_keys=OFF;\nBEGIN IMMEDIATE;\n\n",
);

append_table_rebuild_sql(
&mut sql,
"wasm_tools",
V10_WASM_TOOLS_COLUMNS,
V10_WASM_TOOLS_COPY_COLUMNS,
Some(V10_WASM_TOOLS_POST_REBUILD_SQL),
V12_WASM_TOOLS_COLUMNS,
V12_WASM_TOOLS_COPY_COLUMNS,
Some(V12_WASM_TOOLS_POST_REBUILD_SQL),
);
append_table_rebuild_sql(
&mut sql,
"wasm_channels",
V10_WASM_CHANNELS_COLUMNS,
V10_WASM_CHANNELS_COPY_COLUMNS,
V12_WASM_CHANNELS_COLUMNS,
V12_WASM_CHANNELS_COPY_COLUMNS,
None,
);

Expand All @@ -328,7 +335,7 @@ fn v10_wasm_wit_default_migration_sql() -> String {

#[cfg(test)]
mod tests {
use super::{INCREMENTAL_MIGRATIONS, SCHEMA, v10_wasm_wit_default_migration_sql};
use super::{INCREMENTAL_MIGRATIONS, SCHEMA, v12_wasm_wit_default_migration_sql};

#[test]
fn schema_uses_current_wit_defaults_for_new_wasm_records() {
Expand All @@ -348,22 +355,22 @@ mod tests {
fn incremental_migrations_upgrade_existing_wasm_wit_defaults_to_0_3_0() {
let (_, _, sql_marker) = INCREMENTAL_MIGRATIONS
.iter()
.find(|(version, _, _)| *version == 10)
.expect("expected a V10 libSQL migration for stale wasm wit_version defaults");
.find(|(version, _, _)| *version == 12)
.expect("expected a V12 libSQL migration for stale wasm wit_version defaults");
assert_eq!(
*sql_marker, "-- generated by v10_wasm_wit_default_migration_sql()",
"expected V10 migration entry to use the shared SQL builder"
*sql_marker, "-- generated by v12_wasm_wit_default_migration_sql()",
"expected V12 migration entry to use the shared SQL builder"
);

let sql = v10_wasm_wit_default_migration_sql();
let sql = v12_wasm_wit_default_migration_sql();

assert!(
sql.contains("0.3.0"),
"expected V10 libSQL migration to set wasm wit_version defaults to 0.3.0"
"expected V12 libSQL migration to set wasm wit_version defaults to 0.3.0"
);
assert!(
!sql.contains("wit_version TEXT NOT NULL DEFAULT '0.1.0'"),
"expected V10 libSQL migration to remove stale 0.1.0 wit_version defaults"
"expected V12 libSQL migration to remove stale 0.1.0 wit_version defaults"
);
assert!(
!sql.contains("INSERT INTO _migrations"),
Expand Down
134 changes: 85 additions & 49 deletions tests/db_integration/libsql_wit_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,7 @@ CREATE TABLE IF NOT EXISTS wasm_channels (
);
"#;

#[tokio::test]
async fn libsql_run_migrations_upgrades_legacy_wasm_wit_defaults() {
let dir = tempfile::tempdir().expect("temp dir");
let db_path = dir.path().join("legacy-wit-defaults.db");
let backend = LibSqlBackend::new_local(&db_path).await.expect("backend");

let conn = backend.connect().await.expect("connect");
conn.execute_batch(LEGACY_WASM_WIT_SCHEMA)
.await
.expect("seed legacy schema");

backend.run_migrations().await.expect("run migrations");

let conn = backend.connect().await.expect("connect after migration");
async fn insert_test_wasm_tool(conn: &libsql::Connection) {
conn.execute(
r#"
INSERT INTO wasm_tools (
Expand All @@ -83,7 +70,9 @@ async fn libsql_run_migrations_upgrades_legacy_wasm_wit_defaults() {
)
.await
.expect("insert wasm tool without wit_version");
}

async fn insert_test_wasm_channel(conn: &libsql::Connection) {
conn.execute(
r#"
INSERT INTO wasm_channels (
Expand All @@ -102,49 +91,96 @@ async fn libsql_run_migrations_upgrades_legacy_wasm_wit_defaults() {
)
.await
.expect("insert wasm channel without wit_version");
}

let mut tool_rows = conn
.query(
"SELECT wit_version FROM wasm_tools WHERE id = ?1",
libsql::params!["tool-1"],
)
async fn assert_wit_version(conn: &libsql::Connection, table: &str, id: &str, expected: &str) {
let query = format!("SELECT wit_version FROM {table} WHERE id = ?1");
let mut rows = conn
.query(&query, libsql::params![id])
.await
.expect("query tool");
let tool_row = tool_rows
.unwrap_or_else(|error| panic!("query {table} wit_version: {error}"));
let row = rows
.next()
.await
.expect("tool rows")
.expect("tool row");
let tool_wit_version: String = tool_row.get(0).expect("tool wit_version");
assert_eq!(tool_wit_version, "0.3.0");

let mut channel_rows = conn
.query(
"SELECT wit_version FROM wasm_channels WHERE id = ?1",
libsql::params!["channel-1"],
)
.await
.expect("query channel");
let channel_row = channel_rows
.next()
.await
.expect("channel rows")
.expect("channel row");
let channel_wit_version: String = channel_row.get(0).expect("channel wit_version");
assert_eq!(channel_wit_version, "0.3.0");
.unwrap_or_else(|error| panic!("read {table} wit_version row: {error}"))
.unwrap_or_else(|| panic!("missing {table} row for id {id}"));
let actual: String = row
.get(0)
.unwrap_or_else(|error| panic!("read {table} wit_version value: {error}"));
assert_eq!(actual, expected);
}

let mut migration_rows = conn
async fn assert_migration_absent(conn: &libsql::Connection, version: i64) {
let mut old_version_rows = conn
.query(
Comment thread
leynos marked this conversation as resolved.
"SELECT name FROM _migrations WHERE version = ?1",
libsql::params![10],
libsql::params![version],
)
.await
.expect("query migration marker");
let migration_row = migration_rows
.next()
.unwrap_or_else(|error| panic!("query migration marker for version {version}: {error}"));
assert!(
old_version_rows
.next()
.await
.unwrap_or_else(|error| panic!("read migration row for version {version}: {error}"))
.is_none(),
"unexpected _migrations row for version {version}; migration numbering regressed"
);
}

async fn assert_migration_entries(conn: &libsql::Connection, entries: &[(i64, &str)]) {
for (version, expected_name) in entries {
let mut migration_rows = conn
.query(
"SELECT name FROM _migrations WHERE version = ?1",
libsql::params![*version],
)
.await
.unwrap_or_else(|error| {
panic!("query migration marker for version {version}: {error}")
});
let migration_row = migration_rows
.next()
.await
.unwrap_or_else(|error| panic!("read migration row for version {version}: {error}"))
.unwrap_or_else(|| {
panic!(
"expected _migrations row for version {version}; migration sequences diverged"
)
});
let migration_name: String = migration_row
.get(0)
.unwrap_or_else(|error| panic!("read migration name for version {version}: {error}"));
assert_eq!(migration_name, *expected_name);
}
}

#[tokio::test]
async fn libsql_run_migrations_upgrades_legacy_wasm_wit_defaults() {
let dir = tempfile::tempdir().expect("temp dir");
let db_path = dir.path().join("legacy-wit-defaults.db");
let backend = LibSqlBackend::new_local(&db_path).await.expect("backend");

let conn = backend.connect().await.expect("connect");
conn.execute_batch(LEGACY_WASM_WIT_SCHEMA)
.await
.expect("migration rows")
.expect("migration row");
let migration_name: String = migration_row.get(0).expect("migration name");
assert_eq!(migration_name, "wasm_wit_default_0_3_0");
.expect("seed legacy schema");

backend.run_migrations().await.expect("run migrations");

let conn = backend.connect().await.expect("connect after migration");
insert_test_wasm_tool(&conn).await;
insert_test_wasm_channel(&conn).await;
assert_wit_version(&conn, "wasm_tools", "tool-1", "0.3.0").await;
assert_wit_version(&conn, "wasm_channels", "channel-1", "0.3.0").await;
assert_migration_absent(&conn, 10).await;
assert_migration_entries(
&conn,
&[
(12_i64, "wasm_wit_default_0_3_0"),
(13_i64, "job_token_budget"),
(14_i64, "drop_redundant_wasm_tools_name_index"),
],
)
.await;
}
Loading