Skip to content

Commit

Permalink
safekeeper: flush WAL on transaction commit
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Nov 8, 2024
1 parent bc3c0eb commit 97a9cd3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
9 changes: 8 additions & 1 deletion libs/postgres_ffi/src/xlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::bindings::{
};
use super::wal_generator::LogicalMessageGenerator;
use super::PG_MAJORVERSION;
use crate::pg_constants;
use crate::pg_constants::{self, XLOG_XACT_COMMIT, XLOG_XACT_COMMIT_PREPARED};
use crate::PG_TLI;
use crate::{uint32, uint64, Oid};
use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
Expand Down Expand Up @@ -296,6 +296,13 @@ impl XLogRecord {
pub fn is_xlog_switch_record(&self) -> bool {
self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
}

// Is this record a transaction commit?
pub fn is_xact_commit(&self) -> bool {
self.xl_rmid == pg_constants::RM_XACT_ID
&& (self.xl_info & pg_constants::XLOG_XACT_OPMASK == XLOG_XACT_COMMIT
|| self.xl_info & pg_constants::XLOG_XACT_OPMASK == XLOG_XACT_COMMIT_PREPARED)
}
}

impl XLogPageHeaderData {
Expand Down
16 changes: 14 additions & 2 deletions safekeeper/src/wal_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use futures::future::BoxFuture;
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI};
use postgres_ffi::{dispatch_pgversion, XLogRecord, XLogSegNo, PG_TLI, XLOG_SIZE_OF_XLOG_RECORD};
use remote_storage::RemotePath;
use std::cmp::{max, min};
use std::future::Future;
Expand Down Expand Up @@ -431,19 +431,31 @@ impl Storage for PhysicalStorage {
self.decoder = WalStreamDecoder::new(startpos, pg_version);
}
self.decoder.feed_bytes(buf);
let mut xact_commit = false;
loop {
match self.decoder.poll_decode()? {
None => break, // no full record yet
Some((lsn, _rec)) => {
Some((lsn, rec)) => {
self.write_record_lsn = lsn;
if lsn <= self.flush_lsn {
debug_assert!(lsn > self.flush_lsn, "flush LSN regressed");
self.flush_record_lsn = lsn;
}
// TODO: the decoder already has the record header, make it return it.
let header = XLogRecord::from_slice(&rec[0..XLOG_SIZE_OF_XLOG_RECORD])
.expect("invalid record header");
xact_commit = xact_commit || header.is_xact_commit();
}
}
}

// If a transaction committed, flush the WAL. This will emit an AppendResponse to the
// compute. Otherwise, with pipelined ingestion, the txn may have to wait until the next
// periodic flush in 1 second, causing commit latency.
if xact_commit {
self.flush_wal().await?;
}

Ok(())
}

Expand Down

0 comments on commit 97a9cd3

Please sign in to comment.