diff --git a/Cargo.lock b/Cargo.lock index 58c2f62e..25232719 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1322,6 +1322,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.9.0" @@ -6275,6 +6289,7 @@ dependencies = [ "clap-markdown", "clap_complete", "ctrlc", + "dashmap", "dialoguer", "etcetera 0.10.0", "figment", @@ -6320,6 +6335,7 @@ dependencies = [ "async-trait", "bytes", "chrono", + "dashmap", "docker_credential", "futures", "gag", diff --git a/Cargo.toml b/Cargo.toml index b5a74f56..9f3ed6ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ clap = { workspace = true, features = ["derive", "env", "help", "color", "sugges clap_complete = { workspace = true } clap-markdown = { workspace = true } ctrlc = { workspace = true } +dashmap = { workspace = true } dialoguer = { workspace = true, features = ["editor", "password", "zeroize", "fuzzy-select", "fuzzy-matcher"] } etcetera = { workspace = true } figment = { workspace = true, features = ["json", "env", "yaml", "toml"] } @@ -84,6 +85,7 @@ bytes = { version = "1", default-features = false } chrono = { version = "0.4.42", default-features = false, features = ["clock", "alloc"] } clap-markdown = { version = "0.1.5", default-features = false } ctrlc = { version = "3.4.7", default-features = false } +dashmap = "6.1.0" dialoguer = { version = "0.11.0", default-features = false, features = ["editor", "password", "zeroize", "fuzzy-select", "fuzzy-matcher"] } docker_credential = { version = "1.3.1", default-features = false } etcetera = { version = "0.10.0", default-features = false } diff --git a/crates/wash-runtime/Cargo.toml b/crates/wash-runtime/Cargo.toml index 23cae648..d86c99ea 100644 --- a/crates/wash-runtime/Cargo.toml +++ b/crates/wash-runtime/Cargo.toml @@ -27,6 +27,7 @@ async-nats = { workspace = true, features = ["aws-lc-rs"] } async-trait = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } +dashmap = { workspace = true } futures = { workspace = true } hostname = { workspace = true } hyper = { workspace = true, features = ["server", "http1"] } diff --git a/crates/wash-runtime/src/engine/workload.rs b/crates/wash-runtime/src/engine/workload.rs index 82b0fd7e..7d129d94 100644 --- a/crates/wash-runtime/src/engine/workload.rs +++ b/crates/wash-runtime/src/engine/workload.rs @@ -9,6 +9,7 @@ use std::{ }; use anyhow::{Context as _, bail, ensure}; +use dashmap::DashMap; use tokio::{sync::RwLock, task::JoinHandle, time::timeout}; use tracing::{debug, info, trace, warn}; use wasmtime::component::{ @@ -388,7 +389,7 @@ pub struct ResolvedWorkload { namespace: Arc, /// All components in the workload. This is behind a `RwLock` to support mutable /// access to the component linkers. - components: Arc, WorkloadComponent>>>, + components: Arc, WorkloadComponent>>, /// The HTTP handler for outgoing HTTP requests http_handler: Arc, /// An optional service component that runs once to completion or for the duration of the workload @@ -453,7 +454,7 @@ impl ResolvedWorkload { } } - pub fn components(&self) -> Arc, WorkloadComponent>>> { + pub fn components(&self) -> Arc, WorkloadComponent>> { self.components.clone() } @@ -466,7 +467,8 @@ impl ResolvedWorkload { let mut interface_map: HashMap> = HashMap::new(); // Determine available component exports to link to the rest of the workload - for c in self.components.read().await.values() { + for item in self.components.iter() { + let c = item.value(); let exported_instances = c.component_exports()?; for (name, item) in exported_instances { // TODO(#11): It's probably a good idea to skip registering wasi@0.2 interfaces @@ -518,8 +520,10 @@ impl ResolvedWorkload { let mut dependencies: HashMap, HashSet>> = HashMap::new(); { - let components = self.components.read().await; - for (component_id, component) in components.iter() { + let components = self.components.clone(); + for item in components.iter() { + let component_id = item.key(); + let component = item.value(); let mut deps = HashSet::new(); let ty = component.metadata.component.component_type(); for (import_name, import_item) in ty.imports(component.metadata.component.engine()) @@ -551,10 +555,8 @@ impl ResolvedWorkload { for component_id in sorted_component_ids { // In order to have mutable access to both the workload component and components that need // to be instantiated as "plugins" during linking, we remove and re-add the component to the list. - let mut workload_component = { + let (_, mut workload_component) = { self.components - .write() - .await .remove(&component_id) .context("component not found during import resolution")? }; @@ -565,8 +567,6 @@ impl ResolvedWorkload { .resolve_component_imports(&component, linker, interface_map) .await; self.components - .write() - .await .insert(workload_component.metadata.id.clone(), workload_component); // Propagate any errors encountered during import resolution res?; @@ -605,8 +605,8 @@ impl ResolvedWorkload { match import_item { ComponentItem::ComponentInstance(import_instance_ty) => { trace!(name = import_name, "processing component instance import"); - let mut all_components = self.components.write().await; - let (plugin_component, instance_idx) = { + let all_components = self.components.clone(); + let (mut plugin_component, instance_idx) = { let Some(exporter_component) = interface_map.get(import_name) else { // TODO: error because unsatisfied import, if there's no available // export then it's an unresolvable workload @@ -878,12 +878,12 @@ impl ResolvedWorkload { /// Returns the number of components in this workload. /// Does not include the service component if one is defined. pub async fn component_count(&self) -> usize { - self.components.read().await.len() + self.components.len() } /// Helper to create a new wasmtime Store for a given component in the workload. pub async fn new_store(&self, component_id: &str) -> anyhow::Result> { - let components = self.components.read().await; + let components = self.components.clone(); let component = components .get(component_id) .context("component ID not found in workload")?; @@ -895,7 +895,7 @@ impl ResolvedWorkload { &self, metadata: &WorkloadMetadata, ) -> anyhow::Result> { - let components = self.components.read().await; + let components = self.components.clone(); // TODO: Consider stderr/stdout buffering + logging let mut wasi_ctx_builder = WasiCtxBuilder::new(); @@ -915,7 +915,7 @@ impl ResolvedWorkload { // Mount all possible volume mounts in the workload since components share a WasiCtx for (host_path, mount) in &components .iter() - .flat_map(|(_id, workload_component)| workload_component.metadata.volume_mounts.clone()) + .flat_map(|workload_component_item| workload_component_item.value().metadata.volume_mounts.clone()) .collect::>() { let dir = tokio::fs::canonicalize(host_path).await?; @@ -944,8 +944,8 @@ impl ResolvedWorkload { &self, component_id: &str, ) -> anyhow::Result> { - let mut components = self.components.write().await; - let component = components + let components = self.components.clone(); + let mut component = components .get_mut(component_id) .context("component ID not found in workload")?; let wasmtime_component = component.metadata.component.clone(); @@ -967,7 +967,8 @@ impl ResolvedWorkload { "unbinding all plugins from workload" ); - for component in self.components.read().await.values() { + for item in self.components.iter() { + let component = item.value(); if let Some(plugins) = component.plugins() { for (plugin_id, plugin) in plugins.iter() { trace!( @@ -1362,7 +1363,7 @@ impl UnresolvedWorkload { id: self.id.clone(), name: self.name.clone(), namespace: self.namespace.clone(), - components: Arc::new(RwLock::new(self.components)), + components: Arc::new(DashMap::from_iter(self.components)), service: self.service, host_interfaces: self.host_interfaces, http_handler: http_handler.clone(), diff --git a/crates/wash-runtime/src/host/http.rs b/crates/wash-runtime/src/host/http.rs index 44a0e0cb..8dc89213 100644 --- a/crates/wash-runtime/src/host/http.rs +++ b/crates/wash-runtime/src/host/http.rs @@ -25,6 +25,7 @@ use std::{ sync::Arc, }; +use dashmap::DashMap; use crate::engine::ctx::Ctx; use crate::engine::workload::ResolvedWorkload; use crate::wit::WitInterface; @@ -290,7 +291,7 @@ impl HostHandler for NullServer { /// A map from host header to resolved workload handles and their associated component id pub type WorkloadHandles = - Arc, String)>>>; + Arc, String)>>; /// HTTP server plugin that handles incoming HTTP requests for WebAssembly components. /// @@ -425,7 +426,7 @@ impl HostHandler for HttpServer { .await?; let instance_pre = resolved_handle.instantiate_pre(component_id).await?; - self.workload_handles.write().await.insert( + self.workload_handles.insert( resolved_handle.id().to_string(), ( resolved_handle.clone(), @@ -440,7 +441,7 @@ impl HostHandler for HttpServer { async fn on_workload_unbind(&self, workload_id: &str) -> anyhow::Result<()> { self.router.on_workload_unbind(workload_id).await?; - self.workload_handles.write().await.remove(workload_id); + self.workload_handles.remove(workload_id); Ok(()) } @@ -563,9 +564,9 @@ async fn handle_http_request( // Look up workload handle for this host, with wildcard fallback let workload_handle = { - let handles = workload_handles.read().await; + let handles = workload_handles; debug!(host = %workload_id, "looking up workload handle for host header"); - handles.get(&workload_id).cloned() + handles.get(&workload_id).map(|item| item.value().clone()) }; let response = match workload_handle { diff --git a/crates/wash-runtime/src/host/mod.rs b/crates/wash-runtime/src/host/mod.rs index a918f51a..67c3f175 100644 --- a/crates/wash-runtime/src/host/mod.rs +++ b/crates/wash-runtime/src/host/mod.rs @@ -45,6 +45,7 @@ use std::future::Future; use std::sync::Arc; use std::time::Duration; +use dashmap::DashMap; use anyhow::{Context, bail}; use names::{Generator, Name}; use tokio::sync::RwLock; @@ -178,7 +179,7 @@ impl From<&HostWorkload> for WorkloadState { pub struct Host { engine: Engine, /// Workloads mapped from ID to the workload and its current state - workloads: Arc>>, + workloads: Arc>, /// Plugins in a map from their ID to the plugin itself plugins: HashMap<&'static str, Arc>, /// Host metadata @@ -422,10 +423,11 @@ impl HostApi for Host { // Count components and providers from workloads let (workload_count, component_count) = { - let workloads = self.workloads.read().await; + let workloads = self.workloads.clone(); let workload_count: u64 = workloads.len() as u64; let mut component_count: u64 = 0; - for workload in workloads.values() { + for item in workloads.iter() { + let workload = item.value(); if let HostWorkload::Running(workload) = workload { component_count += workload.component_count().await as u64; } @@ -470,8 +472,6 @@ impl HostApi for Host { ) -> anyhow::Result { // Store the workload with initial state self.workloads - .write() - .await .insert(request.workload_id.clone(), HostWorkload::Starting); let service_present = request.workload.service.is_some(); @@ -495,8 +495,6 @@ impl HostApi for Host { // Update the workload state to `Running` self.workloads - .write() - .await .entry(request.workload_id.clone()) .and_modify(|workload| { *workload = HostWorkload::Running(Box::new(resolved_workload)); @@ -515,8 +513,8 @@ impl HostApi for Host { &self, request: WorkloadStatusRequest, ) -> anyhow::Result { - if let Some(workload) = self.workloads.read().await.get(&request.workload_id) { - let workload_state = workload.into(); + if let Some(workload) = self.workloads.get(&request.workload_id) { + let workload_state = workload.value().into(); Ok(WorkloadStatusResponse { workload_status: WorkloadStatus { workload_id: request.workload_id, @@ -535,14 +533,12 @@ impl HostApi for Host { ) -> anyhow::Result { let has_workload = self .workloads - .read() - .await .contains_key(&request.workload_id); let (workload_state, message) = if has_workload { // Update state to stopping let resolved_workload = { - let mut workloads = self.workloads.write().await; + let workloads = self.workloads.clone(); trace!( workload_id = request.workload_id, "updating workload state to stopping" @@ -582,7 +578,7 @@ impl HostApi for Host { // Remove the workload from the active workloads map // This will drop the workload and clean up wasmtime resources - self.workloads.write().await.remove(&request.workload_id); + self.workloads.remove(&request.workload_id); debug!( workload_id = request.workload_id, diff --git a/crates/wash-runtime/src/plugin/wasi_blobstore.rs b/crates/wash-runtime/src/plugin/wasi_blobstore.rs index f6282d9a..ef17f7d7 100644 --- a/crates/wash-runtime/src/plugin/wasi_blobstore.rs +++ b/crates/wash-runtime/src/plugin/wasi_blobstore.rs @@ -11,6 +11,7 @@ use std::{ const WASI_BLOBSTORE_ID: &str = "wasi-blobstore"; use tokio::sync::RwLock; +use dashmap::DashMap; use wasmtime::component::{HasSelf, Resource}; use wasmtime_wasi::p2::{ InputStream, OutputStream, @@ -86,7 +87,7 @@ pub struct StreamObjectNamesHandle { #[derive(Clone, Default)] pub struct WasiBlobstore { /// Storage for all containers, keyed by store context ID - storage: Arc>>>, + storage: Arc>>, /// The maximum size for objects stored in the blobstore max_object_size: usize, } @@ -94,7 +95,7 @@ pub struct WasiBlobstore { impl WasiBlobstore { pub fn new(max_object_size: Option) -> Self { Self { - storage: Arc::new(RwLock::new(HashMap::new())), + storage: Arc::new(DashMap::new()), max_object_size: max_object_size.unwrap_or(1_000_000), // 1mb limit by default } } @@ -117,8 +118,8 @@ impl bindings::wasi::blobstore::blobstore::Host for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let storage = plugin.storage.clone(); + let mut workload_storage = storage.entry(self.id.clone()).or_default(); if workload_storage.contains_key(&name) { return Ok(Err(format!("container '{name}' already exists"))); @@ -143,11 +144,13 @@ impl bindings::wasi::blobstore::blobstore::Host for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); + let storage = plugin.storage.clone(); + let does_contain_name = storage + .get(&self.id) + .map(|workload_storage| workload_storage.value().contains_key(&name)) + .unwrap_or(false); - if !workload_storage.contains_key(&name) { + if !does_contain_name { return Ok(Err(format!("container '{name}' does not exist"))); } @@ -163,8 +166,8 @@ impl bindings::wasi::blobstore::blobstore::Host for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let storage = plugin.storage.clone(); + let mut workload_storage = storage.entry(self.id.clone()).or_default(); workload_storage.remove(&name); Ok(Ok(())) @@ -178,11 +181,9 @@ impl bindings::wasi::blobstore::blobstore::Host for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); + let does_contain_name = plugin.storage.get(&self.id).map(|workload_storage| workload_storage.value().contains_key(&name)).unwrap_or(false); - Ok(Ok(workload_storage.contains_key(&name))) + Ok(Ok(does_contain_name)) } async fn copy_object( @@ -194,8 +195,7 @@ impl bindings::wasi::blobstore::blobstore::Host for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); // Get source object data (clone to avoid borrow conflicts) let src_object_data = { @@ -253,8 +253,7 @@ impl bindings::wasi::blobstore::blobstore::Host for Ctx { }; // Then delete the source - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); if let Some(src_container) = workload_storage.get_mut(&src.container) { src_container.objects.remove(&src.object); @@ -284,16 +283,15 @@ impl bindings::wasi::blobstore::container::HostContainer for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); - - match workload_storage.get(container_name) { - Some(container_data) => Ok(Ok(ContainerMetadata { - name: container_data.name.clone(), - created_at: container_data.created_at, - })), - None => Ok(Err(format!("container '{container_name}' does not exist"))), + match plugin.storage.get(&self.id) { + Some(item) => match item.value().get(container_name){ + Some(container_data) => Ok(Ok(ContainerMetadata { + name: container_data.name.clone(), + created_at: container_data.created_at, + })), + None => Ok(Err(format!("container '{container_name}' does not exist"))), + } + None => return Ok(Err(format!("container '{container_name}' does not exist"))), } } @@ -320,39 +318,47 @@ impl bindings::wasi::blobstore::container::HostContainer for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); - - match workload_storage.get(container_name) { - Some(container_data) => match container_data.objects.get(&name) { - Some(object_data) => { - let start_idx = start.min(object_data.data.len() as u64) as usize; - let end_idx = end.min(object_data.data.len() as u64) as usize; - let data_slice = object_data.data[start_idx..end_idx].to_vec(); - - tracing::debug!( - container = container_name, - object = name, - original_size = object_data.data.len(), - slice_size = data_slice.len(), - start_idx = start_idx, - end_idx = end_idx, - "Retrieved object data slice" - ); - - let resource = self.table.push(data_slice)?; - Ok(Ok(resource)) + match plugin.storage.get(&self.id) { + Some(workload_storage) => { + match workload_storage.value().get(container_name) { + Some(container_data) => match container_data.objects.get(&name) { + Some(object_data) => { + let start_idx = start.min(object_data.data.len() as u64) as usize; + let end_idx = end.min(object_data.data.len() as u64) as usize; + let data_slice = object_data.data[start_idx..end_idx].to_vec(); + + tracing::debug!( + container = container_name, + object = name, + original_size = object_data.data.len(), + slice_size = data_slice.len(), + start_idx = start_idx, + end_idx = end_idx, + "Retrieved object data slice" + ); + + let resource = self.table.push(data_slice)?; + Ok(Ok(resource)) + } + None => { + tracing::warn!( + container = container_name, + object = name, + "Object does not exist in container" + ); + Ok(Err(format!("object '{name}' does not exist"))) + } + }, + None => { + tracing::warn!( + container = container_name, + workload_id = self.id, + "Container does not exist for workload" + ); + Ok(Err(format!("container '{container_name}' does not exist"))) + } } - None => { - tracing::warn!( - container = container_name, - object = name, - "Object does not exist in container" - ); - Ok(Err(format!("object '{name}' does not exist"))) - } - }, + } None => { tracing::warn!( container = container_name, @@ -385,11 +391,9 @@ impl bindings::wasi::blobstore::container::HostContainer for Ctx { }; // Verify the container exists - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); + let does_contain_name = plugin.storage.get(&self.id).map(|workload_storage| workload_storage.value().contains_key(&name)).unwrap_or(false); - if !workload_storage.contains_key(&container_name) { + if !does_contain_name { tracing::warn!( container = container_name, workload_id = self.id, @@ -397,7 +401,6 @@ impl bindings::wasi::blobstore::container::HostContainer for Ctx { ); return Ok(Err(format!("container '{container_name}' does not exist"))); } - drop(storage); // Store the container and object names - actual writing happens in finish() let outgoing_handle = self.table.get_mut(&data)?; @@ -423,11 +426,12 @@ impl bindings::wasi::blobstore::container::HostContainer for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); + let workload_storage = match plugin.storage.get(&self.id) { + Some(workload_storage) => workload_storage, + None => return Ok(Err(format!("container '{container_name}' does not exist"))), + }; - match workload_storage.get(container_name) { + match workload_storage.value().get(container_name) { Some(container_data) => { let objects: Vec = container_data.objects.keys().cloned().collect(); let handle = StreamObjectNamesHandle { @@ -454,8 +458,7 @@ impl bindings::wasi::blobstore::container::HostContainer for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); match workload_storage.get_mut(container_name) { Some(container_data) => { @@ -477,8 +480,7 @@ impl bindings::wasi::blobstore::container::HostContainer for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); match workload_storage.get_mut(container_name) { Some(container_data) => { @@ -502,11 +504,12 @@ impl bindings::wasi::blobstore::container::HostContainer for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); + let workload_storage = match plugin.storage.get(&self.id){ + Some(workload_storage) => workload_storage, + None => return Ok(Err(format!("container '{container_name}' does not exist"))), + }; - match workload_storage.get(container_name) { + match workload_storage.value().get(container_name) { Some(container_data) => Ok(Ok(container_data.objects.contains_key(&name))), None => Ok(Err(format!("container '{container_name}' does not exist"))), } @@ -523,11 +526,12 @@ impl bindings::wasi::blobstore::container::HostContainer for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); + let workload_storage = match plugin.storage.get(&self.id) { + Some(workload_storage) => workload_storage, + None => return Ok(Err(format!("container '{container_name}' does not exist"))), + }; - match workload_storage.get(container_name) { + match workload_storage.value().get(container_name) { Some(container_data) => match container_data.objects.get(&name) { Some(object_data) => Ok(Ok(ObjectMetadata { name: object_data.name.clone(), @@ -551,10 +555,9 @@ impl bindings::wasi::blobstore::container::HostContainer for Ctx { return Ok(Err("blobstore plugin not available".to_string())); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); - match workload_storage.get_mut(container_name) { + match workload_storage.value_mut().get_mut(container_name) { Some(container_data) => { container_data.objects.clear(); Ok(Ok(())) @@ -766,10 +769,9 @@ impl bindings::wasi::blobstore::types::HostOutgoingValue for Ctx { "Retrieved data from pipe in finish()" ); - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); - match workload_storage.get_mut(container_name) { + match workload_storage.value_mut().get_mut(container_name) { Some(container_data) => { let object_data = ObjectData { name: object_name.clone(), @@ -938,8 +940,7 @@ impl HostPlugin for WasiBlobstore { ); // Initialize storage for this component (note: actual storage is per-store-context, this is just a placeholder) - let mut storage = self.storage.write().await; - storage.insert(id.to_string(), HashMap::new()); + self.storage.insert(id.to_string(), HashMap::new()); tracing::debug!("WasiBlobstore plugin bound to workload '{id}'"); @@ -952,8 +953,7 @@ impl HostPlugin for WasiBlobstore { _interfaces: std::collections::HashSet, ) -> anyhow::Result<()> { // Clean up storage for this workload - let mut storage = self.storage.write().await; - storage.remove(workload_id); + self.storage.remove(workload_id); tracing::debug!("WasiBlobstore plugin unbound from workload '{workload_id}'"); @@ -968,7 +968,7 @@ mod tests { #[test] fn test_wasi_blobstore_creation() { let blobstore = WasiBlobstore::new(None); - assert!(blobstore.storage.try_read().is_ok()); + assert!(blobstore.storage.is_empty()); } #[test] @@ -1011,14 +1011,12 @@ mod tests { // Test write access { - let mut storage = blobstore.storage.write().await; - storage.insert("workload1".to_string(), HashMap::new()); + blobstore.storage.insert("workload1".to_string(), HashMap::new()); } // Test read access { - let storage = blobstore.storage.read().await; - assert!(storage.contains_key("workload1")); + assert!(blobstore.storage.contains_key("workload1")); } } } diff --git a/crates/wash-runtime/src/plugin/wasi_keyvalue.rs b/crates/wash-runtime/src/plugin/wasi_keyvalue.rs index b6e008bb..fc63fc3b 100644 --- a/crates/wash-runtime/src/plugin/wasi_keyvalue.rs +++ b/crates/wash-runtime/src/plugin/wasi_keyvalue.rs @@ -9,7 +9,7 @@ use std::{ }; const WASI_KEYVALUE_ID: &str = "wasi-keyvalue"; -use tokio::sync::RwLock; +use dashmap::DashMap; use wasmtime::component::{HasSelf, Resource}; use crate::{ @@ -45,13 +45,13 @@ pub type BucketHandle = String; #[derive(Clone, Default)] pub struct WasiKeyvalue { /// Storage for all buckets, keyed by workload ID, then bucket name - storage: Arc>>>, + storage: Arc>>, } impl WasiKeyvalue { pub fn new() -> Self { Self { - storage: Arc::new(RwLock::new(HashMap::new())), + storage: Arc::new(DashMap::new()), } } @@ -75,17 +75,16 @@ impl bindings::wasi::keyvalue::store::Host for Ctx { ))); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); // Create bucket if it doesn't exist - if !workload_storage.contains_key(&identifier) { + if !workload_storage.value().contains_key(&identifier) { let bucket_data = BucketData { name: identifier.clone(), data: HashMap::new(), created_at: WasiKeyvalue::get_timestamp(), }; - workload_storage.insert(identifier.clone(), bucket_data); + workload_storage.value_mut().insert(identifier.clone(), bucket_data); } let resource = self.table.push(identifier)?; @@ -108,11 +107,14 @@ impl bindings::wasi::keyvalue::store::HostBucket for Ctx { ))); }; - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); + let workload_storage = match plugin.storage.get(&self.id) { + Some(workload_storage) => workload_storage, + None => return Ok(Err(StoreError::Other(format!( + "bucket '{bucket_name}' does not exist" + )))), + }; - match workload_storage.get(bucket_name) { + match workload_storage.value().get(bucket_name) { Some(bucket_data) => { let value = bucket_data.data.get(&key).cloned(); Ok(Ok(value)) @@ -137,10 +139,9 @@ impl bindings::wasi::keyvalue::store::HostBucket for Ctx { ))); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); - match workload_storage.get_mut(bucket_name) { + match workload_storage.value_mut().get_mut(bucket_name) { Some(bucket_data) => { bucket_data.data.insert(key, value); Ok(Ok(())) @@ -164,10 +165,9 @@ impl bindings::wasi::keyvalue::store::HostBucket for Ctx { ))); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); - match workload_storage.get_mut(bucket_name) { + match workload_storage.value_mut().get_mut(bucket_name) { Some(bucket_data) => { bucket_data.data.remove(&key); Ok(Ok(())) @@ -191,11 +191,14 @@ impl bindings::wasi::keyvalue::store::HostBucket for Ctx { ))); }; - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); + let workload_storage = match plugin.storage.get(&self.id) { + Some(workload_storage) => workload_storage, + None => return Ok(Err(StoreError::Other(format!( + "bucket '{bucket_name}' does not exist" + )))), + }; - match workload_storage.get(bucket_name) { + match workload_storage.value().get(bucket_name) { Some(bucket_data) => Ok(Ok(bucket_data.data.contains_key(&key))), None => Ok(Err(StoreError::Other(format!( "bucket '{bucket_name}' does not exist" @@ -216,11 +219,14 @@ impl bindings::wasi::keyvalue::store::HostBucket for Ctx { ))); }; - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); + let workload_storage = match plugin.storage.get(&self.id) { + Some(workload_storage) => workload_storage, + None => return Ok(Err(StoreError::Other(format!( + "bucket '{bucket_name}' does not exist" + )))), + }; - match workload_storage.get(bucket_name) { + match workload_storage.value().get(bucket_name) { Some(bucket_data) => { let mut keys: Vec = bucket_data.data.keys().cloned().collect(); keys.sort(); // Ensure consistent ordering @@ -278,10 +284,9 @@ impl bindings::wasi::keyvalue::atomics::Host for Ctx { ))); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); - match workload_storage.get_mut(bucket_name) { + match workload_storage.value_mut().get_mut(bucket_name) { Some(bucket_data) => { // Get current value, treating missing key as 0 let current_bytes = bucket_data.data.get(&key); @@ -328,11 +333,14 @@ impl bindings::wasi::keyvalue::batch::Host for Ctx { ))); }; - let storage = plugin.storage.read().await; - let empty_map = HashMap::new(); - let workload_storage = storage.get(&self.id).unwrap_or(&empty_map); + let workload_storage = match plugin.storage.get(&self.id) { + Some(workload_storage) => workload_storage, + None => return Ok(Err(StoreError::Other(format!( + "bucket '{bucket_name}' does not exist" + )))), + }; - match workload_storage.get(bucket_name) { + match workload_storage.value().get(bucket_name) { Some(bucket_data) => { let results: Vec)>> = keys .into_iter() @@ -365,10 +373,9 @@ impl bindings::wasi::keyvalue::batch::Host for Ctx { ))); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); - match workload_storage.get_mut(bucket_name) { + match workload_storage.value_mut().get_mut(bucket_name) { Some(bucket_data) => { for (key, value) in key_values { bucket_data.data.insert(key, value); @@ -394,10 +401,9 @@ impl bindings::wasi::keyvalue::batch::Host for Ctx { ))); }; - let mut storage = plugin.storage.write().await; - let workload_storage = storage.entry(self.id.clone()).or_default(); + let mut workload_storage = plugin.storage.entry(self.id.clone()).or_default(); - match workload_storage.get_mut(bucket_name) { + match workload_storage.value_mut().get_mut(bucket_name) { Some(bucket_data) => { for key in keys { bucket_data.data.remove(&key); @@ -461,8 +467,7 @@ impl HostPlugin for WasiKeyvalue { ); // Initialize storage for this workload - let mut storage = self.storage.write().await; - storage.insert(id.to_string(), HashMap::new()); + self.storage.insert(id.to_string(), HashMap::new()); tracing::debug!("WasiKeyvalue plugin bound to workload '{id}'"); @@ -475,8 +480,7 @@ impl HostPlugin for WasiKeyvalue { _interfaces: std::collections::HashSet, ) -> anyhow::Result<()> { // Clean up storage for this workload - let mut storage = self.storage.write().await; - storage.remove(workload_id); + self.storage.remove(workload_id); tracing::debug!("WasiKeyvalue plugin unbound from workload '{workload_id}'"); @@ -491,7 +495,7 @@ mod tests { #[test] fn test_wasi_keyvalue_creation() { let keyvalue = WasiKeyvalue::new(); - assert!(keyvalue.storage.try_read().is_ok()); + assert!(keyvalue.storage.is_empty()); } #[test] @@ -519,14 +523,12 @@ mod tests { // Test write access { - let mut storage = keyvalue.storage.write().await; - storage.insert("workload1".to_string(), HashMap::new()); + keyvalue.storage.insert("workload1".to_string(), HashMap::new()); } // Test read access { - let storage = keyvalue.storage.read().await; - assert!(storage.contains_key("workload1")); + assert!(keyvalue.storage.contains_key("workload1")); } } diff --git a/crates/wash-runtime/src/washlet/plugins/wasi_config.rs b/crates/wash-runtime/src/washlet/plugins/wasi_config.rs index 1570a5e0..de67d1c3 100644 --- a/crates/wash-runtime/src/washlet/plugins/wasi_config.rs +++ b/crates/wash-runtime/src/washlet/plugins/wasi_config.rs @@ -5,7 +5,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use tokio::sync::RwLock; +use dashmap::DashMap; use wasmtime::component::HasSelf; const PLUGIN_WASI_CONFIG_ID: &str = "wasi-config"; @@ -32,7 +32,7 @@ use bindings::wasi::config::store::{Error as ConfigError, Host}; #[derive(Clone, Default)] pub struct WasiConfig { /// A map of configuration from workload id to key-value pairs - config: Arc>>>, + config: Arc>>, } impl Host for Ctx { @@ -40,8 +40,7 @@ impl Host for Ctx { let Some(plugin) = self.get_plugin::(PLUGIN_WASI_CONFIG_ID) else { return Ok(Ok(None)); }; - let config_guard = plugin.config.read().await; - config_guard + plugin.config .get(&self.component_id.to_string()) .and_then(|map| map.get(&key).cloned()) .map_or(Ok(Ok(None)), |v| Ok(Ok(Some(v)))) @@ -51,8 +50,7 @@ impl Host for Ctx { let Some(plugin) = self.get_plugin::(PLUGIN_WASI_CONFIG_ID) else { return Ok(Ok(vec![])); }; - let config_guard = plugin.config.read().await; - let entries = config_guard + let entries = plugin.config .get(&self.component_id.to_string()) .map(|map| map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()) .unwrap_or_default(); @@ -97,7 +95,7 @@ impl HostPlugin for WasiConfig { // Store the configuration for lookups later // This mirrors wasi:cli/env on wasi:config/store - self.config.write().await.insert( + self.config.insert( component_handle.id().to_string(), component_handle.local_resources().environment.clone(), ); diff --git a/crates/wash-runtime/src/washlet/plugins/wasi_logging.rs b/crates/wash-runtime/src/washlet/plugins/wasi_logging.rs index 97506f6b..df4d4cc2 100644 --- a/crates/wash-runtime/src/washlet/plugins/wasi_logging.rs +++ b/crates/wash-runtime/src/washlet/plugins/wasi_logging.rs @@ -5,7 +5,7 @@ //! components to log messages at various levels (trace, debug, info, warn, //! error, critical). -use std::collections::{HashMap, HashSet}; +use std::collections::{HashSet}; use std::sync::Arc; use crate::engine::ctx::Ctx; @@ -24,10 +24,10 @@ mod bindings { } use bindings::wasi::logging::logging::Level; -use tokio::sync::RwLock; +use dashmap::DashMap; use wasmtime::component::HasSelf; -type ComponentMap = Arc>>; +type ComponentMap = Arc>; #[derive(Default)] pub struct TracingLogging { @@ -46,15 +46,15 @@ impl bindings::wasi::logging::logging::Host for Ctx { bail!("TracingLogging plugin not found in context"); }; - let workloads = plugin.components.read().await; - let Some(ComponentInfo { - workload_name, - workload_namespace, - component_id, - }) = workloads.get(&self.component_id.to_string()) + let Some(item) = plugin.components.get(&self.component_id.to_string()) else { bail!("Component not found in TracingLogging plugin"); }; + let ComponentInfo { + workload_name, + workload_namespace, + component_id, + } = item.value(); match level { Level::Trace => { tracing::trace!( @@ -153,7 +153,7 @@ impl HostPlugin for TracingLogging { |ctx| ctx, )?; - self.components.write().await.insert( + self.components.insert( component.id().to_string(), ComponentInfo { workload_name: component.workload_name().to_string(), diff --git a/crates/wash-runtime/tests/fixtures/cron-service.wasm b/crates/wash-runtime/tests/fixtures/cron-service.wasm index 240c6928..96f235eb 100644 Binary files a/crates/wash-runtime/tests/fixtures/cron-service.wasm and b/crates/wash-runtime/tests/fixtures/cron-service.wasm differ diff --git a/crates/wash-runtime/tests/fixtures/cron_component.wasm b/crates/wash-runtime/tests/fixtures/cron_component.wasm index b02a60c4..36646d68 100644 Binary files a/crates/wash-runtime/tests/fixtures/cron_component.wasm and b/crates/wash-runtime/tests/fixtures/cron_component.wasm differ diff --git a/crates/wash-runtime/tests/fixtures/http_blobstore.wasm b/crates/wash-runtime/tests/fixtures/http_blobstore.wasm index f5411f48..085bece4 100644 Binary files a/crates/wash-runtime/tests/fixtures/http_blobstore.wasm and b/crates/wash-runtime/tests/fixtures/http_blobstore.wasm differ diff --git a/crates/wash-runtime/tests/fixtures/http_counter.wasm b/crates/wash-runtime/tests/fixtures/http_counter.wasm index 01e950ea..658797ac 100644 Binary files a/crates/wash-runtime/tests/fixtures/http_counter.wasm and b/crates/wash-runtime/tests/fixtures/http_counter.wasm differ diff --git a/crates/wash/src/cli/mod.rs b/crates/wash/src/cli/mod.rs index 272037ef..7b67ab05 100644 --- a/crates/wash/src/cli/mod.rs +++ b/crates/wash/src/cli/mod.rs @@ -7,6 +7,7 @@ use std::{ sync::Arc, }; +use dashmap::DashMap; use anyhow::{Context as _, bail, ensure}; use bytes::Bytes; use etcetera::{ @@ -14,7 +15,6 @@ use etcetera::{ app_strategy::{Windows, Xdg}, choose_app_strategy, }; -use tokio::sync::RwLock; use serde_json::json; use tracing::{debug, error, info, instrument, trace}; @@ -613,7 +613,7 @@ impl CliContext { pub async fn call_hooks( &self, hook_type: HookType, - runtime_context: Arc>>, + runtime_context: Arc>, ) { let hooks = self.plugin_manager.get_hooks(hook_type).await; for hook in hooks { diff --git a/crates/wash/src/plugin/mod.rs b/crates/wash/src/plugin/mod.rs index 918e993d..bc53b59d 100644 --- a/crates/wash/src/plugin/mod.rs +++ b/crates/wash/src/plugin/mod.rs @@ -20,6 +20,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; +use dashmap::DashMap; use tokio::sync::RwLock; use tracing::{debug, error, info, instrument}; use wash_runtime::{ @@ -51,7 +52,7 @@ pub struct PluginManager { /// All registered plugins plugins: Arc>>>, /// A map of configuration from workload id to key-value pairs - runtime_config: Arc>>>, + runtime_config: Arc>>, /// Whether to skip confirmation prompts for host exec operations skip_confirmation: bool, } @@ -371,7 +372,7 @@ impl PluginComponent { pub async fn call_hook( &self, hook: HookType, - runner_context: Arc>>, + runner_context: Arc>, ) -> anyhow::Result { let mut store = self.workload.new_store(&self.id).await?; let component = self.workload.instantiate_pre(&self.id).await?; @@ -409,7 +410,7 @@ impl PluginComponent { pub async fn call_run( &self, command: &bindings::wasmcloud::wash::types::Command, - runner_context: Arc>>, + runner_context: Arc>, ) -> anyhow::Result { let mut store = self.workload.new_store(&self.id).await?; let component = self.workload.instantiate_pre(&self.id).await?; diff --git a/crates/wash/src/plugin/runner.rs b/crates/wash/src/plugin/runner.rs index 3a13da22..f5277f0e 100644 --- a/crates/wash/src/plugin/runner.rs +++ b/crates/wash/src/plugin/runner.rs @@ -3,8 +3,9 @@ use anyhow::{Context as _, Result}; use dialoguer::{Confirm, theme::ColorfulTheme}; +use dashmap::DashMap; use std::{collections::HashMap, env, sync::Arc}; -use tokio::{process::Command, sync::RwLock}; +use tokio::{process::Command}; use tracing::{debug, warn}; use wash_runtime::engine::ctx::Ctx; use wasmtime::component::Resource; @@ -17,14 +18,14 @@ pub struct Runner { version: String, /// The metadata of the plugin pub metadata: Metadata, - pub context: Arc>>, + pub context: Arc>, pub(crate) skip_confirmation: bool, } impl Runner { pub fn new( metadata: Metadata, - context: Arc>>, + context: Arc>, skip_confirmation: bool, ) -> Self { Self { @@ -48,7 +49,7 @@ impl Default for ProjectConfig { } } -pub type Context = Arc>>; +pub type Context = Arc>; pub type PluginConfig = HashMap; impl crate::plugin::bindings::wasmcloud::wash::types::Host for Ctx {} @@ -62,7 +63,7 @@ impl crate::plugin::bindings::wasmcloud::wash::types::HostContext for Ctx { return None; } }; - context.read().await.get(&key).cloned() + context.get(&key).map(|item| item.value().clone()) } async fn set(&mut self, ctx: Resource, key: String, value: String) -> Option { @@ -74,7 +75,7 @@ impl crate::plugin::bindings::wasmcloud::wash::types::HostContext for Ctx { } }; - context.write().await.insert(key, value) + context.insert(key, value) } async fn delete(&mut self, ctx: Resource, key: String) -> Option { @@ -86,7 +87,7 @@ impl crate::plugin::bindings::wasmcloud::wash::types::HostContext for Ctx { } }; - context.write().await.remove(&key) + context.remove(&key).map(|(_, value)| value) } async fn list(&mut self, ctx: Resource) -> Vec { @@ -98,7 +99,7 @@ impl crate::plugin::bindings::wasmcloud::wash::types::HostContext for Ctx { } }; - context.read().await.keys().cloned().collect() + context.iter().map(|item| item.key().clone()).collect() } async fn drop(&mut self, ctx: Resource) -> wasmtime::Result<()> { @@ -141,9 +142,8 @@ impl crate::plugin::bindings::wasmcloud::wash::types::HostRunner for Ctx { }; let runtime_config = { - let all_config = plugin.runtime_config.read().await; - match all_config.get(&self.id) { - Some(config) => config.clone(), + match plugin.runtime_config.get(&self.id) { + Some(config) => config.value().clone(), None => { tracing::warn!(component_id = %self.id, "no plugin config found, returning default"); PluginConfig::default()