From 0b3a0fe70a767bda06ba208fd3033cd361841865 Mon Sep 17 00:00:00 2001 From: David Hale Date: Tue, 2 Jun 2026 23:28:36 -0700 Subject: [PATCH] fix: DaemonClient::send() reconnects and retries on dead-socket write --- common/common.cpp | 43 ++++++++++++++++++++++++++++++++++++++++--- utils/network.cpp | 5 ++--- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/common/common.cpp b/common/common.cpp index 5d7c08f6..b1652582 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -747,13 +747,24 @@ namespace Common { // arrived and reconnected without draining). Without this, the stale CID-tagged // reply would be read as the response to the current command and cause a // mismatch, propagating through every subsequent send until the socket is cycled. - // Safe: client_access mutex is held, so no concurrent sender touches this socket; + // + // client_access mutex is held, so no concurrent sender touches this socket; // and in the command/reply protocol daemons never push unsolicited TCP data. // +#ifdef LOGLEVEL_DEBUG + { std::ostringstream oss; oss << "[DEBUG] send pre-drain: name=" << this->name + << " isconnected=" << this->socket.isconnected() << " fd=" << this->socket.getfd(); + logwrite( function, oss.str() ); } +#endif { std::string discard; while ( this->socket.Poll(0) > 0 ) { - if ( this->socket.Read( discard, this->term_read ) <= 0 ) break; + ssize_t rd = this->socket.Read( discard, this->term_read ); +#ifdef LOGLEVEL_DEBUG + { std::ostringstream oss; oss << "[DEBUG] drain Read=" << rd << " fd=" << this->socket.getfd(); + logwrite( function, oss.str() ); } +#endif + if ( rd <= 0 ) break; message.str(""); message << "drained stale buffered reply from " << this->name << ": \"" << discard << "\""; logwrite( function, message.str() ); } @@ -769,7 +780,23 @@ namespace Common { // send the command // - this->socket.Write( command ); + ssize_t wrote = this->socket.Write( command ); +#ifdef LOGLEVEL_DEBUG + { std::ostringstream oss; oss << "[DEBUG] Write()=" << wrote << " fd=" << this->socket.getfd(); + logwrite( function, oss.str() ); } +#endif + + // a dead peer leaves fd=-1 which was set by the drain above, via Read()'s EOF-close (FIN) or + // Poll()'s POLLHUP-close (RST). Write() returns <=0. Reconnect and retry with the SAME + // correlation id (server-side CorrIdCache dedup makes that safe) instead of polling a dead + // descriptor for the full timeout. deaths observable before the write (FIN/RST). + // + if ( wrote <= 0 ) { + lock.unlock(); + this->connect(); + lock.lock(); + continue; // bounded by the existing trys < retry_limit + } // This indicates that the caller only wants to send the command, // and doesn't want to read the reply. @@ -778,7 +805,17 @@ namespace Common { // Wait (poll) connected socket for incoming data... // +#ifdef LOGLEVEL_DEBUG + auto _t0 = std::chrono::steady_clock::now(); +#endif pollret = this->socket.Poll(timeout_in); +#ifdef LOGLEVEL_DEBUG + { long _ms = std::chrono::duration_cast( + std::chrono::steady_clock::now() - _t0 ).count(); + std::ostringstream oss; oss << "[DEBUG] Poll(timeout=" << timeout_in << ")=" << pollret + << " elapsed=" << _ms << "ms fd=" << this->socket.getfd(); + logwrite( function, oss.str() ); } +#endif // got data so break out of retry loop // diff --git a/utils/network.cpp b/utils/network.cpp index eec4bc8b..bdcf1c14 100644 --- a/utils/network.cpp +++ b/utils/network.cpp @@ -965,10 +965,9 @@ namespace Network { return -1; // indicates error } if ( nread == 0 ) { -#ifdef LOGLEVEL_DEBUG - message << "[DEBUG] no data on socket " << this->host << ":" << this->port << " fd " << this->fd << ". closing connection"; + message.str(""); message << "peer closed connection (EOF) on fd " << this->fd + << " for " << this->host << "/" << this->port << "; closing"; logwrite( function, message.str() ); -#endif this->Close(); return 0; // not an error }