-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathThreadLoc.cpp
More file actions
346 lines (301 loc) · 15.6 KB
/
ThreadLoc.cpp
File metadata and controls
346 lines (301 loc) · 15.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#include <future>
#include "WorkThreads.h"
#include "include/ordered_map.h"
#include "log.h"
using namespace tsl;
void ProcessCombinationPool(vector<vector<TriggerInfo>>& locCombinationPool,
ordered_map<int, triggerAtStation>& triggerPool, double& MinSq,
LocSta& result, __int64& finalCombIdx,
unsigned long long& CountGeoLocationTimes,
unordered_map<int, StationInfo>& siteMap) {
double* sqList = new double[locCombinationPool.size()];
LocSta* resultList = new LocSta[locCombinationPool.size()];
int num_threads = locCombinationPool.size();
#pragma omp parallel for num_threads(locCombinationPool.size())
for (int m = 0; m < locCombinationPool.size(); ++m) {
vector<TriggerInfo>& oneComb = locCombinationPool[m];
/* if (oneComb[0].stationID != baseTrig.stationID) {
sqList[m] = FLOAT_MAX;
continue;
}*/
if (oneComb.size() < 5) {
sqList[m] = FLOAT_MAX;
continue;
}
vector<double> Loc_Time_One;
vector<LocSta> Stations_One;
for (int j = 0; j < oneComb.size(); ++j) {
Loc_Time_One.emplace_back(oneComb[j].time.m_Sec + oneComb[j].time.m_ActPointSec);
Stations_One.emplace_back(triggerPool[oneComb[j].stationID].staLocation);
}
LocSta oneResult = GeoLocation_GPU(Stations_One, Loc_Time_One);
//LocSta oneResult = GeoLocation_GPU_Initial(Stations_One, Loc_Time_One);
//LocSta oneResult = GeoLocation_OP(Stations_One, Loc_Time_One, LocSta(0.0, 0.0, 0.0), num_threads);
//LocSta oneResult = GeoLocation_OP_2(Stations_One, Loc_Time_One, LocSta(0.0, 0.0, 0.0), num_threads);
sqList[m] = oneResult.sq;
resultList[m] = oneResult;
}
// 查找最小值
CountGeoLocationTimes += locCombinationPool.size();
auto minElementPtr = min_element(sqList, sqList + locCombinationPool.size());
//if (MinSq > *minElementPtr)
{
MinSq = *minElementPtr;
finalCombIdx = minElementPtr - sqList;
result = resultList[finalCombIdx];
}
//vector<TriggerInfo>& oneComb = locCombinationPool[finalCombIdx];
//for (int i = 0; i < 2; ++i) //最大站点数为8,最少可信站点数为6,迭代2次
//if (MinSq > 1.0 && oneComb.size() > 6) {
// map<int, double> sqMap;
// for (auto& iter : oneComb) {
// sqMap[iter.stationID] =
// abs((iter.time.m_Sec + iter.time.m_ActPointSec - result.occur_t) * cVeo -
// Stadistance(siteMap[iter.stationID].latitude, siteMap[iter.stationID].longitude,
// result.Lat, result.Lon));
// }
// // 找出sqMap中最大的值的编号,去除这个站点
// auto maxElement = max_element(sqMap.begin(), sqMap.end(),
// [](const pair<int, double>& p1, const pair<int, double>& p2) {
// return p1.second < p2.second;
// });
// oneComb.erase(remove_if(oneComb.begin(), oneComb.end(),
// [&](const TriggerInfo& trigger) {
// return trigger.stationID == maxElement->first;
// }),
// oneComb.end());
// vector<double> Loc_Time_One;
// vector<LocSta> Stations_One;
// for (int j = 0; j < oneComb.size(); ++j) {
// Loc_Time_One.emplace_back(oneComb[j].time.m_Sec + oneComb[j].time.m_ActPointSec);
// Stations_One.emplace_back(triggerPool[oneComb[j].stationID].staLocation);
// }
// result = GeoLocation_GPU(Stations_One, Loc_Time_One);
//}
delete[] sqList;
delete[] resultList;
}
void ThreadLoc(deque<TriggerInfo>& allTriggers, deque<TriggerInfo>& transTriggers,
unordered_map<int, StationInfo>& siteMap,
unordered_map<int, unordered_map<int, double>>& siteTimeMap, shared_mutex& rwMutex,
YAML::Node config, bool& keep_loading) {
ThreadPool postThreadPool{100};
double maxBaseLineAsTOA = (config["maxBaseLineAsTOA"].as<double>());
double LocThresholdInitial = config["LocThresholdInitial"].as<double>();
double checkTheta = config["checkTheta"].as<double>();
double waitTime = config["waitTime"].as<double>();
double LocThresholdFinal = config["LocThresholdFinal"].as<double>();
//GPSTime CurrentProcessingTime = GPSTime();
GPSTime CurrentProcessingTime(config["CurrentProcessingTime"].as<string>());
if (config["mode"].as<string>() == "reProcess") CurrentProcessingTime = GPSTime();
ofstream outfile_O;
if (config["mode"].as<string>() == "reProcess") outfile_O.open("lig_txt/NewData2.txt", ios::out);
auto start = std::chrono::high_resolution_clock::now();
// 用于测试
while (keep_loading || transTriggers.size()) {
//合并数据
if (transTriggers.size()) {
unique_lock<shared_mutex> lock(rwMutex);
for (const auto& trigger : transTriggers) {
if (trigger.time > CurrentProcessingTime) allTriggers.push_back(trigger);
}
sort(allTriggers.begin(), allTriggers.end()); // 按照时间排序
allTriggers.erase(unique(allTriggers.begin(), allTriggers.end()), allTriggers.end()); // 去重
transTriggers.clear();
AccumulatedIdleTime = 0;
lock.unlock();
} else {
LOG_INFO("Wait for new trigger data... 10s, accumulated idle time: " << AccumulatedIdleTime
<< endl);
std::this_thread::sleep_for(std::chrono::seconds(10));
if (AccumulatedIdleTime % 600 == 0 && AccumulatedIdleTime != 0) {
LigDataApi::sendDataViaMQTT(topic_w, "十分钟未定位闪电数据!");
LOG_WARN(AccumulatedIdleTime << " second without locating lightning data!" << endl);
}
AccumulatedIdleTime += 10;
continue;
}
LOG_INFO("New data merge and begin processing: "
<< CGPSTimeAlgorithm::GetTimeString(CurrentProcessingTime) << endl);
LOG_INFO("Size of allTriggers: " << allTriggers.size() << endl);
// 开始定位
while (allTriggers.size()) {
// Wait time
if (allTriggers.size() > 1) {
if (allTriggers[1].time - allTriggers[0].time > maxBaseLineAsTOA) {
allTriggers.erase(allTriggers.begin());
continue;
}
}
ordered_map<int, triggerAtStation> triggerPool;
TriggerInfo& baseTrig = allTriggers[0];
//vector<int> recycleIdx;
// 预留网络等待的时间,如果不满足则直接返回
CurrentProcessingTime = CGPSTimeAlgorithm::AddActPointSec(baseTrig.time, maxBaseLineAsTOA);
if ((CGPSTimeAlgorithm::ConvertGPSTimeToUnixTime(baseTrig.time) + 8 * 3600) >
(time(nullptr) - waitTime))
break;
//LOG_INFO( "Time: " << (CGPSTimeAlgorithm::ConvertGPSTimeToUnixTime(baseTrig.time) + 8 * 3600) << " " << (time(nullptr) - waitTime) );
// CurrentProcessingTime = allTriggers[0].time;
for (int j = 0; j < allTriggers.size(); ++j) {
TriggerInfo& oneTrig = allTriggers[j];
if (baseTrig.Value * oneTrig.Value < 0) continue;
int trigSiteID = oneTrig.stationID;
double diffTime = baseTrig.time - oneTrig.time;
if ((diffTime < -maxBaseLineAsTOA) || (diffTime > 0)) break;
// 这里第一个基站只放入一个数据,因为筛选标准是以第一个基站定标的,所以第一个基站只放入一个数据
if ((fabs(diffTime) <= (siteTimeMap[baseTrig.stationID][oneTrig.stationID] +
0.0001))) // || (baseTrig.stationID == oneTrig.stationID))
{
// 判断键值是否存在
if (triggerPool.find(trigSiteID) == triggerPool.end()) {
if (triggerPool.size() >= 8) continue; // New stategy, 使用距离最近的8个站点定位
//recycleIdx.push_back(j);
triggerAtStation oneTriggerAtStation;
oneTriggerAtStation.staLocation =
(LocSta(siteMap[trigSiteID].latitude * degree2radians,
siteMap[trigSiteID].longitude * degree2radians,
siteMap[trigSiteID].altitude / 1000.0));
oneTriggerAtStation.stationID = trigSiteID;
oneTriggerAtStation.triggers.emplace_back(oneTrig);
oneTriggerAtStation.triggers.back().trigIdx = j;
triggerPool[trigSiteID] = oneTriggerAtStation;
} else {
//recycleIdx.push_back(j);
triggerPool[trigSiteID].triggers.emplace_back(oneTrig);
}
}
}
//if ((triggerPool.size()) < 5)
//{
// for (int i = recycleIdx.size() - 1; i >= 0; i--) {
// allTriggers.erase(allTriggers.begin() + recycleIdx[i]);
// }
// continue;
//}
if ((triggerPool.size()) >= 5) {
double MinSq = FLOAT_MAX;
double ThresSqInitial = LocThresholdInitial;
double ThresSqFinal = LocThresholdFinal;
LocSta result;
__int64 finalCombIdx;
vector<vector<TriggerInfo>> locCombinationPool;
//std::LOG_INFO( "Before comb, Elapsed time: " << std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - start).count() << " seconds.\n";
if (MinSq > ThresSqInitial) {
locCombinationPool =
LigTools::getLocationPool_p(triggerPool, siteTimeMap, triggerPool.size());
if (locCombinationPool.size() > 0)
ProcessCombinationPool(locCombinationPool, triggerPool, MinSq, result, finalCombIdx,
CountGeoLocationTimes, siteMap);
}
for (int i = 0; i < 2; ++i) //最大站点数为8,最少可信站点数为6,迭代2次
{
if (MinSq > ThresSqInitial && triggerPool.size() > 6) {
if (locCombinationPool.size() > 0) {
map<int, double> sqMap;
for (auto& iter : locCombinationPool[finalCombIdx]) {
sqMap[iter.stationID] =
abs((iter.time.m_Sec + iter.time.m_ActPointSec - result.occur_t) * cVeo -
Stadistance(siteMap[iter.stationID].latitude,
siteMap[iter.stationID].longitude, result.Lat, result.Lon));
}
// 找出sqMap中最大的值的编号,去除这个站点
auto maxElement =
max_element(sqMap.begin(), sqMap.end(),
[](const pair<int, double>& p1, const pair<int, double>& p2) {
return p1.second < p2.second;
});
triggerPool.erase(maxElement->first);
}
locCombinationPool =
LigTools::getLocationPool_p(triggerPool, siteTimeMap, triggerPool.size());
if (locCombinationPool.size() > 0)
ProcessCombinationPool(locCombinationPool, triggerPool, MinSq, result, finalCombIdx,
CountGeoLocationTimes, siteMap);
}
}
//if (MinSq > ThresSqInitial && triggerPool.size() > 6)
//{
// locCombinationPool = LigTools::getLocationPool_p(triggerPool, siteTimeMap, triggerPool.size() - 1);
// if (locCombinationPool.size() > 0)
// ProcessCombinationPool(locCombinationPool, triggerPool, MinSq, result, finalCombIdx, CountGeoLocationTimes);
//}
//if (MinSq > ThresSqInitial && triggerPool.size() > 7)
//{
// locCombinationPool = LigTools::getLocationPool_p(triggerPool, siteTimeMap, triggerPool.size() - 2);
// if (locCombinationPool.size() > 0)
// ProcessCombinationPool(locCombinationPool, triggerPool, MinSq, result, finalCombIdx, CountGeoLocationTimes);
//}
//std::LOG_INFO( "After comb, Elapsed time: " << std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - start).count() << " seconds.\n";
if (MinSq < ThresSqInitial) {
CountLocationPoints++;
vector<TriggerInfo>& oneComb = locCombinationPool[finalCombIdx];
vector<double> Loc_Time_One;
vector<LocSta> Stations_One;
for (int j = 0; j < oneComb.size(); ++j) {
Loc_Time_One.emplace_back(oneComb[j].time.m_Sec + oneComb[j].time.m_ActPointSec);
Stations_One.emplace_back(triggerPool[oneComb[j].stationID].staLocation);
}
LocSta oneResult = result;
// oneResult = FinalGeoLocation_GPU(Stations_One, Loc_Time_One, result);
//oneResult = GeoLocation_OP(Stations_One, Loc_Time_One, result);
//oneResult = GeoLocation_OP_2(Stations_One, Loc_Time_One, result);
LocSta oneResult_rad = oneResult;
oneResult_rad.Lat = oneResult.Lat * degree2radians;
oneResult_rad.Lon = oneResult.Lon * degree2radians;
//ThresSqFinal = 1.5;
//if (oneComb.size() >= 6) ThresSqFinal = 3;
double distanceToBase =
Stadistance(siteMap[oneComb[0].stationID].latitude,
siteMap[oneComb[0].stationID].longitude, oneResult.Lat, oneResult.Lon);
//LOG_INFO("Pending-----" << CGPSTimeAlgorithm::GetTimeStr(oneComb[0].time) << " "
// << oneResult.Lat << " " << oneResult.Lon << " " << oneResult.h
// << " " << oneResult.sq << " " << distanceToBase << endl);
if (LigTools::check_location_structure(Stations_One, oneResult_rad, checkTheta) &&
oneResult.sq < ThresSqFinal && (distanceToBase < 4000.0)) {
LOG_INFO("CountGeoLocationTimes: "
<< CountGeoLocationTimes << " CountLocationPoints: " << CountLocationPoints
<< " Number of sites: " << Stations_One.size() << endl);
LOG_INFO(CGPSTimeAlgorithm::GetTimeStr(oneComb[0].time)
<< " " << oneResult.Lat << " " << oneResult.Lon << " " << oneResult.h << " "
<< oneResult.sq << " " << distanceToBase << endl);
// 把LOG(INFO)的内容写入NewData.txt里,调试使用
// 改成覆盖写入模式
GPSTime lig_time = oneComb[0].time;
lig_time.set_second(oneResult.occur_t);
if (outfile_O.is_open())
outfile_O << CGPSTimeAlgorithm::GetTimeStr(lig_time) << " " << oneResult.Lat << " "
<< oneResult.Lon << " " << oneResult.h << " " << oneResult.sq << " "
<< oneComb.size() << endl;
postThreadPool.enqueue(LigDataApi::PostLigResult, lig_time, oneResult, oneComb,
siteMap);
}
// LOG_INFO( "Test2" );
// 需要删除的元素的索引
std::vector<int> indices;
for (int i = 0; i < oneComb.size(); ++i) {
indices.emplace_back(oneComb[i].trigIdx);
// LOG_INFO( siteMap[oneComb[i].stationID].name << " " << oneComb[i].Value << " " << CGPSTimeAlgorithm::GetTimeStr(oneComb[i].time) << " " << Stadistance(siteMap[oneComb[i].stationID].latitude, siteMap[oneComb[i].stationID].longitude, oneResult.Lat, oneResult.Lon) );
}
// 从后向前删除
// shared_lock<shared_mutex> lock(rwMutex);
std::sort(indices.begin(), indices.end(), std::greater<int>());
for (auto i : indices) {
allTriggers.erase(allTriggers.begin() + i);
}
continue;
}
}
allTriggers.erase(allTriggers.begin());
// 输出经过的时间
}
// 计算经过的时间(以秒为单位)
}
LigDataApi::disconnect();
if (outfile_O.is_open()) outfile_O.close();
LOG_INFO(
"CountLocationPoints: "
<< CountLocationPoints << " Elapsed time: "
<< std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - start).count()
<< " seconds.\n");
}