diff --git a/pyeyesweb/analysis_primitives/mse_dominance.py b/pyeyesweb/analysis_primitives/mse_dominance.py new file mode 100644 index 0000000..4eb2651 --- /dev/null +++ b/pyeyesweb/analysis_primitives/mse_dominance.py @@ -0,0 +1,265 @@ +"""Multi-scale entropy analysis module for dominance detection in ensemble performances. + +This module implements the multi-scale entropy (MSE) algorithm for analyzing +dominance and leadership in social creative activities. The method quantifies +the complexity of movement dynamics across multiple time scales to identify +leadership patterns in musical ensembles. + +The multi-scale entropy algorithm includes: +1. Coarse-graining procedure for multi-scale signal representation +2. Sample entropy calculation for irregularity quantification +3. Complexity index computation across scales +4. Dominance analysis based on complexity differences + +Typical use cases include: +1. Leadership detection in string quartet performances +2. Dominance analysis in social creative interactions +3. Group coordination pattern analysis +4. Movement complexity characterization +5. Real-time ensemble performance monitoring + +References +---------- +Glowinski, D., Coletta, P., Volpe, G., Camurri, A., Chiorri, C., & Schenone, A. (2010). +Multi-scale entropy analysis of dominance in social creative activities. +In Proceedings of the 18th ACM international conference on Multimedia (pp. 1035-1038). + +Costa, M., Goldberger, A. L., & Peng, C.-K. (2005). +Multiscale entropy analysis of biological signals. +Physical Review E, 71(2), 021906. +""" + +import numpy as np +from pyeyesweb.data_models.sliding_window import SlidingWindow + + +class MultiScaleEntropyDominance: + """Real-time multi-scale entropy analyzer for dominance detection. + + This class implements the multi-scale entropy algorithm to analyze dominance + in ensemble performances by computing complexity indices of movement dynamics + across multiple time scales. + + The algorithm follows the methodology described in: + Glowinski et al. (2010). Multi-scale entropy analysis of dominance in + social creative activities. ACM Multimedia, 1035-1038. + """ + + def __init__(self): + """Initialize the multi-scale entropy analyzer with default parameters.""" + # Algorithm parameters as per reference papers + self.m = 2 # Embedding dimension for sample entropy + self.r = 0.15 # Tolerance parameter (15% of standard deviation) + self.max_scale = 6 # Maximum scale factor for coarse-graining + self.min_points = 500 # Minimum data points required per scale + + def _coarse_grain(self, data: np.ndarray, scale: int) -> np.ndarray: + """Apply coarse-graining procedure to time series data. + + Implements the exact equation from Costa et al. (2005): + y_j^(τ) = (1/τ) * Σ_{i=(j-1)τ+1}^{jτ} x_i, for j=1..floor(N/τ) + + Parameters + ---------- + data : np.ndarray + Input time series data (1D array) + scale : int + Scale factor for coarse-graining (τ in the equation) + + Returns + ------- + np.ndarray + Coarse-grained time series + """ + if data is None or data.size == 0: + return np.array([], dtype=float) + + x = np.asarray(data, dtype=float).ravel() + + if scale is None or scale < 1: + return np.array([], dtype=float) + + if scale == 1: + return x + + N = x.shape[0] + if N < scale: + return np.array([], dtype=float) + + # Calculate number of complete blocks + num_points = N // scale + + # Trim data to complete blocks + trimmed = x[:num_points * scale] + + # Reshape and average: each block becomes one point + coarse = trimmed.reshape(num_points, scale).mean(axis=1) + + return coarse + + def _sample_entropy(self, data: np.ndarray) -> float: + """Calculate sample entropy (SampEn) for a time series. + + Parameters + ---------- + data : np.ndarray + Input time series data (1D array) + + Returns + ------- + float + Sample entropy value, or 0.0 if insufficient data + """ + x = np.asarray(data, dtype=float).reshape(-1) + N = x.shape[0] + m = int(self.m) + r = float(self.r) + + if N <= m + 10: + return 0.0 + + mu = float(np.mean(x)) + sd = float(np.std(x)) + if sd < 1e-10: + return 0.0 + + u = (x - mu) / sd + + templates_m = np.array([u[i:i + m] for i in range(N - m)], dtype=float) + templates_m1 = np.array([u[i:i + m + 1] for i in range(N - m - 1)], dtype=float) + + n_m = templates_m.shape[0] + n_m1 = templates_m1.shape[0] + + if n_m <= 1 or n_m1 <= 1: + return 0.0 + + B_matches = 0 + A_matches = 0 + + for i in range(n_m): + dist = np.max(np.abs(templates_m[i] - templates_m), axis=1) + B_matches += int(np.sum(dist < r) - 1) + + for i in range(n_m1): + dist = np.max(np.abs(templates_m1[i] - templates_m1), axis=1) + A_matches += int(np.sum(dist < r) - 1) + + if B_matches <= 0 or A_matches <= 0: + return 0.0 + + B = B_matches / (n_m * (n_m - 1)) + A = A_matches / (n_m1 * (n_m1 - 1)) + + if A <= 0 or B <= 0: + return 0.0 + + return float(-np.log(A / B)) + + def _calculate_complexity_index(self, data: np.ndarray) -> float: + """Calculate complexity index by integrating sample entropy across scales. + + Parameters + ---------- + data : np.ndarray + Input time series data + + Returns + ------- + float + Complexity index value + """ + sampen_values = [] + + for scale in range(1, int(self.max_scale) + 1): + coarse = self._coarse_grain(data, scale) + + if coarse.shape[0] < int(self.min_points): + break + + sampen = self._sample_entropy(coarse) + sampen_values.append(sampen) + + if len(sampen_values) > 1: + scales = np.arange(1, len(sampen_values) + 1, dtype=float) + return float(np.trapz(np.asarray(sampen_values, dtype=float), x=scales)) + + if len(sampen_values) == 1: + return float(sampen_values[0]) + + return 0.0 + + def compute_analysis(self, signals: SlidingWindow, methods: list) -> dict: + """Compute dominance analysis for ensemble performance data. + + Parameters + ---------- + signals : SlidingWindow + Sliding window buffer containing movement velocity data. + methods : list of str + List of analysis methods to compute. Available options: + 'complexity_index', 'dominance_score', 'leader_identification' + + Returns + ------- + dict + Dictionary containing dominance analysis results. + """ + if not signals.is_full(): + return {} + + data, _ = signals.to_array() + n_samples, n_features = data.shape + + if n_samples < int(self.min_points): + return {} + + complexity_indices = [] + for i in range(n_features): + ci = self._calculate_complexity_index(data[:, i]) + complexity_indices.append(ci) + + result = {} + + for method in methods: + if method == 'complexity_index': + values = np.array(complexity_indices, dtype=float) + result['complexity_index'] = float(values[0]) if len(values) == 1 else values.tolist() + + elif method == 'dominance_score': + cis = np.array(complexity_indices, dtype=float) + if cis.size > 0: + max_ci = float(np.max(cis)) + if max_ci > 0: + scores = (1.0 - (cis / max_ci)) + else: + scores = np.zeros_like(cis) + result['dominance_score'] = float(scores[0]) if len(scores) == 1 else scores.tolist() + + elif method == 'leader_identification': + if complexity_indices: + leader_idx = np.argmin(complexity_indices) + result['leader'] = int(leader_idx) + result['leader_complexity'] = float(complexity_indices[leader_idx]) + + else: + continue + + return result + + def __call__(self, sliding_window: SlidingWindow, methods: list) -> dict: + """Compute dominance analysis metrics. + + Parameters + ---------- + sliding_window : SlidingWindow + Buffer containing multivariate data to analyze. + methods : list of str + List of analysis methods to compute. + + Returns + ------- + dict + Dictionary containing dominance analysis metrics. + """ + return self.compute_analysis(sliding_window, methods) \ No newline at end of file