Was reading through the code and noticed the following within WorkerState::stream_loop
msg_result = tokio::time::timeout(
self.cfg.idle_wakeup_interval,
read_backend_message_into(stream, &mut read_buf),
) => {
match msg_result {
Ok(res) => res?,
Err(_) => {
let applied = self.progress.load_applied();
last_applied = applied;
self.send_feedback(stream, applied, false).await?;
last_status_sent = Instant::now();
continue;
}
}
}
I believe this could potentially corrupt the socket as read_backend_message_into isn't cancellation safe
let mut hdr = [0u8; 5];
rd.read_exact(&mut hdr).await?;
let tag = hdr[0];
let len = i32::from_be_bytes([hdr[1], hdr[2], hdr[3], hdr[4]]);
if len < 4 {
return Err(PgWireError::Protocol(format!(
"invalid backend message length: {len}"
)));
}
let payload_len = (len - 4) as usize;
if payload_len > MAX_MESSAGE_SIZE {
return Err(PgWireError::Protocol(format!(
"backend message too large: {payload_len} bytes (max {MAX_MESSAGE_SIZE})"
)));
}
buf.clear();
buf.resize(payload_len, 0);
rd.read_exact(&mut buf[..]).await?;
Ok(BackendMessage {
tag,
payload: buf.split().freeze(),
})
Was reading through the code and noticed the following within WorkerState::stream_loop
I believe this could potentially corrupt the socket as read_backend_message_into isn't cancellation safe