Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,13 +747,24 @@ namespace Common {
// arrived and reconnected without draining). Without this, the stale CID-tagged
// reply would be read as the response to the current command and cause a
// mismatch, propagating through every subsequent send until the socket is cycled.
// Safe: client_access mutex is held, so no concurrent sender touches this socket;
//
// client_access mutex is held, so no concurrent sender touches this socket;
// and in the command/reply protocol daemons never push unsolicited TCP data.
//
#ifdef LOGLEVEL_DEBUG
{ std::ostringstream oss; oss << "[DEBUG] send pre-drain: name=" << this->name
<< " isconnected=" << this->socket.isconnected() << " fd=" << this->socket.getfd();
logwrite( function, oss.str() ); }
#endif
{
std::string discard;
while ( this->socket.Poll(0) > 0 ) {
if ( this->socket.Read( discard, this->term_read ) <= 0 ) break;
ssize_t rd = this->socket.Read( discard, this->term_read );
#ifdef LOGLEVEL_DEBUG
{ std::ostringstream oss; oss << "[DEBUG] drain Read=" << rd << " fd=" << this->socket.getfd();
logwrite( function, oss.str() ); }
#endif
if ( rd <= 0 ) break;
message.str(""); message << "drained stale buffered reply from " << this->name << ": \"" << discard << "\"";
logwrite( function, message.str() );
}
Expand All @@ -769,7 +780,23 @@ namespace Common {

// send the command
//
this->socket.Write( command );
ssize_t wrote = this->socket.Write( command );
#ifdef LOGLEVEL_DEBUG
{ std::ostringstream oss; oss << "[DEBUG] Write()=" << wrote << " fd=" << this->socket.getfd();
logwrite( function, oss.str() ); }
#endif

// a dead peer leaves fd=-1 which was set by the drain above, via Read()'s EOF-close (FIN) or
// Poll()'s POLLHUP-close (RST). Write() returns <=0. Reconnect and retry with the SAME
// correlation id (server-side CorrIdCache dedup makes that safe) instead of polling a dead
// descriptor for the full timeout. deaths observable before the write (FIN/RST).
//
if ( wrote <= 0 ) {
lock.unlock();
this->connect();
lock.lock();
continue; // bounded by the existing trys < retry_limit
}

// This indicates that the caller only wants to send the command,
// and doesn't want to read the reply.
Expand All @@ -778,7 +805,17 @@ namespace Common {

// Wait (poll) connected socket for incoming data...
//
#ifdef LOGLEVEL_DEBUG
auto _t0 = std::chrono::steady_clock::now();
#endif
pollret = this->socket.Poll(timeout_in);
#ifdef LOGLEVEL_DEBUG
{ long _ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _t0 ).count();
std::ostringstream oss; oss << "[DEBUG] Poll(timeout=" << timeout_in << ")=" << pollret
<< " elapsed=" << _ms << "ms fd=" << this->socket.getfd();
logwrite( function, oss.str() ); }
#endif

// got data so break out of retry loop
//
Expand Down
5 changes: 2 additions & 3 deletions utils/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -965,10 +965,9 @@ namespace Network {
return -1; // indicates error
}
if ( nread == 0 ) {
#ifdef LOGLEVEL_DEBUG
message << "[DEBUG] no data on socket " << this->host << ":" << this->port << " fd " << this->fd << ". closing connection";
message.str(""); message << "peer closed connection (EOF) on fd " << this->fd
<< " for " << this->host << "/" << this->port << "; closing";
logwrite( function, message.str() );
#endif
this->Close();
return 0; // not an error
}
Expand Down
Loading