-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathScheduler.cc
More file actions
132 lines (110 loc) · 3.77 KB
/
Scheduler.cc
File metadata and controls
132 lines (110 loc) · 3.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#include <iostream>
#include <vector>
#include <map>
#include <functional>
#include <unordered_map>
#include <atomic>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
int64_t getCurrMs() {
auto now = std::chrono::system_clock::now();
std::chrono::milliseconds nowMs =
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch());
return nowMs.count();
}
class Scheduler {
public:
Scheduler() {
stopped_ = false;
std::thread loop([this](){
this->scheduleLoop();
});
loop.detach();
}
~Scheduler() {
std::unique_lock<std::mutex> lock(mu_);
stopped_ = true;
}
int64_t schedule(std::function<void()> task, int64_t delayInSec) {
std::unique_lock<std::mutex> lock(mu_);
int64_t taskId = idGen_++;
int64_t targetTs = getCurrMs() + delayInSec * 1000;
Task newTask(taskId, task, targetTs);
tasks_[taskId] = newTask;
taskBuckets_[targetTs].push_back(taskId);
runCv_.notify_one();
return taskId;
}
void cancel(int64_t taskId) {
std::unique_lock<std::mutex> lock(mu_);
tasks_.erase(taskId);
}
private:
struct Task {
Task() = default;
Task(int64_t id, std::function<void()> r, int64_t t):
taskId(id), run(r), targetTs(t) {}
Task(const Task& other) {
taskId = other.taskId;
run = other.run;
targetTs = other.targetTs;
}
Task& operator=(const Task& other) {
taskId = other.taskId;
run = other.run;
targetTs = other.targetTs;
return *this;
}
int64_t taskId;
std::function<void()> run;
int64_t targetTs;
};
void scheduleLoop() {
std::unique_lock<std::mutex> lock(mu_);
while (!stopped_) {
if (taskBuckets_.empty()) {
runCv_.wait_for(lock, std::chrono::milliseconds(1000));
continue;
}
// check if we have tasks need to be executed
int64_t nextTs = taskBuckets_.begin()->first;
int64_t delay = nextTs - getCurrMs();
if (delay < 0) {
auto& taskIds = taskBuckets_.begin()->second;
for (auto taskId: taskIds) {
// the task maybe canceled
if (tasks_.find(taskId) == tasks_.end()) {
continue;
}
std::thread t(tasks_[taskId].run);
t.detach(); // run in back ground
tasks_.erase(taskId);
}
taskBuckets_.erase(nextTs);
} else {
runCv_.wait_for(lock, std::chrono::milliseconds(delay));
}
}
}
private:
std::mutex mu_;
std::condition_variable runCv_;
bool stopped_;
int64_t idGen_;
std::unordered_map<int64_t, Task> tasks_;
std::map<int64_t, std::vector<int64_t>> taskBuckets_;
};
int main() {
Scheduler tasks;
std::cout << "Now time is: " << getCurrMs() << std::endl;
auto task1 = tasks.schedule([](){
std::cout << "Now time is: " << getCurrMs() << std::endl;
}, 2);
auto task2 = tasks.schedule([](){
std::cout << "Now time is: " << getCurrMs() << std::endl;
}, 1);
tasks.cancel(task2);
std::this_thread::sleep_for(std::chrono::seconds(10));
}