From 96fe007d6509b3dde9f9373fc9af4540991016ca Mon Sep 17 00:00:00 2001 From: Neil Cook Date: Tue, 4 Nov 2025 22:52:05 +0000 Subject: [PATCH 1/4] wforce: Add new metrics to track send and recv queue errors and size --- wforce/wforce-prometheus.cc | 109 +++++++++++++++++++++++++++++++++++- wforce/wforce-prometheus.hh | 46 +++++++++++++-- 2 files changed, 148 insertions(+), 7 deletions(-) diff --git a/wforce/wforce-prometheus.cc b/wforce/wforce-prometheus.cc index dfbc9783..8c01b0c4 100644 --- a/wforce/wforce-prometheus.cc +++ b/wforce/wforce-prometheus.cc @@ -45,7 +45,7 @@ void WforcePrometheus::incAllowStatusMetric(const std::string& name) } } -void WforcePrometheus::addReplicationSibling(const std::string& name) +void WforcePrometheus::addReplicationSibling(const std::string& name, const std::function& func) { std::lock_guard lock(repl_mutx); Counter* c = &(repl_sent_family->Add({{"sibling", name}, @@ -54,14 +54,27 @@ void WforcePrometheus::addReplicationSibling(const std::string& name) c = &(repl_sent_family->Add({{"sibling", name}, {"status", "error"}})); repl_sent_err_metrics.insert(std::make_pair(name, c)); + c = &(repl_send_queue_err_family->Add({{"sibling", name}})); + repl_send_queue_err_metrics.insert(std::make_pair(name, c)); c = &(repl_rcvd_family->Add({{"sibling", name}, {"status", "ok"}})); repl_rcvd_ok_metrics.insert(std::make_pair(name, c)); c = &(repl_rcvd_family->Add({{"sibling", name}, {"status", "error"}})); repl_rcvd_err_metrics.insert(std::make_pair(name, c)); + c = &(repl_rcvd_queue_err_family->Add({{"sibling", name}})); + repl_rcvd_err_metrics.insert(std::make_pair(name, c)); c = &(repl_connfail_family->Add({{"sibling", name}})); repl_connfail_metrics.insert(std::make_pair(name, c)); + Gauge* g = &(repl_send_queue_size_family->Add({{"sibling", name}})); + repl_send_queue_size_metrics.insert(std::make_pair(name, g)); + // Add the function which returns the sibling queue size + if (func != nullptr) { + repl_send_queue_func_map.insert_or_assign(name, func); + } + else { + repl_send_queue_func_map.insert_or_assign(name, []() { return 0; }); + } } void WforcePrometheus::removeReplicationSiblingNoLock(const std::string& name) @@ -76,6 +89,11 @@ void WforcePrometheus::removeReplicationSiblingNoLock(const std::string& name) repl_sent_family->Remove(c->second); repl_sent_err_metrics.erase(name); } + c = repl_send_queue_err_metrics.find(name); + if (c != repl_send_queue_err_metrics.end()) { + repl_send_queue_err_family->Remove(c->second); + repl_send_queue_err_metrics.erase(name); + } c = repl_rcvd_ok_metrics.find(name); if (c != repl_rcvd_ok_metrics.end()) { repl_rcvd_family->Remove(c->second); @@ -86,11 +104,25 @@ void WforcePrometheus::removeReplicationSiblingNoLock(const std::string& name) repl_rcvd_family->Remove(c->second); repl_rcvd_err_metrics.erase(name); } + c = repl_rcvd_queue_err_metrics.find(name); + if (c != repl_rcvd_queue_err_metrics.end()) { + repl_rcvd_queue_err_family->Remove(c->second); + repl_rcvd_queue_err_metrics.erase(name); + } c = repl_connfail_metrics.find(name); if (c != repl_connfail_metrics.end()) { repl_connfail_family->Remove(c->second); repl_connfail_metrics.erase(name); } + auto g = repl_send_queue_size_metrics.find(name); + if (g != repl_send_queue_size_metrics.end()) { + repl_send_queue_size_family->Remove(g->second); + repl_send_queue_size_metrics.erase(name); + } + // Remove the function which returns the sibling queue size + if (auto i = repl_send_queue_func_map.find(name); i != repl_send_queue_func_map.end()) { + repl_send_queue_func_map.erase(i); + } } void WforcePrometheus::removeReplicationSibling(const std::string& name) @@ -128,6 +160,15 @@ void WforcePrometheus::incReplicationSent(const std::string& name, bool success) } } +void WforcePrometheus::incReplicationSendQueueErr(const std::string& name) +{ + std::lock_guard lock(repl_mutx); + auto i = repl_send_queue_err_metrics.find(name); + if (i != repl_send_queue_err_metrics.end()) { + i->second->Increment(); + } +} + void WforcePrometheus::incReplicationRcvd(const std::string& name, bool success) { std::lock_guard lock(repl_mutx); @@ -145,6 +186,16 @@ void WforcePrometheus::incReplicationRcvd(const std::string& name, bool success) } } +void WforcePrometheus::incReplicationRcvdQueueErr(const std::string& name) +{ + std::lock_guard lock(repl_mutx); + auto i = repl_rcvd_queue_err_metrics.find(name); + if (i != repl_rcvd_queue_err_metrics.end()) + { + i->second->Increment(); + } +} + void WforcePrometheus::incReplicationConnFail(const std::string& name) { std::lock_guard lock(repl_mutx); @@ -154,6 +205,29 @@ void WforcePrometheus::incReplicationConnFail(const std::string& name) } } +void WforcePrometheus::setReplicationSendQueueSize(const std::string& name, int size) +{ + std::lock_guard lock(repl_mutx); + auto i = repl_send_queue_size_metrics.find(name); + if (i != repl_send_queue_size_metrics.end()) { + i->second->Set(size); + } +} + +void WforcePrometheus::setReplicationSendQueueRetrieveFunc(const std::string& name, const std::function& func) +{ + std::lock_guard lock(repl_mutx); + auto i = repl_send_queue_func_map.find(name); + if (i != repl_send_queue_func_map.end()) { + if (func != nullptr) { + i->second = func; + } + else { + i->second = []() { return 0; }; + } + } +} + void WforcePrometheus::incRedisBLUpdates() { if (redis_bl_updates != nullptr) @@ -252,10 +326,10 @@ void incPrometheusAllowStatusMetric(const std::string& name) } } -void addPrometheusReplicationSibling(const std::string& name) +void addPrometheusReplicationSibling(const std::string& name, const std::function& func) { if (wforce_prom_metrics != nullptr) { - wforce_prom_metrics->addReplicationSibling(name); + wforce_prom_metrics->addReplicationSibling(name, func); } } @@ -280,6 +354,13 @@ void incPrometheusReplicationSent(const std::string& name, bool success) } } +void incPrometheusReplicationSendQueueErr(const std::string& name) +{ + if (wforce_prom_metrics != nullptr) { + wforce_prom_metrics->incReplicationSendQueueErr(name); + } +} + void incPrometheusReplicationRcvd(const std::string& name, bool success) { if (wforce_prom_metrics != nullptr) { @@ -287,6 +368,13 @@ void incPrometheusReplicationRcvd(const std::string& name, bool success) } } +void incPrometheusReplicationRcvdQueueErr(const std::string& name) +{ + if (wforce_prom_metrics != nullptr) { + wforce_prom_metrics->incReplicationRcvdQueueErr(name); + } +} + void incPrometheusReplicationConnFail(const std::string& name) { if (wforce_prom_metrics != nullptr) { @@ -405,3 +493,18 @@ void setPrometheusReplRecvQueueRetrieveFunc(std::function func) wforce_prom_metrics->setReplRecvQueueRetrieveFunc(func); } } + +void setPrometheusReplSendQueueSize(const std::string& name, int value) +{ + if (wforce_prom_metrics) { + wforce_prom_metrics->setReplicationSendQueueSize(name, value); + } +} + +void setPrometheusReplSendQueueRetrieveFunc(const std::string& name, const std::function& func) +{ + if (wforce_prom_metrics) + { + wforce_prom_metrics->setReplicationSendQueueRetrieveFunc(name, func); + } +} \ No newline at end of file diff --git a/wforce/wforce-prometheus.hh b/wforce/wforce-prometheus.hh index 56ac5699..57a1152f 100644 --- a/wforce/wforce-prometheus.hh +++ b/wforce/wforce-prometheus.hh @@ -40,10 +40,18 @@ public: .Name(d_prefix+"_replication_sent_total") .Help("How many replication messages were sent?") .Register(*d_registry)); + repl_send_queue_err_family = &(BuildCounter() + .Name(d_prefix+"_replication_send_queue_error_total") + .Help("How many errors trying to add replication messages to the send queue?") + .Register(*d_registry)); repl_rcvd_family = &(BuildCounter() .Name(d_prefix+"_replication_rcvd_total") .Help("How many replication messages were rcvd?") .Register(*d_registry)); + repl_rcvd_queue_err_family = &(BuildCounter() + .Name(d_prefix+"_replication_rcvd_queue_error_total") + .Help("How many errors trying to add replication msgs to the receive queue?") + .Register(*d_registry)); repl_connfail_family = &(BuildCounter() .Name(d_prefix+"_replication_tcp_connfailed_total") .Help("How many TCP connections failed?") @@ -83,6 +91,10 @@ public: .Help("How full is the replication recv worker thread queue?") .Register(*d_registry); repl_recv_queue_size = &(repl_recv_queue_family.Add({})); + repl_send_queue_size_family = &(BuildGauge() + .Name(d_prefix+"_repl_send_queue_size") + .Help("How full is the replication per-sibling send queue?") + .Register(*d_registry)); } else { throw WforceException("Could not allocate memory for Prometheus Registry"); @@ -93,11 +105,13 @@ public: void addAllowStatusMetric(const std::string& name); void incAllowStatusMetric(const std::string& name); - void addReplicationSibling(const std::string& name); + void addReplicationSibling(const std::string& name, const std::function& func); void removeReplicationSibling(const std::string& name); void removeAllReplicationSiblings(); void incReplicationSent(const std::string& name, bool success); + void incReplicationSendQueueErr(const std::string& name); void incReplicationRcvd(const std::string& name, bool success); + void incReplicationRcvdQueueErr(const std::string& name); void incReplicationConnFail(const std::string& name); void incRedisBLUpdates(); @@ -120,16 +134,28 @@ public: repl_recv_queue_size->Set(value); } - void setReplRecvQueueRetrieveFunc(std::function func) + void setReplicationSendQueueSize(const std::string& name, int size); + + void setReplRecvQueueRetrieveFunc(const std::function& func) { repl_queue_func = func; } - + + void setReplicationSendQueueRetrieveFunc(const std::string& name, const std::function& func); + std::string serialize() override { // We want to retrieve the value of the worker thread queue only when metrics // are asked for. The promethus-cpp library doesn't allow this itself setReplRecvQueueSize(repl_queue_func()); + for (auto& i : repl_send_queue_func_map) { + if (i.second != nullptr) { + setReplicationSendQueueSize(i.first, i.second()); + } + } + // We need to lock the mutex to prevent any of the dynamic replication-related + // prometheus metrics changing while we serialize + std::lock_guard lock(repl_mutx); return PrometheusMetrics::serialize(); } protected: @@ -141,11 +167,15 @@ private: // unlike the other metrics std::mutex repl_mutx; Family* repl_sent_family; + Family* repl_send_queue_err_family; std::map repl_sent_ok_metrics; std::map repl_sent_err_metrics; + std::map repl_send_queue_err_metrics; Family* repl_rcvd_family; + Family* repl_rcvd_queue_err_family; std::map repl_rcvd_ok_metrics; std::map repl_rcvd_err_metrics; + std::map repl_rcvd_queue_err_metrics; Family* repl_connfail_family; std::map repl_connfail_metrics; Counter* redis_bl_updates; @@ -163,18 +193,23 @@ private: Gauge* wl_entries_ja3; Gauge* wl_entries_ipja3; Gauge* repl_recv_queue_size; + Family* repl_send_queue_size_family; + std::map repl_send_queue_size_metrics; std::function repl_queue_func; + std::map> repl_send_queue_func_map; }; void initWforcePrometheusMetrics(std::shared_ptr wpmp); void addPrometheusAllowStatusMetric(const std::string& name); void incPrometheusAllowStatusMetric(const std::string& name); -void addPrometheusReplicationSibling(const std::string& name); +void addPrometheusReplicationSibling(const std::string& name, const std::function& func); void removePrometheusReplicationSibling(const std::string& name); void removeAllPrometheusReplicationSiblings(); void incPrometheusReplicationSent(const std::string& name, bool success); +void incPrometheusReplicationSendQueueErr(const std::string& name); void incPrometheusReplicationRcvd(const std::string& name, bool success); +void incPrometheusReplicationRcvdQueueErr(const std::string& name); void incPrometheusReplicationConnFail(const std::string& name); void incPrometheusRedisBLUpdates(); @@ -194,3 +229,6 @@ void setPrometheusWLJA3Entries(int); void setPrometheusReplRecvQueueSize(int value); void setPrometheusReplRecvQueueRetrieveFunc(std::function func); + +void setPrometheusReplSendQueueSize(const std::string& name, int value); +void setPrometheusReplSendQueueRetrieveFunc(const std::string& name, const std::function& func); From 5f6a263c646120510a28567782d85e9323099157 Mon Sep 17 00:00:00 2001 From: Neil Cook Date: Tue, 4 Nov 2025 22:52:50 +0000 Subject: [PATCH 2/4] wforce: Don't log when recv replication queue is full, just increment a metric --- wforce/wforce-replication.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wforce/wforce-replication.cc b/wforce/wforce-replication.cc index db39ab9a..6dfb8989 100644 --- a/wforce/wforce-replication.cc +++ b/wforce/wforce-replication.cc @@ -170,7 +170,7 @@ void WforceReplication::parseReceivedReplicationMsg(const std::string& msg, cons { std::lock_guard lock(d_sibling_queue_mutex); if (d_sibling_queue.size() >= d_max_sibling_queue_size) { - errlog("parseReceivedReplicationMsg: max sibling recv queue size (%d) reached - dropping replication msg", d_max_sibling_queue_size); + incPrometheusReplicationSendQueueErr(remote.toString()); return; } else { From 8af16530839260286ef5d6aaf6369bde476d9535 Mon Sep 17 00:00:00 2001 From: Neil Cook Date: Tue, 4 Nov 2025 22:54:22 +0000 Subject: [PATCH 3/4] wforce: Setup new prometheus metrics for send queue size and set them instead of logging when queue size too big --- wforce/wforce-sibling.cc | 26 ++++++++++++++++++++------ wforce/wforce-sibling.hh | 7 +++++++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/wforce/wforce-sibling.cc b/wforce/wforce-sibling.cc index 9c514772..41e03368 100644 --- a/wforce/wforce-sibling.cc +++ b/wforce/wforce-sibling.cc @@ -132,6 +132,7 @@ Sibling::Sibling(const ComboAddress& ca, } }); } + d_name = rem.toStringWithPort(); } Sibling::~Sibling() @@ -208,8 +209,9 @@ void Sibling::queueMsg(const std::string& msg) { { std::lock_guard lock(queue_mutx); - if (queue.size() >= max_queue_size) { - errlog("Sibling::queueMsg: max sibling send queue size (%d) reached - dropping replication msg", max_queue_size); + auto queue_size = queue.size(); + if (queue_size >= max_queue_size) { + incPrometheusReplicationSendQueueErr(d_name); return; } else { @@ -447,10 +449,16 @@ bool addSibling(std::shared_ptr sibling, GlobalStateHolder([sibling]() -> int + { + return sibling->queue.size(); + }); + // This is for sending when we know the port - addPrometheusReplicationSibling(sibling->rem.toStringWithPort()); + addPrometheusReplicationSibling(sibling->rem.toStringWithPort(), get_queue_size_func); // This is for receiving when the port may be ephemeral - addPrometheusReplicationSibling(sibling->rem.toString()); + addPrometheusReplicationSibling(sibling->rem.toString(), get_queue_size_func); siblings.modify([sibling](vector>& v) { v.push_back(sibling); }); @@ -554,14 +562,20 @@ bool setSiblingsWithKey(const std::vector(ca, proto, raw_key, sibling_connect_timeout, sibling_queue_size, send_sdb, send_wlbl); sibling->checkIgnoreSelf(g_sibling_listen_addr); v.push_back(sibling); + auto get_queue_size_func = std::function([sibling]() -> int + { + return static_cast(sibling->queue.size()); + }); + setPrometheusReplSendQueueRetrieveFunc(ca.toStringWithPort(), get_queue_size_func); + setPrometheusReplSendQueueRetrieveFunc(ca.toString(), get_queue_size_func); } siblings.setState(v); return true; diff --git a/wforce/wforce-sibling.hh b/wforce/wforce-sibling.hh index e0162d1c..438da7d6 100644 --- a/wforce/wforce-sibling.hh +++ b/wforce/wforce-sibling.hh @@ -55,6 +55,7 @@ struct Sibling { Sibling(const Sibling&) = delete; + std::string d_name; ComboAddress rem; std::mutex mutx; std::unique_ptr sockp; @@ -89,6 +90,12 @@ struct Sibling { void connectSibling(); + int getSendQueueSize() + { + std::lock_guard lock(queue_mutx); + return queue.size(); + } + static Protocol stringToProtocol(const std::string& s) { if (s.compare("tcp") == 0) From 7eff70f93dd494440d7482abca73f8e0a3d92739 Mon Sep 17 00:00:00 2001 From: Neil Cook Date: Tue, 4 Nov 2025 23:20:47 +0000 Subject: [PATCH 4/4] ci: Upload artifacts in GitHub CI --- .github/workflows/builder.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/builder.yml b/.github/workflows/builder.yml index 512e0cf4..3cc1a8ff 100644 --- a/.github/workflows/builder.yml +++ b/.github/workflows/builder.yml @@ -26,3 +26,7 @@ jobs: fetch-depth: 0 # for correct version numbers submodules: recursive - run: builder/build.sh -e 1 -v ${{ matrix.os }} + - uses: actions/upload-artifact@v4 + with: + name: package-${{ matrix.os }} + path: builder/tmp/latest/${{ matrix.os }}/dist/* \ No newline at end of file