-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathThreadLoadData.cpp
More file actions
60 lines (51 loc) · 2.52 KB
/
ThreadLoadData.cpp
File metadata and controls
60 lines (51 loc) · 2.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include "WorkThreads.h"
void threadLoadData(deque<TriggerInfo>& transTriggers, LigDataApi& LigDataApi,
shared_mutex& rwMutex, YAML::Node& config, bool& keep_loading) {
if (config["mode"].as<string>() == "reProcess") {
GPSTime till_time(config["reProcess"]["startTime"].as<string>());
GPSTime start_time(config["reProcess"]["startTime"].as<string>());
GPSTime end_time(config["reProcess"]["endTime"].as<string>());
while (1) {
// ��ȡ���µ�����
till_time += Duration("00000000T002000");
vector<TriggerInfo> cache = LigDataApi.GetHistoricalTriggerDataUntill(till_time, 20);
cache.erase(remove_if(cache.begin(), cache.end(),
[&](const TriggerInfo& trigger) {
return trigger.time < start_time || trigger.time > end_time;
}),
cache.end());
// �ϲ�����
int init_size = transTriggers.size();
unique_lock<shared_mutex> lock(rwMutex);
transTriggers.insert(transTriggers.end(), cache.begin(), cache.end());
sort(transTriggers.begin(), transTriggers.end()); // ����ʱ������
transTriggers.erase(unique(transTriggers.begin(), transTriggers.end()),
transTriggers.end()); // ȥ��
lock.unlock();
LOG_INFO("cache size " << cache.size() << endl);
LOG_INFO("add " << transTriggers.size() - init_size << " new triggers" << endl);
LOG_INFO("current transTriggers size: " << transTriggers.size() << endl);
if (till_time >= end_time) {
keep_loading = false;
break;
}
}
} else {
while (1) {
// ��ȡ���µ�����
vector<TriggerInfo> cache = LigDataApi.GetRealTimeTriggerData();
// �ϲ�����
int init_size = transTriggers.size();
unique_lock<shared_mutex> lock(rwMutex);
transTriggers.insert(transTriggers.end(), cache.begin(), cache.end());
sort(transTriggers.begin(), transTriggers.end()); // ����ʱ������
transTriggers.erase(unique(transTriggers.begin(), transTriggers.end()),
transTriggers.end()); // ȥ��
lock.unlock();
LOG_INFO("cache size " << cache.size() << endl);
LOG_INFO("add " << transTriggers.size() - init_size << " new triggers" << endl);
LOG_INFO("current transTriggers size: " << transTriggers.size() << endl);
std::this_thread::sleep_for(std::chrono::seconds(60));
}
}
}