Skip to content

Commit 9567327

Browse files
committed
drain connections when more messages are expected
The needs_drain() condition prior to this commit checks the connection state is some kind of idle or otherwise "in sync". However, even when the connection state is Idle, there can be expected messages in the protocol queue. If a client disconnects abruptly at just the right time, needs_drain() would not return true, but the protocol queue is not empty and expected messages from the server are not drained. In this scenario, the connection is checked back in in a tainted state and subsequent usage can produce unexpected results (ProtocolOutOfSync or off-by-1 results mixups). This updates needs_drain() to also account for prepared_statements.has_more_messages() in addition to the "in sync" check, thus triggering a drain when there are more messages expected from the server (and tracked in the protocol queue).
1 parent 6704336 commit 9567327

File tree

2 files changed

+85
-2
lines changed

2 files changed

+85
-2
lines changed

pgdog/src/backend/pool/test/mod.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,3 +653,59 @@ async fn test_lsn_monitor() {
653653

654654
pool.shutdown();
655655
}
656+
657+
/// If a client disconnects while the backend still has protocol messages
658+
/// queued for it, but the high-level stats state is Idle, the connection must
659+
/// still be treated as needing a drain. Otherwise the connection is checked
660+
/// back into the pool with a non-empty protocol queue and will be "tainted"
661+
/// for the next client.
662+
#[tokio::test]
663+
async fn test_abrupt_disconnect_with_pending_protocol_state_requires_drain() {
664+
crate::logger();
665+
666+
let pool = pool();
667+
let mut guard = pool.get(&Request::default()).await.unwrap();
668+
669+
guard
670+
.send(&vec![ProtocolMessage::from(Query::new("SELECT 1"))].into())
671+
.await
672+
.unwrap();
673+
674+
// Consume only the RowDescription ('T'), leaving the remainder of the
675+
// response (D, C, Z) unread. At this point the protocol state still
676+
// expects a ReadyForQuery, so has_more_messages() is true.
677+
let msg = guard.read().await.unwrap();
678+
assert_eq!(msg.code(), 'T');
679+
680+
assert!(
681+
guard.prepared_statements().state().has_more_messages(),
682+
"protocol state should have pending messages after partial read"
683+
);
684+
guard.stats_mut().state(State::Idle);
685+
assert_eq!(guard.stats().state, State::Idle);
686+
687+
// Client disconnects, guard is dropped, runs Guard::cleanup. needs_drain()
688+
// must report that there are pending protocol messages it's expecting,
689+
// otherwise the connection is checked back into the pool with a non-empty
690+
// protocol queue.
691+
drop(guard);
692+
693+
// Re-acquire the same connection from the pool. If this
694+
// assertion ever fails, the connection has been returned to the pool
695+
// in a tainted state.
696+
let mut guard = pool.get(&Request::default()).await.unwrap();
697+
assert!(
698+
!guard.prepared_statements().state().has_more_messages(),
699+
"connection should not have pending protocol messages after cleanup"
700+
);
701+
702+
// Run a fresh query and assert we get a valid result back; if the
703+
// connection had been returned to the pool without a drain, this is
704+
// where stray CommandComplete/ReadyForQuery messages from the previous
705+
// query would surface and break the protocol. Each query would receive the
706+
// prior query's results (off by 1).
707+
let rows: Vec<i32> = guard.fetch_all("SELECT 2").await.unwrap();
708+
assert_eq!(rows, vec![2]);
709+
let rows: Vec<i32> = guard.fetch_all("SELECT 3").await.unwrap();
710+
assert_eq!(rows, vec![3]);
711+
}

pgdog/src/backend/server.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ impl Server {
562562
)
563563
}
564564

565-
/// Server is done executing all queries and isz
565+
/// Server is done executing all queries and is
566566
/// not inside a transaction.
567567
pub fn can_check_in(&self) -> bool {
568568
self.stats().state == State::Idle
@@ -602,7 +602,17 @@ impl Server {
602602

603603
/// Connection was left with an unfinished query.
604604
pub fn needs_drain(&self) -> bool {
605-
!self.in_sync()
605+
// We need to drain whenever the protocol state indicates that there
606+
// are still tracked messages expected from the server *or* the high‑
607+
// level connection state is not one of the "in sync" states.
608+
//
609+
// This is intentionally more conservative than just checking
610+
// `in_sync()`: if a client disappears after we've enqueued expected
611+
// backend responses (via `PreparedStatements::handle`) but before we
612+
// see a new ReadyForQuery, the stats state can still be Idle while the
613+
// protocol queue has pending items. In that case we must treat the
614+
// connection as needing a drain before it is safe to reuse.
615+
self.has_more_messages() || !self.in_sync()
606616
}
607617

608618
/// Close the connection, don't do any recovery.
@@ -1771,6 +1781,23 @@ pub mod test {
17711781
}
17721782
}
17731783

1784+
#[test]
1785+
fn test_needs_drain_when_protocol_queue_not_empty_even_if_idle() {
1786+
use crate::state::State;
1787+
1788+
let mut server = Server::default();
1789+
assert_eq!(server.stats().state, State::Idle);
1790+
assert!(server.in_sync());
1791+
assert!(!server.has_more_messages());
1792+
assert!(!server.needs_drain());
1793+
1794+
server.prepared_statements_mut().state_mut().add('Z'); // expect a ReadyForQuery at some point
1795+
1796+
assert_eq!(server.stats().state, State::Idle);
1797+
assert!(server.prepared_statements().state().has_more_messages());
1798+
assert!(server.needs_drain());
1799+
}
1800+
17741801
#[tokio::test]
17751802
async fn test_partial_state() -> Result<(), Box<dyn std::error::Error>> {
17761803
crate::logger();

0 commit comments

Comments
 (0)