Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 76 additions & 33 deletions executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ impl Executor {
}

/// Creates new [`Executor`] for database at `database_path` and initializes background threads used by its components.
/// Returns executor and handles to the background threads.
/// Returns executor and handles to the background threads (with the distinction between WAL and other
/// background workers needing to be made).
pub fn with_background_workers(
database_path: impl AsRef<Path>,
catalog: Catalog,
) -> Result<(Self, Vec<BackgroundWorkerHandle>), ExecutorError> {
) -> Result<(Self, Vec<BackgroundWorkerHandle>, BackgroundWorkerHandle), ExecutorError> {
let (files, files_background_worker) = FilesManager::with_background_cleaner(
database_path.as_ref(),
consts::FILES_MANAGER_CLEANUP_INTERVAL,
Expand All @@ -92,12 +93,8 @@ impl Executor {
cache,
catalog,
};
let workers = vec![
cache_background_worker,
files_background_worker,
wal_background_worker,
];
Ok((executor, workers))
let workers = vec![cache_background_worker, files_background_worker];
Ok((executor, workers, wal_background_worker))
}

/// Parses `query` and returns iterator over results for each statement in the `query`.
Expand Down Expand Up @@ -1383,18 +1380,22 @@ mod tests {
let (catalog, temp_dir) = create_catalog();
let db_path = temp_dir.path().join("test_db");

let (executor, mut workers) = Executor::with_background_workers(db_path, catalog)
.expect("with_background_workers should succeed");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(db_path, catalog)
.expect("with_background_workers should succeed");

// We expect three background workers (cache and files manager and wal)
assert_eq!(workers.len(), 3);
assert_eq!(workers.len(), 2);

// Shutdown and join all workers to ensure threads are started and can be stopped.
while let Some(mut handle) = workers.pop() {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}

wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");

// Basic sanity check that executor was created and holds structures
let _c = executor.catalog.read();
}
Expand Down Expand Up @@ -3801,8 +3802,9 @@ mod tests {
fn test_select_index_scan_1000_records_with_wal() {
let (catalog, temp_dir) = create_catalog();
let db_path = temp_dir.path().join("test_db");
let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");

// Create table
execute_single(
Expand Down Expand Up @@ -3894,6 +3896,8 @@ mod tests {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}
wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");
}

#[test]
Expand Down Expand Up @@ -4802,8 +4806,9 @@ mod tests {

{
// Create table, insert many records, then delete all of them
let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");

// Create table with data
execute_single(
Expand Down Expand Up @@ -4836,14 +4841,18 @@ mod tests {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}

wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");
}

{
// Restart and attempt redo
// This should NOT panic - it should gracefully handle missing pages
let catalog = Catalog::new(temp_dir.path(), "test_db").unwrap();
let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog)
.expect("Recovery with redo should handle deleted pages gracefully");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(&db_path, catalog)
.expect("Recovery with redo should handle deleted pages gracefully");

let result = execute_single(&executor, "SELECT * FROM users;");
// We call it just to trigger wal redo
Expand All @@ -4854,6 +4863,9 @@ mod tests {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}

wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");
}
}

Expand All @@ -4868,8 +4880,9 @@ mod tests {

{
// Create table, insert data, then drop it
let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");

// Create and populate table
execute_single(
Expand Down Expand Up @@ -4903,14 +4916,18 @@ mod tests {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}

wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");
}

{
// Restart, attempt redo, then create new table with same name
// This should NOT panic when trying to redo records for deleted pages
let catalog = Catalog::new(temp_dir.path(), "test_db").unwrap();
let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog)
.expect("Recovery should handle dropped tables gracefully");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(&db_path, catalog)
.expect("Recovery should handle dropped tables gracefully");

// Create new table with same name but different schema
execute_single(
Expand All @@ -4934,6 +4951,8 @@ mod tests {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}
wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");
}
}

Expand All @@ -4948,8 +4967,9 @@ mod tests {

{
// Create table, insert data, then truncate
let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");

// Create and populate table
execute_single(
Expand Down Expand Up @@ -4982,14 +5002,18 @@ mod tests {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}

wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");
}

{
// Restart, attempt redo, then insert new data
// This should NOT panic when trying to redo records for deallocated pages
let catalog = Catalog::new(temp_dir.path(), "test_db").unwrap();
let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog)
.expect("Recovery should handle truncated tables gracefully");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(&db_path, catalog)
.expect("Recovery should handle truncated tables gracefully");

// Insert new data into the truncated table
execute_single(
Expand All @@ -5011,6 +5035,9 @@ mod tests {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}

wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");
}
}

Expand All @@ -5024,8 +5051,9 @@ mod tests {

{
// Complex sequence of operations
let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");

// Create first table
execute_single(
Expand Down Expand Up @@ -5069,14 +5097,18 @@ mod tests {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}

wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");
}

{
// Restart and attempt redo
// This should NOT panic despite complex history of deletions
let catalog = Catalog::new(temp_dir.path(), "test_db").unwrap();
let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog)
.expect("Recovery should handle complex table operations gracefully");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(&db_path, catalog)
.expect("Recovery should handle complex table operations gracefully");

// Verify final state
let result = execute_single(&executor, "SELECT * FROM temp1;");
Expand All @@ -5092,6 +5124,9 @@ mod tests {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}

wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");
}
}

Expand All @@ -5106,8 +5141,9 @@ mod tests {

{
// Create table, insert records, truncate, then insert one new record
let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(&db_path, catalog)
.expect("with_background_workers should succeed");

// Create table
execute_single(
Expand Down Expand Up @@ -5156,14 +5192,18 @@ mod tests {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}

wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");
}

{
// Restart and verify WAL redo only applies the new record
// NOT the old 4 records that existed before truncate
let catalog = Catalog::new(temp_dir.path(), "test_db").unwrap();
let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog)
.expect("Recovery should handle page reallocation correctly");
let (executor, mut workers, mut wal_worker) =
Executor::with_background_workers(&db_path, catalog)
.expect("Recovery should handle page reallocation correctly");

// Verify table has exactly 1 record (the one inserted after truncate)
let result = execute_single(&executor, "SELECT * FROM items;");
Expand All @@ -5187,6 +5227,9 @@ mod tests {
handle.shutdown().expect("shutdown should succeed");
handle.join().expect("join should succeed");
}

wal_worker.shutdown().expect("shutdown should succeed");
wal_worker.join().expect("join should succeed");
}
}
}
Loading
Loading