From 397fa65710ce7b2e9d2db50dfc713a0e7f847ab9 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 24 Feb 2025 11:40:31 +0200 Subject: [PATCH] chore: more comments around registered buffers functionality --- util/fibers/uring_file.h | 3 +++ util/fibers/uring_proactor.cc | 37 +++++++++++++++++++++++++---------- util/fibers/uring_proactor.h | 22 +++++++++++++-------- util/fibers/uring_types.h | 19 ++++++++++++++++++ 4 files changed, 63 insertions(+), 18 deletions(-) create mode 100644 util/fibers/uring_types.h diff --git a/util/fibers/uring_file.h b/util/fibers/uring_file.h index 5eae7c04..2a3ded7f 100644 --- a/util/fibers/uring_file.h +++ b/util/fibers/uring_file.h @@ -69,6 +69,9 @@ class LinuxFile { // Async versions of Read void ReadFixedAsync(io::MutableBytes dest, off_t offset, unsigned buf_index, AsyncCb cb); void ReadAsync(io::MutableBytes dest, off_t offset, AsyncCb cb); + + // io_uring fixed version - src must point within the region, contained by the fixed buffer that + // is specified by buf_index. See io_uring_prep_write_fixed(3) for more details. void WriteFixedAsync(io::Bytes src, off_t offset, unsigned buf_index, AsyncCb cb); void WriteAsync(io::Bytes src, off_t offset, AsyncCb cb); diff --git a/util/fibers/uring_proactor.cc b/util/fibers/uring_proactor.cc index 716eebf1..14d58d21 100644 --- a/util/fibers/uring_proactor.cc +++ b/util/fibers/uring_proactor.cc @@ -58,7 +58,7 @@ constexpr uint64_t kUserDataCbIndex = 1024; constexpr uint16_t kMsgRingSubmitTag = 1; constexpr uint16_t kTimeoutSubmitTag = 2; constexpr uint16_t kCqeBatchLen = 128; - +constexpr size_t kAlign = 4096; } // namespace struct UringProactor::BufRingGroup { @@ -170,7 +170,7 @@ UringProactor::~UringProactor() { CHECK(is_stopped_); if (thread_id_ != -1U) { if (buf_pool_.backing) { - munmap(buf_pool_.backing, buf_pool_.segments.Size() * UringBuf::kAlign); + munmap(buf_pool_.backing, buf_pool_.segments.Size() * kAlign); io_uring_unregister_buffers(&ring_); } @@ -423,7 +423,7 @@ SubmitEntry UringProactor::GetSubmitEntry(CbType cb, uint32_t submit_tag) { } unsigned UringProactor::RegisterBuffers(size_t size) { - size = (size + UringBuf::kAlign - 1) / UringBuf::kAlign * UringBuf::kAlign; + size = (size + kAlign - 1) / kAlign * kAlign; // Use mmap to create the backing void* ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); @@ -441,7 +441,7 @@ unsigned UringProactor::RegisterBuffers(size_t size) { } buf_pool_.backing = reinterpret_cast(ptr); - buf_pool_.segments.Grow(size / UringBuf::kAlign); + buf_pool_.segments.Grow(size / kAlign); return 0; } @@ -449,10 +449,10 @@ unsigned UringProactor::RegisterBuffers(size_t size) { std::optional UringProactor::RequestBuffer(size_t size) { if (buf_pool_.backing) { // We keep track not of bytes, but 4kb segments and round up - size_t segment_cnt = (size + UringBuf::kAlign - 1) / UringBuf::kAlign; + size_t segment_cnt = (size + kAlign - 1) / kAlign; if (auto offset = buf_pool_.segments.Request(segment_cnt)) { - uint8_t* ptr = buf_pool_.backing + *offset * UringBuf::kAlign; - return UringBuf{{ptr, segment_cnt * UringBuf::kAlign}, 0}; + uint8_t* ptr = buf_pool_.backing + *offset * kAlign; + return UringBuf{{ptr, segment_cnt * kAlign}, 0}; } } return std::nullopt; @@ -461,7 +461,24 @@ std::optional UringProactor::RequestBuffer(size_t size) { void UringProactor::ReturnBuffer(UringBuf buf) { DCHECK(buf.buf_idx); - size_t segments = (buf.bytes.data() - buf_pool_.backing) / UringBuf::kAlign; + size_t segments = (buf.bytes.data() - buf_pool_.backing) / kAlign; + buf_pool_.segments.Return(segments); +} + +optional UringProactor::RequestRegisteredSlice(size_t size) { + if (buf_pool_.backing) { + // We keep track not of bytes, but 4kb segments and round up + size_t segment_cnt = (size + kAlign - 1) / kAlign; + if (auto offset = buf_pool_.segments.Request(segment_cnt)) { + uint8_t* ptr = buf_pool_.backing + *offset * kAlign; + return RegisteredSlice{{ptr, segment_cnt * kAlign}, 0}; + } + } + return std::nullopt; +} + +void UringProactor::ReturnRegisteredSlice(RegisteredSlice buf) { + size_t segments = (buf.bytes.data() - buf_pool_.backing) / kAlign; buf_pool_.segments.Return(segments); } @@ -616,8 +633,8 @@ uint16_t UringProactor::EnqueueMultishotCompletion(uint16_t group_id, IoResult r return buf_group.HandleCompletion(bid, tail_id, res); } -auto UringProactor::PullMultiShotCompletion(uint16_t group_id, - uint16_t* head_id) -> CompletionResult { +auto UringProactor::PullMultiShotCompletion(uint16_t group_id, uint16_t* head_id) + -> CompletionResult { DCHECK_LT(group_id, bufring_groups_.size()); DCHECK_NE(*head_id, kMultiShotUndef); diff --git a/util/fibers/uring_proactor.h b/util/fibers/uring_proactor.h index 07e34b1e..7a957327 100644 --- a/util/fibers/uring_proactor.h +++ b/util/fibers/uring_proactor.h @@ -1,4 +1,4 @@ -// Copyright 2022, Roman Gershman. All rights reserved. +// Copyright 2025, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // @@ -10,6 +10,7 @@ #include "base/segment_pool.h" #include "util/fibers/proactor_base.h" #include "util/fibers/submit_entry.h" +#include "util/fibers/uring_types.h" namespace util { namespace fb2 { @@ -19,13 +20,14 @@ class Scheduler; } // Aligned buffer that is optionally part of registered buffer (io_uring_register_buffers) -struct UringBuf { - static constexpr size_t kAlign = 4096; - +// Deprecated in favor of RegisteredSlice; +struct UringBufDeprecated { io::MutableBytes bytes; // buf, nbytes std::optional buf_idx; // buf_idx }; +using UringBuf = UringBufDeprecated; + class UringProactor : public ProactorBase { UringProactor(const UringProactor&) = delete; void operator=(const UringProactor&) = delete; @@ -108,8 +110,12 @@ class UringProactor : public ProactorBase { // Request buffer of given size from the buffer pool registered with RegisterBuffers. // Returns none if there's no space left in the pool. // Must be returned with ReturnBuffer. - std::optional RequestBuffer(size_t size); - void ReturnBuffer(UringBuf buf); + std::optional RequestBuffer(size_t size); + void ReturnBuffer(UringBufDeprecated buf); + + std::optional RequestRegisteredSlice(size_t size); + void ReturnRegisteredSlice(RegisteredSlice buf); + // Registers an iouring buffer ring (see io_uring_register_buf_ring(3)). // Available from kernel 5.19. nentries must be less than 2^15 and should be power of 2. @@ -226,8 +232,8 @@ class UringProactor : public ProactorBase { // Keeps track of requested buffers struct { uint8_t* backing = nullptr; - base::SegmentPool segments{}; - } buf_pool_{}; + base::SegmentPool segments; + } buf_pool_; int32_t next_free_ce_ = -1; uint32_t pending_cb_cnt_ = 0; diff --git a/util/fibers/uring_types.h b/util/fibers/uring_types.h new file mode 100644 index 00000000..597ef204 --- /dev/null +++ b/util/fibers/uring_types.h @@ -0,0 +1,19 @@ +// Copyright 2025, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include "io/io.h" + +namespace util { +namespace fb2 { + +// A slice that is sub-region of registered buffer (io_uring_register_buffers) +struct RegisteredSlice { + io::MutableBytes bytes; // buf, nbytes + unsigned buf_idx; // registered buffer id +}; + +} // namespace fb2 +} // namespace util