From 72864e46283550d408ca4a826786491e7cb36100 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Wed, 4 Jun 2025 13:52:56 -0400 Subject: [PATCH 01/13] SockMan: introduce class and implement binding to listening socket Introduce a new low-level socket managing class `SockMan`. BindListenPort() is copied from CConnMan in net.cpp and will be modernized in the next commit. Unit-test it with a new class `SocketTestingSetup` which mocks `CreateSock()` and will enable mock client I/O in future commits. `SockMan` and `SocketTestingSetup` are designed to be generic and reusbale for higher-level network protocol implementation and testing. Co-authored-by: Vasil Dimov --- src/CMakeLists.txt | 1 + src/common/sockman.cpp | 85 ++++++++++++++++++++++++++++++++++ src/common/sockman.h | 44 ++++++++++++++++++ src/test/CMakeLists.txt | 1 + src/test/sockman_tests.cpp | 30 ++++++++++++ src/test/util/setup_common.cpp | 18 +++++++ src/test/util/setup_common.h | 15 ++++++ 7 files changed, 194 insertions(+) create mode 100644 src/common/sockman.cpp create mode 100644 src/common/sockman.h create mode 100644 src/test/sockman_tests.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 96a6790e612c..0e5d17a44e7a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -149,6 +149,7 @@ add_library(bitcoin_common STATIC EXCLUDE_FROM_ALL common/run_command.cpp common/settings.cpp common/signmessage.cpp + common/sockman.cpp common/system.cpp common/url.cpp compressor.cpp diff --git a/src/common/sockman.cpp b/src/common/sockman.cpp new file mode 100644 index 000000000000..25527db8e071 --- /dev/null +++ b/src/common/sockman.cpp @@ -0,0 +1,85 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://opensource.org/license/mit/. + +#include // IWYU pragma: keep + +#include +#include +#include +#include + +bool SockMan::BindListenPort(const CService& addrBind, bilingual_str& strError) +{ + int nOne = 1; + + // Create socket for listening for incoming connections + struct sockaddr_storage sockaddr; + socklen_t len = sizeof(sockaddr); + if (!addrBind.GetSockAddr((struct sockaddr*)&sockaddr, &len)) + { + strError = Untranslated(strprintf("Bind address family for %s not supported", addrBind.ToStringAddrPort())); + LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); + return false; + } + + std::unique_ptr sock = CreateSock(addrBind.GetSAFamily(), SOCK_STREAM, IPPROTO_TCP); + if (!sock) { + strError = Untranslated(strprintf("Couldn't open socket for incoming connections (socket returned error %s)", NetworkErrorString(WSAGetLastError()))); + LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); + return false; + } + + // Allow binding if the port is still in TIME_WAIT state after + // the program was closed and restarted. + if (sock->SetSockOpt(SOL_SOCKET, SO_REUSEADDR, (sockopt_arg_type)&nOne, sizeof(int)) == SOCKET_ERROR) { + strError = Untranslated(strprintf("Error setting SO_REUSEADDR on socket: %s, continuing anyway", NetworkErrorString(WSAGetLastError()))); + LogPrintf("%s\n", strError.original); + } + + // some systems don't have IPV6_V6ONLY but are always v6only; others do have the option + // and enable it by default or not. Try to enable it, if possible. + if (addrBind.IsIPv6()) { +#ifdef IPV6_V6ONLY + if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, (sockopt_arg_type)&nOne, sizeof(int)) == SOCKET_ERROR) { + strError = Untranslated(strprintf("Error setting IPV6_V6ONLY on socket: %s, continuing anyway", NetworkErrorString(WSAGetLastError()))); + LogPrintf("%s\n", strError.original); + } +#endif +#ifdef WIN32 + int nProtLevel = PROTECTION_LEVEL_UNRESTRICTED; + if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_PROTECTION_LEVEL, (const char*)&nProtLevel, sizeof(int)) == SOCKET_ERROR) { + strError = Untranslated(strprintf("Error setting IPV6_PROTECTION_LEVEL on socket: %s, continuing anyway", NetworkErrorString(WSAGetLastError()))); + LogPrintf("%s\n", strError.original); + } +#endif + } + + if (sock->Bind(reinterpret_cast(&sockaddr), len) == SOCKET_ERROR) { + int nErr = WSAGetLastError(); + if (nErr == WSAEADDRINUSE) + strError = strprintf(_("Unable to bind to %s on this computer. %s is probably already running."), addrBind.ToStringAddrPort(), CLIENT_NAME); + else + strError = strprintf(_("Unable to bind to %s on this computer (bind returned error %s)"), addrBind.ToStringAddrPort(), NetworkErrorString(nErr)); + LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); + return false; + } + LogPrintf("Bound to %s\n", addrBind.ToStringAddrPort()); + + // Listen for incoming connections + if (sock->Listen(SOMAXCONN) == SOCKET_ERROR) + { + strError = strprintf(_("Listening for incoming connections failed (listen returned error %s)"), NetworkErrorString(WSAGetLastError())); + LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); + return false; + } + + m_listen.emplace_back(std::move(sock)); + + return true; +} + +void SockMan::StopListening() +{ + m_listen.clear(); +} diff --git a/src/common/sockman.h b/src/common/sockman.h new file mode 100644 index 000000000000..84499cdacd9b --- /dev/null +++ b/src/common/sockman.h @@ -0,0 +1,44 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://opensource.org/license/mit/. + +#ifndef BITCOIN_COMMON_SOCKMAN_H +#define BITCOIN_COMMON_SOCKMAN_H + +#include +#include +#include + +#include +#include + +/** + * A socket manager class which handles socket operations. + * To use this class, inherit from it and implement the pure virtual methods. + * Handled operations: + * - binding and listening on sockets + */ +class SockMan +{ +public: + /** + * Bind to a new address:port, start listening and add the listen socket to `m_listen`. + * @param[in] addrBind Where to bind. + * @param[out] strError Error string if an error occurs. + * @retval true Success. + * @retval false Failure, `strError` will be set. + */ + bool BindListenPort(const CService& addrBind, bilingual_str& strError); + + /** + * Stop listening by closing all listening sockets. + */ + void StopListening(); + + /** + * List of listening sockets. + */ + std::vector> m_listen; +}; + +#endif // BITCOIN_COMMON_SOCKMAN_H diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 6ce33621af8e..cc7490cf172e 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -96,6 +96,7 @@ add_executable(test_bitcoin sigopcount_tests.cpp skiplist_tests.cpp sock_tests.cpp + sockman_tests.cpp span_tests.cpp streams_tests.cpp sync_tests.cpp diff --git a/src/test/sockman_tests.cpp b/src/test/sockman_tests.cpp new file mode 100644 index 000000000000..7650e03b9df1 --- /dev/null +++ b/src/test/sockman_tests.cpp @@ -0,0 +1,30 @@ +// Copyright (c) The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include +#include + +#include + +BOOST_FIXTURE_TEST_SUITE(sockman_tests, SocketTestingSetup) + +BOOST_AUTO_TEST_CASE(test_sockman) +{ + SockMan sockman; + + // This address won't actually get used because we stubbed CreateSock() + const std::optional addr{Lookup("0.0.0.0", 0, false)}; + BOOST_REQUIRE(addr.has_value()); + bilingual_str strError; + + // Init state + BOOST_REQUIRE_EQUAL(sockman.m_listen.size(), 0); + // Bind to mock Listening Socket + BOOST_REQUIRE(sockman.BindListenPort(addr.value(), strError)); + // We are bound and listening + BOOST_REQUIRE_EQUAL(sockman.m_listen.size(), 1); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 76a42d19ea2e..0b2d3677f2d6 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -601,6 +601,24 @@ void TestChain100Setup::MockMempoolMinFee(const CFeeRate& target_feerate) m_node.mempool->TrimToSize(0); assert(m_node.mempool->GetMinFee() == target_feerate); } + +SocketTestingSetup::SocketTestingSetup() + : m_create_sock_orig{CreateSock} +{ + CreateSock = [this](int, int, int) { + // This is a mock Listening Socket that a server can "bind" to and + // listen to for incoming connections. We won't need to access its I/O + // pipes because we don't read or write directly to it. It will return + // Connected Sockets from the queue via its Accept() method. + return std::make_unique(std::make_shared(), m_accepted_sockets); + }; +}; + +SocketTestingSetup::~SocketTestingSetup() +{ + CreateSock = m_create_sock_orig; +} + /** * @returns a real block (0000000000013b8ab2cd513b0261a14096412195a72a0c4827d229dcc7e0f7af) * with 9 txs. diff --git a/src/test/util/setup_common.h b/src/test/util/setup_common.h index 57bea9086b99..563048772d9e 100644 --- a/src/test/util/setup_common.h +++ b/src/test/util/setup_common.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include // IWYU pragma: export #include @@ -270,6 +271,20 @@ std::unique_ptr MakeNoLogFileContext(const ChainType chain_type = ChainType:: return std::make_unique(chain_type, opts); } +class SocketTestingSetup : public BasicTestingSetup +{ +public: + explicit SocketTestingSetup(); + ~SocketTestingSetup(); + +private: + // Save the original value of CreateSock here and restore it when the test ends. + const decltype(CreateSock) m_create_sock_orig; + + // Queue of connected sockets returned by listening socket (represents network interface) + std::shared_ptr m_accepted_sockets{std::make_shared()}; +}; + CBlock getBlock13b8a(); // Make types usable in BOOST_CHECK_* @{ From 35324545bac8b8e1e109bf633561c3c79626844d Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Wed, 4 Jun 2025 13:56:30 -0400 Subject: [PATCH 02/13] style: modernize the style of SockMan::BindListenPort() It was copied verbatim from `CConnman::BindListenPort()` in the previous commit. Modernize its variables and style and log the error messages from the caller. Also categorize the informative messages to the "net" category because they are quite specific to the networking layer. Co-authored-by: Vasil Dimov --- src/common/sockman.cpp | 82 ++++++++++++++++++++++---------------- src/common/sockman.h | 8 ++-- src/test/sockman_tests.cpp | 2 +- 3 files changed, 52 insertions(+), 40 deletions(-) diff --git a/src/common/sockman.cpp b/src/common/sockman.cpp index 25527db8e071..bd661789affd 100644 --- a/src/common/sockman.cpp +++ b/src/common/sockman.cpp @@ -9,68 +9,80 @@ #include #include -bool SockMan::BindListenPort(const CService& addrBind, bilingual_str& strError) +bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg) { - int nOne = 1; - // Create socket for listening for incoming connections - struct sockaddr_storage sockaddr; - socklen_t len = sizeof(sockaddr); - if (!addrBind.GetSockAddr((struct sockaddr*)&sockaddr, &len)) - { - strError = Untranslated(strprintf("Bind address family for %s not supported", addrBind.ToStringAddrPort())); - LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); + sockaddr_storage storage; + socklen_t len{sizeof(storage)}; + if (!to.GetSockAddr(reinterpret_cast(&storage), &len)) { + err_msg = Untranslated(strprintf("Bind address family for %s not supported", to.ToStringAddrPort())); return false; } - std::unique_ptr sock = CreateSock(addrBind.GetSAFamily(), SOCK_STREAM, IPPROTO_TCP); + std::unique_ptr sock{CreateSock(to.GetSAFamily(), SOCK_STREAM, IPPROTO_TCP)}; if (!sock) { - strError = Untranslated(strprintf("Couldn't open socket for incoming connections (socket returned error %s)", NetworkErrorString(WSAGetLastError()))); - LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); + err_msg = Untranslated(strprintf("Cannot create %s listen socket: %s", + to.ToStringAddrPort(), + NetworkErrorString(WSAGetLastError()))); return false; } + int one{1}; + // Allow binding if the port is still in TIME_WAIT state after // the program was closed and restarted. - if (sock->SetSockOpt(SOL_SOCKET, SO_REUSEADDR, (sockopt_arg_type)&nOne, sizeof(int)) == SOCKET_ERROR) { - strError = Untranslated(strprintf("Error setting SO_REUSEADDR on socket: %s, continuing anyway", NetworkErrorString(WSAGetLastError()))); - LogPrintf("%s\n", strError.original); + if (sock->SetSockOpt(SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&one), sizeof(one)) == SOCKET_ERROR) { + LogPrintLevel(BCLog::NET, + BCLog::Level::Info, + "Cannot set SO_REUSEADDR on %s listen socket: %s, continuing anyway\n", + to.ToStringAddrPort(), + NetworkErrorString(WSAGetLastError())); } // some systems don't have IPV6_V6ONLY but are always v6only; others do have the option // and enable it by default or not. Try to enable it, if possible. - if (addrBind.IsIPv6()) { + if (to.IsIPv6()) { #ifdef IPV6_V6ONLY - if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, (sockopt_arg_type)&nOne, sizeof(int)) == SOCKET_ERROR) { - strError = Untranslated(strprintf("Error setting IPV6_V6ONLY on socket: %s, continuing anyway", NetworkErrorString(WSAGetLastError()))); - LogPrintf("%s\n", strError.original); + if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast(&one), sizeof(one)) == SOCKET_ERROR) { + LogPrintLevel(BCLog::NET, + BCLog::Level::Info, + "Cannot set IPV6_V6ONLY on %s listen socket: %s, continuing anyway\n", + to.ToStringAddrPort(), + NetworkErrorString(WSAGetLastError())); } #endif #ifdef WIN32 - int nProtLevel = PROTECTION_LEVEL_UNRESTRICTED; - if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_PROTECTION_LEVEL, (const char*)&nProtLevel, sizeof(int)) == SOCKET_ERROR) { - strError = Untranslated(strprintf("Error setting IPV6_PROTECTION_LEVEL on socket: %s, continuing anyway", NetworkErrorString(WSAGetLastError()))); - LogPrintf("%s\n", strError.original); + int prot_level{PROTECTION_LEVEL_UNRESTRICTED}; + if (sock->SetSockOpt(IPPROTO_IPV6, + IPV6_PROTECTION_LEVEL, + reinterpret_cast(&prot_level), + sizeof(prot_level)) == SOCKET_ERROR) { + LogPrintLevel(BCLog::NET, + BCLog::Level::Info, + "Cannot set IPV6_PROTECTION_LEVEL on %s listen socket: %s, continuing anyway\n", + to.ToStringAddrPort(), + NetworkErrorString(WSAGetLastError())); } #endif } - if (sock->Bind(reinterpret_cast(&sockaddr), len) == SOCKET_ERROR) { - int nErr = WSAGetLastError(); - if (nErr == WSAEADDRINUSE) - strError = strprintf(_("Unable to bind to %s on this computer. %s is probably already running."), addrBind.ToStringAddrPort(), CLIENT_NAME); - else - strError = strprintf(_("Unable to bind to %s on this computer (bind returned error %s)"), addrBind.ToStringAddrPort(), NetworkErrorString(nErr)); - LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); + if (sock->Bind(reinterpret_cast(&storage), len) == SOCKET_ERROR) { + const int err{WSAGetLastError()}; + if (err == WSAEADDRINUSE) { + err_msg = strprintf(_("Unable to bind to %s on this computer. %s is probably already running."), + to.ToStringAddrPort(), + CLIENT_NAME); + } else { + err_msg = strprintf(_("Unable to bind to %s on this computer (bind returned error %s)"), + to.ToStringAddrPort(), + NetworkErrorString(err)); + } return false; } - LogPrintf("Bound to %s\n", addrBind.ToStringAddrPort()); // Listen for incoming connections - if (sock->Listen(SOMAXCONN) == SOCKET_ERROR) - { - strError = strprintf(_("Listening for incoming connections failed (listen returned error %s)"), NetworkErrorString(WSAGetLastError())); - LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); + if (sock->Listen(SOMAXCONN) == SOCKET_ERROR) { + err_msg = strprintf(_("Cannot listen on %s: %s"), to.ToStringAddrPort(), NetworkErrorString(WSAGetLastError())); return false; } diff --git a/src/common/sockman.h b/src/common/sockman.h index 84499cdacd9b..b977d49f42be 100644 --- a/src/common/sockman.h +++ b/src/common/sockman.h @@ -23,12 +23,12 @@ class SockMan public: /** * Bind to a new address:port, start listening and add the listen socket to `m_listen`. - * @param[in] addrBind Where to bind. - * @param[out] strError Error string if an error occurs. + * @param[in] to Where to bind. + * @param[out] err_msg Error string if an error occurs. * @retval true Success. - * @retval false Failure, `strError` will be set. + * @retval false Failure, `err_msg` will be set. */ - bool BindListenPort(const CService& addrBind, bilingual_str& strError); + bool BindAndStartListening(const CService& to, bilingual_str& err_msg); /** * Stop listening by closing all listening sockets. diff --git a/src/test/sockman_tests.cpp b/src/test/sockman_tests.cpp index 7650e03b9df1..664d755544b3 100644 --- a/src/test/sockman_tests.cpp +++ b/src/test/sockman_tests.cpp @@ -22,7 +22,7 @@ BOOST_AUTO_TEST_CASE(test_sockman) // Init state BOOST_REQUIRE_EQUAL(sockman.m_listen.size(), 0); // Bind to mock Listening Socket - BOOST_REQUIRE(sockman.BindListenPort(addr.value(), strError)); + BOOST_REQUIRE(sockman.BindAndStartListening(addr.value(), strError)); // We are bound and listening BOOST_REQUIRE_EQUAL(sockman.m_listen.size(), 1); } From 03cce3b91b046f128dea638d616359dffa90ed95 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Wed, 4 Jun 2025 14:14:13 -0400 Subject: [PATCH 03/13] SockMan: implement and test AcceptConnection() AcceptConnection() is mostly copied from CConmann in net.cpp and will be modernized in the following commit. Co-authored-by: Vasil Dimov --- src/common/sockman.cpp | 21 +++++++++++++++++++++ src/common/sockman.h | 9 +++++++++ src/test/sockman_tests.cpp | 16 +++++++++++++--- src/test/util/setup_common.cpp | 15 +++++++++++++++ src/test/util/setup_common.h | 3 +++ 5 files changed, 61 insertions(+), 3 deletions(-) diff --git a/src/common/sockman.cpp b/src/common/sockman.cpp index bd661789affd..2b9dac0b059f 100644 --- a/src/common/sockman.cpp +++ b/src/common/sockman.cpp @@ -91,6 +91,27 @@ bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg) return true; } +std::unique_ptr SockMan::AcceptConnection(const Sock& listen_sock, CService& addr) +{ + struct sockaddr_storage sockaddr; + socklen_t len = sizeof(sockaddr); + auto sock = listen_sock.Accept((struct sockaddr*)&sockaddr, &len); + + if (!sock) { + const int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK) { + LogPrintf("socket error accept failed: %s\n", NetworkErrorString(nErr)); + } + return {}; + } + + if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr, len)) { + LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "Unknown socket family\n"); + } + + return sock; +} + void SockMan::StopListening() { m_listen.clear(); diff --git a/src/common/sockman.h b/src/common/sockman.h index b977d49f42be..580e5b189e02 100644 --- a/src/common/sockman.h +++ b/src/common/sockman.h @@ -17,6 +17,7 @@ * To use this class, inherit from it and implement the pure virtual methods. * Handled operations: * - binding and listening on sockets + * - accepting incoming connections */ class SockMan { @@ -30,6 +31,14 @@ class SockMan */ bool BindAndStartListening(const CService& to, bilingual_str& err_msg); + /** + * Accept a connection. + * @param[in] listen_sock Socket on which to accept the connection. + * @param[out] addr Address of the peer that was accepted. + * @return Newly created socket for the accepted connection. + */ + std::unique_ptr AcceptConnection(const Sock& listen_sock, CService& addr); + /** * Stop listening by closing all listening sockets. */ diff --git a/src/test/sockman_tests.cpp b/src/test/sockman_tests.cpp index 664d755544b3..67436da7f8a1 100644 --- a/src/test/sockman_tests.cpp +++ b/src/test/sockman_tests.cpp @@ -15,16 +15,26 @@ BOOST_AUTO_TEST_CASE(test_sockman) SockMan sockman; // This address won't actually get used because we stubbed CreateSock() - const std::optional addr{Lookup("0.0.0.0", 0, false)}; - BOOST_REQUIRE(addr.has_value()); + const std::optional addr_bind{Lookup("0.0.0.0", 0, false)}; + BOOST_REQUIRE(addr_bind.has_value()); bilingual_str strError; // Init state BOOST_REQUIRE_EQUAL(sockman.m_listen.size(), 0); // Bind to mock Listening Socket - BOOST_REQUIRE(sockman.BindAndStartListening(addr.value(), strError)); + BOOST_REQUIRE(sockman.BindAndStartListening(addr_bind.value(), strError)); // We are bound and listening BOOST_REQUIRE_EQUAL(sockman.m_listen.size(), 1); + + // Pick up the phone, there's no one there + CService addr_connection; + BOOST_REQUIRE(!sockman.AcceptConnection(*sockman.m_listen.front(), addr_connection)); + + // Create a mock client and add it to the local CreateSock queue + ConnectClient(); + // Accept the connection + BOOST_REQUIRE(sockman.AcceptConnection(*sockman.m_listen.front(), addr_connection)); + BOOST_CHECK_EQUAL(addr_connection.ToStringAddrPort(), "5.5.5.5:6789"); } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 0b2d3677f2d6..19f4ca56163b 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -619,6 +619,21 @@ SocketTestingSetup::~SocketTestingSetup() CreateSock = m_create_sock_orig; } +void SocketTestingSetup::ConnectClient() +{ + // I/O pipes for a mock Connected Socket we can read and write to. + std::shared_ptr connected_socket_pipes(std::make_shared()); + + // TODO: Insert a payload + + // Create the Mock Connected Socket that represents a client. + // It needs I/O pipes but its queue can remain empty + std::unique_ptr connected_socket{std::make_unique(connected_socket_pipes, std::make_shared())}; + + // Push into the queue of Accepted Sockets returned by the local CreateSock() + m_accepted_sockets->Push(std::move(connected_socket)); +} + /** * @returns a real block (0000000000013b8ab2cd513b0261a14096412195a72a0c4827d229dcc7e0f7af) * with 9 txs. diff --git a/src/test/util/setup_common.h b/src/test/util/setup_common.h index 563048772d9e..b7f947213201 100644 --- a/src/test/util/setup_common.h +++ b/src/test/util/setup_common.h @@ -277,6 +277,9 @@ class SocketTestingSetup : public BasicTestingSetup explicit SocketTestingSetup(); ~SocketTestingSetup(); + // Connect to the socket with a mock client (a DynSock) + void ConnectClient(); + private: // Save the original value of CreateSock here and restore it when the test ends. const decltype(CreateSock) m_create_sock_orig; From 2900c33f66c4bb1c7f1569a5d9ea622c24ad2a32 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Tue, 17 Sep 2024 17:29:07 +0200 Subject: [PATCH 04/13] style: modernize the style of SockMan::AcceptConnection() --- src/common/sockman.cpp | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/common/sockman.cpp b/src/common/sockman.cpp index 2b9dac0b059f..830d3777f41a 100644 --- a/src/common/sockman.cpp +++ b/src/common/sockman.cpp @@ -93,19 +93,23 @@ bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg) std::unique_ptr SockMan::AcceptConnection(const Sock& listen_sock, CService& addr) { - struct sockaddr_storage sockaddr; - socklen_t len = sizeof(sockaddr); - auto sock = listen_sock.Accept((struct sockaddr*)&sockaddr, &len); + sockaddr_storage storage; + socklen_t len{sizeof(storage)}; + + auto sock{listen_sock.Accept(reinterpret_cast(&storage), &len)}; if (!sock) { - const int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK) { - LogPrintf("socket error accept failed: %s\n", NetworkErrorString(nErr)); + const int err{WSAGetLastError()}; + if (err != WSAEWOULDBLOCK) { + LogPrintLevel(BCLog::NET, + BCLog::Level::Error, + "Cannot accept new connection: %s\n", + NetworkErrorString(err)); } return {}; } - if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr, len)) { + if (!addr.SetSockAddr(reinterpret_cast(&storage), len)) { LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "Unknown socket family\n"); } From 7e68a72e8da22c1a24d8b70c80d0c7b45d8899cf Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Wed, 4 Jun 2025 14:19:16 -0400 Subject: [PATCH 05/13] SockMan: generate sequential Ids for each newly accepted connection Co-authored-by: Vasil Dimov --- src/common/sockman.cpp | 5 +++++ src/common/sockman.h | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/src/common/sockman.cpp b/src/common/sockman.cpp index 830d3777f41a..4f64910cc86d 100644 --- a/src/common/sockman.cpp +++ b/src/common/sockman.cpp @@ -116,6 +116,11 @@ std::unique_ptr SockMan::AcceptConnection(const Sock& listen_sock, CServic return sock; } +SockMan::Id SockMan::GetNewId() +{ + return m_next_id.fetch_add(1, std::memory_order_relaxed); +} + void SockMan::StopListening() { m_listen.clear(); diff --git a/src/common/sockman.h b/src/common/sockman.h index 580e5b189e02..633ee5638beb 100644 --- a/src/common/sockman.h +++ b/src/common/sockman.h @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -22,6 +23,11 @@ class SockMan { public: + /** + * Each connection is assigned an unique id of this type. + */ + using Id = int64_t; + /** * Bind to a new address:port, start listening and add the listen socket to `m_listen`. * @param[in] to Where to bind. @@ -39,6 +45,11 @@ class SockMan */ std::unique_ptr AcceptConnection(const Sock& listen_sock, CService& addr); + /** + * Generate an id for a newly created connection. + */ + Id GetNewId(); + /** * Stop listening by closing all listening sockets. */ @@ -48,6 +59,13 @@ class SockMan * List of listening sockets. */ std::vector> m_listen; + +private: + + /** + * The id to assign to the next created connection. Used to generate ids of connections. + */ + std::atomic m_next_id{0}; }; #endif // BITCOIN_COMMON_SOCKMAN_H From 6e420ba127da8069e930d53a0142bb12f3abc924 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Mon, 23 Jun 2025 11:36:01 -0700 Subject: [PATCH 06/13] [move-only] Make GetBindAddress() callable from outside net.cpp --- src/net.cpp | 14 -------------- src/netbase.cpp | 13 +++++++++++++ src/netbase.h | 3 +++ 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 217d9a890373..4a069438751c 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -373,20 +373,6 @@ bool CConnman::CheckIncomingNonce(uint64_t nonce) return true; } -/** Get the bind address for a socket as CService. */ -static CService GetBindAddress(const Sock& sock) -{ - CService addr_bind; - struct sockaddr_storage sockaddr_bind; - socklen_t sockaddr_bind_len = sizeof(sockaddr_bind); - if (!sock.GetSockName((struct sockaddr*)&sockaddr_bind, &sockaddr_bind_len)) { - addr_bind.SetSockAddr((const struct sockaddr*)&sockaddr_bind, sockaddr_bind_len); - } else { - LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "getsockname failed\n"); - } - return addr_bind; -} - CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); diff --git a/src/netbase.cpp b/src/netbase.cpp index 1f3faa186434..707a11cc29bb 100644 --- a/src/netbase.cpp +++ b/src/netbase.cpp @@ -946,3 +946,16 @@ CService MaybeFlipIPv6toCJDNS(const CService& service) } return ret; } + +CService GetBindAddress(const Sock& sock) +{ + CService addr_bind; + struct sockaddr_storage sockaddr_bind; + socklen_t sockaddr_bind_len = sizeof(sockaddr_bind); + if (!sock.GetSockName((struct sockaddr*)&sockaddr_bind, &sockaddr_bind_len)) { + addr_bind.SetSockAddr((const struct sockaddr*)&sockaddr_bind, sockaddr_bind_len); + } else { + LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "getsockname failed\n"); + } + return addr_bind; +} diff --git a/src/netbase.h b/src/netbase.h index b2cc172e536a..49c1b24ce631 100644 --- a/src/netbase.h +++ b/src/netbase.h @@ -351,4 +351,7 @@ bool IsBadPort(uint16_t port); */ CService MaybeFlipIPv6toCJDNS(const CService& service); +/** Get the bind address for a socket as CService. */ +CService GetBindAddress(const Sock& sock); + #endif // BITCOIN_NETBASE_H From 63eda80af90a891a703815de09214345a81e6a67 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Thu, 12 Jun 2025 14:57:16 -0400 Subject: [PATCH 07/13] SockMan: start an I/O loop in a new thread and accept connections Socket handling methods are copied from CConnMan: `CConnman::GenerateWaitSockets()` goes to `SockMan::GenerateWaitSockets()`. `CConnman::ThreadSocketHandler()` and `CConnman::SocketHandler()` are combined into `SockMan::ThreadSocketHandler()`. `CConnman::SocketHandlerListening()` goes to `SockMan::SocketHandlerListening()`. Co-authored-by: Vasil Dimov --- src/common/sockman.cpp | 125 +++++++++++++++++++++++++++ src/common/sockman.h | 170 +++++++++++++++++++++++++++++++++++++ src/test/sockman_tests.cpp | 65 ++++++++++++-- 3 files changed, 353 insertions(+), 7 deletions(-) diff --git a/src/common/sockman.cpp b/src/common/sockman.cpp index 4f64910cc86d..a6145e626c45 100644 --- a/src/common/sockman.cpp +++ b/src/common/sockman.cpp @@ -8,6 +8,11 @@ #include #include #include +#include + +// The set of sockets cannot be modified while waiting +// The sleep time needs to be small to avoid new sockets stalling +static constexpr auto SELECT_TIMEOUT{50ms}; bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg) { @@ -91,6 +96,19 @@ bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg) return true; } +void SockMan::StartSocketsThreads(const Options& options) +{ + m_thread_socket_handler = std::thread( + &util::TraceThread, options.socket_handler_thread_name, [this] { ThreadSocketHandler(); }); +} + +void SockMan::JoinSocketsThreads() +{ + if (m_thread_socket_handler.joinable()) { + m_thread_socket_handler.join(); + } +} + std::unique_ptr SockMan::AcceptConnection(const Sock& listen_sock, CService& addr) { sockaddr_storage storage; @@ -116,12 +134,119 @@ std::unique_ptr SockMan::AcceptConnection(const Sock& listen_sock, CServic return sock; } +void SockMan::NewSockAccepted(std::unique_ptr&& sock, const CService& me, const CService& them) +{ + AssertLockNotHeld(m_connected_mutex); + + if (!sock->IsSelectable()) { + LogPrintf("connection from %s dropped: non-selectable socket\n", them.ToStringAddrPort()); + return; + } + + // According to the internet TCP_NODELAY is not carried into accepted sockets + // on all platforms. Set it again here just to be sure. + const int on{1}; + if (sock->SetSockOpt(IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) == SOCKET_ERROR) { + LogDebug(BCLog::NET, "connection from %s: unable to set TCP_NODELAY, continuing anyway\n", + them.ToStringAddrPort()); + } + + const Id id{GetNewId()}; + + { + LOCK(m_connected_mutex); + m_connected.emplace(id, std::make_shared(std::move(sock))); + } + + if (!EventNewConnectionAccepted(id, me, them)) { + CloseConnection(id); + } +} + SockMan::Id SockMan::GetNewId() { return m_next_id.fetch_add(1, std::memory_order_relaxed); } +bool SockMan::CloseConnection(Id id) +{ + LOCK(m_connected_mutex); + return m_connected.erase(id) > 0; +} + void SockMan::StopListening() { m_listen.clear(); } + +bool SockMan::ShouldTryToSend(Id id) const { return true; } + +bool SockMan::ShouldTryToRecv(Id id) const { return true; } + +void SockMan::ThreadSocketHandler() +{ + AssertLockNotHeld(m_connected_mutex); + + while (!interruptNet) { + // Check for the readiness of the already connected sockets and the + // listening sockets in one call ("readiness" as in poll(2) or + // select(2)). If none are ready, wait for a short while and return + // empty sets. + auto io_readiness{GenerateWaitSockets()}; + if (io_readiness.events_per_sock.empty() || + // WaitMany() may as well be a static method, the context of the first Sock in the vector is not relevant. + !io_readiness.events_per_sock.begin()->first->WaitMany(SELECT_TIMEOUT, + io_readiness.events_per_sock)) { + interruptNet.sleep_for(SELECT_TIMEOUT); + } + + // Accept new connections from listening sockets. + SocketHandlerListening(io_readiness.events_per_sock); + } +} + +SockMan::IOReadiness SockMan::GenerateWaitSockets() +{ + AssertLockNotHeld(m_connected_mutex); + + IOReadiness io_readiness; + + for (const auto& sock : m_listen) { + io_readiness.events_per_sock.emplace(sock, Sock::Events{Sock::RECV}); + } + + auto connected_snapshot{WITH_LOCK(m_connected_mutex, return m_connected;)}; + + for (const auto& [id, sockets] : connected_snapshot) { + const bool select_recv{ShouldTryToRecv(id)}; + const bool select_send{ShouldTryToSend(id)}; + if (!select_recv && !select_send) continue; + + Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0); + io_readiness.events_per_sock.emplace(sockets->sock, Sock::Events{event}); + io_readiness.ids_per_sock.emplace(sockets->sock, id); + } + + return io_readiness; +} + +void SockMan::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) +{ + AssertLockNotHeld(m_connected_mutex); + + for (const auto& sock : m_listen) { + if (interruptNet) { + return; + } + const auto it = events_per_sock.find(sock); + if (it != events_per_sock.end() && it->second.occurred & Sock::RECV) { + CService addr_accepted; + + auto sock_accepted{AcceptConnection(*sock, addr_accepted)}; + + if (sock_accepted) { + NewSockAccepted(std::move(sock_accepted), GetBindAddress(*sock), addr_accepted); + } + } + } +} diff --git a/src/common/sockman.h b/src/common/sockman.h index 633ee5638beb..282ae3aee1d5 100644 --- a/src/common/sockman.h +++ b/src/common/sockman.h @@ -18,7 +18,10 @@ * To use this class, inherit from it and implement the pure virtual methods. * Handled operations: * - binding and listening on sockets + * - starting of necessary threads to process socket operations * - accepting incoming connections + * - closing connections + * - waiting for IO readiness on sockets and doing send/recv accordingly */ class SockMan { @@ -28,8 +31,15 @@ class SockMan */ using Id = int64_t; + virtual ~SockMan() = default; + + // + // Non-virtual functions, to be reused by children classes. + // + /** * Bind to a new address:port, start listening and add the listen socket to `m_listen`. + * Should be called before `StartSocketsThreads()`. * @param[in] to Where to bind. * @param[out] err_msg Error string if an error occurs. * @retval true Success. @@ -37,6 +47,23 @@ class SockMan */ bool BindAndStartListening(const CService& to, bilingual_str& err_msg); + /** + * Options to influence `StartSocketsThreads()`. + */ + struct Options { + std::string_view socket_handler_thread_name; + }; + + /** + * Start the necessary threads for sockets IO. + */ + void StartSocketsThreads(const Options& options); + + /** + * Join (wait for) the threads started by `StartSocketsThreads()` to exit. + */ + void JoinSocketsThreads(); + /** * Accept a connection. * @param[in] listen_sock Socket on which to accept the connection. @@ -45,16 +72,39 @@ class SockMan */ std::unique_ptr AcceptConnection(const Sock& listen_sock, CService& addr); + /** + * After a new socket with a peer has been created, configure its flags, + * make a new connection id and call `EventNewConnectionAccepted()`. + * @param[in] sock The newly created socket. + * @param[in] me Address at our end of the connection. + * @param[in] them Address of the new peer. + */ + void NewSockAccepted(std::unique_ptr&& sock, const CService& me, const CService& them) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + /** * Generate an id for a newly created connection. */ Id GetNewId(); + /** + * Destroy a given connection by closing its socket and release resources occupied by it. + * @param[in] id Connection to destroy. + * @return Whether the connection existed and its socket was closed by this call. + */ + bool CloseConnection(Id id) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + /** * Stop listening by closing all listening sockets. */ void StopListening(); + /** + * This is signaled when network activity should cease. + */ + CThreadInterrupt interruptNet; + /** * List of listening sockets. */ @@ -62,10 +112,130 @@ class SockMan private: + // + // Pure virtual functions must be implemented by children classes. + // + + /** + * Be notified when a new connection has been accepted. + * @param[in] id Id of the newly accepted connection. + * @param[in] me The address and port at our side of the connection. + * @param[in] them The address and port at the peer's side of the connection. + * @retval true The new connection was accepted at the higher level. + * @retval false The connection was refused at the higher level, so the + * associated socket and id should be discarded by `SockMan`. + */ + virtual bool EventNewConnectionAccepted(Id id, + const CService& me, + const CService& them) = 0; + + // + // Non-pure virtual functions can be overridden by children classes or left + // alone to use the default implementation from SockMan. + // + + /** + * Can be used to temporarily pause sends on a connection. + * SockMan would only call Send() if this returns true. + * The implementation in SockMan always returns true. + * @param[in] id Connection for which to confirm or omit the next call to EventReadyToSend(). + */ + virtual bool ShouldTryToSend(Id id) const; + + /** + * SockMan would only call Recv() on a connection's socket if this returns true. + * Can be used to temporarily pause receives on a connection. + * The implementation in SockMan always returns true. + * @param[in] id Connection for which to confirm or omit the next receive. + */ + virtual bool ShouldTryToRecv(Id id) const; + + /** + * The sockets used by a connection. + */ + struct ConnectionSockets { + explicit ConnectionSockets(std::unique_ptr&& s) + : sock{std::move(s)} + { + } + + /** + * Mutex that serializes the Send() and Recv() calls on `sock`. + */ + Mutex mutex; + + /** + * Underlying socket. + * `shared_ptr` (instead of `unique_ptr`) is used to avoid premature close of the + * underlying file descriptor by one thread while another thread is poll(2)-ing + * it for activity. + * @see https://github.com/bitcoin/bitcoin/issues/21744 for details. + */ + std::shared_ptr sock; + }; + + /** + * Info about which socket has which event ready and its connection id. + */ + struct IOReadiness { + /** + * Map of socket -> socket events. For example: + * socket1 -> { requested = SEND|RECV, occurred = RECV } + * socket2 -> { requested = SEND, occurred = SEND } + */ + Sock::EventsPerSock events_per_sock; + + /** + * Map of socket -> connection id (in `m_connected`). For example + * socket1 -> id=23 + * socket2 -> id=56 + */ + std::unordered_map + ids_per_sock; + }; + + /** + * Check connected and listening sockets for IO readiness and process them accordingly. + */ + void ThreadSocketHandler() + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Generate a collection of sockets to check for IO readiness. + * @return Sockets to check for readiness plus an aux map to find the + * corresponding connection id given a socket. + */ + IOReadiness GenerateWaitSockets() + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Accept incoming connections, one from each read-ready listening socket. + * @param[in] events_per_sock Sockets that are ready for IO. + */ + void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + /** * The id to assign to the next created connection. Used to generate ids of connections. */ std::atomic m_next_id{0}; + + /** + * Thread that sends to and receives from sockets and accepts connections. + */ + std::thread m_thread_socket_handler; + + mutable Mutex m_connected_mutex; + + /** + * Sockets for existent connections. + * The `shared_ptr` makes it possible to create a snapshot of this by simply copying + * it (under `m_connected_mutex`). + */ + std::unordered_map> m_connected GUARDED_BY(m_connected_mutex); }; #endif // BITCOIN_COMMON_SOCKMAN_H diff --git a/src/test/sockman_tests.cpp b/src/test/sockman_tests.cpp index 67436da7f8a1..8e6bb964f102 100644 --- a/src/test/sockman_tests.cpp +++ b/src/test/sockman_tests.cpp @@ -12,7 +12,39 @@ BOOST_FIXTURE_TEST_SUITE(sockman_tests, SocketTestingSetup) BOOST_AUTO_TEST_CASE(test_sockman) { - SockMan sockman; + class TestSockMan : public SockMan + { + public: + // Connections are added from the SockMan I/O thread + // but the test reads them from the main thread. + Mutex m_connections_mutex; + std::vector> m_connections; + + size_t GetConnectionsCount() EXCLUSIVE_LOCKS_REQUIRED(!m_connections_mutex) + { + LOCK(m_connections_mutex); + return m_connections.size(); + } + + std::pair GetFirstConnection() EXCLUSIVE_LOCKS_REQUIRED(!m_connections_mutex) + { + LOCK(m_connections_mutex); + return m_connections.front(); + } + + private: + virtual bool EventNewConnectionAccepted(Id id, + const CService& me, + const CService& them) override + EXCLUSIVE_LOCKS_REQUIRED(!m_connections_mutex) + { + LOCK(m_connections_mutex); + m_connections.emplace_back(id, them); + return true; + } + }; + + TestSockMan sockman; // This address won't actually get used because we stubbed CreateSock() const std::optional addr_bind{Lookup("0.0.0.0", 0, false)}; @@ -26,15 +58,34 @@ BOOST_AUTO_TEST_CASE(test_sockman) // We are bound and listening BOOST_REQUIRE_EQUAL(sockman.m_listen.size(), 1); - // Pick up the phone, there's no one there - CService addr_connection; - BOOST_REQUIRE(!sockman.AcceptConnection(*sockman.m_listen.front(), addr_connection)); + // Name the SockMan I/O thread + SockMan::Options options{"test_sockman"}; + // Start the I/O loop + sockman.StartSocketsThreads(options); + + // No connections yet + BOOST_CHECK_EQUAL(sockman.GetConnectionsCount(), 0); // Create a mock client and add it to the local CreateSock queue ConnectClient(); - // Accept the connection - BOOST_REQUIRE(sockman.AcceptConnection(*sockman.m_listen.front(), addr_connection)); - BOOST_CHECK_EQUAL(addr_connection.ToStringAddrPort(), "5.5.5.5:6789"); + + // Wait up to a minute to find and connect the client in the I/O loop + int attempts{6000}; + while (sockman.GetConnectionsCount() < 1) { + std::this_thread::sleep_for(10ms); + BOOST_REQUIRE(--attempts > 0); + } + + // Inspect the connection + auto client{sockman.GetFirstConnection()}; + BOOST_CHECK_EQUAL(client.second.ToStringAddrPort(), "5.5.5.5:6789"); + + // Close connection + BOOST_REQUIRE(sockman.CloseConnection(client.first)); + // Stop the I/O loop and shutdown + sockman.interruptNet(); + sockman.JoinSocketsThreads(); + sockman.StopListening(); } BOOST_AUTO_TEST_SUITE_END() From 7d4d4b6fad63c78404d2ec184e119c97377eda3a Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Thu, 12 Jun 2025 15:35:05 -0400 Subject: [PATCH 08/13] SockMan: handle connected sockets: read data from socket `CConnman::SocketHandlerConnected()` copied to `SockMan::SocketHandlerConnected()`. Testing this requires adding a new feature to the SocketTestingSetup, inserting a "request" payload into the mock client that connects to us. Co-authored-by: Vasil Dimov --- src/common/sockman.cpp | 66 ++++++++++++++++++++++++++++++++++ src/common/sockman.h | 37 +++++++++++++++++++ src/test/sockman_tests.cpp | 34 ++++++++++++++++-- src/test/util/setup_common.cpp | 5 +-- src/test/util/setup_common.h | 4 +-- 5 files changed, 140 insertions(+), 6 deletions(-) diff --git a/src/common/sockman.cpp b/src/common/sockman.cpp index a6145e626c45..198cfc0e86db 100644 --- a/src/common/sockman.cpp +++ b/src/common/sockman.cpp @@ -200,6 +200,9 @@ void SockMan::ThreadSocketHandler() interruptNet.sleep_for(SELECT_TIMEOUT); } + // Service (send/receive) each of the already connected sockets. + SocketHandlerConnected(io_readiness); + // Accept new connections from listening sockets. SocketHandlerListening(io_readiness.events_per_sock); } @@ -230,6 +233,55 @@ SockMan::IOReadiness SockMan::GenerateWaitSockets() return io_readiness; } +void SockMan::SocketHandlerConnected(const IOReadiness& io_readiness) +{ + AssertLockNotHeld(m_connected_mutex); + + for (const auto& [sock, events] : io_readiness.events_per_sock) { + if (interruptNet) { + return; + } + + auto it{io_readiness.ids_per_sock.find(sock)}; + if (it == io_readiness.ids_per_sock.end()) { + continue; + } + const Id id{it->second}; + + bool send_ready = events.occurred & Sock::SEND; // Sock::SEND could only be set if ShouldTryToSend() has returned true in GenerateWaitSockets(). + bool recv_ready = events.occurred & Sock::RECV; // Sock::RECV could only be set if ShouldTryToRecv() has returned true in GenerateWaitSockets(). + bool err_ready = events.occurred & Sock::ERR; + + if (send_ready) { + // TODO: send data + } + + if (recv_ready || err_ready) { + uint8_t buf[0x10000]; // typical socket buffer is 8K-64K + + auto sockets{GetConnectionSockets(id)}; + if (!sockets) { + continue; + } + + const ssize_t nrecv{WITH_LOCK( + sockets->mutex, + return sockets->sock->Recv(buf, sizeof(buf), MSG_DONTWAIT);)}; + + if (nrecv < 0) { // In all cases (including -1 and 0) EventIOLoopCompletedForOne() should be executed after this, don't change the code to skip it. + const int err = WSAGetLastError(); + if (err != WSAEWOULDBLOCK && err != WSAEMSGSIZE && err != WSAEINTR && err != WSAEINPROGRESS) { + EventGotPermanentReadError(id, NetworkErrorString(err)); + } + } else if (nrecv == 0) { + EventGotEOF(id); + } else { + EventGotData(id, {buf, static_cast(nrecv)}); + } + } + } +} + void SockMan::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) { AssertLockNotHeld(m_connected_mutex); @@ -250,3 +302,17 @@ void SockMan::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) } } } + +std::shared_ptr SockMan::GetConnectionSockets(Id id) const +{ + LOCK(m_connected_mutex); + + auto it{m_connected.find(id)}; + if (it == m_connected.end()) { + // There is no socket in case we've already disconnected, or in test cases without + // real connections. + return {}; + } + + return it->second; +} diff --git a/src/common/sockman.h b/src/common/sockman.h index 282ae3aee1d5..14eed5bf3f12 100644 --- a/src/common/sockman.h +++ b/src/common/sockman.h @@ -129,6 +129,28 @@ class SockMan const CService& me, const CService& them) = 0; + /** + * Called when new data has been received. + * @param[in] id Connection for which the data arrived. + * @param[in] data Received data. + */ + virtual void EventGotData(Id id, std::span data) = 0; + + /** + * Called when the remote peer has sent an EOF on the socket. This is a graceful + * close of their writing side, we can still send and they will receive, if it + * makes sense at the application level. + * @param[in] id Connection whose socket got EOF. + */ + virtual void EventGotEOF(Id id) = 0; + + /** + * Called when we get an irrecoverable error trying to read from a socket. + * @param[in] id Connection whose socket got an error. + * @param[in] errmsg Message describing the error. + */ + virtual void EventGotPermanentReadError(Id id, const std::string& errmsg) = 0; + // // Non-pure virtual functions can be overridden by children classes or left // alone to use the default implementation from SockMan. @@ -211,6 +233,13 @@ class SockMan IOReadiness GenerateWaitSockets() EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + /** + * Do the read/write for connected sockets that are ready for IO. + * @param[in] io_readiness Which sockets are ready and their connection ids. + */ + void SocketHandlerConnected(const IOReadiness& io_readiness) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + /** * Accept incoming connections, one from each read-ready listening socket. * @param[in] events_per_sock Sockets that are ready for IO. @@ -218,6 +247,14 @@ class SockMan void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + /** + * Retrieve an entry from m_connected. + * @param[in] id Connection id to search for. + * @return ConnectionSockets for the given connection id or empty shared_ptr if not found. + */ + std::shared_ptr GetConnectionSockets(Id id) const + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + /** * The id to assign to the next created connection. Used to generate ids of connections. */ diff --git a/src/test/sockman_tests.cpp b/src/test/sockman_tests.cpp index 8e6bb964f102..5d613966a004 100644 --- a/src/test/sockman_tests.cpp +++ b/src/test/sockman_tests.cpp @@ -20,6 +20,11 @@ BOOST_AUTO_TEST_CASE(test_sockman) Mutex m_connections_mutex; std::vector> m_connections; + // Received data is written here by the SockMan I/O thread + // and tested by the main thread. + Mutex m_received_mutex; + std::unordered_map> m_received; + size_t GetConnectionsCount() EXCLUSIVE_LOCKS_REQUIRED(!m_connections_mutex) { LOCK(m_connections_mutex); @@ -32,6 +37,12 @@ BOOST_AUTO_TEST_CASE(test_sockman) return m_connections.front(); } + std::vector GetReceivedData(Id id) EXCLUSIVE_LOCKS_REQUIRED(!m_received_mutex) + { + LOCK(m_received_mutex); + return m_received[id]; + } + private: virtual bool EventNewConnectionAccepted(Id id, const CService& me, @@ -42,6 +53,16 @@ BOOST_AUTO_TEST_CASE(test_sockman) m_connections.emplace_back(id, them); return true; } + + // When we receive data just store it in a member variable for testing. + virtual void EventGotData(Id id, std::span data) override + EXCLUSIVE_LOCKS_REQUIRED(!m_received_mutex) + { + LOCK(m_received_mutex); + m_received[id].assign(data.begin(), data.end()); + }; + virtual void EventGotEOF(Id id) override {}; + virtual void EventGotPermanentReadError(Id id, const std::string& errmsg) override {}; }; TestSockMan sockman; @@ -66,8 +87,10 @@ BOOST_AUTO_TEST_CASE(test_sockman) // No connections yet BOOST_CHECK_EQUAL(sockman.GetConnectionsCount(), 0); - // Create a mock client and add it to the local CreateSock queue - ConnectClient(); + // Create a mock client with a data payload to send + // and add it to the local CreateSock queue + const std::vector request = {'b', 'i', 't', 's'}; + ConnectClient(request); // Wait up to a minute to find and connect the client in the I/O loop int attempts{6000}; @@ -80,6 +103,13 @@ BOOST_AUTO_TEST_CASE(test_sockman) auto client{sockman.GetFirstConnection()}; BOOST_CHECK_EQUAL(client.second.ToStringAddrPort(), "5.5.5.5:6789"); + // Wait up to a minute to read the data from the connection + attempts = 6000; + while (!std::ranges::equal(sockman.GetReceivedData(client.first), request)) { + std::this_thread::sleep_for(10ms); + BOOST_REQUIRE(--attempts > 0); + } + // Close connection BOOST_REQUIRE(sockman.CloseConnection(client.first)); // Stop the I/O loop and shutdown diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 19f4ca56163b..bf182422001c 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -619,12 +619,13 @@ SocketTestingSetup::~SocketTestingSetup() CreateSock = m_create_sock_orig; } -void SocketTestingSetup::ConnectClient() +void SocketTestingSetup::ConnectClient(const std::vector& data) { // I/O pipes for a mock Connected Socket we can read and write to. std::shared_ptr connected_socket_pipes(std::make_shared()); - // TODO: Insert a payload + // Insert the payload + connected_socket_pipes->recv.PushBytes(data.data(), data.size()); // Create the Mock Connected Socket that represents a client. // It needs I/O pipes but its queue can remain empty diff --git a/src/test/util/setup_common.h b/src/test/util/setup_common.h index b7f947213201..a7e64d88aa1c 100644 --- a/src/test/util/setup_common.h +++ b/src/test/util/setup_common.h @@ -277,8 +277,8 @@ class SocketTestingSetup : public BasicTestingSetup explicit SocketTestingSetup(); ~SocketTestingSetup(); - // Connect to the socket with a mock client (a DynSock) - void ConnectClient(); + // Connect to the socket with a mock client (a DynSock) and send pre-loaded data. + void ConnectClient(const std::vector& data); private: // Save the original value of CreateSock here and restore it when the test ends. From 6ef3e992808872bd920c31222e700a0a483389c0 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Fri, 13 Jun 2025 10:17:59 -0400 Subject: [PATCH 09/13] SockMan: handle connected sockets: write data to socket Sockets-touching bits from `CConnman::SocketSendData()` copied to `SockMan::SendBytes()`. Testing this requires adding a new feature to the SocketTestingSetup, returning the DynSock I/O pipes from the mock socket so the received data can be checked. Co-authored-by: Vasil Dimov --- src/common/sockman.cpp | 48 +++++++++++++++++++++++++++++++++- src/common/sockman.h | 27 +++++++++++++++++++ src/test/sockman_tests.cpp | 33 ++++++++++++++++++++++- src/test/util/setup_common.cpp | 4 ++- src/test/util/setup_common.h | 3 ++- 5 files changed, 111 insertions(+), 4 deletions(-) diff --git a/src/common/sockman.cpp b/src/common/sockman.cpp index 198cfc0e86db..9b4cea1c160a 100644 --- a/src/common/sockman.cpp +++ b/src/common/sockman.cpp @@ -174,6 +174,46 @@ bool SockMan::CloseConnection(Id id) return m_connected.erase(id) > 0; } +ssize_t SockMan::SendBytes(Id id, + std::span data, + bool will_send_more, + std::string& errmsg) const +{ + AssertLockNotHeld(m_connected_mutex); + + if (data.empty()) { + return 0; + } + + auto sockets{GetConnectionSockets(id)}; + if (!sockets) { + // Bail out immediately and just leave things in the caller's send queue. + return 0; + } + + int flags{MSG_NOSIGNAL | MSG_DONTWAIT}; +#ifdef MSG_MORE + if (will_send_more) { + flags |= MSG_MORE; + } +#endif + + const ssize_t sent{WITH_LOCK( + sockets->mutex, + return sockets->sock->Send(reinterpret_cast(data.data()), data.size(), flags);)}; + + if (sent >= 0) { + return sent; + } + + const int err{WSAGetLastError()}; + if (err == WSAEWOULDBLOCK || err == WSAEMSGSIZE || err == WSAEINTR || err == WSAEINPROGRESS) { + return 0; + } + errmsg = NetworkErrorString(err); + return -1; +} + void SockMan::StopListening() { m_listen.clear(); @@ -253,7 +293,13 @@ void SockMan::SocketHandlerConnected(const IOReadiness& io_readiness) bool err_ready = events.occurred & Sock::ERR; if (send_ready) { - // TODO: send data + bool cancel_recv; + + EventReadyToSend(id, cancel_recv); + + if (cancel_recv) { + recv_ready = false; + } } if (recv_ready || err_ready) { diff --git a/src/common/sockman.h b/src/common/sockman.h index 14eed5bf3f12..9c094ff9354d 100644 --- a/src/common/sockman.h +++ b/src/common/sockman.h @@ -95,6 +95,23 @@ class SockMan bool CloseConnection(Id id) EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + /** + * Try to send some data over the given connection. + * @param[in] id Identifier of the connection. + * @param[in] data The data to send, it might happen that only a prefix of this is sent. + * @param[in] will_send_more Used as an optimization if the caller knows that they will + * be sending more data soon after this call. + * @param[out] errmsg If <0 is returned then this will contain a human readable message + * explaining the error. + * @retval >=0 The number of bytes actually sent. + * @retval <0 A permanent error has occurred. + */ + ssize_t SendBytes(Id id, + std::span data, + bool will_send_more, + std::string& errmsg) const + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + /** * Stop listening by closing all listening sockets. */ @@ -129,6 +146,16 @@ class SockMan const CService& me, const CService& them) = 0; + /** + * Called when the socket is ready to send data and `ShouldTryToSend()` has + * returned true. This is where the higher level code serializes its messages + * and calls `SockMan::SendBytes()`. + * @param[in] id Id of the connection whose socket is ready to send. + * @param[out] cancel_recv Should always be set upon return and if it is true, + * then the next attempt to receive data from that connection will be omitted. + */ + virtual void EventReadyToSend(Id id, bool& cancel_recv) = 0; + /** * Called when new data has been received. * @param[in] id Connection for which the data arrived. diff --git a/src/test/sockman_tests.cpp b/src/test/sockman_tests.cpp index 5d613966a004..fe55b110c72e 100644 --- a/src/test/sockman_tests.cpp +++ b/src/test/sockman_tests.cpp @@ -24,6 +24,7 @@ BOOST_AUTO_TEST_CASE(test_sockman) // and tested by the main thread. Mutex m_received_mutex; std::unordered_map> m_received; + std::vector m_respond{'o', 'k'}; size_t GetConnectionsCount() EXCLUSIVE_LOCKS_REQUIRED(!m_connections_mutex) { @@ -63,6 +64,20 @@ BOOST_AUTO_TEST_CASE(test_sockman) }; virtual void EventGotEOF(Id id) override {}; virtual void EventGotPermanentReadError(Id id, const std::string& errmsg) override {}; + + // As soon as we can send data to the connected socket, send the preloaded response. + // Data is sent by the SockMan I/O thread and read by the main test thread, + // but the Mutex in SockMan::ConnectionSockets guards this. + virtual void EventReadyToSend(Id id, bool& cancel_recv) override + { + cancel_recv = false; + if (m_respond.size() > 0) { + std::string errmsg; + ssize_t sent = SendBytes(id, m_respond, /*will_send_more=*/false, errmsg); + // Remove sent bytes until entire response is sent. + m_respond.erase(m_respond.begin(), m_respond.begin() + sent); + } + } }; TestSockMan sockman; @@ -90,7 +105,7 @@ BOOST_AUTO_TEST_CASE(test_sockman) // Create a mock client with a data payload to send // and add it to the local CreateSock queue const std::vector request = {'b', 'i', 't', 's'}; - ConnectClient(request); + auto pipes{ConnectClient(request)}; // Wait up to a minute to find and connect the client in the I/O loop int attempts{6000}; @@ -110,6 +125,22 @@ BOOST_AUTO_TEST_CASE(test_sockman) BOOST_REQUIRE(--attempts > 0); } + // Wait up to a minute to write our response data back to the connection + attempts = 6000; + size_t expected_response_size = sockman.m_respond.size(); + std::vector actually_received(expected_response_size); + while (!std::ranges::equal(actually_received, sockman.m_respond)) { + // Read the data received by the mock socket + ssize_t bytes_read = pipes->send.GetBytes((void *)actually_received.data(), expected_response_size); + // We may need to wait a few loop iterations in the socket thread + // but once we can write, we can expect all the data at once. + if (bytes_read >= 0) { + BOOST_CHECK_EQUAL(bytes_read, expected_response_size); + } + std::this_thread::sleep_for(10ms); + BOOST_REQUIRE(--attempts > 0); + } + // Close connection BOOST_REQUIRE(sockman.CloseConnection(client.first)); // Stop the I/O loop and shutdown diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index bf182422001c..511b47fc0949 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -619,7 +619,7 @@ SocketTestingSetup::~SocketTestingSetup() CreateSock = m_create_sock_orig; } -void SocketTestingSetup::ConnectClient(const std::vector& data) +std::shared_ptr SocketTestingSetup::ConnectClient(const std::vector& data) { // I/O pipes for a mock Connected Socket we can read and write to. std::shared_ptr connected_socket_pipes(std::make_shared()); @@ -633,6 +633,8 @@ void SocketTestingSetup::ConnectClient(const std::vector& data) // Push into the queue of Accepted Sockets returned by the local CreateSock() m_accepted_sockets->Push(std::move(connected_socket)); + + return connected_socket_pipes; } /** diff --git a/src/test/util/setup_common.h b/src/test/util/setup_common.h index a7e64d88aa1c..d8d7fe07e2c8 100644 --- a/src/test/util/setup_common.h +++ b/src/test/util/setup_common.h @@ -278,7 +278,8 @@ class SocketTestingSetup : public BasicTestingSetup ~SocketTestingSetup(); // Connect to the socket with a mock client (a DynSock) and send pre-loaded data. - void ConnectClient(const std::vector& data); + // Returns the I/O pipes from the mock client so we can read response datasent to it. + std::shared_ptr ConnectClient(const std::vector& data); private: // Save the original value of CreateSock here and restore it when the test ends. From 598bee6bd590757565d2564ae86cf46b5eea4399 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Fri, 13 Jun 2025 10:43:34 -0400 Subject: [PATCH 10/13] SockMan: dispatch cyclical events from I/O loop Copy from some parts of `CConnman::SocketHandlerConnected()` and `CConnman::ThreadSocketHandler()` to: `EventIOLoopCompletedForOne(id)` and `EventIOLoopCompletedForAll()`. Co-authored-by: Vasil Dimov --- src/common/sockman.cpp | 8 ++++++++ src/common/sockman.h | 17 +++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/src/common/sockman.cpp b/src/common/sockman.cpp index 9b4cea1c160a..c6ed415a54df 100644 --- a/src/common/sockman.cpp +++ b/src/common/sockman.cpp @@ -223,11 +223,17 @@ bool SockMan::ShouldTryToSend(Id id) const { return true; } bool SockMan::ShouldTryToRecv(Id id) const { return true; } +void SockMan::EventIOLoopCompletedForOne(Id id) {} + +void SockMan::EventIOLoopCompletedForAll() {} + void SockMan::ThreadSocketHandler() { AssertLockNotHeld(m_connected_mutex); while (!interruptNet) { + EventIOLoopCompletedForAll(); + // Check for the readiness of the already connected sockets and the // listening sockets in one call ("readiness" as in poll(2) or // select(2)). If none are ready, wait for a short while and return @@ -325,6 +331,8 @@ void SockMan::SocketHandlerConnected(const IOReadiness& io_readiness) EventGotData(id, {buf, static_cast(nrecv)}); } } + + EventIOLoopCompletedForOne(id); } } diff --git a/src/common/sockman.h b/src/common/sockman.h index 9c094ff9354d..5187ed3f0519 100644 --- a/src/common/sockman.h +++ b/src/common/sockman.h @@ -199,6 +199,23 @@ class SockMan */ virtual bool ShouldTryToRecv(Id id) const; + /** + * SockMan has completed the current send+recv iteration for a given connection. + * It will do another send+recv for this connection after processing all other connections. + * Can be used to execute periodic tasks for a given connection. + * The implementation in SockMan does nothing. + * @param[in] id Connection for which send+recv has been done. + */ + virtual void EventIOLoopCompletedForOne(Id id); + + /** + * SockMan has completed send+recv for all connections. + * Can be used to execute periodic tasks for all connections, like closing + * connections due to higher level logic. + * The implementation in SockMan does nothing. + */ + virtual void EventIOLoopCompletedForAll(); + /** * The sockets used by a connection. */ From 5acab497eb956dffde1d900139005d7ca1b2b956 Mon Sep 17 00:00:00 2001 From: Sjors Provoost Date: Mon, 15 Jul 2024 13:30:39 +0200 Subject: [PATCH 11/13] Add sv2 SETUP_CONNECTION messages Co-Authored-By: Christopher Coverdale --- src/sv2/messages.h | 153 +++++++++++++++++++++++++++++++- src/test/CMakeLists.txt | 1 + src/test/sv2_messages_tests.cpp | 100 +++++++++++++++++++++ 3 files changed, 253 insertions(+), 1 deletion(-) create mode 100644 src/test/sv2_messages_tests.cpp diff --git a/src/sv2/messages.h b/src/sv2/messages.h index 9475f5bbdedc..9e4c5fc1dd00 100644 --- a/src/sv2/messages.h +++ b/src/sv2/messages.h @@ -6,11 +6,22 @@ #define BITCOIN_SV2_MESSAGES_H #include // for CSerializedNetMsg and CNetMessage +#include +#include +#include +#include