From 2a0f2b1a4080acb3388dc40f9cfe57dc8f8eb0b5 Mon Sep 17 00:00:00 2001 From: melodykke <1553649937@qq.com> Date: Wed, 11 Mar 2026 17:50:58 +0800 Subject: [PATCH 1/2] fix(esp32): use runtime node_id from NVS in outgoing packets --- firmware/esp32-csi-node/main/csi_collector.c | 6 ++++-- firmware/esp32-csi-node/main/edge_processing.c | 14 ++++---------- firmware/esp32-csi-node/main/wasm_runtime.c | 8 +++----- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/firmware/esp32-csi-node/main/csi_collector.c b/firmware/esp32-csi-node/main/csi_collector.c index 69eb2982..ddeab9dd 100644 --- a/firmware/esp32-csi-node/main/csi_collector.c +++ b/firmware/esp32-csi-node/main/csi_collector.c @@ -14,6 +14,7 @@ #include "csi_collector.h" #include "stream_sender.h" #include "edge_processing.h" +#include "nvs_config.h" #include #include "esp_log.h" @@ -22,6 +23,7 @@ #include "sdkconfig.h" static const char *TAG = "csi_collector"; +extern nvs_config_t g_nvs_config; static uint32_t s_sequence = 0; static uint32_t s_cb_count = 0; @@ -104,7 +106,7 @@ size_t csi_serialize_frame(const wifi_csi_info_t *info, uint8_t *buf, size_t buf memcpy(&buf[0], &magic, 4); /* Node ID */ - buf[4] = (uint8_t)CONFIG_CSI_NODE_ID; + buf[4] = g_nvs_config.node_id; /* Number of antennas */ buf[5] = n_antennas; @@ -221,7 +223,7 @@ void csi_collector_init(void) ESP_ERROR_CHECK(esp_wifi_set_csi(true)); ESP_LOGI(TAG, "CSI collection initialized (node_id=%d, channel=%d)", - CONFIG_CSI_NODE_ID, CONFIG_CSI_WIFI_CHANNEL); + g_nvs_config.node_id, CONFIG_CSI_WIFI_CHANNEL); } /* ---- ADR-029: Channel hopping ---- */ diff --git a/firmware/esp32-csi-node/main/edge_processing.c b/firmware/esp32-csi-node/main/edge_processing.c index a14c4bd3..d677bea5 100644 --- a/firmware/esp32-csi-node/main/edge_processing.c +++ b/firmware/esp32-csi-node/main/edge_processing.c @@ -20,6 +20,7 @@ #include "edge_processing.h" #include "wasm_runtime.h" #include "stream_sender.h" +#include "nvs_config.h" #include #include @@ -30,6 +31,7 @@ #include "sdkconfig.h" static const char *TAG = "edge_proc"; +extern nvs_config_t g_nvs_config; /* ====================================================================== * SPSC Ring Buffer (lock-free, single-producer single-consumer) @@ -421,11 +423,7 @@ static void send_compressed_frame(const uint8_t *iq_data, uint16_t iq_len, uint32_t magic = EDGE_COMPRESSED_MAGIC; memcpy(&pkt[0], &magic, 4); -#ifdef CONFIG_CSI_NODE_ID - pkt[4] = (uint8_t)CONFIG_CSI_NODE_ID; -#else - pkt[4] = 0; -#endif + pkt[4] = g_nvs_config.node_id; pkt[5] = channel; memcpy(&pkt[6], &iq_len, 2); memcpy(&pkt[8], &comp_len, 2); @@ -543,11 +541,7 @@ static void send_vitals_packet(void) memset(&pkt, 0, sizeof(pkt)); pkt.magic = EDGE_VITALS_MAGIC; -#ifdef CONFIG_CSI_NODE_ID - pkt.node_id = (uint8_t)CONFIG_CSI_NODE_ID; -#else - pkt.node_id = 0; -#endif + pkt.node_id = g_nvs_config.node_id; pkt.flags = 0; if (s_presence_detected) pkt.flags |= 0x01; diff --git a/firmware/esp32-csi-node/main/wasm_runtime.c b/firmware/esp32-csi-node/main/wasm_runtime.c index f4e667c3..2bbbadd1 100644 --- a/firmware/esp32-csi-node/main/wasm_runtime.c +++ b/firmware/esp32-csi-node/main/wasm_runtime.c @@ -17,6 +17,7 @@ #include "rvf_parser.h" #include "stream_sender.h" +#include "nvs_config.h" #include #include @@ -32,6 +33,7 @@ #include "m3_env.h" static const char *TAG = "wasm_rt"; +extern nvs_config_t g_nvs_config; /* ====================================================================== * Module Slot @@ -380,11 +382,7 @@ static void send_wasm_output(uint8_t slot_id) memset(&pkt, 0, sizeof(pkt)); pkt.magic = WASM_OUTPUT_MAGIC; -#ifdef CONFIG_CSI_NODE_ID - pkt.node_id = (uint8_t)CONFIG_CSI_NODE_ID; -#else - pkt.node_id = 0; -#endif + pkt.node_id = g_nvs_config.node_id; pkt.module_id = slot_id; pkt.event_count = n_filtered; From d71c7fbeff42a38deb8fc14246adea0bc7e72eef Mon Sep 17 00:00:00 2001 From: melodykke <1553649937@qq.com> Date: Sat, 14 Mar 2026 23:16:40 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E7=A9=BA?= =?UTF-8?q?=E9=97=B4=E5=B8=83=E5=B1=80=E9=85=8D=E7=BD=AE=E4=B8=8E=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=E6=97=B6=E8=8A=82=E7=82=B9=E6=A0=87=E8=AF=86=E8=81=94?= =?UTF-8?q?=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 为 Rust sensing server 增加空间布局配置入口,支持加载节点位置与语义区域定义,并扩展空间融合解释输出。 - 在 Docker 镜像中打包 config 目录,确保部署后可直接读取 spatial-layout.json。 - 修正 ESP32 显示界面中的节点编号来源,改为使用运行时 NVS 配置而非编译期常量。 --- config/spatial-layout.json | 40 + docker/Dockerfile.rust | 1 + firmware/esp32-csi-node/main/display_ui.c | 8 +- .../wifi-densepose-sensing-server/src/main.rs | 882 ++++++++++++++++-- 4 files changed, 841 insertions(+), 90 deletions(-) create mode 100644 config/spatial-layout.json diff --git a/config/spatial-layout.json b/config/spatial-layout.json new file mode 100644 index 00000000..176d11d7 --- /dev/null +++ b/config/spatial-layout.json @@ -0,0 +1,40 @@ +{ + "nodes": [ + { + "node_id": 1, + "label": "node-1", + "position": [0.65, 0.6, 0.9] + }, + { + "node_id": 2, + "label": "node-2", + "position": [-0.7, 0.5, 1.23] + } + ], + "zones": [ + { + "zone_id": "center", + "label": "center", + "center": [0.0, 0.0, 1.0], + "radius": 0.7 + }, + { + "zone_id": "entrance", + "label": "entrance", + "center": [1.0, -0.2, 1.0], + "radius": 0.8 + }, + { + "zone_id": "left-closet", + "label": "left closet", + "center": [-1.5, -0.5, 1.0], + "radius": 0.8 + }, + { + "zone_id": "behind-closet", + "label": "behind closet", + "center": [0.0, -1.5, 1.0], + "radius": 0.55 + } + ] +} diff --git a/docker/Dockerfile.rust b/docker/Dockerfile.rust index 73cc58a1..fc6a1bd2 100644 --- a/docker/Dockerfile.rust +++ b/docker/Dockerfile.rust @@ -32,6 +32,7 @@ COPY --from=builder /build/target/release/sensing-server /app/sensing-server # Copy UI assets COPY ui/ /app/ui/ +COPY config/ /app/config/ # HTTP API EXPOSE 3000 diff --git a/firmware/esp32-csi-node/main/display_ui.c b/firmware/esp32-csi-node/main/display_ui.c index 810e17e8..feb49e12 100644 --- a/firmware/esp32-csi-node/main/display_ui.c +++ b/firmware/esp32-csi-node/main/display_ui.c @@ -7,6 +7,7 @@ */ #include "display_ui.h" +#include "nvs_config.h" #include "sdkconfig.h" #if CONFIG_DISPLAY_ENABLE @@ -20,6 +21,7 @@ #include "edge_processing.h" static const char *TAG = "disp_ui"; +extern nvs_config_t g_nvs_config; /* ---- Theme colors ---- */ #define COLOR_BG lv_color_make(0x0A, 0x0A, 0x0F) @@ -347,11 +349,7 @@ void display_ui_update(void) { char buf[48]; -#ifdef CONFIG_CSI_NODE_ID - snprintf(buf, sizeof(buf), "Node: %d", CONFIG_CSI_NODE_ID); -#else - snprintf(buf, sizeof(buf), "Node: --"); -#endif + snprintf(buf, sizeof(buf), "Node: %d", g_nvs_config.node_id); lv_label_set_text(s_sys_node, buf); snprintf(buf, sizeof(buf), "Heap: %lu KB free", diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs index 7497c95a..b8591b25 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs @@ -16,7 +16,7 @@ mod vital_signs; // Training pipeline modules (exposed via lib.rs) use wifi_densepose_sensing_server::{graph_transformer, trainer, dataset, embedding}; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; @@ -85,6 +85,10 @@ struct Args { #[arg(long, default_value = "auto")] source: String, + /// Spatial layout JSON describing node positions and semantic zones. + #[arg(long, value_name = "PATH", env = "SPATIAL_CONFIG")] + spatial_config: Option, + /// Run vital sign detection benchmark (1000 frames) and exit #[arg(long)] benchmark: bool, @@ -212,6 +216,15 @@ struct SensingUpdate { /// Estimated person count from CSI feature heuristics (1-3 for single ESP32). #[serde(skip_serializing_if = "Option::is_none")] estimated_persons: Option, + /// Spatially fused zone scores derived from node layout and per-node evidence. + #[serde(skip_serializing_if = "Option::is_none")] + spatial_zones: Option>, + /// Cross-node agreement and conflict metrics for explainable fusion. + #[serde(skip_serializing_if = "Option::is_none")] + fusion_consensus: Option, + /// Human-readable and structured explanation of the current fusion decision. + #[serde(skip_serializing_if = "Option::is_none")] + fusion_explanation: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -247,6 +260,45 @@ struct SignalField { values: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SpatialZoneScore { + zone_id: String, + center: [f64; 3], + presence_score: f64, + motion_score: f64, + vital_score: f64, + confidence: f64, + supporting_nodes: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct NodeContribution { + node_id: u8, + position: [f64; 3], + quality_score: f64, + presence_score: f64, + motion_score: f64, + vital_score: f64, + strongest_zone: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct FusionConsensus { + presence_agreement: f64, + motion_agreement: f64, + spatial_confidence: f64, + conflict_score: f64, + dominant_zone: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct FusionExplanation { + summary: String, + dominant_zone: Option, + evidence: Vec, + node_contributions: Vec, +} + /// WiFi-derived pose keypoint (17 COCO keypoints) #[derive(Debug, Clone, Serialize, Deserialize)] struct PoseKeypoint { @@ -275,6 +327,80 @@ struct BoundingBox { height: f64, } +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SpatialNodeConfig { + node_id: u8, + label: Option, + position: [f64; 3], +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SpatialZoneConfig { + zone_id: String, + center: [f64; 3], + radius: Option, + label: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SpatialLayoutConfig { + nodes: Vec, + zones: Vec, +} + +#[derive(Debug, Clone)] +struct Esp32NodeSnapshot { + node: NodeInfo, + features: FeatureInfo, + classification: ClassificationInfo, + signal_field: SignalField, + vital_signs: VitalSigns, + estimated_persons: usize, + last_seen: std::time::Instant, +} + +struct Esp32NodeState { + frame_history: VecDeque>, + smoothed_person_score: f64, + smoothed_motion: f64, + current_motion_level: String, + debounce_counter: u32, + debounce_candidate: String, + baseline_motion: f64, + baseline_frames: u64, + smoothed_hr: f64, + smoothed_br: f64, + smoothed_hr_conf: f64, + smoothed_br_conf: f64, + hr_buffer: VecDeque, + br_buffer: VecDeque, + vital_detector: VitalSignDetector, + latest_snapshot: Option, +} + +impl Esp32NodeState { + fn new(vital_sample_rate: f64) -> Self { + Self { + frame_history: VecDeque::new(), + smoothed_person_score: 0.0, + smoothed_motion: 0.0, + current_motion_level: "absent".to_string(), + debounce_counter: 0, + debounce_candidate: "absent".to_string(), + baseline_motion: 0.0, + baseline_frames: 0, + smoothed_hr: 0.0, + smoothed_br: 0.0, + smoothed_hr_conf: 0.0, + smoothed_br_conf: 0.0, + hr_buffer: VecDeque::with_capacity(8), + br_buffer: VecDeque::with_capacity(8), + vital_detector: VitalSignDetector::new(vital_sample_rate), + latest_snapshot: None, + } + } +} + /// Shared application state struct AppStateInner { latest_update: Option, @@ -360,11 +486,16 @@ struct AppStateInner { // ── Adaptive classifier (environment-tuned) ────────────────────────── /// Trained adaptive model (loaded from data/adaptive_model.json or trained at runtime). adaptive_model: Option, + /// Latest independent state for each ESP32 node. + esp32_nodes: HashMap, + /// Configured physical layout for nodes and semantic room zones. + spatial_layout: SpatialLayoutConfig, } /// Number of frames retained in `frame_history` for temporal analysis. /// At 500 ms ticks this covers ~50 seconds; at 100 ms ticks ~10 seconds. const FRAME_HISTORY_CAPACITY: usize = 100; +const ESP32_NODE_STALE_MS: u64 = 3000; type SharedState = Arc>; @@ -943,12 +1074,15 @@ fn smooth_and_classify(state: &mut AppStateInner, raw: &mut ClassificationInfo, /// If an adaptive model is loaded, override the classification with the /// model's prediction. Uses the full 15-feature vector for higher accuracy. -fn adaptive_override(state: &AppStateInner, features: &FeatureInfo, classification: &mut ClassificationInfo) { - if let Some(ref model) = state.adaptive_model { +fn adaptive_override( + model: Option<&adaptive_classifier::AdaptiveModel>, + frame_history: &VecDeque>, + features: &FeatureInfo, + classification: &mut ClassificationInfo, +) { + if let Some(model) = model { // Get current frame amplitudes from the latest history entry. - let amps = state.frame_history.back() - .map(|v| v.as_slice()) - .unwrap_or(&[]); + let amps = frame_history.back().map(|v| v.as_slice()).unwrap_or(&[]); let feat_arr = adaptive_classifier::features_from_runtime( &serde_json::json!({ "variance": features.variance, @@ -1057,6 +1191,537 @@ fn trimmed_mean(buf: &VecDeque) -> f64 { } } +fn smooth_and_classify_node(state: &mut Esp32NodeState, raw: &mut ClassificationInfo, raw_motion: f64) { + state.baseline_frames += 1; + if state.baseline_frames < BASELINE_WARMUP { + state.baseline_motion = state.baseline_motion * 0.9 + raw_motion * 0.1; + } else if raw_motion < state.smoothed_motion + 0.05 { + state.baseline_motion = state.baseline_motion * (1.0 - BASELINE_EMA_ALPHA) + + raw_motion * BASELINE_EMA_ALPHA; + } + + let adjusted = (raw_motion - state.baseline_motion * 0.7).max(0.0); + state.smoothed_motion = state.smoothed_motion * (1.0 - MOTION_EMA_ALPHA) + + adjusted * MOTION_EMA_ALPHA; + let sm = state.smoothed_motion; + let candidate = raw_classify(sm); + + if candidate == state.current_motion_level { + state.debounce_counter = 0; + state.debounce_candidate = candidate; + } else if candidate == state.debounce_candidate { + state.debounce_counter += 1; + if state.debounce_counter >= DEBOUNCE_FRAMES { + state.current_motion_level = candidate; + state.debounce_counter = 0; + } + } else { + state.debounce_candidate = candidate; + state.debounce_counter = 1; + } + + raw.motion_level = state.current_motion_level.clone(); + raw.presence = sm > 0.03; + raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0); +} + +fn smooth_vitals_node(state: &mut Esp32NodeState, raw: &VitalSigns) -> VitalSigns { + let raw_hr = raw.heart_rate_bpm.unwrap_or(0.0); + let raw_br = raw.breathing_rate_bpm.unwrap_or(0.0); + + let hr_ok = state.smoothed_hr < 1.0 || (raw_hr - state.smoothed_hr).abs() < HR_MAX_JUMP; + let br_ok = state.smoothed_br < 1.0 || (raw_br - state.smoothed_br).abs() < BR_MAX_JUMP; + + if hr_ok && raw_hr > 0.0 { + state.hr_buffer.push_back(raw_hr); + if state.hr_buffer.len() > VITAL_MEDIAN_WINDOW { state.hr_buffer.pop_front(); } + } + if br_ok && raw_br > 0.0 { + state.br_buffer.push_back(raw_br); + if state.br_buffer.len() > VITAL_MEDIAN_WINDOW { state.br_buffer.pop_front(); } + } + + let trimmed_hr = trimmed_mean(&state.hr_buffer); + let trimmed_br = trimmed_mean(&state.br_buffer); + + if trimmed_hr > 0.0 { + if state.smoothed_hr < 1.0 { + state.smoothed_hr = trimmed_hr; + } else if (trimmed_hr - state.smoothed_hr).abs() > HR_DEAD_BAND { + state.smoothed_hr = state.smoothed_hr * (1.0 - VITAL_EMA_ALPHA) + + trimmed_hr * VITAL_EMA_ALPHA; + } + } + if trimmed_br > 0.0 { + if state.smoothed_br < 1.0 { + state.smoothed_br = trimmed_br; + } else if (trimmed_br - state.smoothed_br).abs() > BR_DEAD_BAND { + state.smoothed_br = state.smoothed_br * (1.0 - VITAL_EMA_ALPHA) + + trimmed_br * VITAL_EMA_ALPHA; + } + } + + state.smoothed_hr_conf = state.smoothed_hr_conf * 0.92 + raw.heartbeat_confidence * 0.08; + state.smoothed_br_conf = state.smoothed_br_conf * 0.92 + raw.breathing_confidence * 0.08; + + VitalSigns { + breathing_rate_bpm: if state.smoothed_br > 1.0 { Some(state.smoothed_br) } else { None }, + heart_rate_bpm: if state.smoothed_hr > 1.0 { Some(state.smoothed_hr) } else { None }, + breathing_confidence: state.smoothed_br_conf, + heartbeat_confidence: state.smoothed_hr_conf, + signal_quality: raw.signal_quality, + } +} + +fn motion_level_rank(level: &str) -> u8 { + match level { + "active" => 3, + "present_moving" => 2, + "present_still" => 1, + _ => 0, + } +} + +fn motion_level_score(level: &str) -> f64 { + motion_level_rank(level) as f64 / 3.0 +} + +fn default_spatial_layout() -> SpatialLayoutConfig { + SpatialLayoutConfig { + nodes: vec![ + SpatialNodeConfig { + node_id: 1, + label: Some("left-sensor".to_string()), + position: [-0.75, 0.0, 1.2], + }, + SpatialNodeConfig { + node_id: 2, + label: Some("right-sensor".to_string()), + position: [0.75, 0.0, 1.2], + }, + ], + zones: vec![ + SpatialZoneConfig { + zone_id: "left".to_string(), + center: [-1.0, 0.0, 1.2], + radius: Some(0.85), + label: Some("left zone".to_string()), + }, + SpatialZoneConfig { + zone_id: "center".to_string(), + center: [0.0, 0.0, 1.4], + radius: Some(0.85), + label: Some("center zone".to_string()), + }, + SpatialZoneConfig { + zone_id: "right".to_string(), + center: [1.0, 0.0, 1.2], + radius: Some(0.85), + label: Some("right zone".to_string()), + }, + ], + } +} + +fn load_spatial_layout(path: Option<&PathBuf>) -> SpatialLayoutConfig { + let default = default_spatial_layout(); + let Some(path) = path else { + return default; + }; + + match std::fs::read_to_string(path) { + Ok(raw) => match serde_json::from_str::(&raw) { + Ok(mut cfg) => { + if cfg.zones.is_empty() { + cfg.zones = default.zones; + } + if cfg.nodes.is_empty() { + cfg.nodes = default.nodes; + } + cfg + } + Err(err) => { + warn!("Failed to parse spatial config {}: {}. Falling back to defaults.", path.display(), err); + default + } + }, + Err(err) => { + warn!("Failed to read spatial config {}: {}. Falling back to defaults.", path.display(), err); + default + } + } +} + +fn node_position(layout: &SpatialLayoutConfig, node_id: u8) -> [f64; 3] { + layout + .nodes + .iter() + .find(|node| node.node_id == node_id) + .map(|node| node.position) + .unwrap_or_else(|| { + if node_id == 1 { + [-0.75, 0.0, 1.2] + } else if node_id == 2 { + [0.75, 0.0, 1.2] + } else { + [2.0, 0.0, 1.5] + } + }) +} + +fn node_quality(snapshot: &Esp32NodeSnapshot) -> f64 { + let class_conf = snapshot.classification.confidence; + let signal_q = snapshot.vital_signs.signal_quality.clamp(0.0, 1.0); + let vital_q = snapshot + .vital_signs + .breathing_confidence + .max(snapshot.vital_signs.heartbeat_confidence) + .clamp(0.0, 1.0); + (class_conf * 0.5 + signal_q * 0.3 + vital_q * 0.2).clamp(0.0, 1.0) +} + +fn spatial_weight(node_pos: [f64; 3], zone_center: [f64; 3], radius: f64) -> f64 { + let dx = node_pos[0] - zone_center[0]; + let dz = node_pos[2] - zone_center[2]; + let dist2 = dx * dx + dz * dz; + let sigma2 = radius.max(0.2) * radius.max(0.2); + (-dist2 / (2.0 * sigma2)).exp().clamp(0.0, 1.0) +} + +fn build_spatial_semantics( + layout: &SpatialLayoutConfig, + snapshots: &[&Esp32NodeSnapshot], + classification: &ClassificationInfo, +) -> (Vec, FusionConsensus, FusionExplanation) { + let zones_template = &layout.zones; + let mut zone_scores = Vec::with_capacity(zones_template.len()); + let mut node_contributions = Vec::with_capacity(snapshots.len()); + let mut node_zone_scores: Vec<(u8, Vec<(String, f64)>)> = Vec::with_capacity(snapshots.len()); + + for zone in zones_template { + let zone_id = zone.zone_id.as_str(); + let center = zone.center; + let radius = zone.radius.unwrap_or(0.85); + let mut presence_sum = 0.0; + let mut motion_sum = 0.0; + let mut vital_sum = 0.0; + let mut quality_sum = 0.0; + let mut supporting_nodes = Vec::new(); + + for snap in snapshots { + let quality = node_quality(snap); + let weight = spatial_weight(snap.node.position, center, radius); + let presence_term = if snap.classification.presence { + quality * snap.classification.confidence * weight + } else { + 0.0 + }; + let motion_term = quality * motion_level_score(&snap.classification.motion_level) * weight; + let vital_term = quality + * snap + .vital_signs + .breathing_confidence + .max(snap.vital_signs.heartbeat_confidence) + * weight; + + if presence_term > 0.12 || motion_term > 0.15 { + supporting_nodes.push(snap.node.node_id); + } + + presence_sum += presence_term; + motion_sum += motion_term; + vital_sum += vital_term; + quality_sum += quality * weight; + } + + let denom = snapshots.len().max(1) as f64; + let presence_score = (presence_sum / denom).clamp(0.0, 1.0); + let motion_score = (motion_sum / denom).clamp(0.0, 1.0); + let vital_score = (vital_sum / denom).clamp(0.0, 1.0); + let confidence = ((quality_sum / denom) * 0.6 + presence_score * 0.25 + motion_score * 0.15) + .clamp(0.0, 1.0); + + zone_scores.push(SpatialZoneScore { + zone_id: zone_id.to_string(), + center, + presence_score, + motion_score, + vital_score, + confidence, + supporting_nodes, + }); + } + + for snap in snapshots { + let quality = node_quality(snap); + let presence_score = if snap.classification.presence { + (quality * snap.classification.confidence).clamp(0.0, 1.0) + } else { + 0.0 + }; + let motion_score = (quality * motion_level_score(&snap.classification.motion_level)).clamp(0.0, 1.0); + let vital_score = (quality + * snap + .vital_signs + .breathing_confidence + .max(snap.vital_signs.heartbeat_confidence)) + .clamp(0.0, 1.0); + + let mut per_zone = Vec::with_capacity(zones_template.len()); + for zone in zones_template { + let weight = spatial_weight(snap.node.position, zone.center, zone.radius.unwrap_or(0.85)); + let combined = (presence_score * 0.5 + motion_score * 0.35 + vital_score * 0.15) * weight; + per_zone.push((zone.zone_id.clone(), combined.clamp(0.0, 1.0))); + } + per_zone.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); + let strongest_zone = per_zone + .first() + .map(|z| z.0.clone()) + .unwrap_or_else(|| "center".to_string()); + + node_contributions.push(NodeContribution { + node_id: snap.node.node_id, + position: snap.node.position, + quality_score: quality, + presence_score, + motion_score, + vital_score, + strongest_zone: strongest_zone.clone(), + }); + node_zone_scores.push((snap.node.node_id, per_zone)); + } + + zone_scores.sort_by(|a, b| { + (b.presence_score + b.motion_score * 0.8 + b.vital_score * 0.4) + .partial_cmp(&(a.presence_score + a.motion_score * 0.8 + a.vital_score * 0.4)) + .unwrap_or(std::cmp::Ordering::Equal) + }); + let dominant_zone = zone_scores.first().map(|z| z.zone_id.clone()); + + let presence_votes = snapshots.iter().filter(|snap| snap.classification.presence).count(); + let presence_agreement = (presence_votes.max(snapshots.len() - presence_votes) as f64 + / snapshots.len().max(1) as f64) + .clamp(0.0, 1.0); + + let motion_mean = snapshots + .iter() + .map(|snap| motion_level_score(&snap.classification.motion_level)) + .sum::() + / snapshots.len().max(1) as f64; + let motion_variance = snapshots + .iter() + .map(|snap| { + let delta = motion_level_score(&snap.classification.motion_level) - motion_mean; + delta * delta + }) + .sum::() + / snapshots.len().max(1) as f64; + let motion_agreement = (1.0 - motion_variance.sqrt()).clamp(0.0, 1.0); + + let spatial_confidence = zone_scores + .first() + .map(|zone| zone.confidence) + .unwrap_or(0.0) + .clamp(0.0, 1.0); + let conflict_score = (1.0 - presence_agreement * 0.55 - motion_agreement * 0.45) + .clamp(0.0, 1.0); + + let consensus = FusionConsensus { + presence_agreement, + motion_agreement, + spatial_confidence, + conflict_score, + dominant_zone: dominant_zone.clone(), + }; + + let mut evidence = Vec::new(); + if let Some(zone) = dominant_zone.clone() { + if let Some(zone_info) = zone_scores.iter().find(|z| z.zone_id == zone) { + evidence.push(format!( + "主导区域为{},presence={:.2} motion={:.2} confidence={:.2}", + zone, zone_info.presence_score, zone_info.motion_score, zone_info.confidence + )); + } + } + evidence.push(format!( + "跨节点一致性: presence={:.2} motion={:.2} conflict={:.2}", + presence_agreement, motion_agreement, conflict_score + )); + for contrib in &node_contributions { + evidence.push(format!( + "节点{} 位于({:.2},{:.2},{:.2}),质量={:.2},主要支持{}", + contrib.node_id, + contrib.position[0], + contrib.position[1], + contrib.position[2], + contrib.quality_score, + contrib.strongest_zone + )); + } + + let summary = match dominant_zone.clone() { + Some(zone) if classification.presence => format!( + "空间融合判断房间{}侧证据最强,{}个节点参与,当前状态为{}。", + zone, + snapshots.len(), + classification.motion_level + ), + Some(zone) => format!( + "当前整体无稳定存在证据,但{}侧仍有弱空间响应,建议继续观察。", + zone + ), + None => "当前没有足够的空间融合证据。".to_string(), + }; + + let explanation = FusionExplanation { + summary, + dominant_zone, + evidence, + node_contributions, + }; + + (zone_scores, consensus, explanation) +} + +fn fuse_esp32_update(state: &AppStateInner, tick: u64) -> Option { + let now = std::time::Instant::now(); + let stale_after = Duration::from_millis(ESP32_NODE_STALE_MS); + let mut snapshots: Vec<&Esp32NodeSnapshot> = state + .esp32_nodes + .values() + .filter_map(|node| node.latest_snapshot.as_ref()) + .filter(|snap| now.duration_since(snap.last_seen) <= stale_after) + .collect(); + + if snapshots.is_empty() { + return None; + } + + snapshots.sort_by_key(|snap| snap.node.node_id); + let node_count = snapshots.len() as f64; + let nodes: Vec = snapshots.iter().map(|snap| snap.node.clone()).collect(); + + let features = FeatureInfo { + mean_rssi: snapshots.iter().map(|snap| snap.features.mean_rssi).sum::() / node_count, + variance: snapshots.iter().map(|snap| snap.features.variance).sum::() / node_count, + motion_band_power: snapshots.iter().map(|snap| snap.features.motion_band_power).fold(0.0, f64::max), + breathing_band_power: snapshots.iter().map(|snap| snap.features.breathing_band_power).fold(0.0, f64::max), + dominant_freq_hz: snapshots + .iter() + .max_by(|a, b| a.features.breathing_band_power.partial_cmp(&b.features.breathing_band_power).unwrap_or(std::cmp::Ordering::Equal)) + .map(|snap| snap.features.dominant_freq_hz) + .unwrap_or(0.0), + change_points: snapshots.iter().map(|snap| snap.features.change_points).sum(), + spectral_power: snapshots.iter().map(|snap| snap.features.spectral_power).sum::() / node_count, + }; + + let classification = { + let best = snapshots + .iter() + .max_by(|a, b| { + motion_level_rank(&a.classification.motion_level) + .cmp(&motion_level_rank(&b.classification.motion_level)) + .then_with(|| { + a.classification + .confidence + .partial_cmp(&b.classification.confidence) + .unwrap_or(std::cmp::Ordering::Equal) + }) + }) + .unwrap(); + ClassificationInfo { + motion_level: best.classification.motion_level.clone(), + presence: snapshots.iter().any(|snap| snap.classification.presence), + confidence: snapshots + .iter() + .map(|snap| snap.classification.confidence) + .fold(0.0, f64::max), + } + }; + + let signal_field = { + let template = &snapshots[0].signal_field; + let mut values = vec![0.0; template.values.len()]; + for snap in &snapshots { + for (idx, val) in snap.signal_field.values.iter().enumerate() { + values[idx] += *val; + } + } + for val in &mut values { + *val = (*val / node_count).clamp(0.0, 1.0); + } + SignalField { + grid_size: template.grid_size, + values, + } + }; + + let best_br = snapshots + .iter() + .filter_map(|snap| snap.vital_signs.breathing_rate_bpm.map(|rate| (rate, snap.vital_signs.breathing_confidence))) + .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal)); + let best_hr = snapshots + .iter() + .filter_map(|snap| snap.vital_signs.heart_rate_bpm.map(|rate| (rate, snap.vital_signs.heartbeat_confidence))) + .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal)); + let avg_signal_quality = snapshots + .iter() + .map(|snap| snap.vital_signs.signal_quality) + .sum::() + / node_count; + + let vital_signs = Some(VitalSigns { + breathing_rate_bpm: best_br.map(|v| v.0), + heart_rate_bpm: best_hr.map(|v| v.0), + breathing_confidence: best_br.map(|v| v.1).unwrap_or(0.0), + heartbeat_confidence: best_hr.map(|v| v.1).unwrap_or(0.0), + signal_quality: avg_signal_quality, + }); + + // The current ESP32 spatial-fusion workflow does not provide reliable multi-person + // separation yet. Clamp the aggregate result to a single person while tuning zones. + let estimated_persons = if classification.presence { Some(1) } else { None }; + + let (spatial_zones, fusion_consensus, fusion_explanation) = + build_spatial_semantics(&state.spatial_layout, &snapshots, &classification); + + let mut update = SensingUpdate { + msg_type: "sensing_update".to_string(), + timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0, + source: "esp32".to_string(), + tick, + nodes, + features, + classification, + signal_field, + vital_signs, + enhanced_motion: Some(serde_json::json!({ + "active_node_ids": snapshots.iter().map(|snap| snap.node.node_id).collect::>(), + "active_nodes": snapshots.len(), + "spatial_mode": "semantic-weighted-fusion", + })), + enhanced_breathing: None, + posture: None, + signal_quality_score: None, + quality_verdict: None, + bssid_count: None, + pose_keypoints: None, + model_status: None, + persons: None, + estimated_persons, + spatial_zones: Some(spatial_zones), + fusion_consensus: Some(fusion_consensus), + fusion_explanation: Some(fusion_explanation), + }; + + let persons = derive_pose_from_sensing(&update); + if !persons.is_empty() { + update.persons = Some(persons); + } + + Some(update) +} + // ── Windows WiFi RSSI collector ────────────────────────────────────────────── /// Parse `netsh wlan show interfaces` output for RSSI and signal quality @@ -1199,7 +1864,12 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) { let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) = extract_features_from_frame(&frame, &s_write_pre.frame_history, sample_rate_hz); smooth_and_classify(&mut s_write_pre, &mut classification, raw_motion); - adaptive_override(&s_write_pre, &features, &mut classification); + adaptive_override( + s_write_pre.adaptive_model.as_ref(), + &s_write_pre.frame_history, + &features, + &mut classification, + ); drop(s_write_pre); // ── Step 5: Build enhanced fields from pipeline result ─────── @@ -1285,6 +1955,9 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) { model_status: None, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + spatial_zones: None, + fusion_consensus: None, + fusion_explanation: None, }; // Populate persons from the sensing update. @@ -1352,7 +2025,12 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) { let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) = extract_features_from_frame(&frame, &s.frame_history, sample_rate_hz); smooth_and_classify(&mut s, &mut classification, raw_motion); - adaptive_override(&s, &features, &mut classification); + adaptive_override( + s.adaptive_model.as_ref(), + &s.frame_history, + &features, + &mut classification, + ); s.source = format!("wifi:{ssid}"); s.rssi_history.push_back(rssi_dbm); @@ -1415,6 +2093,9 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) { model_status: None, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + spatial_zones: None, + fusion_consensus: None, + fusion_explanation: None, }; let persons = derive_pose_from_sensing(&update); @@ -2790,89 +3471,92 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { let mut s = state.write().await; s.source = "esp32".to_string(); - - // Append current amplitudes to history before extracting features so - // that temporal analysis includes the most recent frame. - s.frame_history.push_back(frame.amplitudes.clone()); - if s.frame_history.len() > FRAME_HISTORY_CAPACITY { - s.frame_history.pop_front(); - } - - let sample_rate_hz = 1000.0 / 500.0_f64; // default tick; ESP32 frames arrive as fast as they come - let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) = - extract_features_from_frame(&frame, &s.frame_history, sample_rate_hz); - smooth_and_classify(&mut s, &mut classification, raw_motion); - adaptive_override(&s, &features, &mut classification); - - // Update RSSI history - s.rssi_history.push_back(features.mean_rssi); + let model = s.adaptive_model.clone(); + let node_pos = node_position(&s.spatial_layout, frame.node_id); + s.tick += 1; + let tick = s.tick; + let mean_rssi = frame.rssi as f64; + s.rssi_history.push_back(mean_rssi); if s.rssi_history.len() > 60 { s.rssi_history.pop_front(); } - s.tick += 1; - let tick = s.tick; + { + let node_state = s + .esp32_nodes + .entry(frame.node_id) + .or_insert_with(|| Esp32NodeState::new(2.0)); + + // Maintain a per-node history so different ESP32 streams do not + // contaminate one another's temporal features and vitals filters. + node_state.frame_history.push_back(frame.amplitudes.clone()); + if node_state.frame_history.len() > FRAME_HISTORY_CAPACITY { + node_state.frame_history.pop_front(); + } - let motion_score = if classification.motion_level == "active" { 0.8 } - else if classification.motion_level == "present_still" { 0.3 } - else { 0.05 }; - - let raw_vitals = s.vital_detector.process_frame( - &frame.amplitudes, - &frame.phases, - ); - let vitals = smooth_vitals(&mut s, &raw_vitals); - s.latest_vitals = vitals.clone(); - - // Multi-person estimation with temporal smoothing. - let raw_score = compute_person_score(&features); - s.smoothed_person_score = s.smoothed_person_score * 0.85 + raw_score * 0.15; - let est_persons = if classification.presence { - score_to_person_count(s.smoothed_person_score) - } else { - 0 - }; - - let mut update = SensingUpdate { - msg_type: "sensing_update".to_string(), - timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0, - source: "esp32".to_string(), - tick, - nodes: vec![NodeInfo { + let sample_rate_hz = 1000.0 / 500.0_f64; // default tick; ESP32 frames arrive as fast as they come + let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) = + extract_features_from_frame(&frame, &node_state.frame_history, sample_rate_hz); + smooth_and_classify_node(node_state, &mut classification, raw_motion); + adaptive_override( + model.as_ref(), + &node_state.frame_history, + &features, + &mut classification, + ); + + let motion_score = if classification.motion_level == "active" { 0.8 } + else if classification.motion_level == "present_still" { 0.3 } + else { 0.05 }; + + let raw_vitals = node_state.vital_detector.process_frame( + &frame.amplitudes, + &frame.phases, + ); + let vitals = smooth_vitals_node(node_state, &raw_vitals); + + let raw_score = compute_person_score(&features); + node_state.smoothed_person_score = node_state.smoothed_person_score * 0.85 + raw_score * 0.15; + let est_persons = if classification.presence { + score_to_person_count(node_state.smoothed_person_score) + } else { + 0 + }; + + let signal_field = generate_signal_field( + features.mean_rssi, motion_score, breathing_rate_hz, + features.variance.min(1.0), &sub_variances, + ); + let node_info = NodeInfo { node_id: frame.node_id, rssi_dbm: features.mean_rssi, - position: [2.0, 0.0, 1.5], + position: node_pos, amplitude: frame.amplitudes.iter().take(56).cloned().collect(), subcarrier_count: frame.n_subcarriers as usize, - }], - features: features.clone(), - classification, - signal_field: generate_signal_field( - features.mean_rssi, motion_score, breathing_rate_hz, - features.variance.min(1.0), &sub_variances, - ), - vital_signs: Some(vitals), - enhanced_motion: None, - enhanced_breathing: None, - posture: None, - signal_quality_score: None, - quality_verdict: None, - bssid_count: None, - pose_keypoints: None, - model_status: None, - persons: None, - estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, - }; - - let persons = derive_pose_from_sensing(&update); - if !persons.is_empty() { - update.persons = Some(persons); + }; + node_state.latest_snapshot = Some(Esp32NodeSnapshot { + node: node_info, + features: features.clone(), + classification, + signal_field, + vital_signs: vitals, + estimated_persons: est_persons, + last_seen: std::time::Instant::now(), + }); } - if let Ok(json) = serde_json::to_string(&update) { - let _ = s.tx.send(json); + if let Some(update) = fuse_esp32_update(&s, tick) { + if let Some(vitals) = update.vital_signs.clone() { + s.latest_vitals = vitals; + } + if update.classification.presence { + s.total_detections += 1; + } + if let Ok(json) = serde_json::to_string(&update) { + let _ = s.tx.send(json); + } + s.latest_update = Some(update); } - s.latest_update = Some(update); } } Err(e) => { @@ -2908,7 +3592,12 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) = extract_features_from_frame(&frame, &s.frame_history, sample_rate_hz); smooth_and_classify(&mut s, &mut classification, raw_motion); - adaptive_override(&s, &features, &mut classification); + adaptive_override( + s.adaptive_model.as_ref(), + &s.frame_history, + &features, + &mut classification, + ); s.rssi_history.push_back(features.mean_rssi); if s.rssi_history.len() > 60 { @@ -2946,7 +3635,7 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { nodes: vec![NodeInfo { node_id: 1, rssi_dbm: features.mean_rssi, - position: [2.0, 0.0, 1.5], + position: node_position(&s.spatial_layout, frame.node_id), amplitude: frame_amplitudes, subcarrier_count: frame_n_sub as usize, }], @@ -2977,6 +3666,9 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { }, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + spatial_zones: None, + fusion_consensus: None, + fusion_explanation: None, }; // Populate persons from the sensing update. @@ -3002,11 +3694,19 @@ async fn broadcast_tick_task(state: SharedState, tick_ms: u64) { loop { interval.tick().await; - let s = state.read().await; - if let Some(ref update) = s.latest_update { + let mut s = state.write().await; + if let Some(update) = fuse_esp32_update(&s, s.tick) { + s.latest_update = Some(update.clone()); + if let Some(vitals) = update.vital_signs.clone() { + s.latest_vitals = vitals; + } + if s.tx.receiver_count() > 0 { + if let Ok(json) = serde_json::to_string(&update) { + let _ = s.tx.send(json); + } + } + } else if let Some(ref update) = s.latest_update { if s.tx.receiver_count() > 0 { - // Re-broadcast the latest sensing_update so pose WS clients - // always get data even when ESP32 pauses between frames. if let Ok(json) = serde_json::to_string(update) { let _ = s.tx.send(json); } @@ -3557,7 +4257,17 @@ async fn main() { // Discover model and recording files on startup let initial_models = scan_model_files(); let initial_recordings = scan_recording_files(); + let spatial_layout = load_spatial_layout(args.spatial_config.as_ref()); info!("Discovered {} model files, {} recording files", initial_models.len(), initial_recordings.len()); + info!( + "Spatial layout: {} nodes, {} zones{}", + spatial_layout.nodes.len(), + spatial_layout.zones.len(), + args.spatial_config + .as_ref() + .map(|p| format!(" ({})", p.display())) + .unwrap_or_else(|| " (defaults)".to_string()) + ); let (tx, _) = broadcast::channel::(256); let state: SharedState = Arc::new(RwLock::new(AppStateInner { @@ -3608,6 +4318,8 @@ async fn main() { m.trained_frames, m.training_accuracy * 100.0); m }), + esp32_nodes: HashMap::new(), + spatial_layout, })); // Start background tasks based on source