diff --git a/Cargo.lock b/Cargo.lock index d3045bf1..95629001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2058,6 +2058,16 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -6327,6 +6337,7 @@ dependencies = [ "dhat", "figment", "flate2", + "fs2", "futures", "futures-util", "getrandom 0.4.2", diff --git a/apps/skit/Cargo.toml b/apps/skit/Cargo.toml index f8577d5d..f729679a 100644 --- a/apps/skit/Cargo.toml +++ b/apps/skit/Cargo.toml @@ -148,6 +148,7 @@ rmcp = { version = "1.5", features = [ ], optional = true } # Used by the MCP module to extract HTTP request parts from rmcp's request context. http = { version = "1", optional = true } +fs2 = "0.4.3" [features] default = ["script", "compositor", "gpu", "moq"] diff --git a/apps/skit/src/auth/stores/file.rs b/apps/skit/src/auth/stores/file.rs index 9ab6e737..74d01e90 100644 --- a/apps/skit/src/auth/stores/file.rs +++ b/apps/skit/src/auth/stores/file.rs @@ -80,6 +80,38 @@ fn lock_write(lock: &RwLock) -> std::sync::RwLockWriteGuard<'_, T> { lock.write().unwrap_or_else(std::sync::PoisonError::into_inner) } +/// RAII guard for an advisory file lock (`flock`). +/// +/// The lock is released when the guard is dropped (file close). +struct FileLockGuard { + _file: std::fs::File, +} + +/// Acquire an exclusive advisory file lock on a companion `.lock` file. +/// +/// Provides cross-process serialization of read-modify-write operations +/// so the CLI and server cannot clobber each other's writes when running +/// concurrently (e.g. during `rotate-key`). +async fn acquire_file_lock(data_path: &Path) -> Result { + use fs2::FileExt; + + let lock_path = lock_path_for(data_path); + tokio::task::spawn_blocking(move || -> Result { + let file = + std::fs::OpenOptions::new().create(true).truncate(true).write(true).open(&lock_path)?; + file.lock_exclusive()?; + Ok(FileLockGuard { _file: file }) + }) + .await + .map_err(|e| AuthStoreError::Io(std::io::Error::other(e)))? +} + +fn lock_path_for(data_path: &Path) -> PathBuf { + let mut s = data_path.as_os_str().to_os_string(); + s.push(".lock"); + PathBuf::from(s) +} + /// File-based key provider with rotation support. /// /// Stores private signing key in `auth.jwk` (0600) and public verification keys in `jwks.json`. @@ -223,6 +255,9 @@ impl FileKeyProvider { } /// Write file with secure permissions (0600). + /// + /// Ensures crash durability by calling `sync_all()` on the file + /// before the atomic rename and on the parent directory afterwards. pub(crate) async fn write_secure(path: &Path, content: &str) -> Result<(), AuthStoreError> { use tokio::io::AsyncWriteExt; @@ -240,7 +275,7 @@ impl FileKeyProvider { file.write_all(content.as_bytes()).await?; file.flush().await?; - drop(file); + file.sync_all().await?; } #[cfg(not(unix))] @@ -250,7 +285,7 @@ impl FileKeyProvider { file.write_all(content.as_bytes()).await?; file.flush().await?; - drop(file); + file.sync_all().await?; } // Atomic rename (same directory). @@ -259,6 +294,14 @@ impl FileKeyProvider { let _ = tokio::fs::remove_file(&temp_path).await; return Err(e.into()); } + + // Fsync parent directory so the new directory entry is durable. + if let Some(parent) = path.parent() { + if let Ok(dir) = tokio::fs::File::open(parent).await { + let _ = dir.sync_all().await; + } + } + Ok(()) } @@ -325,6 +368,10 @@ impl KeyProvider for FileKeyProvider { let private_path = self.state_dir.join(PRIVATE_JWK_FILENAME); let jwks_path = self.state_dir.join(PUBLIC_JWKS_FILENAME); + // Cross-process file lock so the CLI and server cannot + // clobber each other's key writes during concurrent rotation. + let _flock = acquire_file_lock(&jwks_path).await?; + let (private_jwk, new_signing_key, public_key_bytes) = generate_new_private_key()?; // Re-read JWKS from disk (not in-memory) to pick up any @@ -360,6 +407,10 @@ impl KeyProvider for FileKeyProvider { let private_path = self.state_dir.join(PRIVATE_JWK_FILENAME); let jwks_path = self.state_dir.join(PUBLIC_JWKS_FILENAME); + // Cross-process file lock so reload doesn't read a + // partially-written file during a concurrent rotation. + let _flock = acquire_file_lock(&jwks_path).await?; + Self::verify_permissions(&private_path)?; let content = tokio::fs::read_to_string(&private_path).await?; let private: PrivateJwk = serde_json::from_str(&content)?; @@ -532,6 +583,28 @@ impl FileRevocationStore { Ok(()) } + + /// Re-read revocations from disk into the in-memory cache. + /// + /// Callers must hold the file lock when using this as part of a + /// read-modify-write cycle. + async fn reload_from_disk(&self) -> Result<(), AuthStoreError> { + let path = self.state_dir.join("revoked.json"); + if path.exists() { + FileKeyProvider::verify_permissions(&path)?; + let data = tokio::fs::read_to_string(&path).await?; + let revoked: RevokedOnDisk = serde_json::from_str(&data)?; + let mut map = match revoked { + RevokedOnDisk::Map(map) => map, + RevokedOnDisk::Set(set) => set.into_iter().map(|h| (h, 0)).collect(), + }; + Self::prune_expired_locked(&mut map); + let count = map.len(); + *lock_write(&self.revoked) = map; + debug!(count, "Loaded revocations from disk"); + } + Ok(()) + } } #[derive(Deserialize)] @@ -548,30 +621,27 @@ impl RevocationStore for FileRevocationStore { lock_read(&self.revoked).contains_key(token_hash) } + // Hold a single write lock for both the insert and the prune + // so no reader can observe a partially-updated map. + #[allow(clippy::significant_drop_tightening)] async fn revoke(&self, token_hash: &str, exp: u64) -> Result<(), AuthStoreError> { - lock_write(&self.revoked).insert(token_hash.to_string(), exp); - Self::prune_expired_locked(&mut lock_write(&self.revoked)); + let data_path = self.state_dir.join("revoked.json"); + let _flock = acquire_file_lock(&data_path).await?; + self.reload_from_disk().await?; + { + let mut guard = lock_write(&self.revoked); + guard.insert(token_hash.to_string(), exp); + Self::prune_expired_locked(&mut guard); + } self.persist().await?; debug!(token_hash = %token_hash, "Token revoked"); Ok(()) } async fn reload(&self) -> Result<(), AuthStoreError> { - let path = self.state_dir.join("revoked.json"); - if path.exists() { - FileKeyProvider::verify_permissions(&path)?; - let data = tokio::fs::read_to_string(&path).await?; - let revoked: RevokedOnDisk = serde_json::from_str(&data)?; - let mut map = match revoked { - RevokedOnDisk::Map(map) => map, - RevokedOnDisk::Set(set) => set.into_iter().map(|h| (h, 0)).collect(), - }; - Self::prune_expired_locked(&mut map); - let count = map.len(); - *lock_write(&self.revoked) = map; - debug!(count, "Loaded revocations from disk"); - } - Ok(()) + let data_path = self.state_dir.join("revoked.json"); + let _flock = acquire_file_lock(&data_path).await?; + self.reload_from_disk().await } } @@ -628,12 +698,37 @@ impl FileTokenMetadataStore { Ok(()) } + + /// Re-read token metadata from disk into the in-memory cache. + /// + /// Callers must hold the file lock when using this as part of a + /// read-modify-write cycle. + async fn reload_from_disk(&self) -> Result<(), AuthStoreError> { + let path = self.state_dir.join("tokens.json"); + if !path.exists() { + return Ok(()); + } + FileKeyProvider::verify_permissions(&path)?; + let data = tokio::fs::read_to_string(&path).await?; + let tokens: Vec = serde_json::from_str(&data)?; + let count = tokens.len(); + let mut new_tokens: HashMap = HashMap::with_capacity(count); + for token in tokens { + new_tokens.insert(token.jti.clone(), token); + } + *lock_write(&self.tokens) = new_tokens; + debug!(count, "Reloaded token metadata from disk"); + Ok(()) + } } #[async_trait] impl TokenMetadataStore for FileTokenMetadataStore { async fn store(&self, meta: TokenMetadata) -> Result<(), AuthStoreError> { let jti = meta.jti.clone(); + let data_path = self.state_dir.join("tokens.json"); + let _flock = acquire_file_lock(&data_path).await?; + self.reload_from_disk().await?; lock_write(&self.tokens).insert(jti.clone(), meta); self.persist().await?; debug!(jti = %jti, "Stored token metadata"); @@ -649,6 +744,9 @@ impl TokenMetadataStore for FileTokenMetadataStore { } async fn mark_revoked(&self, jti: &str) -> Result<(), AuthStoreError> { + let data_path = self.state_dir.join("tokens.json"); + let _flock = acquire_file_lock(&data_path).await?; + self.reload_from_disk().await?; { let mut tokens = lock_write(&self.tokens); if let Some(token) = tokens.get_mut(jti) { @@ -666,22 +764,9 @@ impl TokenMetadataStore for FileTokenMetadataStore { } async fn reload(&self) -> Result<(), AuthStoreError> { - let path = self.state_dir.join("tokens.json"); - if !path.exists() { - warn!("tokens.json not found during reload — keeping existing in-memory cache"); - return Ok(()); - } - FileKeyProvider::verify_permissions(&path)?; - let data = tokio::fs::read_to_string(&path).await?; - let tokens: Vec = serde_json::from_str(&data)?; - let count = tokens.len(); - let mut new_tokens: HashMap = HashMap::with_capacity(count); - for token in tokens { - new_tokens.insert(token.jti.clone(), token); - } - *lock_write(&self.tokens) = new_tokens; - debug!(count, "Reloaded token metadata from disk"); - Ok(()) + let data_path = self.state_dir.join("tokens.json"); + let _flock = acquire_file_lock(&data_path).await?; + self.reload_from_disk().await } } @@ -1014,4 +1099,263 @@ mod tests { let retrieved = store.get("test-jti").await.unwrap().unwrap(); assert!(retrieved.revoked); } + + // --- Tests for #345: revoke() single lock acquisition --- + + #[tokio::test(flavor = "multi_thread")] + async fn test_revoke_does_not_deadlock() { + // Regression: revoke() previously acquired the write lock twice + // (once for insert, once for prune_expired), which could panic on + // re-entrant locking or deadlock with concurrent readers. + let temp_dir = TempDir::new().unwrap(); + let store = Arc::new(FileRevocationStore::new(temp_dir.path()).await.unwrap()); + + // Rapid sequential revocations must not deadlock. + for i in 0..20 { + store.revoke(&format!("hash-{i}"), 0).await.unwrap(); + } + + // All 20 hashes must be present. + for i in 0..20 { + let s = store.clone(); + let hash = format!("hash-{i}"); + let revoked = tokio::task::spawn_blocking(move || s.is_revoked(&hash)).await.unwrap(); + assert!(revoked, "hash-{i} should be revoked"); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_revoke_prunes_expired_in_same_lock() { + let temp_dir = TempDir::new().unwrap(); + let store = Arc::new(FileRevocationStore::new(temp_dir.path()).await.unwrap()); + + // Insert a revocation that has already expired (exp = 1). + store.revoke("expired-hash", 1).await.unwrap(); + + // Insert a live revocation — prune should remove the expired one + // in the same lock acquisition. + store.revoke("live-hash", 0).await.unwrap(); + + let s = store.clone(); + let expired = + tokio::task::spawn_blocking(move || s.is_revoked("expired-hash")).await.unwrap(); + assert!(!expired, "expired revocation should have been pruned"); + + let s = store.clone(); + let live = tokio::task::spawn_blocking(move || s.is_revoked("live-hash")).await.unwrap(); + assert!(live, "live revocation should remain"); + } + + // --- Tests for #331: fsync after write_secure --- + + #[tokio::test(flavor = "multi_thread")] + async fn test_write_secure_fsync_durability() { + // Verify write_secure produces durable files: the data is readable + // immediately after the call returns (sync_all was called). + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("durable.json"); + let content = r#"{"test": "fsync"}"#; + + FileKeyProvider::write_secure(&path, content).await.unwrap(); + + // Re-read to confirm content survived the fsync + rename. + let read_back = tokio::fs::read_to_string(&path).await.unwrap(); + assert_eq!(read_back, content); + + // Verify permissions are 0600. + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mode = std::fs::metadata(&path).unwrap().permissions().mode() & 0o777; + assert_eq!(mode, 0o600, "write_secure should set 0600 permissions"); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_write_secure_overwrites_atomically() { + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("atomic.json"); + + FileKeyProvider::write_secure(&path, "first").await.unwrap(); + FileKeyProvider::write_secure(&path, "second").await.unwrap(); + + let content = tokio::fs::read_to_string(&path).await.unwrap(); + assert_eq!(content, "second"); + + // No leftover temp files. + let entries: Vec<_> = std::fs::read_dir(temp_dir.path()) + .unwrap() + .filter_map(std::result::Result::ok) + .filter(|e| e.file_name().to_string_lossy().contains("tmp-")) + .collect(); + assert!(entries.is_empty(), "temp files should be cleaned up"); + } + + // --- Tests for #329: flock-based cross-process file locking --- + + #[tokio::test(flavor = "multi_thread")] + async fn test_concurrent_token_stores_no_lost_updates() { + // Simulate two independent store instances (like CLI + server) + // writing tokens concurrently. With file locking, no writes + // should be lost. + let temp_dir = TempDir::new().unwrap(); + let store_a = Arc::new(FileTokenMetadataStore::new(temp_dir.path()).await.unwrap()); + let store_b = Arc::new(FileTokenMetadataStore::new(temp_dir.path()).await.unwrap()); + + let make_meta = |id: &str| TokenMetadata { + jti: id.to_string(), + token_hash: format!("hash-{id}"), + token_type: TokenType::Api, + role: Some("admin".to_string()), + label: Some(format!("token-{id}")), + created_at: 1000, + exp: 2000, + revoked: false, + created_by: "test".to_string(), + }; + + // Store tokens concurrently from both instances. + let sa = store_a.clone(); + let sb = store_b.clone(); + let (r1, r2) = tokio::join!( + tokio::spawn(async move { + for i in 0..5 { + sa.store(make_meta(&format!("a-{i}"))).await.unwrap(); + } + }), + tokio::spawn(async move { + for i in 0..5 { + sb.store(make_meta(&format!("b-{i}"))).await.unwrap(); + } + }), + ); + r1.unwrap(); + r2.unwrap(); + + // Read the on-disk state with a fresh store to verify nothing was lost. + let verifier = FileTokenMetadataStore::new(temp_dir.path()).await.unwrap(); + let all = verifier.list().await.unwrap(); + assert_eq!(all.len(), 10, "all 10 tokens must survive concurrent writes"); + + for i in 0..5 { + assert!(verifier.exists(&format!("a-{i}")).await, "a-{i} missing"); + assert!(verifier.exists(&format!("b-{i}")).await, "b-{i} missing"); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_concurrent_revocations_no_lost_updates() { + let temp_dir = TempDir::new().unwrap(); + let store_a = Arc::new(FileRevocationStore::new(temp_dir.path()).await.unwrap()); + let store_b = Arc::new(FileRevocationStore::new(temp_dir.path()).await.unwrap()); + + let sa = store_a.clone(); + let sb = store_b.clone(); + let (r1, r2) = tokio::join!( + tokio::spawn(async move { + for i in 0..5 { + sa.revoke(&format!("a-{i}"), 0).await.unwrap(); + } + }), + tokio::spawn(async move { + for i in 0..5 { + sb.revoke(&format!("b-{i}"), 0).await.unwrap(); + } + }), + ); + r1.unwrap(); + r2.unwrap(); + + // Verify with a fresh store instance. + let verifier = Arc::new(FileRevocationStore::new(temp_dir.path()).await.unwrap()); + for i in 0..5 { + let v = verifier.clone(); + let hash = format!("a-{i}"); + assert!( + tokio::task::spawn_blocking(move || v.is_revoked(&hash)).await.unwrap(), + "a-{i} missing" + ); + let v = verifier.clone(); + let hash = format!("b-{i}"); + assert!( + tokio::task::spawn_blocking(move || v.is_revoked(&hash)).await.unwrap(), + "b-{i} missing" + ); + } + } + + // --- Tests for rotate() cross-process file locking --- + + #[tokio::test(flavor = "multi_thread")] + async fn test_concurrent_rotations_no_lost_keys() { + // Simulate two independent KeyProvider instances (CLI + server) + // rotating concurrently. With flock, all generated keys must + // appear in the final JWKS. + let temp_dir = TempDir::new().unwrap(); + let provider_a = Arc::new(FileKeyProvider::load_or_init(temp_dir.path()).await.unwrap()); + let provider_b = Arc::new(FileKeyProvider::load_or_init(temp_dir.path()).await.unwrap()); + + let pa = provider_a.clone(); + let pb = provider_b.clone(); + let (r1, r2) = tokio::join!( + tokio::spawn(async move { + for _ in 0..3 { + pa.rotate().await.unwrap(); + } + }), + tokio::spawn(async move { + for _ in 0..3 { + pb.rotate().await.unwrap(); + } + }), + ); + r1.unwrap(); + r2.unwrap(); + + // Re-read from disk with a fresh provider to verify. + let verifier = FileKeyProvider::load_or_init(temp_dir.path()).await.unwrap(); + let jwks = verifier.jwks(); + // 1 initial key + 6 rotations = 7 keys total. + assert_eq!( + jwks.keys.len(), + 7, + "all rotated keys must survive concurrent writes (found {})", + jwks.keys.len() + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_file_lock_is_exclusive() { + // Verify that two concurrent lock acquisitions are serialized. + let temp_dir = TempDir::new().unwrap(); + let lock_target = temp_dir.path().join("test.json"); + + let counter = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let max_concurrent = Arc::new(std::sync::atomic::AtomicU32::new(0)); + + let mut handles = Vec::new(); + for _ in 0..10 { + let path = lock_target.clone(); + let ctr = counter.clone(); + let max = max_concurrent.clone(); + handles.push(tokio::spawn(async move { + let _guard = acquire_file_lock(&path).await.unwrap(); + let cur = ctr.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1; + max.fetch_max(cur, std::sync::atomic::Ordering::SeqCst); + // Hold the lock briefly. + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + ctr.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + })); + } + + for h in handles { + h.await.unwrap(); + } + + assert_eq!( + max_concurrent.load(std::sync::atomic::Ordering::SeqCst), + 1, + "file lock must be exclusive — at most 1 holder at a time" + ); + } }