-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
376 lines (309 loc) · 12.4 KB
/
app.py
File metadata and controls
376 lines (309 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
from fastapi import FastAPI, WebSocket, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
import serial
import json
import asyncio
import httpx
import numpy as np
from datetime import datetime
import cv2
from typing import List, Dict, Optional
import os
from pydantic import BaseModel
# Import your emotion recognition model
from models.model import create_model
from utils.face_detector import FaceDetector
app = FastAPI(title="Fitness Emotion Analytics API",
description="Integrating emotion recognition with Arduino vitals data")
# Enable CORS for React frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Update with your frontend URL in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Data models
class VitalsData(BaseModel):
heart_rate: float
spo2: float
timestamp: str
class EmotionData(BaseModel):
emotion: str
confidence: float
timestamp: str
class CombinedData(BaseModel):
vitals: VitalsData
emotion: EmotionData
alignment_score: Optional[float] = None
recommendations: Optional[List[str]] = None
# Global variables
arduino_connected = False
arduino_port = None
emotion_model = None
face_detector = None
connected_clients = set()
latest_vitals = None
latest_emotion = None
# Initialize Arduino connection
def setup_arduino(port="/dev/ttyACM0", baud_rate=9600):
global arduino_connected, arduino_port
try:
arduino_port = serial.Serial(port, baud_rate, timeout=1)
arduino_connected = True
print(f"✅ Connected to Arduino on {port}")
return True
except Exception as e:
print(f"❌ Failed to connect to Arduino: {e}")
return False
# Initialize emotion recognition model
def setup_emotion_model(model_path="models/trained_models/gpu_final_model_full.pth", model_name="efficientnet-b2"):
global emotion_model, face_detector
try:
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
emotion_model = create_model(num_classes=7, model_name=model_name)
emotion_model.load_state_dict(torch.load(model_path, map_location=device))
emotion_model.to(device)
emotion_model.eval()
face_detector = FaceDetector()
print(f"✅ Emotion recognition model loaded successfully")
return True
except Exception as e:
print(f"❌ Failed to load emotion model: {e}")
return False
# Calculate alignment score between emotion and vitals
def calculate_alignment_score(emotion: str, heart_rate: float, spo2: float) -> float:
# This is a simplified example - would need a more sophisticated algorithm in production
emotion_intensity = {
"angry": 0.8,
"disgust": 0.6,
"fear": 0.7,
"happy": 0.5,
"neutral": 0.3,
"sad": 0.2,
"surprise": 0.6
}
# Define expected heart rate ranges for each emotion (simplified)
expected_hr = {
"angry": (80, 110),
"disgust": (70, 90),
"fear": (80, 100),
"happy": (70, 90),
"neutral": (60, 80),
"sad": (60, 75),
"surprise": (70, 95)
}
# Check if heart rate is in expected range for the emotion
hr_min, hr_max = expected_hr.get(emotion, (60, 100))
hr_alignment = 1.0 - min(abs(heart_rate - (hr_min + hr_max)/2) / ((hr_max - hr_min)/2), 1.0)
# SpO2 alignment (should always be high)
spo2_alignment = min(1.0, max(0, (spo2 - 90) / 10))
# Combine scores (weighted average)
alignment_score = 0.5 * hr_alignment + 0.3 * spo2_alignment + 0.2 * emotion_intensity.get(emotion, 0.5)
return min(1.0, max(0, alignment_score))
# Generate recommendations based on emotion and vitals
def generate_recommendations(emotion: str, heart_rate: float, spo2: float, alignment_score: float) -> List[str]:
recommendations = []
# Check vital signs for safety
if spo2 < 92:
recommendations.append("⚠️ Blood oxygen levels are lower than optimal. Consider reducing workout intensity.")
if heart_rate > 160:
recommendations.append("⚠️ Heart rate is very elevated. Take a short break to recover.")
# Emotion-specific recommendations
if emotion == "angry" and heart_rate > 100:
recommendations.append("Try focused breathing exercises to channel your energy more effectively.")
elif emotion == "sad" and heart_rate < 70:
recommendations.append("Consider upbeat music or a group exercise to elevate your mood.")
elif emotion == "happy" and alignment_score > 0.8:
recommendations.append("Great alignment! This is an optimal state for challenging yourself.")
# Low alignment score recommendations
if alignment_score < 0.4:
recommendations.append("Your emotional and physical states seem misaligned. Consider adjusting your workout intensity.")
# Default recommendation if list is empty
if not recommendations:
recommendations.append("Maintain current pace. All indicators look good.")
return recommendations
# Background task to read from Arduino
async def read_arduino_data():
global arduino_port, latest_vitals
while True:
if arduino_connected and arduino_port:
try:
if arduino_port.in_waiting > 0:
line = arduino_port.readline().decode('utf-8').strip()
try:
data = json.loads(line)
timestamp = datetime.now().isoformat()
# Create vitals data object
latest_vitals = VitalsData(
heart_rate=data.get("heart_rate", 0),
spo2=data.get("spo2", 0),
timestamp=timestamp
)
# Broadcast to all connected clients
await broadcast_combined_data()
except json.JSONDecodeError:
print(f"❌ Invalid JSON from Arduino: {line}")
except Exception as e:
print(f"❌ Error reading from Arduino: {e}")
await asyncio.sleep(5)
# Try to reconnect
setup_arduino()
await asyncio.sleep(0.1) # Small delay to prevent CPU overload
# Process webcam frame for emotion detection
async def process_emotion(frame):
global emotion_model, face_detector, latest_emotion
if emotion_model is None or face_detector is None:
return None
try:
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Detect face in frame
face_img, _ = face_detector.detect_face(frame)
if face_img is not None:
# Preprocess face for model input
face_tensor = face_detector.preprocess_face(face_img)
face_tensor = face_tensor.to(device)
# Predict emotion
with torch.no_grad():
outputs = model(face_tensor)
probabilities = torch.softmax(outputs, dim=1)[0]
# Get predicted class and probability
predicted_class = torch.argmax(probabilities).item()
probability = probabilities[predicted_class].item()
# Emotion classes
emotion_classes = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']
emotion = emotion_classes[predicted_class]
timestamp = datetime.now().isoformat()
# Create emotion data object
latest_emotion = EmotionData(
emotion=emotion,
confidence=probability,
timestamp=timestamp
)
return latest_emotion
except Exception as e:
print(f"❌ Error processing emotion: {e}")
return None
# Broadcast combined data to all connected clients
async def broadcast_combined_data():
global latest_vitals, latest_emotion, connected_clients
if latest_vitals and latest_emotion:
# Calculate alignment score
alignment_score = calculate_alignment_score(
latest_emotion.emotion,
latest_vitals.heart_rate,
latest_vitals.spo2
)
# Generate recommendations
recommendations = generate_recommendations(
latest_emotion.emotion,
latest_vitals.heart_rate,
latest_vitals.spo2,
alignment_score
)
# Create combined data
combined_data = CombinedData(
vitals=latest_vitals,
emotion=latest_emotion,
alignment_score=alignment_score,
recommendations=recommendations
)
# Convert to JSON
json_data = combined_data.json()
# Broadcast to all connected clients
for websocket in connected_clients:
try:
await websocket.send_text(json_data)
except Exception as e:
print(f"❌ Error sending to client: {e}")
connected_clients.remove(websocket)
# WebSocket endpoint for real-time data
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connected_clients.add(websocket)
try:
# Send initial data if available
if latest_vitals and latest_emotion:
alignment_score = calculate_alignment_score(
latest_emotion.emotion,
latest_vitals.heart_rate,
latest_vitals.spo2
)
recommendations = generate_recommendations(
latest_emotion.emotion,
latest_vitals.heart_rate,
latest_vitals.spo2,
alignment_score
)
combined_data = CombinedData(
vitals=latest_vitals,
emotion=latest_emotion,
alignment_score=alignment_score,
recommendations=recommendations
)
await websocket.send_text(combined_data.json())
# Keep connection alive and handle incoming messages
while True:
data = await websocket.receive_text()
# Handle client messages if needed
except Exception as e:
print(f"❌ WebSocket error: {e}")
finally:
connected_clients.remove(websocket)
# REST endpoint to get latest combined data
@app.get("/api/data", response_model=CombinedData)
async def get_latest_data():
if latest_vitals and latest_emotion:
alignment_score = calculate_alignment_score(
latest_emotion.emotion,
latest_vitals.heart_rate,
latest_vitals.spo2
)
recommendations = generate_recommendations(
latest_emotion.emotion,
latest_vitals.heart_rate,
latest_vitals.spo2,
alignment_score
)
return CombinedData(
vitals=latest_vitals,
emotion=latest_emotion,
alignment_score=alignment_score,
recommendations=recommendations
)
else:
raise HTTPException(status_code=404, detail="No data available yet")
# REST endpoint to get Arduino connection status
@app.get("/api/status")
async def get_status():
return {
"arduino_connected": arduino_connected,
"emotion_model_loaded": emotion_model is not None,
"clients_connected": len(connected_clients),
"timestamp": datetime.now().isoformat()
}
# Startup event
@app.on_event("startup")
async def startup_event():
# Initialize Arduino connection
setup_arduino()
# Initialize emotion recognition model
setup_emotion_model()
# Start background task to read Arduino data
background_tasks = BackgroundTasks()
asyncio.create_task(read_arduino_data())
print("✅ API server started successfully")
# Shutdown event
@app.on_event("shutdown")
async def shutdown_event():
global arduino_port
# Close Arduino connection
if arduino_port:
arduino_port.close()
print("✅ API server shutdown successfully")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)