From 7dd02d7c7b3064db27d67901f422f0726627e30b Mon Sep 17 00:00:00 2001 From: kamilsa Date: Tue, 20 Jan 2026 16:47:45 +0500 Subject: [PATCH 1/9] logs --- src/protocol/gossip/gossip.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index 78c6514f..13ff5015 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -613,6 +613,7 @@ namespace libp2p::protocol::gossip { // Handle GRAFT: accept (add to mesh) or PRUNE with backoff. for (auto &pb_graft : pb_message.control().graft()) { + std::cerr << "CORE_P2P: RX GRAFT topic=" << pb_graft.topic_id() << " from=" << peer->peer_id_.toBase58() << std::endl; if (not peer->isGossipsub()) { return false; } @@ -661,6 +662,7 @@ namespace libp2p::protocol::gossip { // Handle PRUNE: remove from mesh and update backoff (v1.1+ honors backoff). for (auto &pb_prune : pb_message.control().prune()) { + std::cerr << "CORE_P2P: RX PRUNE topic=" << pb_prune.topic_id() << " from=" << peer->peer_id_.toBase58() << std::endl; if (not peer->isGossipsub()) { return false; } @@ -673,6 +675,9 @@ namespace libp2p::protocol::gossip { } // Handle IHAVE: select a capped subset of unknown IDs and enqueue IWANTs. + if (pb_message.control().ihave().size() > 0) { + std::cerr << "CORE_P2P: RX IHAVE count=" << pb_message.control().ihave().size() << " from=" << peer->peer_id_.toBase58() << std::endl; + } if (not handle_ihave(peer, pb_message)) { return false; } From e3dd71f6855547ac38b7e110ec5d563b01a3574a Mon Sep 17 00:00:00 2001 From: kamilsa Date: Tue, 20 Jan 2026 16:59:07 +0500 Subject: [PATCH 2/9] logs --- src/protocol/gossip/gossip.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index 13ff5015..93baf97c 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -508,6 +508,9 @@ namespace libp2p::protocol::gossip { // Handle SUBSCRIBE/UNSUBSCRIBE and opportunistic GRAFT on subscribe. for (auto &pb_subscribe : pb_message.subscriptions()) { + std::cerr << "CORE_P2P: RX SUBSCRIBE/UNSUBSCRIBE topic=" << pb_subscribe.topic_id() + << " sub=" << pb_subscribe.subscribe() + << " from=" << peer->peer_id_.toBase58() << std::endl; auto topic_hash = qtils::ByteVec(qtils::str2byte(pb_subscribe.topic_id())); auto topic_it = topics_.find(topic_hash); From f81da7d23e9571f71ecc589490e4d920b493d06d Mon Sep 17 00:00:00 2001 From: kamilsa Date: Tue, 20 Jan 2026 17:10:43 +0500 Subject: [PATCH 3/9] logs --- src/protocol/gossip/gossip.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index 93baf97c..d0e1b9d9 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -5,6 +5,7 @@ */ #include +#include #include #include #include @@ -542,6 +543,9 @@ namespace libp2p::protocol::gossip { // Handle PUBLISH: verify signature (if strict mode), dedupe, deliver // locally, and relay. for (auto &pb_publish : pb_message.publish()) { + std::cerr << "CORE_P2P: RX PUBLISH topic=" << pb_publish.topic() + << " size=" << pb_publish.data().size() + << " from=" << peer->peer_id_.toBase58() << std::endl; auto message = std::make_shared(); switch (config_.validation_mode) { From 9bd6a95c33ab74952ec547b79d7e12d0089bf9e1 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Tue, 20 Jan 2026 17:24:01 +0500 Subject: [PATCH 4/9] logs --- src/protocol/gossip/gossip.cpp | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index d0e1b9d9..d59e0483 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -504,6 +504,9 @@ namespace libp2p::protocol::gossip { // Graylist gate: ignore peer below threshold. if (score_.below(peer->peer_id_, config_.score.graylist_threshold)) { + std::cerr << "CORE_P2P: GRAYLISTED peer=" << peer->peer_id_.toBase58() + << " score=" << score_.score(peer->peer_id_) + << " threshold=" << config_.score.graylist_threshold << std::endl; return true; } @@ -655,14 +658,20 @@ namespace libp2p::protocol::gossip { return false; } if (score_.below(peer->peer_id_, config_.score.zero)) { + std::cerr << "CORE_P2P: GRAFT_REJECT_LOW_SCORE peer=" << peer->peer_id_.toBase58() + << " score=" << score_.score(peer->peer_id_) << std::endl; return false; } return true; }(); if (accept) { + std::cerr << "CORE_P2P: GRAFT_ACCEPTED peer=" << peer->peer_id_.toBase58() + << " topic=" << pb_graft.topic_id() << std::endl; topic->mesh_peers_.emplace(peer); score_.graft(peer->peer_id_, topic_hash); } else { + std::cerr << "CORE_P2P: GRAFT_PRUNED peer=" << peer->peer_id_.toBase58() + << " topic=" << pb_graft.topic_id() << std::endl; make_prune(*topic, peer); } } @@ -969,6 +978,28 @@ namespace libp2p::protocol::gossip { // emit gossip, expire history/cache, and clear expired dont_send_ marks. void Gossip::heartbeat() { ++heartbeat_ticks_; + + // Log peer scores and mesh status during heartbeat + std::cerr << "CORE_P2P: === HEARTBEAT #" << heartbeat_ticks_ << " ===" << std::endl; + std::cerr << "CORE_P2P: Peer count=" << peers_.size() << " Topic count=" << topics_.size() << std::endl; + for (auto &[peer_id, peer] : peers_) { + auto peer_score = score_.score(peer_id); + std::cerr << "CORE_P2P: PEER_SCORE peer=" << peer_id.toBase58() + << " score=" << peer_score + << " out=" << peer->out_ + << " topics=" << peer->topics_.size() << std::endl; + } + for (auto &[topic_hash, topic] : topics_) { + std::cerr << "CORE_P2P: TOPIC topic=" << qtils::byte2str(topic_hash) + << " mesh_size=" << topic->mesh_peers_.size() + << " peers=" << topic->peers_.size() + << " publish_only=" << topic->publish_only_ << std::endl; + for (auto &peer : topic->mesh_peers_) { + std::cerr << "CORE_P2P: MESH_PEER peer=" << peer->peer_id_.toBase58() << std::endl; + } + } + std::cerr << "CORE_P2P: === END HEARTBEAT ===" << std::endl; + for (auto &topic : topics_ | std::views::values) { topic->backoff_.shift(); } From af487d873693bd4d68368356ff099a8f4cbddcd1 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Tue, 20 Jan 2026 17:32:15 +0500 Subject: [PATCH 5/9] fix --- src/protocol/gossip/gossip.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index d59e0483..2cfba5fa 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -277,8 +277,13 @@ namespace libp2p::protocol::gossip { auto peer_id = stream->remotePeerId(); auto peer_it = peers_.find(peer_id); if (peer_it == peers_.end()) { - stream->reset(); - return; + // Peer not found - this can happen if the inbound stream arrives before + // the on_peer_connected event fires (race condition). Create the peer + // entry now to avoid rejecting the stream. + std::cerr << "CORE_P2P: HANDLE creating missing peer entry for " + << peer_id.toBase58() << std::endl; + peer_it = peers_.emplace(peer_id, std::make_shared(peer_id, false)).first; + score_.connect(peer_id); } auto peer = peer_it->second; updatePeerKind(peer, stream->protocol()); From ee2f2efc3342bd121df6dc6add62af6ed2d89021 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Tue, 20 Jan 2026 17:55:59 +0500 Subject: [PATCH 6/9] logs --- src/protocol/gossip/gossip.cpp | 36 ++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index 2cfba5fa..c15167f2 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -275,6 +275,8 @@ namespace libp2p::protocol::gossip { // loop. void Gossip::handle(std::shared_ptr stream) { auto peer_id = stream->remotePeerId(); + std::cerr << "CORE_P2P: HANDLE called for peer=" << peer_id.toBase58() + << " protocol=" << stream->protocol() << std::endl; auto peer_it = peers_.find(peer_id); if (peer_it == peers_.end()) { // Peer not found - this can happen if the inbound stream arrives before @@ -284,25 +286,38 @@ namespace libp2p::protocol::gossip { << peer_id.toBase58() << std::endl; peer_it = peers_.emplace(peer_id, std::make_shared(peer_id, false)).first; score_.connect(peer_id); + } else { + std::cerr << "CORE_P2P: HANDLE peer exists for " << peer_id.toBase58() << std::endl; } auto peer = peer_it->second; updatePeerKind(peer, stream->protocol()); peer->streams_in_.emplace(stream); - coroSpawn(*io_context_, [WEAK_SELF, stream, peer]() -> Coro { + std::cerr << "CORE_P2P: HANDLE starting read loop for peer=" << peer_id.toBase58() << std::endl; + coroSpawn(*io_context_, [WEAK_SELF, stream, peer, peer_id]() -> Coro { Bytes encoded; + int msg_count = 0; while (true) { auto r = co_await readVarintMessage(stream, encoded); if (not r.has_value()) { + std::cerr << "CORE_P2P: HANDLE read failed for peer=" << peer_id.toBase58() + << " after " << msg_count << " messages, error=" << r.error().message() << std::endl; break; } + msg_count++; + std::cerr << "CORE_P2P: HANDLE read message #" << msg_count << " from peer=" << peer_id.toBase58() + << " size=" << encoded.size() << std::endl; auto self = weak_self.lock(); if (not self) { + std::cerr << "CORE_P2P: HANDLE self expired for peer=" << peer_id.toBase58() << std::endl; break; } if (not self->onMessage(peer, encoded)) { + std::cerr << "CORE_P2P: HANDLE onMessage returned false for peer=" << peer_id.toBase58() << std::endl; break; } } + std::cerr << "CORE_P2P: HANDLE read loop ended for peer=" << peer_id.toBase58() + << " total_messages=" << msg_count << std::endl; peer->streams_in_.erase(stream); }); } @@ -318,8 +333,11 @@ namespace libp2p::protocol::gossip { WEAK_LOCK(self); auto peer_id = connection->remotePeer(); auto out = connection->isInitiator(); + std::cerr << "CORE_P2P: ON_PEER_CONNECTED peer=" << peer_id.toBase58() + << " initiator=" << (out ? "true" : "false") << std::endl; auto peer_it = self->peers_.find(peer_id); if (peer_it == self->peers_.end()) { + std::cerr << "CORE_P2P: ON_PEER_CONNECTED creating new peer entry for " << peer_id.toBase58() << std::endl; peer_it = self->peers_ .emplace(peer_id, std::make_shared(peer_id, out)) @@ -329,23 +347,28 @@ namespace libp2p::protocol::gossip { auto peer = peer_it->second; // Avoid creating multiple streams concurrently if (not peer->stream_out_.has_value() and not peer->is_connecting_) { + std::cerr << "CORE_P2P: ON_PEER_CONNECTED creating outbound stream for " << peer_id.toBase58() << std::endl; peer->is_connecting_ = true; coroSpawn( - *self->io_context_, [self, connection, peer]() -> Coro { + *self->io_context_, [self, connection, peer, peer_id]() -> Coro { auto stream_result = (co_await self->host_->newStream( connection, self->protocols_)); peer->is_connecting_ = false; if (not stream_result.has_value()) { - // TODO: can't open out stream? + std::cerr << "CORE_P2P: ON_PEER_CONNECTED failed to open out stream for " + << peer_id.toBase58() << " error=" << stream_result.error().message() << std::endl; co_return; } if (auto stream = qtils::optionTake(peer->stream_out_)) { (**stream).reset(); } auto &stream = stream_result.value(); + std::cerr << "CORE_P2P: ON_PEER_CONNECTED opened out stream for " << peer_id.toBase58() + << " protocol=" << stream->protocol() << std::endl; self->updatePeerKind(peer, stream->protocol()); peer->stream_out_ = stream; if (not self->topics_.empty()) { + std::cerr << "CORE_P2P: ON_PEER_CONNECTED sending initial SUBSCRIBE to " << peer_id.toBase58() << std::endl; auto &message = self->getBatch(peer); for (auto &[topic_hash, topic] : self->topics_) { if (topic->publish_only_) { @@ -907,18 +930,23 @@ namespace libp2p::protocol::gossip { return; } peer->writing_ = true; - coroSpawn(*io_context_, [peer]() -> Coro { + auto peer_id = peer->peer_id_; + coroSpawn(*io_context_, [peer, peer_id]() -> Coro { co_await coroYield(); assert(peer->writing_); assert(peer->stream_out_.has_value()); while (auto message = qtils::optionTake(peer->batch_)) { auto pb_messages = splitBatch(*message); + std::cerr << "CORE_P2P: CHECKWRITE sending " << pb_messages.size() + << " PB messages to " << peer_id.toBase58() << std::endl; for (auto &encoded : pb_messages) { assert(not encoded.empty()); auto r = co_await writeVarintMessage(peer->stream_out_.value(), encoded); if (not r.has_value()) { + std::cerr << "CORE_P2P: CHECKWRITE send failed for " << peer_id.toBase58() + << " error=" << r.error().message() << std::endl; peer->stream_out_.reset(); break; } From 6925d6298bf2dfe93e4cded017166f66a5d5c22c Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 21 Jan 2026 10:05:13 +0500 Subject: [PATCH 7/9] refactor: replace std::cerr with SL_TRACE --- include/libp2p/protocol/gossip/gossip.hpp | 2 + src/protocol/gossip/gossip.cpp | 130 +++++++++++----------- 2 files changed, 70 insertions(+), 62 deletions(-) diff --git a/include/libp2p/protocol/gossip/gossip.hpp b/include/libp2p/protocol/gossip/gossip.hpp index 36e7e8f9..fa05d8e8 100644 --- a/include/libp2p/protocol/gossip/gossip.hpp +++ b/include/libp2p/protocol/gossip/gossip.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -348,6 +349,7 @@ namespace libp2p::protocol::gossip { message_cache_; GossipPromises gossip_promises_; Score score_; + log::Logger logger_ = log::createLogger("Gossip"); size_t heartbeat_ticks_ = 0; std::unordered_map count_received_ihave_; std::unordered_map count_sent_iwant_; diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index c15167f2..d696dda4 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -5,7 +5,6 @@ */ #include -#include #include #include #include @@ -268,6 +267,10 @@ namespace libp2p::protocol::gossip { // Protocol IDs to advertise for stream negotiation. StreamProtocols Gossip::getProtocolIds() const { + SL_TRACE(logger_, "getProtocolIds called, returning {} protocols:", protocols_.size()); + for (auto &p : protocols_) { + SL_TRACE(logger_, " - {}", p); + } return protocols_; } @@ -275,49 +278,60 @@ namespace libp2p::protocol::gossip { // loop. void Gossip::handle(std::shared_ptr stream) { auto peer_id = stream->remotePeerId(); - std::cerr << "CORE_P2P: HANDLE called for peer=" << peer_id.toBase58() - << " protocol=" << stream->protocol() << std::endl; + SL_TRACE(logger_, + "HANDLE called for peer={} protocol={}", + peer_id.toBase58(), + stream->protocol()); auto peer_it = peers_.find(peer_id); if (peer_it == peers_.end()) { // Peer not found - this can happen if the inbound stream arrives before // the on_peer_connected event fires (race condition). Create the peer // entry now to avoid rejecting the stream. - std::cerr << "CORE_P2P: HANDLE creating missing peer entry for " - << peer_id.toBase58() << std::endl; + SL_TRACE(logger_, "HANDLE creating missing peer entry for {}", peer_id.toBase58()); peer_it = peers_.emplace(peer_id, std::make_shared(peer_id, false)).first; score_.connect(peer_id); } else { - std::cerr << "CORE_P2P: HANDLE peer exists for " << peer_id.toBase58() << std::endl; + SL_TRACE(logger_, "HANDLE peer exists for {}", peer_id.toBase58()); } auto peer = peer_it->second; updatePeerKind(peer, stream->protocol()); peer->streams_in_.emplace(stream); - std::cerr << "CORE_P2P: HANDLE starting read loop for peer=" << peer_id.toBase58() << std::endl; - coroSpawn(*io_context_, [WEAK_SELF, stream, peer, peer_id]() -> Coro { + SL_TRACE(logger_, "HANDLE starting read loop for peer={}", peer_id.toBase58()); + coroSpawn(*io_context_, [WEAK_SELF, stream, peer, peer_id, logger = logger_]() -> Coro { Bytes encoded; int msg_count = 0; while (true) { auto r = co_await readVarintMessage(stream, encoded); if (not r.has_value()) { - std::cerr << "CORE_P2P: HANDLE read failed for peer=" << peer_id.toBase58() - << " after " << msg_count << " messages, error=" << r.error().message() << std::endl; + SL_TRACE(logger, + "HANDLE read failed for peer={} after {} messages, error={}", + peer_id.toBase58(), + msg_count, + r.error().message()); break; } msg_count++; - std::cerr << "CORE_P2P: HANDLE read message #" << msg_count << " from peer=" << peer_id.toBase58() - << " size=" << encoded.size() << std::endl; + SL_TRACE(logger, + "HANDLE read message #{} from peer={} size={}", + msg_count, + peer_id.toBase58(), + encoded.size()); auto self = weak_self.lock(); if (not self) { - std::cerr << "CORE_P2P: HANDLE self expired for peer=" << peer_id.toBase58() << std::endl; + SL_TRACE(logger, "HANDLE self expired for peer={}", peer_id.toBase58()); break; } if (not self->onMessage(peer, encoded)) { - std::cerr << "CORE_P2P: HANDLE onMessage returned false for peer=" << peer_id.toBase58() << std::endl; + SL_TRACE(logger, + "HANDLE onMessage returned false for peer={}", + peer_id.toBase58()); break; } } - std::cerr << "CORE_P2P: HANDLE read loop ended for peer=" << peer_id.toBase58() - << " total_messages=" << msg_count << std::endl; + SL_TRACE(logger, + "HANDLE read loop ended for peer={} protocol={}", + peer_id.toBase58(), + stream->protocol()); peer->streams_in_.erase(stream); }); } @@ -333,11 +347,10 @@ namespace libp2p::protocol::gossip { WEAK_LOCK(self); auto peer_id = connection->remotePeer(); auto out = connection->isInitiator(); - std::cerr << "CORE_P2P: ON_PEER_CONNECTED peer=" << peer_id.toBase58() - << " initiator=" << (out ? "true" : "false") << std::endl; + SL_TRACE(self->logger_, "ON_PEER_CONNECTED peer={} initiator={}", peer_id.toBase58(), (out ? "true" : "false")); auto peer_it = self->peers_.find(peer_id); if (peer_it == self->peers_.end()) { - std::cerr << "CORE_P2P: ON_PEER_CONNECTED creating new peer entry for " << peer_id.toBase58() << std::endl; + SL_TRACE(self->logger_, "ON_PEER_CONNECTED creating new peer entry for {}", peer_id.toBase58()); peer_it = self->peers_ .emplace(peer_id, std::make_shared(peer_id, out)) @@ -347,28 +360,26 @@ namespace libp2p::protocol::gossip { auto peer = peer_it->second; // Avoid creating multiple streams concurrently if (not peer->stream_out_.has_value() and not peer->is_connecting_) { - std::cerr << "CORE_P2P: ON_PEER_CONNECTED creating outbound stream for " << peer_id.toBase58() << std::endl; + SL_TRACE(self->logger_, "ON_PEER_CONNECTED creating outbound stream for {}", peer_id.toBase58()); peer->is_connecting_ = true; coroSpawn( - *self->io_context_, [self, connection, peer, peer_id]() -> Coro { + *self->io_context_, [self, connection, peer, peer_id, logger = self->logger_]() -> Coro { auto stream_result = (co_await self->host_->newStream( connection, self->protocols_)); peer->is_connecting_ = false; if (not stream_result.has_value()) { - std::cerr << "CORE_P2P: ON_PEER_CONNECTED failed to open out stream for " - << peer_id.toBase58() << " error=" << stream_result.error().message() << std::endl; + SL_TRACE(logger, "ON_PEER_CONNECTED failed to open out stream for {} error={}", peer_id.toBase58(), stream_result.error().message()); co_return; } if (auto stream = qtils::optionTake(peer->stream_out_)) { (**stream).reset(); } auto &stream = stream_result.value(); - std::cerr << "CORE_P2P: ON_PEER_CONNECTED opened out stream for " << peer_id.toBase58() - << " protocol=" << stream->protocol() << std::endl; + SL_TRACE(logger, "ON_PEER_CONNECTED opened out stream for {} protocol={}", peer_id.toBase58(), stream->protocol()); self->updatePeerKind(peer, stream->protocol()); peer->stream_out_ = stream; if (not self->topics_.empty()) { - std::cerr << "CORE_P2P: ON_PEER_CONNECTED sending initial SUBSCRIBE to " << peer_id.toBase58() << std::endl; + SL_TRACE(logger, "ON_PEER_CONNECTED sending initial SUBSCRIBE to {}", peer_id.toBase58()); auto &message = self->getBatch(peer); for (auto &[topic_hash, topic] : self->topics_) { if (topic->publish_only_) { @@ -532,17 +543,13 @@ namespace libp2p::protocol::gossip { // Graylist gate: ignore peer below threshold. if (score_.below(peer->peer_id_, config_.score.graylist_threshold)) { - std::cerr << "CORE_P2P: GRAYLISTED peer=" << peer->peer_id_.toBase58() - << " score=" << score_.score(peer->peer_id_) - << " threshold=" << config_.score.graylist_threshold << std::endl; + SL_TRACE(logger_, "GRAYLISTED peer={} score={} threshold={}", peer->peer_id_.toBase58(), score_.score(peer->peer_id_), config_.score.graylist_threshold); return true; } // Handle SUBSCRIBE/UNSUBSCRIBE and opportunistic GRAFT on subscribe. for (auto &pb_subscribe : pb_message.subscriptions()) { - std::cerr << "CORE_P2P: RX SUBSCRIBE/UNSUBSCRIBE topic=" << pb_subscribe.topic_id() - << " sub=" << pb_subscribe.subscribe() - << " from=" << peer->peer_id_.toBase58() << std::endl; + SL_TRACE(logger_, "RX SUBSCRIBE/UNSUBSCRIBE topic={} sub={} from={}", pb_subscribe.topic_id(), pb_subscribe.subscribe(), peer->peer_id_.toBase58()); auto topic_hash = qtils::ByteVec(qtils::str2byte(pb_subscribe.topic_id())); auto topic_it = topics_.find(topic_hash); @@ -574,9 +581,7 @@ namespace libp2p::protocol::gossip { // Handle PUBLISH: verify signature (if strict mode), dedupe, deliver // locally, and relay. for (auto &pb_publish : pb_message.publish()) { - std::cerr << "CORE_P2P: RX PUBLISH topic=" << pb_publish.topic() - << " size=" << pb_publish.data().size() - << " from=" << peer->peer_id_.toBase58() << std::endl; + SL_TRACE(logger_, "RX PUBLISH topic={} size={} from={}", pb_publish.topic(), pb_publish.data().size(), peer->peer_id_.toBase58()); auto message = std::make_shared(); switch (config_.validation_mode) { @@ -651,7 +656,7 @@ namespace libp2p::protocol::gossip { // Handle GRAFT: accept (add to mesh) or PRUNE with backoff. for (auto &pb_graft : pb_message.control().graft()) { - std::cerr << "CORE_P2P: RX GRAFT topic=" << pb_graft.topic_id() << " from=" << peer->peer_id_.toBase58() << std::endl; + SL_TRACE(logger_, "RX GRAFT topic={} from={}", pb_graft.topic_id(), peer->peer_id_.toBase58()); if (not peer->isGossipsub()) { return false; } @@ -686,27 +691,24 @@ namespace libp2p::protocol::gossip { return false; } if (score_.below(peer->peer_id_, config_.score.zero)) { - std::cerr << "CORE_P2P: GRAFT_REJECT_LOW_SCORE peer=" << peer->peer_id_.toBase58() - << " score=" << score_.score(peer->peer_id_) << std::endl; + SL_TRACE(logger_, "GRAFT_REJECT_LOW_SCORE peer={} score={}", peer->peer_id_.toBase58(), score_.score(peer->peer_id_)); return false; } return true; }(); if (accept) { - std::cerr << "CORE_P2P: GRAFT_ACCEPTED peer=" << peer->peer_id_.toBase58() - << " topic=" << pb_graft.topic_id() << std::endl; + SL_TRACE(logger_, "GRAFT_ACCEPTED peer={} topic={}", peer->peer_id_.toBase58(), pb_graft.topic_id()); topic->mesh_peers_.emplace(peer); score_.graft(peer->peer_id_, topic_hash); } else { - std::cerr << "CORE_P2P: GRAFT_PRUNED peer=" << peer->peer_id_.toBase58() - << " topic=" << pb_graft.topic_id() << std::endl; + SL_TRACE(logger_, "GRAFT_PRUNED peer={} topic={}", peer->peer_id_.toBase58(), pb_graft.topic_id()); make_prune(*topic, peer); } } // Handle PRUNE: remove from mesh and update backoff (v1.1+ honors backoff). for (auto &pb_prune : pb_message.control().prune()) { - std::cerr << "CORE_P2P: RX PRUNE topic=" << pb_prune.topic_id() << " from=" << peer->peer_id_.toBase58() << std::endl; + SL_TRACE(logger_, "RX PRUNE topic={} from={}", pb_prune.topic_id(), peer->peer_id_.toBase58()); if (not peer->isGossipsub()) { return false; } @@ -720,7 +722,7 @@ namespace libp2p::protocol::gossip { // Handle IHAVE: select a capped subset of unknown IDs and enqueue IWANTs. if (pb_message.control().ihave().size() > 0) { - std::cerr << "CORE_P2P: RX IHAVE count=" << pb_message.control().ihave().size() << " from=" << peer->peer_id_.toBase58() << std::endl; + SL_TRACE(logger_, "RX IHAVE count={} from={}", pb_message.control().ihave().size(), peer->peer_id_.toBase58()); } if (not handle_ihave(peer, pb_message)) { return false; @@ -931,22 +933,28 @@ namespace libp2p::protocol::gossip { } peer->writing_ = true; auto peer_id = peer->peer_id_; - coroSpawn(*io_context_, [peer, peer_id]() -> Coro { + coroSpawn(*io_context_, [WEAK_SELF, peer, peer_id, logger = logger_]() -> Coro { co_await coroYield(); assert(peer->writing_); assert(peer->stream_out_.has_value()); - while (auto message = qtils::optionTake(peer->batch_)) { + while (true) { + auto self = weak_self.lock(); + if (not self) { + break; + } + auto message = qtils::optionTake(peer->batch_); + if (not message) { + break; + } auto pb_messages = splitBatch(*message); - std::cerr << "CORE_P2P: CHECKWRITE sending " << pb_messages.size() - << " PB messages to " << peer_id.toBase58() << std::endl; + SL_TRACE(logger, "CHECKWRITE sending {} PB messages to {}", pb_messages.size(), peer_id.toBase58()); for (auto &encoded : pb_messages) { assert(not encoded.empty()); auto r = co_await writeVarintMessage(peer->stream_out_.value(), encoded); if (not r.has_value()) { - std::cerr << "CORE_P2P: CHECKWRITE send failed for " << peer_id.toBase58() - << " error=" << r.error().message() << std::endl; + SL_TRACE(logger, "CHECKWRITE send failed for peer={} error={}", peer_id.toBase58(), r.error().message()); peer->stream_out_.reset(); break; } @@ -1013,25 +1021,23 @@ namespace libp2p::protocol::gossip { ++heartbeat_ticks_; // Log peer scores and mesh status during heartbeat - std::cerr << "CORE_P2P: === HEARTBEAT #" << heartbeat_ticks_ << " ===" << std::endl; - std::cerr << "CORE_P2P: Peer count=" << peers_.size() << " Topic count=" << topics_.size() << std::endl; + SL_TRACE(logger_, "=== HEARTBEAT #{} ===", heartbeat_ticks_); + SL_TRACE(logger_, "Peer count={} Topic count={}", peers_.size(), topics_.size()); for (auto &[peer_id, peer] : peers_) { auto peer_score = score_.score(peer_id); - std::cerr << "CORE_P2P: PEER_SCORE peer=" << peer_id.toBase58() - << " score=" << peer_score - << " out=" << peer->out_ - << " topics=" << peer->topics_.size() << std::endl; + SL_TRACE(logger_, "PEER_SCORE peer={} score={} out={} topics={}", + peer_id.toBase58(), peer_score, peer->out_, + peer->topics_.size()); } for (auto &[topic_hash, topic] : topics_) { - std::cerr << "CORE_P2P: TOPIC topic=" << qtils::byte2str(topic_hash) - << " mesh_size=" << topic->mesh_peers_.size() - << " peers=" << topic->peers_.size() - << " publish_only=" << topic->publish_only_ << std::endl; + SL_TRACE(logger_, "TOPIC topic={} mesh_size={} peers={} publish_only={}", + qtils::byte2str(topic_hash), topic->mesh_peers_.size(), + topic->peers_.size(), topic->publish_only_); for (auto &peer : topic->mesh_peers_) { - std::cerr << "CORE_P2P: MESH_PEER peer=" << peer->peer_id_.toBase58() << std::endl; + SL_TRACE(logger_, " MESH_PEER peer={}", peer->peer_id_.toBase58()); } } - std::cerr << "CORE_P2P: === END HEARTBEAT ===" << std::endl; + SL_TRACE(logger_, "=== END HEARTBEAT ==="); for (auto &topic : topics_ | std::views::values) { topic->backoff_.shift(); From aed45f6e9774746e7f05392f8c93c14367fc0ff0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 21 Jan 2026 05:34:27 +0000 Subject: [PATCH 8/9] Remove unnecessary weak_self.lock() check from checkWrite loop Co-authored-by: kamilsa <9370151+kamilsa@users.noreply.github.com> --- src/protocol/gossip/gossip.cpp | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index d696dda4..ccd9f75e 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -933,19 +933,11 @@ namespace libp2p::protocol::gossip { } peer->writing_ = true; auto peer_id = peer->peer_id_; - coroSpawn(*io_context_, [WEAK_SELF, peer, peer_id, logger = logger_]() -> Coro { + coroSpawn(*io_context_, [peer, peer_id, logger = logger_]() -> Coro { co_await coroYield(); assert(peer->writing_); assert(peer->stream_out_.has_value()); - while (true) { - auto self = weak_self.lock(); - if (not self) { - break; - } - auto message = qtils::optionTake(peer->batch_); - if (not message) { - break; - } + while (auto message = qtils::optionTake(peer->batch_)) { auto pb_messages = splitBatch(*message); SL_TRACE(logger, "CHECKWRITE sending {} PB messages to {}", pb_messages.size(), peer_id.toBase58()); From 7eaea343a351e466e537ddcc0e749792b6ccb37b Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 21 Jan 2026 10:53:53 +0500 Subject: [PATCH 9/9] Format --- src/protocol/gossip/gossip.cpp | 206 ++++++++++++++++++++++----------- 1 file changed, 140 insertions(+), 66 deletions(-) diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index ccd9f75e..eac58175 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -267,7 +267,9 @@ namespace libp2p::protocol::gossip { // Protocol IDs to advertise for stream negotiation. StreamProtocols Gossip::getProtocolIds() const { - SL_TRACE(logger_, "getProtocolIds called, returning {} protocols:", protocols_.size()); + SL_TRACE(logger_, + "getProtocolIds called, returning {} protocols:", + protocols_.size()); for (auto &p : protocols_) { SL_TRACE(logger_, " - {}", p); } @@ -287,8 +289,11 @@ namespace libp2p::protocol::gossip { // Peer not found - this can happen if the inbound stream arrives before // the on_peer_connected event fires (race condition). Create the peer // entry now to avoid rejecting the stream. - SL_TRACE(logger_, "HANDLE creating missing peer entry for {}", peer_id.toBase58()); - peer_it = peers_.emplace(peer_id, std::make_shared(peer_id, false)).first; + SL_TRACE(logger_, + "HANDLE creating missing peer entry for {}", + peer_id.toBase58()); + peer_it = + peers_.emplace(peer_id, std::make_shared(peer_id, false)).first; score_.connect(peer_id); } else { SL_TRACE(logger_, "HANDLE peer exists for {}", peer_id.toBase58()); @@ -296,44 +301,50 @@ namespace libp2p::protocol::gossip { auto peer = peer_it->second; updatePeerKind(peer, stream->protocol()); peer->streams_in_.emplace(stream); - SL_TRACE(logger_, "HANDLE starting read loop for peer={}", peer_id.toBase58()); - coroSpawn(*io_context_, [WEAK_SELF, stream, peer, peer_id, logger = logger_]() -> Coro { - Bytes encoded; - int msg_count = 0; - while (true) { - auto r = co_await readVarintMessage(stream, encoded); - if (not r.has_value()) { + SL_TRACE( + logger_, "HANDLE starting read loop for peer={}", peer_id.toBase58()); + coroSpawn( + *io_context_, + [WEAK_SELF, stream, peer, peer_id, logger = logger_]() -> Coro { + Bytes encoded; + int msg_count = 0; + while (true) { + auto r = co_await readVarintMessage(stream, encoded); + if (not r.has_value()) { + SL_TRACE( + logger, + "HANDLE read failed for peer={} after {} messages, error={}", + peer_id.toBase58(), + msg_count, + r.error().message()); + break; + } + msg_count++; + SL_TRACE(logger, + "HANDLE read message #{} from peer={} size={}", + msg_count, + peer_id.toBase58(), + encoded.size()); + auto self = weak_self.lock(); + if (not self) { + SL_TRACE(logger, + "HANDLE self expired for peer={}", + peer_id.toBase58()); + break; + } + if (not self->onMessage(peer, encoded)) { + SL_TRACE(logger, + "HANDLE onMessage returned false for peer={}", + peer_id.toBase58()); + break; + } + } SL_TRACE(logger, - "HANDLE read failed for peer={} after {} messages, error={}", + "HANDLE read loop ended for peer={} protocol={}", peer_id.toBase58(), - msg_count, - r.error().message()); - break; - } - msg_count++; - SL_TRACE(logger, - "HANDLE read message #{} from peer={} size={}", - msg_count, - peer_id.toBase58(), - encoded.size()); - auto self = weak_self.lock(); - if (not self) { - SL_TRACE(logger, "HANDLE self expired for peer={}", peer_id.toBase58()); - break; - } - if (not self->onMessage(peer, encoded)) { - SL_TRACE(logger, - "HANDLE onMessage returned false for peer={}", - peer_id.toBase58()); - break; - } - } - SL_TRACE(logger, - "HANDLE read loop ended for peer={} protocol={}", - peer_id.toBase58(), - stream->protocol()); - peer->streams_in_.erase(stream); - }); + stream->protocol()); + peer->streams_in_.erase(stream); + }); } // Start event listeners, timers, and open outbound streams on new @@ -347,10 +358,15 @@ namespace libp2p::protocol::gossip { WEAK_LOCK(self); auto peer_id = connection->remotePeer(); auto out = connection->isInitiator(); - SL_TRACE(self->logger_, "ON_PEER_CONNECTED peer={} initiator={}", peer_id.toBase58(), (out ? "true" : "false")); + SL_TRACE(self->logger_, + "ON_PEER_CONNECTED peer={} initiator={}", + peer_id.toBase58(), + (out ? "true" : "false")); auto peer_it = self->peers_.find(peer_id); if (peer_it == self->peers_.end()) { - SL_TRACE(self->logger_, "ON_PEER_CONNECTED creating new peer entry for {}", peer_id.toBase58()); + SL_TRACE(self->logger_, + "ON_PEER_CONNECTED creating new peer entry for {}", + peer_id.toBase58()); peer_it = self->peers_ .emplace(peer_id, std::make_shared(peer_id, out)) @@ -360,26 +376,41 @@ namespace libp2p::protocol::gossip { auto peer = peer_it->second; // Avoid creating multiple streams concurrently if (not peer->stream_out_.has_value() and not peer->is_connecting_) { - SL_TRACE(self->logger_, "ON_PEER_CONNECTED creating outbound stream for {}", peer_id.toBase58()); + SL_TRACE(self->logger_, + "ON_PEER_CONNECTED creating outbound stream for {}", + peer_id.toBase58()); peer->is_connecting_ = true; coroSpawn( - *self->io_context_, [self, connection, peer, peer_id, logger = self->logger_]() -> Coro { + *self->io_context_, + [self, connection, peer, peer_id, logger = self->logger_]() + -> Coro { auto stream_result = (co_await self->host_->newStream( connection, self->protocols_)); peer->is_connecting_ = false; if (not stream_result.has_value()) { - SL_TRACE(logger, "ON_PEER_CONNECTED failed to open out stream for {} error={}", peer_id.toBase58(), stream_result.error().message()); + SL_TRACE(logger, + "ON_PEER_CONNECTED failed to open out stream for " + "{} error={}", + peer_id.toBase58(), + stream_result.error().message()); co_return; } if (auto stream = qtils::optionTake(peer->stream_out_)) { (**stream).reset(); } auto &stream = stream_result.value(); - SL_TRACE(logger, "ON_PEER_CONNECTED opened out stream for {} protocol={}", peer_id.toBase58(), stream->protocol()); + SL_TRACE( + logger, + "ON_PEER_CONNECTED opened out stream for {} protocol={}", + peer_id.toBase58(), + stream->protocol()); self->updatePeerKind(peer, stream->protocol()); peer->stream_out_ = stream; if (not self->topics_.empty()) { - SL_TRACE(logger, "ON_PEER_CONNECTED sending initial SUBSCRIBE to {}", peer_id.toBase58()); + SL_TRACE( + logger, + "ON_PEER_CONNECTED sending initial SUBSCRIBE to {}", + peer_id.toBase58()); auto &message = self->getBatch(peer); for (auto &[topic_hash, topic] : self->topics_) { if (topic->publish_only_) { @@ -543,13 +574,21 @@ namespace libp2p::protocol::gossip { // Graylist gate: ignore peer below threshold. if (score_.below(peer->peer_id_, config_.score.graylist_threshold)) { - SL_TRACE(logger_, "GRAYLISTED peer={} score={} threshold={}", peer->peer_id_.toBase58(), score_.score(peer->peer_id_), config_.score.graylist_threshold); + SL_TRACE(logger_, + "GRAYLISTED peer={} score={} threshold={}", + peer->peer_id_.toBase58(), + score_.score(peer->peer_id_), + config_.score.graylist_threshold); return true; } // Handle SUBSCRIBE/UNSUBSCRIBE and opportunistic GRAFT on subscribe. for (auto &pb_subscribe : pb_message.subscriptions()) { - SL_TRACE(logger_, "RX SUBSCRIBE/UNSUBSCRIBE topic={} sub={} from={}", pb_subscribe.topic_id(), pb_subscribe.subscribe(), peer->peer_id_.toBase58()); + SL_TRACE(logger_, + "RX SUBSCRIBE/UNSUBSCRIBE topic={} sub={} from={}", + pb_subscribe.topic_id(), + pb_subscribe.subscribe(), + peer->peer_id_.toBase58()); auto topic_hash = qtils::ByteVec(qtils::str2byte(pb_subscribe.topic_id())); auto topic_it = topics_.find(topic_hash); @@ -581,7 +620,11 @@ namespace libp2p::protocol::gossip { // Handle PUBLISH: verify signature (if strict mode), dedupe, deliver // locally, and relay. for (auto &pb_publish : pb_message.publish()) { - SL_TRACE(logger_, "RX PUBLISH topic={} size={} from={}", pb_publish.topic(), pb_publish.data().size(), peer->peer_id_.toBase58()); + SL_TRACE(logger_, + "RX PUBLISH topic={} size={} from={}", + pb_publish.topic(), + pb_publish.data().size(), + peer->peer_id_.toBase58()); auto message = std::make_shared(); switch (config_.validation_mode) { @@ -656,7 +699,10 @@ namespace libp2p::protocol::gossip { // Handle GRAFT: accept (add to mesh) or PRUNE with backoff. for (auto &pb_graft : pb_message.control().graft()) { - SL_TRACE(logger_, "RX GRAFT topic={} from={}", pb_graft.topic_id(), peer->peer_id_.toBase58()); + SL_TRACE(logger_, + "RX GRAFT topic={} from={}", + pb_graft.topic_id(), + peer->peer_id_.toBase58()); if (not peer->isGossipsub()) { return false; } @@ -691,24 +737,36 @@ namespace libp2p::protocol::gossip { return false; } if (score_.below(peer->peer_id_, config_.score.zero)) { - SL_TRACE(logger_, "GRAFT_REJECT_LOW_SCORE peer={} score={}", peer->peer_id_.toBase58(), score_.score(peer->peer_id_)); + SL_TRACE(logger_, + "GRAFT_REJECT_LOW_SCORE peer={} score={}", + peer->peer_id_.toBase58(), + score_.score(peer->peer_id_)); return false; } return true; }(); if (accept) { - SL_TRACE(logger_, "GRAFT_ACCEPTED peer={} topic={}", peer->peer_id_.toBase58(), pb_graft.topic_id()); + SL_TRACE(logger_, + "GRAFT_ACCEPTED peer={} topic={}", + peer->peer_id_.toBase58(), + pb_graft.topic_id()); topic->mesh_peers_.emplace(peer); score_.graft(peer->peer_id_, topic_hash); } else { - SL_TRACE(logger_, "GRAFT_PRUNED peer={} topic={}", peer->peer_id_.toBase58(), pb_graft.topic_id()); + SL_TRACE(logger_, + "GRAFT_PRUNED peer={} topic={}", + peer->peer_id_.toBase58(), + pb_graft.topic_id()); make_prune(*topic, peer); } } // Handle PRUNE: remove from mesh and update backoff (v1.1+ honors backoff). for (auto &pb_prune : pb_message.control().prune()) { - SL_TRACE(logger_, "RX PRUNE topic={} from={}", pb_prune.topic_id(), peer->peer_id_.toBase58()); + SL_TRACE(logger_, + "RX PRUNE topic={} from={}", + pb_prune.topic_id(), + peer->peer_id_.toBase58()); if (not peer->isGossipsub()) { return false; } @@ -722,7 +780,10 @@ namespace libp2p::protocol::gossip { // Handle IHAVE: select a capped subset of unknown IDs and enqueue IWANTs. if (pb_message.control().ihave().size() > 0) { - SL_TRACE(logger_, "RX IHAVE count={} from={}", pb_message.control().ihave().size(), peer->peer_id_.toBase58()); + SL_TRACE(logger_, + "RX IHAVE count={} from={}", + pb_message.control().ihave().size(), + peer->peer_id_.toBase58()); } if (not handle_ihave(peer, pb_message)) { return false; @@ -939,14 +1000,20 @@ namespace libp2p::protocol::gossip { assert(peer->stream_out_.has_value()); while (auto message = qtils::optionTake(peer->batch_)) { auto pb_messages = splitBatch(*message); - SL_TRACE(logger, "CHECKWRITE sending {} PB messages to {}", pb_messages.size(), peer_id.toBase58()); + SL_TRACE(logger, + "CHECKWRITE sending {} PB messages to {}", + pb_messages.size(), + peer_id.toBase58()); for (auto &encoded : pb_messages) { assert(not encoded.empty()); auto r = co_await writeVarintMessage(peer->stream_out_.value(), encoded); if (not r.has_value()) { - SL_TRACE(logger, "CHECKWRITE send failed for peer={} error={}", peer_id.toBase58(), r.error().message()); + SL_TRACE(logger, + "CHECKWRITE send failed for peer={} error={}", + peer_id.toBase58(), + r.error().message()); peer->stream_out_.reset(); break; } @@ -1011,26 +1078,33 @@ namespace libp2p::protocol::gossip { // emit gossip, expire history/cache, and clear expired dont_send_ marks. void Gossip::heartbeat() { ++heartbeat_ticks_; - + // Log peer scores and mesh status during heartbeat SL_TRACE(logger_, "=== HEARTBEAT #{} ===", heartbeat_ticks_); - SL_TRACE(logger_, "Peer count={} Topic count={}", peers_.size(), topics_.size()); + SL_TRACE( + logger_, "Peer count={} Topic count={}", peers_.size(), topics_.size()); for (auto &[peer_id, peer] : peers_) { auto peer_score = score_.score(peer_id); - SL_TRACE(logger_, "PEER_SCORE peer={} score={} out={} topics={}", - peer_id.toBase58(), peer_score, peer->out_, - peer->topics_.size()); + SL_TRACE(logger_, + "PEER_SCORE peer={} score={} out={} topics={}", + peer_id.toBase58(), + peer_score, + peer->out_, + peer->topics_.size()); } for (auto &[topic_hash, topic] : topics_) { - SL_TRACE(logger_, "TOPIC topic={} mesh_size={} peers={} publish_only={}", - qtils::byte2str(topic_hash), topic->mesh_peers_.size(), - topic->peers_.size(), topic->publish_only_); + SL_TRACE(logger_, + "TOPIC topic={} mesh_size={} peers={} publish_only={}", + qtils::byte2str(topic_hash), + topic->mesh_peers_.size(), + topic->peers_.size(), + topic->publish_only_); for (auto &peer : topic->mesh_peers_) { SL_TRACE(logger_, " MESH_PEER peer={}", peer->peer_id_.toBase58()); } } SL_TRACE(logger_, "=== END HEARTBEAT ==="); - + for (auto &topic : topics_ | std::views::values) { topic->backoff_.shift(); }