Skip to content
This repository was archived by the owner on Mar 24, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ clap = { workspace = true, features = ["derive", "env", "help", "color", "sugges
clap_complete = { workspace = true }
clap-markdown = { workspace = true }
ctrlc = { workspace = true }
dashmap = { workspace = true }
dialoguer = { workspace = true, features = ["editor", "password", "zeroize", "fuzzy-select", "fuzzy-matcher"] }
etcetera = { workspace = true }
figment = { workspace = true, features = ["json", "env", "yaml", "toml"] }
Expand Down Expand Up @@ -84,6 +85,7 @@ bytes = { version = "1", default-features = false }
chrono = { version = "0.4.42", default-features = false, features = ["clock", "alloc"] }
clap-markdown = { version = "0.1.5", default-features = false }
ctrlc = { version = "3.4.7", default-features = false }
dashmap = "6.1.0"
dialoguer = { version = "0.11.0", default-features = false, features = ["editor", "password", "zeroize", "fuzzy-select", "fuzzy-matcher"] }
docker_credential = { version = "1.3.1", default-features = false }
etcetera = { version = "0.10.0", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions crates/wash-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async-nats = { workspace = true, features = ["aws-lc-rs"] }
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
hostname = { workspace = true }
hyper = { workspace = true, features = ["server", "http1"] }
Expand Down
41 changes: 21 additions & 20 deletions crates/wash-runtime/src/engine/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
};

use anyhow::{Context as _, bail, ensure};
use dashmap::DashMap;
use tokio::{sync::RwLock, task::JoinHandle, time::timeout};
use tracing::{debug, info, trace, warn};
use wasmtime::component::{
Expand Down Expand Up @@ -388,7 +389,7 @@ pub struct ResolvedWorkload {
namespace: Arc<str>,
/// All components in the workload. This is behind a `RwLock` to support mutable
/// access to the component linkers.
components: Arc<RwLock<HashMap<Arc<str>, WorkloadComponent>>>,
components: Arc<DashMap<Arc<str>, WorkloadComponent>>,
/// The HTTP handler for outgoing HTTP requests
http_handler: Arc<dyn crate::host::http::HostHandler>,
/// An optional service component that runs once to completion or for the duration of the workload
Expand Down Expand Up @@ -453,7 +454,7 @@ impl ResolvedWorkload {
}
}

pub fn components(&self) -> Arc<RwLock<HashMap<Arc<str>, WorkloadComponent>>> {
pub fn components(&self) -> Arc<DashMap<Arc<str>, WorkloadComponent>> {
self.components.clone()
}

Expand All @@ -466,7 +467,8 @@ impl ResolvedWorkload {
let mut interface_map: HashMap<String, Arc<str>> = HashMap::new();

// Determine available component exports to link to the rest of the workload
for c in self.components.read().await.values() {
for item in self.components.iter() {
let c = item.value();
let exported_instances = c.component_exports()?;
for (name, item) in exported_instances {
// TODO(#11): It's probably a good idea to skip registering wasi@0.2 interfaces
Expand Down Expand Up @@ -518,8 +520,10 @@ impl ResolvedWorkload {
let mut dependencies: HashMap<Arc<str>, HashSet<Arc<str>>> = HashMap::new();

{
let components = self.components.read().await;
for (component_id, component) in components.iter() {
let components = self.components.clone();
for item in components.iter() {
let component_id = item.key();
let component = item.value();
let mut deps = HashSet::new();
let ty = component.metadata.component.component_type();
for (import_name, import_item) in ty.imports(component.metadata.component.engine())
Expand Down Expand Up @@ -551,10 +555,8 @@ impl ResolvedWorkload {
for component_id in sorted_component_ids {
// In order to have mutable access to both the workload component and components that need
// to be instantiated as "plugins" during linking, we remove and re-add the component to the list.
let mut workload_component = {
let (_, mut workload_component) = {
self.components
.write()
.await
.remove(&component_id)
.context("component not found during import resolution")?
};
Expand All @@ -565,8 +567,6 @@ impl ResolvedWorkload {
.resolve_component_imports(&component, linker, interface_map)
.await;
self.components
.write()
.await
.insert(workload_component.metadata.id.clone(), workload_component);
// Propagate any errors encountered during import resolution
res?;
Expand Down Expand Up @@ -605,8 +605,8 @@ impl ResolvedWorkload {
match import_item {
ComponentItem::ComponentInstance(import_instance_ty) => {
trace!(name = import_name, "processing component instance import");
let mut all_components = self.components.write().await;
let (plugin_component, instance_idx) = {
let all_components = self.components.clone();
let (mut plugin_component, instance_idx) = {
let Some(exporter_component) = interface_map.get(import_name) else {
// TODO: error because unsatisfied import, if there's no available
// export then it's an unresolvable workload
Expand Down Expand Up @@ -878,12 +878,12 @@ impl ResolvedWorkload {
/// Returns the number of components in this workload.
/// Does not include the service component if one is defined.
pub async fn component_count(&self) -> usize {
self.components.read().await.len()
self.components.len()
}

/// Helper to create a new wasmtime Store for a given component in the workload.
pub async fn new_store(&self, component_id: &str) -> anyhow::Result<wasmtime::Store<Ctx>> {
let components = self.components.read().await;
let components = self.components.clone();
let component = components
.get(component_id)
.context("component ID not found in workload")?;
Expand All @@ -895,7 +895,7 @@ impl ResolvedWorkload {
&self,
metadata: &WorkloadMetadata,
) -> anyhow::Result<wasmtime::Store<Ctx>> {
let components = self.components.read().await;
let components = self.components.clone();

// TODO: Consider stderr/stdout buffering + logging
let mut wasi_ctx_builder = WasiCtxBuilder::new();
Expand All @@ -915,7 +915,7 @@ impl ResolvedWorkload {
// Mount all possible volume mounts in the workload since components share a WasiCtx
for (host_path, mount) in &components
.iter()
.flat_map(|(_id, workload_component)| workload_component.metadata.volume_mounts.clone())
.flat_map(|workload_component_item| workload_component_item.value().metadata.volume_mounts.clone())
.collect::<Vec<_>>()
{
let dir = tokio::fs::canonicalize(host_path).await?;
Expand Down Expand Up @@ -944,8 +944,8 @@ impl ResolvedWorkload {
&self,
component_id: &str,
) -> anyhow::Result<wasmtime::component::InstancePre<Ctx>> {
let mut components = self.components.write().await;
let component = components
let components = self.components.clone();
let mut component = components
.get_mut(component_id)
.context("component ID not found in workload")?;
let wasmtime_component = component.metadata.component.clone();
Expand All @@ -967,7 +967,8 @@ impl ResolvedWorkload {
"unbinding all plugins from workload"
);

for component in self.components.read().await.values() {
for item in self.components.iter() {
let component = item.value();
if let Some(plugins) = component.plugins() {
for (plugin_id, plugin) in plugins.iter() {
trace!(
Expand Down Expand Up @@ -1362,7 +1363,7 @@ impl UnresolvedWorkload {
id: self.id.clone(),
name: self.name.clone(),
namespace: self.namespace.clone(),
components: Arc::new(RwLock::new(self.components)),
components: Arc::new(DashMap::from_iter(self.components)),
service: self.service,
host_interfaces: self.host_interfaces,
http_handler: http_handler.clone(),
Expand Down
11 changes: 6 additions & 5 deletions crates/wash-runtime/src/host/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
sync::Arc,
};

use dashmap::DashMap;
use crate::engine::ctx::Ctx;
use crate::engine::workload::ResolvedWorkload;
use crate::wit::WitInterface;
Expand Down Expand Up @@ -290,7 +291,7 @@ impl HostHandler for NullServer {

/// A map from host header to resolved workload handles and their associated component id
pub type WorkloadHandles =
Arc<RwLock<HashMap<String, (ResolvedWorkload, InstancePre<Ctx>, String)>>>;
Arc<DashMap<String, (ResolvedWorkload, InstancePre<Ctx>, String)>>;

/// HTTP server plugin that handles incoming HTTP requests for WebAssembly components.
///
Expand Down Expand Up @@ -425,7 +426,7 @@ impl<T: Router> HostHandler for HttpServer<T> {
.await?;
let instance_pre = resolved_handle.instantiate_pre(component_id).await?;

self.workload_handles.write().await.insert(
self.workload_handles.insert(
resolved_handle.id().to_string(),
(
resolved_handle.clone(),
Expand All @@ -440,7 +441,7 @@ impl<T: Router> HostHandler for HttpServer<T> {
async fn on_workload_unbind(&self, workload_id: &str) -> anyhow::Result<()> {
self.router.on_workload_unbind(workload_id).await?;

self.workload_handles.write().await.remove(workload_id);
self.workload_handles.remove(workload_id);

Ok(())
}
Expand Down Expand Up @@ -563,9 +564,9 @@ async fn handle_http_request<T: Router>(

// Look up workload handle for this host, with wildcard fallback
let workload_handle = {
let handles = workload_handles.read().await;
let handles = workload_handles;
debug!(host = %workload_id, "looking up workload handle for host header");
handles.get(&workload_id).cloned()
handles.get(&workload_id).map(|item| item.value().clone())
};

let response = match workload_handle {
Expand Down
22 changes: 9 additions & 13 deletions crates/wash-runtime/src/host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use dashmap::DashMap;
use anyhow::{Context, bail};
use names::{Generator, Name};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -178,7 +179,7 @@ impl From<&HostWorkload> for WorkloadState {
pub struct Host {
engine: Engine,
/// Workloads mapped from ID to the workload and its current state
workloads: Arc<RwLock<HashMap<String, HostWorkload>>>,
workloads: Arc<DashMap<String, HostWorkload>>,
/// Plugins in a map from their ID to the plugin itself
plugins: HashMap<&'static str, Arc<dyn HostPlugin>>,
/// Host metadata
Expand Down Expand Up @@ -422,10 +423,11 @@ impl HostApi for Host {

// Count components and providers from workloads
let (workload_count, component_count) = {
let workloads = self.workloads.read().await;
let workloads = self.workloads.clone();
let workload_count: u64 = workloads.len() as u64;
let mut component_count: u64 = 0;
for workload in workloads.values() {
for item in workloads.iter() {
let workload = item.value();
if let HostWorkload::Running(workload) = workload {
component_count += workload.component_count().await as u64;
}
Expand Down Expand Up @@ -470,8 +472,6 @@ impl HostApi for Host {
) -> anyhow::Result<WorkloadStartResponse> {
// Store the workload with initial state
self.workloads
.write()
.await
.insert(request.workload_id.clone(), HostWorkload::Starting);

let service_present = request.workload.service.is_some();
Expand All @@ -495,8 +495,6 @@ impl HostApi for Host {

// Update the workload state to `Running`
self.workloads
.write()
.await
.entry(request.workload_id.clone())
.and_modify(|workload| {
*workload = HostWorkload::Running(Box::new(resolved_workload));
Expand All @@ -515,8 +513,8 @@ impl HostApi for Host {
&self,
request: WorkloadStatusRequest,
) -> anyhow::Result<WorkloadStatusResponse> {
if let Some(workload) = self.workloads.read().await.get(&request.workload_id) {
let workload_state = workload.into();
if let Some(workload) = self.workloads.get(&request.workload_id) {
let workload_state = workload.value().into();
Ok(WorkloadStatusResponse {
workload_status: WorkloadStatus {
workload_id: request.workload_id,
Expand All @@ -535,14 +533,12 @@ impl HostApi for Host {
) -> anyhow::Result<WorkloadStopResponse> {
let has_workload = self
.workloads
.read()
.await
.contains_key(&request.workload_id);

let (workload_state, message) = if has_workload {
// Update state to stopping
let resolved_workload = {
let mut workloads = self.workloads.write().await;
let workloads = self.workloads.clone();
trace!(
workload_id = request.workload_id,
"updating workload state to stopping"
Expand Down Expand Up @@ -582,7 +578,7 @@ impl HostApi for Host {

// Remove the workload from the active workloads map
// This will drop the workload and clean up wasmtime resources
self.workloads.write().await.remove(&request.workload_id);
self.workloads.remove(&request.workload_id);

debug!(
workload_id = request.workload_id,
Expand Down
Loading