From 248558dee85849fd95fece7f8e0a730c14eb0660 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 28 Oct 2024 18:18:37 +0100 Subject: [PATCH] safekeeper: refactor `WalAcceptor` to be event-driven (#9462) ## Problem The `WalAcceptor` main loop currently uses two nested loops to consume inbound messages. This makes it hard to slot in periodic events like metrics collection. It also duplicates the event processing code, and assumes all messages in steady state are AppendRequests (other messages types may be dropped if following an AppendRequest). ## Summary of changes Refactor the `WalAcceptor` loop to be event driven. --- safekeeper/src/receive_wal.rs | 118 ++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 56 deletions(-) diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 3dbf72298fd6..f97e127a1724 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -21,18 +21,15 @@ use postgres_backend::QueryError; use pq_proto::BeMessage; use serde::Deserialize; use serde::Serialize; +use std::future; use std::net::SocketAddr; use std::sync::Arc; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; -use tokio::sync::mpsc::channel; -use tokio::sync::mpsc::error::TryRecvError; -use tokio::sync::mpsc::Receiver; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::task; use tokio::task::JoinHandle; -use tokio::time::Duration; -use tokio::time::Instant; +use tokio::time::{Duration, MissedTickBehavior}; use tracing::*; use utils::id::TenantTimelineId; use utils::lsn::Lsn; @@ -444,9 +441,9 @@ async fn network_write( } } -// Send keepalive messages to walproposer, to make sure it receives updates -// even when it writes a steady stream of messages. -const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); +/// The WAL flush interval. This ensures we periodically flush the WAL and send AppendResponses to +/// walproposer, even when it's writing a steady stream of messages. +const FLUSH_INTERVAL: Duration = Duration::from_secs(1); /// Encapsulates a task which takes messages from msg_rx, processes and pushes /// replies to reply_tx. @@ -494,67 +491,76 @@ impl WalAcceptor { async fn run(&mut self) -> anyhow::Result<()> { let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id); - // After this timestamp we will stop processing AppendRequests and send a response - // to the walproposer. walproposer sends at least one AppendRequest per second, - // we will send keepalives by replying to these requests once per second. - let mut next_keepalive = Instant::now(); + // Periodically flush the WAL. + let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL); + flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + flush_ticker.tick().await; // skip the initial, immediate tick - while let Some(mut next_msg) = self.msg_rx.recv().await { - // Update walreceiver state in shmem for reporting. - if let ProposerAcceptorMessage::Elected(_) = &next_msg { - walreceiver_guard.get().status = WalReceiverStatus::Streaming; - } + // Tracks unflushed appends. + let mut dirty = false; - let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) { - // Loop through AppendRequests while available to write as many WAL records as - // possible without fsyncing. - // - // Make sure the WAL is flushed before returning, see: - // https://github.com/neondatabase/neon/issues/9259 - // - // Note: this will need to be rewritten if we want to read non-AppendRequest messages here. - // Otherwise, we might end up in a situation where we read a message, but don't - // process it. - while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg { - let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); - - if let Some(reply) = self.tli.process_msg(&noflush_msg).await? { - if self.reply_tx.send(reply).await.is_err() { - break; // disconnected, flush WAL and return on next send/recv - } + loop { + let reply = tokio::select! { + // Process inbound message. + msg = self.msg_rx.recv() => { + // If disconnected, break to flush WAL and return. + let Some(mut msg) = msg else { + break; + }; + + // Update walreceiver state in shmem for reporting. + if let ProposerAcceptorMessage::Elected(_) = &msg { + walreceiver_guard.get().status = WalReceiverStatus::Streaming; } - // get out of this loop if keepalive time is reached - if Instant::now() >= next_keepalive { - break; + // Don't flush the WAL on every append, only periodically via flush_ticker. + // This batches multiple appends per fsync. If the channel is empty after + // sending the reply, we'll schedule an immediate flush. + if let ProposerAcceptorMessage::AppendRequest(append_request) = msg { + msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); + dirty = true; } - // continue pulling AppendRequests if available - match self.msg_rx.try_recv() { - Ok(msg) => next_msg = msg, - Err(TryRecvError::Empty) => break, - // on disconnect, flush WAL and return on next send/recv - Err(TryRecvError::Disconnected) => break, - }; + self.tli.process_msg(&msg).await? } - // flush all written WAL to the disk - self.tli - .process_msg(&ProposerAcceptorMessage::FlushWAL) - .await? - } else { - // process message other than AppendRequest - self.tli.process_msg(&next_msg).await? + // While receiving AppendRequests, flush the WAL periodically and respond with an + // AppendResponse to let walproposer know we're still alive. + _ = flush_ticker.tick(), if dirty => { + dirty = false; + self.tli + .process_msg(&ProposerAcceptorMessage::FlushWAL) + .await? + } + + // If there are no pending messages, flush the WAL immediately. + // + // TODO: this should be done via flush_ticker.reset_immediately(), but that's always + // delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866. + _ = future::ready(()), if dirty && self.msg_rx.is_empty() => { + dirty = false; + flush_ticker.reset(); + self.tli + .process_msg(&ProposerAcceptorMessage::FlushWAL) + .await? + } }; - if let Some(reply) = reply_msg { + // Send reply, if any. + if let Some(reply) = reply { if self.reply_tx.send(reply).await.is_err() { - return Ok(()); // chan closed, streaming terminated + break; // disconnected, break to flush WAL and return } - // reset keepalive time - next_keepalive = Instant::now() + KEEPALIVE_INTERVAL; } } + + // Flush WAL on disconnect, see https://github.com/neondatabase/neon/issues/9259. + if dirty { + self.tli + .process_msg(&ProposerAcceptorMessage::FlushWAL) + .await?; + } + Ok(()) } }