-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMatching.py
More file actions
228 lines (193 loc) · 9.3 KB
/
Matching.py
File metadata and controls
228 lines (193 loc) · 9.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import numpy as np
import torch
from scipy.optimize import linear_sum_assignment
from KalmanFilter import KalmanFilter
from EmbeddingSimilarity import EmbeddingSimilarity
from memory_profiler import profile
class Matcher():
def __init__(self, w_iou=0.5, w_similarity=0.5, longevity=20):
"""
Initializes the Matcher object with specified parameters.
Args:
w_iou (float): Weight for Intersection over Union in the cost function.
w_similarity (float): Weight for visual similarity in the cost function.
longevity (int): The number of frames a track will persist without being detected.
Attributes:
current_frames_info (list): Stores current frame detections.
current_frames (None): Placeholder for the current frame image.
track (list): Stores active tracks.
kalman_filters (dict): Stores Kalman filters for each track.
associations (dict): Stores associations between detections and tracks.
embedding_similarity (EmbeddingSimilarity): Embedding similarity calculator.
w_iou (float): Weight for IoU.
w_similarity (float): Weight for similarity.
longevity (int): Longevity value for each track.
nb_track (int): Number of active tracks.
set_id (set): Set of unique track IDs.
"""
self.current_frames_info = []
self.current_frames = None
self.track = []
self.kalman_filters = {}
self.associations = {}
self.embedding_similarity = EmbeddingSimilarity()
self.w_iou = w_iou
self.w_similarity = w_similarity
self.longevity = longevity
self.nb_track = 0
self.set_id = set()
def set_currentframes(self, current_frames_info, current_frames):
"""
Sets the current frame information and image.
Args:
current_frames_info (list): Information about detections in the current frame.
current_frames (tensor): Image tensor of the current frame.
"""
self.current_frames_info = current_frames_info
self.current_frames = current_frames
def compute_iou(self, boxA, boxB):
"""
Computes the Intersection over Union (IoU) between two bounding boxes.
Args:
boxA (list): Coordinates of the first bounding box.
boxB (list): Coordinates of the second bounding box.
Returns:
float: The IoU between boxA and boxB.
"""
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])
interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
# Compute the area of both bounding boxes
boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
iou = interArea / float(boxAArea + boxBArea - interArea)
return iou
def create_kalman_filter(self):
"""
Creates and returns a new KalmanFilter object with predefined parameters.
Returns:
KalmanFilter: A new Kalman filter instance.
"""
dt = 0.1
u_x, u_y = 1, 1
std_acc = 1
x_std_meas, y_std_meas = 0.1, 0.1
return KalmanFilter(dt, u_x, u_y, std_acc, x_std_meas, y_std_meas)
def convert_bb_to_centroid(self, bb_left, bb_top, bb_width, bb_height):
"""
Converts bounding box coordinates to centroid coordinates.
Args:
bb_left, bb_top, bb_width, bb_height (int): Bounding box coordinates.
Returns:
ndarray: Centroid coordinates.
"""
return np.array([bb_left + bb_width/2, bb_top + bb_height/2])
def convert_centroid_to_bb(self, centroid, width, height):
"""
Converts centroid coordinates back to bounding box coordinates.
Args:
centroid (ndarray): Centroid coordinates.
width, height (int): Width and height of the bounding box.
Returns:
ndarray: Bounding box coordinates.
"""
return np.array([centroid[0] - width/2, centroid[1] - height/2, centroid[0] + width/2, centroid[1] + height/2])
def hungarian_similarity_matrix(self):
"""
Creates a cost matrix for all detections and tracks, and applies the Hungarian algorithm.
Returns:
ndarray, ndarray, ndarray: Cost matrix, row indices, and column indices from the Hungarian algorithm.
"""
detections = self.current_frames_info
tracks = self.track
num_detections = len(detections)
num_tracks = len(tracks)
high_cost = 1e5
# max_size = max(num_detections, num_tracks)
cost_matrix = np.full((num_detections, num_tracks), high_cost)
embedding_batch_detection = self.embedding_similarity.compute_batch_embedding(self.current_frames, detections)
embedding_batch_track = self.embedding_similarity.compute_batch_embedding(self.current_frames, tracks)
for d, detection in enumerate(detections):
d_x1, d_y1 = int(detection["bb_left"]), int(detection["bb_top"])
d_w1, d_h1 = int(detection["bb_width"]), int(detection["bb_height"])
blockA = [d_x1, d_y1, d_x1 + d_w1, d_y1 + d_h1]
emb1 = embedding_batch_detection[d].unsqueeze(0)
for t, track in enumerate(tracks):
t_x2, t_y2 = int(track["bb_left"]), int(track["bb_top"])
t_w2, t_h2 = int(track["bb_width"]), int(track["bb_height"])
blockB = self.kalman_filters[track["id"]].predict()
blockB = self.convert_centroid_to_bb(blockB, t_w2, t_h2)
# Compute visual similarity between the box
emb2 = embedding_batch_track[t].unsqueeze(0)
similarity = self.embedding_similarity.compute_similarity(embedding1=emb1, embedding2=emb2)
iou = self.compute_iou(blockA, blockB)
cost_matrix[d, t] = 1 - (self.w_iou * iou + self.w_similarity * similarity)
del embedding_batch_detection
del embedding_batch_track
# Application de l'algorithme hongrois
row_ind, col_ind = linear_sum_assignment(cost_matrix)
return cost_matrix, row_ind, col_ind
def associate_detections_to_tracks(self):
"""
Associates detections with existing tracks or creates new tracks.
"""
cost_matrix, row_ind, col_ind = self.hungarian_similarity_matrix()
keep_track = []
self.associations = {}
for d, t in zip(row_ind, col_ind):
# Mise à jour des tracks existants
self.associations[d] = t
self.current_frames_info[d]["id"] = self.track[t]["id"]
keep_track.append(self.track[t])
# Update Kalman filter
center = self.convert_bb_to_centroid(self.current_frames_info[d]["bb_left"],
self.current_frames_info[d]["bb_top"],
self.current_frames_info[d]["bb_width"],
self.current_frames_info[d]["bb_height"])
if self.track[t]["id"] in self.kalman_filters:
self.kalman_filters[self.track[t]["id"]].update(center)
else:
self.kalman_filters[self.track[t]["id"]] = self.create_kalman_filter()
#Remove tracks with no matching detections
for track in self.track:
if track["id"] not in [t["id"] for t in keep_track]:
track["longevity"] -= 1
if track["longevity"] <= 0:
del self.kalman_filters[track["id"]]
self.set_id.remove(track["id"])
else:
keep_track.append(track)
#Create new tracks for unmatched detections
for frames in self.current_frames_info:
if frames["id"] == -1:
frame_id = max(self.set_id, default=0) + 1
frames["id"] = frame_id
frames["longevity"] = self.longevity
self.kalman_filters[frames["id"]] = self.create_kalman_filter()
keep_track.append(frames)
self.set_id.add(frame_id)
self.nb_track += 1
self.track = keep_track
def find_matching_id(self, init=False):
"""
Finds matching IDs for the current frame detections.
Args:
init (bool): If True, initializes tracks with the current frame detections.
Returns:
list: Updated current frame information with associated track IDs.
"""
if init:
for num_line, line in enumerate(self.current_frames_info):
x, y = int(line["bb_left"]), int(line["bb_top"])
w, h = int(line["bb_width"]), int(line["bb_height"])
line["id"] = int(num_line)
line["longevity"] = self.longevity
self.kalman_filters[num_line] = self.create_kalman_filter()
self.set_id.add(int(num_line))
self.track = self.current_frames_info
self.nb_track = len(self.track)
else:
self.associate_detections_to_tracks()
return self.current_frames_info, self.track