Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/builder.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ jobs:
fetch-depth: 0 # for correct version numbers
submodules: recursive
- run: builder/build.sh -e 1 -v ${{ matrix.os }}
- uses: actions/upload-artifact@v4
with:
name: package-${{ matrix.os }}
path: builder/tmp/latest/${{ matrix.os }}/dist/*
109 changes: 106 additions & 3 deletions wforce/wforce-prometheus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void WforcePrometheus::incAllowStatusMetric(const std::string& name)
}
}

void WforcePrometheus::addReplicationSibling(const std::string& name)
void WforcePrometheus::addReplicationSibling(const std::string& name, const std::function<int()>& func)
{
std::lock_guard<std::mutex> lock(repl_mutx);
Counter* c = &(repl_sent_family->Add({{"sibling", name},
Expand All @@ -54,14 +54,27 @@ void WforcePrometheus::addReplicationSibling(const std::string& name)
c = &(repl_sent_family->Add({{"sibling", name},
{"status", "error"}}));
repl_sent_err_metrics.insert(std::make_pair(name, c));
c = &(repl_send_queue_err_family->Add({{"sibling", name}}));
repl_send_queue_err_metrics.insert(std::make_pair(name, c));
c = &(repl_rcvd_family->Add({{"sibling", name},
{"status", "ok"}}));
repl_rcvd_ok_metrics.insert(std::make_pair(name, c));
c = &(repl_rcvd_family->Add({{"sibling", name},
{"status", "error"}}));
repl_rcvd_err_metrics.insert(std::make_pair(name, c));
c = &(repl_rcvd_queue_err_family->Add({{"sibling", name}}));
repl_rcvd_err_metrics.insert(std::make_pair(name, c));
c = &(repl_connfail_family->Add({{"sibling", name}}));
repl_connfail_metrics.insert(std::make_pair(name, c));
Gauge* g = &(repl_send_queue_size_family->Add({{"sibling", name}}));
repl_send_queue_size_metrics.insert(std::make_pair(name, g));
// Add the function which returns the sibling queue size
if (func != nullptr) {
repl_send_queue_func_map.insert_or_assign(name, func);
}
else {
repl_send_queue_func_map.insert_or_assign(name, []() { return 0; });
}
}

void WforcePrometheus::removeReplicationSiblingNoLock(const std::string& name)
Expand All @@ -76,6 +89,11 @@ void WforcePrometheus::removeReplicationSiblingNoLock(const std::string& name)
repl_sent_family->Remove(c->second);
repl_sent_err_metrics.erase(name);
}
c = repl_send_queue_err_metrics.find(name);
if (c != repl_send_queue_err_metrics.end()) {
repl_send_queue_err_family->Remove(c->second);
repl_send_queue_err_metrics.erase(name);
}
c = repl_rcvd_ok_metrics.find(name);
if (c != repl_rcvd_ok_metrics.end()) {
repl_rcvd_family->Remove(c->second);
Expand All @@ -86,11 +104,25 @@ void WforcePrometheus::removeReplicationSiblingNoLock(const std::string& name)
repl_rcvd_family->Remove(c->second);
repl_rcvd_err_metrics.erase(name);
}
c = repl_rcvd_queue_err_metrics.find(name);
if (c != repl_rcvd_queue_err_metrics.end()) {
repl_rcvd_queue_err_family->Remove(c->second);
repl_rcvd_queue_err_metrics.erase(name);
}
c = repl_connfail_metrics.find(name);
if (c != repl_connfail_metrics.end()) {
repl_connfail_family->Remove(c->second);
repl_connfail_metrics.erase(name);
}
auto g = repl_send_queue_size_metrics.find(name);
if (g != repl_send_queue_size_metrics.end()) {
repl_send_queue_size_family->Remove(g->second);
repl_send_queue_size_metrics.erase(name);
}
// Remove the function which returns the sibling queue size
if (auto i = repl_send_queue_func_map.find(name); i != repl_send_queue_func_map.end()) {
repl_send_queue_func_map.erase(i);
}
}

void WforcePrometheus::removeReplicationSibling(const std::string& name)
Expand Down Expand Up @@ -128,6 +160,15 @@ void WforcePrometheus::incReplicationSent(const std::string& name, bool success)
}
}

void WforcePrometheus::incReplicationSendQueueErr(const std::string& name)
{
std::lock_guard<std::mutex> lock(repl_mutx);
auto i = repl_send_queue_err_metrics.find(name);
if (i != repl_send_queue_err_metrics.end()) {
i->second->Increment();
}
}

void WforcePrometheus::incReplicationRcvd(const std::string& name, bool success)
{
std::lock_guard<std::mutex> lock(repl_mutx);
Expand All @@ -145,6 +186,16 @@ void WforcePrometheus::incReplicationRcvd(const std::string& name, bool success)
}
}

void WforcePrometheus::incReplicationRcvdQueueErr(const std::string& name)
{
std::lock_guard<std::mutex> lock(repl_mutx);
auto i = repl_rcvd_queue_err_metrics.find(name);
if (i != repl_rcvd_queue_err_metrics.end())
{
i->second->Increment();
}
}

void WforcePrometheus::incReplicationConnFail(const std::string& name)
{
std::lock_guard<std::mutex> lock(repl_mutx);
Expand All @@ -154,6 +205,29 @@ void WforcePrometheus::incReplicationConnFail(const std::string& name)
}
}

void WforcePrometheus::setReplicationSendQueueSize(const std::string& name, int size)
{
std::lock_guard<std::mutex> lock(repl_mutx);
auto i = repl_send_queue_size_metrics.find(name);
if (i != repl_send_queue_size_metrics.end()) {
i->second->Set(size);
}
}

void WforcePrometheus::setReplicationSendQueueRetrieveFunc(const std::string& name, const std::function<int()>& func)
{
std::lock_guard<std::mutex> lock(repl_mutx);
auto i = repl_send_queue_func_map.find(name);
if (i != repl_send_queue_func_map.end()) {
if (func != nullptr) {
i->second = func;
}
else {
i->second = []() { return 0; };
}
}
}

void WforcePrometheus::incRedisBLUpdates()
{
if (redis_bl_updates != nullptr)
Expand Down Expand Up @@ -252,10 +326,10 @@ void incPrometheusAllowStatusMetric(const std::string& name)
}
}

void addPrometheusReplicationSibling(const std::string& name)
void addPrometheusReplicationSibling(const std::string& name, const std::function<int()>& func)
{
if (wforce_prom_metrics != nullptr) {
wforce_prom_metrics->addReplicationSibling(name);
wforce_prom_metrics->addReplicationSibling(name, func);
}
}

Expand All @@ -280,13 +354,27 @@ void incPrometheusReplicationSent(const std::string& name, bool success)
}
}

void incPrometheusReplicationSendQueueErr(const std::string& name)
{
if (wforce_prom_metrics != nullptr) {
wforce_prom_metrics->incReplicationSendQueueErr(name);
}
}

void incPrometheusReplicationRcvd(const std::string& name, bool success)
{
if (wforce_prom_metrics != nullptr) {
wforce_prom_metrics->incReplicationRcvd(name, success);
}
}

void incPrometheusReplicationRcvdQueueErr(const std::string& name)
{
if (wforce_prom_metrics != nullptr) {
wforce_prom_metrics->incReplicationRcvdQueueErr(name);
}
}

void incPrometheusReplicationConnFail(const std::string& name)
{
if (wforce_prom_metrics != nullptr) {
Expand Down Expand Up @@ -405,3 +493,18 @@ void setPrometheusReplRecvQueueRetrieveFunc(std::function<int()> func)
wforce_prom_metrics->setReplRecvQueueRetrieveFunc(func);
}
}

void setPrometheusReplSendQueueSize(const std::string& name, int value)
{
if (wforce_prom_metrics) {
wforce_prom_metrics->setReplicationSendQueueSize(name, value);
}
}

void setPrometheusReplSendQueueRetrieveFunc(const std::string& name, const std::function<int()>& func)
{
if (wforce_prom_metrics)
{
wforce_prom_metrics->setReplicationSendQueueRetrieveFunc(name, func);
}
}
46 changes: 42 additions & 4 deletions wforce/wforce-prometheus.hh
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,18 @@ public:
.Name(d_prefix+"_replication_sent_total")
.Help("How many replication messages were sent?")
.Register(*d_registry));
repl_send_queue_err_family = &(BuildCounter()
.Name(d_prefix+"_replication_send_queue_error_total")
.Help("How many errors trying to add replication messages to the send queue?")
.Register(*d_registry));
repl_rcvd_family = &(BuildCounter()
.Name(d_prefix+"_replication_rcvd_total")
.Help("How many replication messages were rcvd?")
.Register(*d_registry));
repl_rcvd_queue_err_family = &(BuildCounter()
.Name(d_prefix+"_replication_rcvd_queue_error_total")
.Help("How many errors trying to add replication msgs to the receive queue?")
.Register(*d_registry));
repl_connfail_family = &(BuildCounter()
.Name(d_prefix+"_replication_tcp_connfailed_total")
.Help("How many TCP connections failed?")
Expand Down Expand Up @@ -83,6 +91,10 @@ public:
.Help("How full is the replication recv worker thread queue?")
.Register(*d_registry);
repl_recv_queue_size = &(repl_recv_queue_family.Add({}));
repl_send_queue_size_family = &(BuildGauge()
.Name(d_prefix+"_repl_send_queue_size")
.Help("How full is the replication per-sibling send queue?")
.Register(*d_registry));
}
else {
throw WforceException("Could not allocate memory for Prometheus Registry");
Expand All @@ -93,11 +105,13 @@ public:
void addAllowStatusMetric(const std::string& name);
void incAllowStatusMetric(const std::string& name);

void addReplicationSibling(const std::string& name);
void addReplicationSibling(const std::string& name, const std::function<int()>& func);
void removeReplicationSibling(const std::string& name);
void removeAllReplicationSiblings();
void incReplicationSent(const std::string& name, bool success);
void incReplicationSendQueueErr(const std::string& name);
void incReplicationRcvd(const std::string& name, bool success);
void incReplicationRcvdQueueErr(const std::string& name);
void incReplicationConnFail(const std::string& name);

void incRedisBLUpdates();
Expand All @@ -120,16 +134,28 @@ public:
repl_recv_queue_size->Set(value);
}

void setReplRecvQueueRetrieveFunc(std::function<int()> func)
void setReplicationSendQueueSize(const std::string& name, int size);

void setReplRecvQueueRetrieveFunc(const std::function<int()>& func)
{
repl_queue_func = func;
}


void setReplicationSendQueueRetrieveFunc(const std::string& name, const std::function<int()>& func);

std::string serialize() override
{
// We want to retrieve the value of the worker thread queue only when metrics
// are asked for. The promethus-cpp library doesn't allow this itself
setReplRecvQueueSize(repl_queue_func());
for (auto& i : repl_send_queue_func_map) {
if (i.second != nullptr) {
setReplicationSendQueueSize(i.first, i.second());
}
}
// We need to lock the mutex to prevent any of the dynamic replication-related
// prometheus metrics changing while we serialize
std::lock_guard<std::mutex> lock(repl_mutx);
return PrometheusMetrics::serialize();
}
protected:
Expand All @@ -141,11 +167,15 @@ private:
// unlike the other metrics
std::mutex repl_mutx;
Family<Counter>* repl_sent_family;
Family<Counter>* repl_send_queue_err_family;
std::map<std::string, Counter*> repl_sent_ok_metrics;
std::map<std::string, Counter*> repl_sent_err_metrics;
std::map<std::string, Counter*> repl_send_queue_err_metrics;
Family<Counter>* repl_rcvd_family;
Family<Counter>* repl_rcvd_queue_err_family;
std::map<std::string, Counter*> repl_rcvd_ok_metrics;
std::map<std::string, Counter*> repl_rcvd_err_metrics;
std::map<std::string, Counter*> repl_rcvd_queue_err_metrics;
Family<Counter>* repl_connfail_family;
std::map<std::string, Counter*> repl_connfail_metrics;
Counter* redis_bl_updates;
Expand All @@ -163,18 +193,23 @@ private:
Gauge* wl_entries_ja3;
Gauge* wl_entries_ipja3;
Gauge* repl_recv_queue_size;
Family<Gauge>* repl_send_queue_size_family;
std::map<std::string, Gauge*> repl_send_queue_size_metrics;
std::function<int()> repl_queue_func;
std::map<std::string, std::function<int()>> repl_send_queue_func_map;
};

void initWforcePrometheusMetrics(std::shared_ptr<WforcePrometheus> wpmp);
void addPrometheusAllowStatusMetric(const std::string& name);
void incPrometheusAllowStatusMetric(const std::string& name);

void addPrometheusReplicationSibling(const std::string& name);
void addPrometheusReplicationSibling(const std::string& name, const std::function<int()>& func);
void removePrometheusReplicationSibling(const std::string& name);
void removeAllPrometheusReplicationSiblings();
void incPrometheusReplicationSent(const std::string& name, bool success);
void incPrometheusReplicationSendQueueErr(const std::string& name);
void incPrometheusReplicationRcvd(const std::string& name, bool success);
void incPrometheusReplicationRcvdQueueErr(const std::string& name);
void incPrometheusReplicationConnFail(const std::string& name);

void incPrometheusRedisBLUpdates();
Expand All @@ -194,3 +229,6 @@ void setPrometheusWLJA3Entries(int);

void setPrometheusReplRecvQueueSize(int value);
void setPrometheusReplRecvQueueRetrieveFunc(std::function<int()> func);

void setPrometheusReplSendQueueSize(const std::string& name, int value);
void setPrometheusReplSendQueueRetrieveFunc(const std::string& name, const std::function<int()>& func);
2 changes: 1 addition & 1 deletion wforce/wforce-replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ void WforceReplication::parseReceivedReplicationMsg(const std::string& msg, cons
{
std::lock_guard<std::mutex> lock(d_sibling_queue_mutex);
if (d_sibling_queue.size() >= d_max_sibling_queue_size) {
errlog("parseReceivedReplicationMsg: max sibling recv queue size (%d) reached - dropping replication msg", d_max_sibling_queue_size);
incPrometheusReplicationSendQueueErr(remote.toString());
return;
}
else {
Expand Down
Loading
Loading