Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: more comments around registered buffers functionality #383

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions util/fibers/uring_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class LinuxFile {
// Async versions of Read
void ReadFixedAsync(io::MutableBytes dest, off_t offset, unsigned buf_index, AsyncCb cb);
void ReadAsync(io::MutableBytes dest, off_t offset, AsyncCb cb);

// io_uring fixed version - src must point within the region, contained by the fixed buffer that
// is specified by buf_index. See io_uring_prep_write_fixed(3) for more details.
void WriteFixedAsync(io::Bytes src, off_t offset, unsigned buf_index, AsyncCb cb);
void WriteAsync(io::Bytes src, off_t offset, AsyncCb cb);

Expand Down
37 changes: 27 additions & 10 deletions util/fibers/uring_proactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ constexpr uint64_t kUserDataCbIndex = 1024;
constexpr uint16_t kMsgRingSubmitTag = 1;
constexpr uint16_t kTimeoutSubmitTag = 2;
constexpr uint16_t kCqeBatchLen = 128;

constexpr size_t kAlign = 4096;
} // namespace

struct UringProactor::BufRingGroup {
Expand Down Expand Up @@ -170,7 +170,7 @@ UringProactor::~UringProactor() {
CHECK(is_stopped_);
if (thread_id_ != -1U) {
if (buf_pool_.backing) {
munmap(buf_pool_.backing, buf_pool_.segments.Size() * UringBuf::kAlign);
munmap(buf_pool_.backing, buf_pool_.segments.Size() * kAlign);
io_uring_unregister_buffers(&ring_);
}

Expand Down Expand Up @@ -423,7 +423,7 @@ SubmitEntry UringProactor::GetSubmitEntry(CbType cb, uint32_t submit_tag) {
}

unsigned UringProactor::RegisterBuffers(size_t size) {
size = (size + UringBuf::kAlign - 1) / UringBuf::kAlign * UringBuf::kAlign;
size = (size + kAlign - 1) / kAlign * kAlign;

// Use mmap to create the backing
void* ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
Expand All @@ -441,18 +441,18 @@ unsigned UringProactor::RegisterBuffers(size_t size) {
}

buf_pool_.backing = reinterpret_cast<uint8_t*>(ptr);
buf_pool_.segments.Grow(size / UringBuf::kAlign);
buf_pool_.segments.Grow(size / kAlign);

return 0;
}

std::optional<UringBuf> UringProactor::RequestBuffer(size_t size) {
if (buf_pool_.backing) {
// We keep track not of bytes, but 4kb segments and round up
size_t segment_cnt = (size + UringBuf::kAlign - 1) / UringBuf::kAlign;
size_t segment_cnt = (size + kAlign - 1) / kAlign;
if (auto offset = buf_pool_.segments.Request(segment_cnt)) {
uint8_t* ptr = buf_pool_.backing + *offset * UringBuf::kAlign;
return UringBuf{{ptr, segment_cnt * UringBuf::kAlign}, 0};
uint8_t* ptr = buf_pool_.backing + *offset * kAlign;
return UringBuf{{ptr, segment_cnt * kAlign}, 0};
}
}
return std::nullopt;
Expand All @@ -461,7 +461,24 @@ std::optional<UringBuf> UringProactor::RequestBuffer(size_t size) {
void UringProactor::ReturnBuffer(UringBuf buf) {
DCHECK(buf.buf_idx);

size_t segments = (buf.bytes.data() - buf_pool_.backing) / UringBuf::kAlign;
size_t segments = (buf.bytes.data() - buf_pool_.backing) / kAlign;
buf_pool_.segments.Return(segments);
}

optional<RegisteredSlice> UringProactor::RequestRegisteredSlice(size_t size) {
if (buf_pool_.backing) {
// We keep track not of bytes, but 4kb segments and round up
size_t segment_cnt = (size + kAlign - 1) / kAlign;
if (auto offset = buf_pool_.segments.Request(segment_cnt)) {
uint8_t* ptr = buf_pool_.backing + *offset * kAlign;
return RegisteredSlice{{ptr, segment_cnt * kAlign}, 0};
}
}
return std::nullopt;
}

void UringProactor::ReturnRegisteredSlice(RegisteredSlice buf) {
size_t segments = (buf.bytes.data() - buf_pool_.backing) / kAlign;
buf_pool_.segments.Return(segments);
}

Expand Down Expand Up @@ -616,8 +633,8 @@ uint16_t UringProactor::EnqueueMultishotCompletion(uint16_t group_id, IoResult r
return buf_group.HandleCompletion(bid, tail_id, res);
}

auto UringProactor::PullMultiShotCompletion(uint16_t group_id,
uint16_t* head_id) -> CompletionResult {
auto UringProactor::PullMultiShotCompletion(uint16_t group_id, uint16_t* head_id)
-> CompletionResult {
DCHECK_LT(group_id, bufring_groups_.size());
DCHECK_NE(*head_id, kMultiShotUndef);

Expand Down
22 changes: 14 additions & 8 deletions util/fibers/uring_proactor.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// Copyright 2025, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//

Expand All @@ -10,6 +10,7 @@
#include "base/segment_pool.h"
#include "util/fibers/proactor_base.h"
#include "util/fibers/submit_entry.h"
#include "util/fibers/uring_types.h"

namespace util {
namespace fb2 {
Expand All @@ -19,13 +20,14 @@ class Scheduler;
}

// Aligned buffer that is optionally part of registered buffer (io_uring_register_buffers)
struct UringBuf {
static constexpr size_t kAlign = 4096;

// Deprecated in favor of RegisteredSlice;
struct UringBufDeprecated {
io::MutableBytes bytes; // buf, nbytes
std::optional<unsigned> buf_idx; // buf_idx
};

using UringBuf = UringBufDeprecated;

class UringProactor : public ProactorBase {
UringProactor(const UringProactor&) = delete;
void operator=(const UringProactor&) = delete;
Expand Down Expand Up @@ -108,8 +110,12 @@ class UringProactor : public ProactorBase {
// Request buffer of given size from the buffer pool registered with RegisterBuffers.
// Returns none if there's no space left in the pool.
// Must be returned with ReturnBuffer.
std::optional<UringBuf> RequestBuffer(size_t size);
void ReturnBuffer(UringBuf buf);
std::optional<UringBufDeprecated> RequestBuffer(size_t size);
void ReturnBuffer(UringBufDeprecated buf);

std::optional<RegisteredSlice> RequestRegisteredSlice(size_t size);
void ReturnRegisteredSlice(RegisteredSlice buf);


// Registers an iouring buffer ring (see io_uring_register_buf_ring(3)).
// Available from kernel 5.19. nentries must be less than 2^15 and should be power of 2.
Expand Down Expand Up @@ -226,8 +232,8 @@ class UringProactor : public ProactorBase {
// Keeps track of requested buffers
struct {
uint8_t* backing = nullptr;
base::SegmentPool segments{};
} buf_pool_{};
base::SegmentPool segments;
} buf_pool_;

int32_t next_free_ce_ = -1;
uint32_t pending_cb_cnt_ = 0;
Expand Down
19 changes: 19 additions & 0 deletions util/fibers/uring_types.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2025, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include "io/io.h"

namespace util {
namespace fb2 {

// A slice that is sub-region of registered buffer (io_uring_register_buffers)
struct RegisteredSlice {
io::MutableBytes bytes; // buf, nbytes
unsigned buf_idx; // registered buffer id
};

} // namespace fb2
} // namespace util
Loading