Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions persistence/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ impl DualCache {
}
}

/// Look up an item by uuid. Checks the active map first, then the idle map.
///
/// Checking the idle map covers the window between a `swap_and_drain` flip
/// and the subsequent `idle.clear()`, where an item may not yet be in the
/// new active map.
pub fn get(&self, uuid: &str) -> Option<Item> {
if let Some(item) = self.active_map().get(uuid) {
return Some(item.clone());
}
// Idle map is whichever map is NOT currently active.
let idle = if self.active.load(Ordering::Acquire) {
&self.map_a
} else {
&self.map_b
};
idle.get(uuid).map(|r| r.value().clone())
}

/// Number of items currently waiting in the active map.
pub fn pending_count(&self) -> usize {
self.active_map().len()
Expand Down
68 changes: 67 additions & 1 deletion persistence/src/websocket/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
cache::DualCache,
config::Config,
db::queries::{get_all_items, get_item_by_uuid, Queries},
websocket::messages::{BridgeEventEnvelope, GenericPropsRequest, Item},
websocket::messages::{BridgeEventEnvelope, GenericPropsRequest, Item, UpdateObjectRequest},
};

/// Handle a single WebSocket connection for its entire lifetime.
Expand Down Expand Up @@ -77,6 +77,72 @@ pub async fn handle_socket(
}
}
}
"update_object" | "update_object_from_external" => {
match serde_json::from_value::<UpdateObjectRequest>(envelope.payload) {
Ok(req) => {
let mut incoming_map = req.object_data
.as_object()
.cloned()
.unwrap_or_default();
// Extract special fields from the incoming partial data.
let new_parent_id = incoming_map
.remove("parent_id")
.and_then(|v| v.as_str().map(str::to_owned));
let new_scenename = incoming_map
.remove("scenename")
.and_then(|v| v.as_str().map(str::to_owned));
let new_position = incoming_map
.remove("position")
.and_then(|v| serde_json::from_value(v).ok());
let new_rotation = incoming_map
.remove("rotation")
.and_then(|v| serde_json::from_value(v).ok());

// Fetch current state: cache first (covers items not yet flushed
// to DB), then DB (covers items not in cache).
let existing = match cache.get(&req.object_uuid) {
Some(item) => Some(item),
None => match get_item_by_uuid(&session, &queries, &req.object_uuid).await {
Ok(opt) => opt,
Err(e) => {
error!("{}: DB lookup error for uuid={}: {e}", envelope.name, req.object_uuid);
continue;
}
},
};

let Some(base) = existing else {
warn!("{}: uuid={} not found in cache or DB, skipping", envelope.name, req.object_uuid);
continue;
};

// Merge: start from existing object_data, then overwrite with
// incoming fields so that untouched fields are preserved.
let mut merged_data = base
.object_data
.first()
.and_then(|v| v.as_object())
.cloned()
.unwrap_or_default();
merged_data.extend(incoming_map);

let merged = Item {
uuid: base.uuid,
object_type: req.object_type,
parent_id: new_parent_id.or(base.parent_id),
scenename: new_scenename.or(base.scenename),
position: new_position.or(base.position),
rotation: new_rotation.or(base.rotation),
object_data: vec![Value::Object(merged_data)],
};
debug!("{}: merged update for uuid={} type={}", envelope.name, merged.uuid, merged.object_type);
cache.insert(merged);
}
Err(e) => {
error!("{}: invalid payload: {e}", envelope.name);
}
}
}
"create_object" | "create_object_from_gameserver" => {
match serde_json::from_value::<GenericPropsRequest>(envelope.payload) {
Ok(req) => {
Expand Down
10 changes: 10 additions & 0 deletions persistence/src/websocket/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ pub struct GenericPropsRequest {
pub object_data: serde_json::Value,
}

/// Incoming request payload for update_object / update_object_from_external events.
/// `object_data` is **partial** — only the fields that changed are present.
/// `object_uuid` and `object_type` are always present at the same level.
#[derive(Debug, Deserialize)]
pub struct UpdateObjectRequest {
pub object_type: String,
pub object_uuid: String,
pub object_data: serde_json::Value,
}

// ─── REST request messages ─────────────────────────────────────────────────

/// Request body for PUT /items/{uuid}.
Expand Down
Loading