diff --git a/Cargo.lock b/Cargo.lock index 7fd9053f627c..db206dd0c20e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2841,6 +2841,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lz4_flex" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8" +dependencies = [ + "twox-hash", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -3511,6 +3520,7 @@ dependencies = [ "hyper", "itertools", "leaky-bucket", + "lz4_flex", "md5", "metrics", "nix 0.27.1", diff --git a/Cargo.toml b/Cargo.toml index 76f4ff041c09..04178987e51f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ jsonwebtoken = "9" lasso = "0.7" leaky-bucket = "1.0.1" libc = "0.2" +lz4_flex = "0.11.1" md5 = "0.7.0" memoffset = "0.8" native-tls = "0.2" diff --git a/Makefile b/Makefile index ea782cb369a0..4de2af61b59c 100644 --- a/Makefile +++ b/Makefile @@ -23,12 +23,12 @@ endif UNAME_S := $(shell uname -s) ifeq ($(UNAME_S),Linux) # Seccomp BPF is only available for Linux - PG_CONFIGURE_OPTS += --with-libseccomp + PG_CONFIGURE_OPTS += --with-lz4 --with-libseccomp else ifeq ($(UNAME_S),Darwin) # macOS with brew-installed openssl requires explicit paths # It can be configured with OPENSSL_PREFIX variable OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3) - PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib + PG_CONFIGURE_OPTS += --with-lz4 --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig # macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure # brew formulae are keg-only and not symlinked into HOMEBREW_PREFIX, force their usage diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index fe5bbd1c06fe..4aa5c6d0f52a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -757,6 +757,7 @@ pub enum PagestreamBeMessage { Error(PagestreamErrorResponse), DbSize(PagestreamDbSizeResponse), GetSlruSegment(PagestreamGetSlruSegmentResponse), + GetCompressedPage(PagestreamGetPageResponse), } // Keep in sync with `pagestore_client.h` @@ -996,6 +997,12 @@ impl PagestreamBeMessage { bytes.put(&resp.page[..]); } + Self::GetCompressedPage(resp) => { + bytes.put_u8(105); /* tag from pagestore_client.h */ + bytes.put_u16(resp.page.len() as u16); + bytes.put(&resp.page[..]); + } + Self::Error(resp) => { bytes.put_u8(Tag::Error as u8); bytes.put(resp.message.as_bytes()); @@ -1078,6 +1085,7 @@ impl PagestreamBeMessage { Self::Error(_) => "Error", Self::DbSize(_) => "DbSize", Self::GetSlruSegment(_) => "GetSlruSegment", + Self::GetCompressedPage(_) => "GetCompressedPage", } } } diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index aa6845b9b129..98599dc05904 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -144,6 +144,13 @@ pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result anyhow::Result { + dispatch_pgversion!( + version, + Ok(pgv::bindings::bkpimg_is_compressed_lz4(bimg_info)) + ) +} + pub fn generate_wal_segment( segno: u64, system_id: u64, diff --git a/libs/postgres_ffi/src/pg_constants_v14.rs b/libs/postgres_ffi/src/pg_constants_v14.rs index 32f8f5111436..4edbfdc4d6bb 100644 --- a/libs/postgres_ffi/src/pg_constants_v14.rs +++ b/libs/postgres_ffi/src/pg_constants_v14.rs @@ -8,3 +8,7 @@ pub const SIZEOF_RELMAPFILE: usize = 512; /* sizeof(RelMapFile) in relmapper.c * pub fn bkpimg_is_compressed(bimg_info: u8) -> bool { (bimg_info & BKPIMAGE_IS_COMPRESSED) != 0 } + +pub fn bkpimg_is_compressed_lz4(_bimg_info: u8) -> bool { + false +} diff --git a/libs/postgres_ffi/src/pg_constants_v15.rs b/libs/postgres_ffi/src/pg_constants_v15.rs index 626a23c7eaba..b419dc52411c 100644 --- a/libs/postgres_ffi/src/pg_constants_v15.rs +++ b/libs/postgres_ffi/src/pg_constants_v15.rs @@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool { (bimg_info & ANY_COMPRESS_FLAG) != 0 } + +pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool { + (bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0 +} diff --git a/libs/postgres_ffi/src/pg_constants_v16.rs b/libs/postgres_ffi/src/pg_constants_v16.rs index 587be71cb3ea..52750f8ac5a2 100644 --- a/libs/postgres_ffi/src/pg_constants_v16.rs +++ b/libs/postgres_ffi/src/pg_constants_v16.rs @@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool { (bimg_info & ANY_COMPRESS_FLAG) != 0 } + +pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool { + (bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0 +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 5adeaffe1af6..a01a50dc52f7 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,6 +37,7 @@ humantime-serde.workspace = true hyper.workspace = true itertools.workspace = true leaky-bucket.workspace = true +lz4_flex.workspace = true md5.workspace = true nix.workspace = true # hack to get the number of worker threads tokio uses diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index 49175b3b90b6..1b5f1d9df0ae 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -157,6 +157,7 @@ impl PagestreamClient { PagestreamBeMessage::Exists(_) | PagestreamBeMessage::Nblocks(_) | PagestreamBeMessage::DbSize(_) + | PagestreamBeMessage::GetCompressedPage(_) | PagestreamBeMessage::GetSlruSegment(_) => { anyhow::bail!( "unexpected be message kind in response to getpage request: {}", diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 02a690d4e18b..4a688d36d4a5 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -40,7 +40,14 @@ use tracing::info; /// format, bump this! /// Note that TimelineMetadata uses its own version number to track /// backwards-compatible changes to the metadata format. -pub const STORAGE_FORMAT_VERSION: u16 = 3; +pub const STORAGE_FORMAT_VERSION: u16 = 4; + +/// Minimal sorage format version with compression support +pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4; + +/// Page image compression algorithm +pub const NO_COMPRESSION: u8 = 0; +pub const LZ4_COMPRESSION: u8 = 0; pub const DEFAULT_PG_VERSION: u32 = 15; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f3ceb7d3e6e5..f2b299579b44 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1155,9 +1155,18 @@ impl PageServerHandler { .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx) .await?; - Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { - page, - })) + let compressed = lz4_flex::block::compress(&page); + if compressed.len() < page.len() { + Ok(PagestreamBeMessage::GetCompressedPage( + PagestreamGetPageResponse { + page: Bytes::from(compressed), + }, + )) + } else { + Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { + page, + })) + } } #[instrument(skip_all, fields(shard_id))] diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 727650a5a5ec..2bb9ae19c1c7 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -16,6 +16,7 @@ use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; use itertools::Itertools; +use lz4_flex; use pageserver_api::key::{ dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key, @@ -992,7 +993,15 @@ impl<'a> DatadirModification<'a> { img: Bytes, ) -> anyhow::Result<()> { anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); - self.put(rel_block_to_key(rel, blknum), Value::Image(img)); + let compressed = lz4_flex::block::compress(&img); + if compressed.len() < img.len() { + self.put( + rel_block_to_key(rel, blknum), + Value::CompressedImage(Bytes::from(compressed)), + ); + } else { + self.put(rel_block_to_key(rel, blknum), Value::Image(img)); + } Ok(()) } @@ -1597,6 +1606,10 @@ impl<'a> DatadirModification<'a> { if let Some((_, value)) = values.last() { return if let Value::Image(img) = value { Ok(img.clone()) + } else if let Value::CompressedImage(img) = value { + let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize) + .map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?; + Ok(Bytes::from(decompressed)) } else { // Currently, we never need to read back a WAL record that we // inserted in the same "transaction". All the metadata updates diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 9959d105ebd2..839e0d101eef 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -13,6 +13,8 @@ pub use pageserver_api::key::{Key, KEY_SIZE}; pub enum Value { /// An Image value contains a full copy of the value Image(Bytes), + /// An compressed page image contains a full copy of the page + CompressedImage(Bytes), /// A WalRecord value contains a WAL record that needs to be /// replayed get the full value. Replaying the WAL record /// might need a previous version of the value (if will_init() @@ -22,12 +24,17 @@ pub enum Value { impl Value { pub fn is_image(&self) -> bool { - matches!(self, Value::Image(_)) + match self { + Value::Image(_) => true, + Value::CompressedImage(_) => true, + Value::WalRecord(_) => false, + } } pub fn will_init(&self) -> bool { match self { Value::Image(_) => true, + Value::CompressedImage(_) => true, Value::WalRecord(rec) => rec.will_init(), } } diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 0d33100eadba..4ba68bac1fa5 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -11,13 +11,16 @@ //! len < 128: 0XXXXXXX //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; use crate::virtual_file::VirtualFile; +use crate::{LZ4_COMPRESSION, NO_COMPRESSION}; +use lz4_flex; +use postgres_ffi::BLCKSZ; use std::cmp::min; use std::io::{Error, ErrorKind}; @@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> { self.read_blob_into_buf(offset, &mut buf, ctx).await?; Ok(buf) } + /// Read blob into the given buffer. Any previous contents in the buffer + /// are overwritten. + pub async fn read_compressed_blob( + &self, + offset: u64, + ctx: &RequestContext, + ) -> Result, std::io::Error> { + let blknum = (offset / PAGE_SZ as u64) as u32; + let off = (offset % PAGE_SZ as u64) as usize; + + let buf = self.read_blk(blknum, ctx).await?; + let compression_alg = buf[off]; + let res = self.read_blob(offset + 1, ctx).await?; + if compression_alg == LZ4_COMPRESSION { + lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error") + }) + } else { + assert_eq!(compression_alg, NO_COMPRESSION); + Ok(res) + } + } + /// Read blob into the given buffer. Any previous contents in the buffer /// are overwritten. pub async fn read_blob_into_buf( @@ -211,6 +237,58 @@ impl BlobWriter { (src_buf, Ok(())) } + pub async fn write_compressed_blob(&mut self, srcbuf: Bytes) -> Result { + let offset = self.offset; + + let len = srcbuf.len(); + + let mut io_buf = self.io_buf.take().expect("we always put it back below"); + io_buf.clear(); + let mut is_compressed = false; + if len < 128 { + // Short blob. Write a 1-byte length header + io_buf.put_u8(NO_COMPRESSION); + io_buf.put_u8(len as u8); + } else { + // Write a 4-byte length header + if len > 0x7fff_ffff { + return Err(Error::new( + ErrorKind::Other, + format!("blob too large ({} bytes)", len), + )); + } + if len == BLCKSZ as usize { + let compressed = lz4_flex::block::compress(&srcbuf); + if compressed.len() < len { + io_buf.put_u8(LZ4_COMPRESSION); + let mut len_buf = (compressed.len() as u32).to_be_bytes(); + len_buf[0] |= 0x80; + io_buf.extend_from_slice(&len_buf[..]); + io_buf.extend_from_slice(&compressed[..]); + is_compressed = true; + } + if is_compressed { + io_buf.put_u8(NO_COMPRESSION); + let mut len_buf = (len as u32).to_be_bytes(); + len_buf[0] |= 0x80; + io_buf.extend_from_slice(&len_buf[..]); + } + } + } + let (io_buf, hdr_res) = self.write_all(io_buf).await; + match hdr_res { + Ok(_) => (), + Err(e) => return Err(e), + } + self.io_buf = Some(io_buf); + if is_compressed { + hdr_res.map(|_| offset) + } else { + let (_buf, res) = self.write_all(srcbuf).await; + res.map(|_| offset) + } + } + /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. pub async fn write_blob, Buf: IoBuf + Send>( @@ -227,7 +305,6 @@ impl BlobWriter { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - self.write_all(io_buf).await } else { // Write a 4-byte length header if len > 0x7fff_ffff { @@ -242,8 +319,8 @@ impl BlobWriter { let mut len_buf = (len as u32).to_be_bytes(); len_buf[0] |= 0x80; io_buf.extend_from_slice(&len_buf[..]); - self.write_all(io_buf).await } + self.write_all(io_buf).await } .await; self.io_buf = Some(io_buf); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 299950cc21a7..c944fe5cfdda 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -20,6 +20,7 @@ use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use pageserver_api::models::{ LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, }; +use postgres_ffi::BLCKSZ; use std::cmp::{Ordering, Reverse}; use std::collections::hash_map::Entry; use std::collections::{BinaryHeap, HashMap}; @@ -147,12 +148,13 @@ impl ValuesReconstructState { lsn: Lsn, value: Value, ) -> ValueReconstructSituation { - let state = self + let mut error: Option = None; + let key_state = self .keys .entry(*key) .or_insert(Ok(VectoredValueReconstructState::default())); - if let Ok(state) = state { + let situation = if let Ok(state) = key_state { let key_done = match state.situation { ValueReconstructSituation::Complete => unreachable!(), ValueReconstructSituation::Continue => match value { @@ -160,6 +162,21 @@ impl ValuesReconstructState { state.img = Some((lsn, img)); true } + Value::CompressedImage(img) => { + match lz4_flex::block::decompress(&img, BLCKSZ as usize) { + Ok(decompressed) => { + state.img = Some((lsn, Bytes::from(decompressed))); + true + } + Err(e) => { + error = Some(PageReconstructError::from(anyhow::anyhow!( + "Failed to decompress blobrom virtual file: {}", + e + ))); + true + } + } + } Value::WalRecord(rec) => { let reached_cache = state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn); @@ -178,7 +195,11 @@ impl ValuesReconstructState { state.situation } else { ValueReconstructSituation::Complete + }; + if let Some(err) = error { + *key_state = Err(err); } + situation } /// Returns the Lsn at which this key is cached if one exists. diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index b7132ee3bfb2..bd0d92e553b5 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -44,12 +44,13 @@ use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; +use postgres_ffi::BLCKSZ; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::File; @@ -813,6 +814,12 @@ impl DeltaLayerInner { need_image = false; break; } + Value::CompressedImage(img) => { + let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?; + reconstruct_state.img = Some((entry_lsn, Bytes::from(decompressed))); + need_image = false; + break; + } Value::WalRecord(rec) => { let will_init = rec.will_init(); reconstruct_state.records.push((entry_lsn, rec)); @@ -1102,6 +1109,9 @@ impl DeltaLayerInner { Value::Image(img) => { format!(" img {} bytes", img.len()) } + Value::CompressedImage(img) => { + format!(" compressed img {} bytes", img.len()) + } Value::WalRecord(rec) => { let wal_desc = walrecord::describe_wal_record(&rec)?; format!( @@ -1138,6 +1148,11 @@ impl DeltaLayerInner { let checkpoint = CheckPoint::decode(&img)?; println!(" CHECKPOINT: {:?}", checkpoint); } + Value::CompressedImage(img) => { + let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?; + let checkpoint = CheckPoint::decode(&decompressed)?; + println!(" CHECKPOINT: {:?}", checkpoint); + } Value::WalRecord(_rec) => { println!(" unexpected walrecord value for checkpoint key"); } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 14c79e413cbf..0ca356646a5a 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -39,7 +39,9 @@ use crate::tenant::vectored_blob_io::{ }; use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; -use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; +use crate::{ + COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX, +}; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; @@ -153,6 +155,7 @@ pub struct ImageLayerInner { // values copied from summary index_start_blk: u32, index_root_blk: u32, + format_version: u16, lsn: Lsn, @@ -167,6 +170,7 @@ impl std::fmt::Debug for ImageLayerInner { f.debug_struct("ImageLayerInner") .field("index_start_blk", &self.index_start_blk) .field("index_root_blk", &self.index_root_blk) + .field("format_version", &self.format_version) .finish() } } @@ -408,6 +412,7 @@ impl ImageLayerInner { Ok(Ok(ImageLayerInner { index_start_blk: actual_summary.index_start_blk, index_root_blk: actual_summary.index_root_blk, + format_version: actual_summary.format_version, lsn, file, file_id, @@ -436,18 +441,20 @@ impl ImageLayerInner { ) .await? { - let blob = block_reader - .block_cursor() - .read_blob( - offset, - &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::ImageLayerValue) - .build(), - ) - .await - .with_context(|| format!("failed to read value from offset {}", offset))?; - let value = Bytes::from(blob); + let ctx = RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::ImageLayerValue) + .build(); + let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION { + block_reader + .block_cursor() + .read_compressed_blob(offset, &ctx) + .await + } else { + block_reader.block_cursor().read_blob(offset, &ctx).await + }) + .with_context(|| format!("failed to read value from offset {}", offset))?; + let value = Bytes::from(blob); reconstruct_state.img = Some((self.lsn, value)); Ok(ValueReconstructResult::Complete) } else { @@ -658,10 +665,7 @@ impl ImageLayerWriterInner { /// async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); - let (_img, res) = self.blob_writer.write_blob(img).await; - // TODO: re-use the buffer for `img` further upstack - let off = res?; - + let off = self.blob_writer.write_compressed_blob(img).await?; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); self.tree.append(&keybuf, off)?; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 5f1db21d493b..d1cea61bb442 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -14,9 +14,12 @@ use crate::tenant::timeline::GetVectoredError; use crate::tenant::{PageReconstructError, Timeline}; use crate::walrecord; use anyhow::{anyhow, ensure, Result}; +use bytes::Bytes; +use lz4_flex; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; +use postgres_ffi::BLCKSZ; use std::collections::{BinaryHeap, HashMap, HashSet}; use std::sync::{Arc, OnceLock}; use tracing::*; @@ -133,6 +136,9 @@ impl InMemoryLayer { Ok(Value::Image(img)) => { write!(&mut desc, " img {} bytes", img.len())?; } + Ok(Value::CompressedImage(img)) => { + write!(&mut desc, " compressed img {} bytes", img.len())?; + } Ok(Value::WalRecord(rec)) => { let wal_desc = walrecord::describe_wal_record(&rec).unwrap(); write!( @@ -184,6 +190,11 @@ impl InMemoryLayer { reconstruct_state.img = Some((*entry_lsn, img)); return Ok(ValueReconstructResult::Complete); } + Value::CompressedImage(img) => { + let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?; + reconstruct_state.img = Some((*entry_lsn, Bytes::from(decompressed))); + return Ok(ValueReconstructResult::Complete); + } Value::WalRecord(rec) => { let will_init = rec.will_init(); reconstruct_state.records.push((*entry_lsn, rec)); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 63a2b30d09bf..69d9522b1c22 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -471,8 +471,9 @@ impl WalIngest { && decoded.xl_rmid == pg_constants::RM_XLOG_ID && (decoded.xl_info == pg_constants::XLOG_FPI || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) - // compression of WAL is not yet supported: fall back to storing the original WAL record - && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)? + // only lz4 compression of WAL is now supported, for other compression algorithms fall back to storing the original WAL record + && (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)? || + postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.tline.pg_version)?) // do not materialize null pages because them most likely be soon replaced with real data && blk.bimg_len != 0 { @@ -480,7 +481,21 @@ impl WalIngest { let img_len = blk.bimg_len as usize; let img_offs = blk.bimg_offset as usize; let mut image = BytesMut::with_capacity(BLCKSZ as usize); - image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); + if postgres_ffi::bkpimage_is_compressed_lz4( + blk.bimg_info, + modification.tline.pg_version, + )? { + let decompressed_img_len = (BLCKSZ - blk.hole_length) as usize; + let decompressed = lz4_flex::block::decompress( + &decoded.record[img_offs..img_offs + img_len], + decompressed_img_len, + ) + .map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?; + assert_eq!(decompressed.len(), decompressed_img_len); + image.extend_from_slice(&decompressed); + } else { + image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); + } if blk.hole_length != 0 { let tail = image.split_off(blk.hole_offset as usize); diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 0bcb9545a674..1b27c8112d70 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -18,7 +18,7 @@ OBJS = \ PG_CPPFLAGS = -I$(libpq_srcdir) SHLIB_LINK_INTERNAL = $(libpq) -SHLIB_LINK = -lcurl +SHLIB_LINK = -lcurl -llz4 EXTENSION = neon DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 2889ffacae85..3285820e0ae5 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -44,6 +44,7 @@ typedef enum T_NeonErrorResponse, T_NeonDbSizeResponse, T_NeonGetSlruSegmentResponse, + T_NeonGetCompressedPageResponse } NeonMessageTag; /* base struct for c-style inheritance */ @@ -144,6 +145,15 @@ typedef struct #define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ)) +typedef struct +{ + NeonMessageTag tag; + uint16 compressed_size; + char page[FLEXIBLE_ARRAY_MEMBER]; +} NeonGetCompressedPageResponse; + +#define PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressded_size) (MAXALIGN(offsetof(NeonGetCompressedPageResponse, page) + compressed_size)) + typedef struct { NeonMessageTag tag; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 213e39632838..84c68af74c77 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -45,6 +45,10 @@ */ #include "postgres.h" +#ifdef USE_LZ4 +#include +#endif + #include "access/xact.h" #include "access/xlog.h" #include "access/xlogdefs.h" @@ -1059,6 +1063,7 @@ nm_pack_request(NeonRequest *msg) case T_NeonExistsResponse: case T_NeonNblocksResponse: case T_NeonGetPageResponse: + case T_NeonGetCompressedPageResponse: case T_NeonErrorResponse: case T_NeonDbSizeResponse: case T_NeonGetSlruSegmentResponse: @@ -1114,6 +1119,21 @@ nm_unpack_response(StringInfo s) Assert(msg_resp->tag == T_NeonGetPageResponse); + resp = (NeonResponse *) msg_resp; + break; + } + case T_NeonGetCompressedPageResponse: + { + NeonGetCompressedPageResponse *msg_resp; + uint16 compressed_size = pq_getmsgint(s, 2); + msg_resp = palloc0(PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressed_size)); + msg_resp->tag = tag; + msg_resp->compressed_size = compressed_size; + memcpy(msg_resp->page, pq_getmsgbytes(s, compressed_size), compressed_size); + pq_getmsgend(s); + + Assert(msg_resp->tag == T_NeonGetCompressedPageResponse); + resp = (NeonResponse *) msg_resp; break; } @@ -1287,6 +1307,14 @@ nm_to_string(NeonMessage *msg) appendStringInfoChar(&s, '}'); break; } + case T_NeonGetCompressedPageResponse: + { + NeonGetCompressedPageResponse *msg_resp = (NeonGetCompressedPageResponse *) msg; + appendStringInfoString(&s, "{\"type\": \"NeonGetCompressedPageResponse\""); + appendStringInfo(&s, ", \"compressed_page_size\": \"%d\"}", msg_resp->compressed_size); + appendStringInfoChar(&s, '}'); + break; + } case T_NeonErrorResponse: { NeonErrorResponse *msg_resp = (NeonErrorResponse *) msg; @@ -2205,6 +2233,29 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, lfc_write(rinfo, forkNum, blkno, buffer); break; + case T_NeonGetCompressedPageResponse: + { +#ifndef USE_LZ4 + ereport(ERROR, \ + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \ + errmsg("compression method lz4 not supported"), \ + errdetail("This functionality requires the server to be built with lz4 support."), \ + errhint("You need to rebuild PostgreSQL using %s.", "--with-lz4"))) +#else + NeonGetCompressedPageResponse* cp = (NeonGetCompressedPageResponse *) resp; + int rc = LZ4_decompress_safe(cp->page, + buffer, + cp->compressed_size, + BLCKSZ); + if (rc != BLCKSZ) { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed lz4 data is corrupt"))); + } + lfc_write(rinfo, forkNum, blkno, buffer); +#endif + break; + } case T_NeonErrorResponse: ereport(ERROR, (errcode(ERRCODE_IO_ERROR),