Skip to content

Commit

Permalink
Limit buffer size in BufferedInputStream::SetBufferSize
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Jun 26, 2024
1 parent 508bdaa commit d025449
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
9 changes: 9 additions & 0 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,20 @@ class BufferedInputStream::Impl : public BufferedBase {
return raw_pos_ - bytes_buffered_;
}

// Resize internal read buffer. Note that the internal buffer-size
// should be not larger than the raw_read_bound_.
Status SetBufferSize(int64_t new_buffer_size) {
if (new_buffer_size <= 0) {
return Status::Invalid("Buffer size should be positive");
}
if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) {
return Status::Invalid("Cannot shrink read buffer if buffered data remains");
}
if (raw_read_bound_ >= 0) {
// No need to reserve space for more than the total remaining number of bytes.
new_buffer_size = std::min(new_buffer_size,
bytes_buffered_ + (raw_read_bound_ - raw_read_total_));
}
return ResizeBuffer(new_buffer_size);
}

Expand Down Expand Up @@ -433,6 +440,8 @@ class BufferedInputStream::Impl : public BufferedBase {
private:
std::shared_ptr<InputStream> raw_;
int64_t raw_read_total_;
// a bound on the maximum number of bytes to read from the raw input stream.
// The default -1 indicates that it is unbounded
int64_t raw_read_bound_;

// Number of remaining bytes in the buffer, to be reduced on each read from
Expand Down
30 changes: 28 additions & 2 deletions cpp/src/arrow/io/buffered_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> {
local_pool_ = MemoryPool::CreateDefault();
}

void MakeExample1(int64_t buffer_size, MemoryPool* pool = default_memory_pool()) {
void MakeExample1(int64_t buffer_size, MemoryPool* pool = default_memory_pool(),
bool fill_raw_read_bound = false) {
test_data_ = kExample1;

ASSERT_OK_AND_ASSIGN(auto file_out, FileOutputStream::Open(path_));
Expand All @@ -338,7 +339,14 @@ class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> {

ASSERT_OK_AND_ASSIGN(auto file_in, ReadableFile::Open(path_));
raw_ = file_in;
ASSERT_OK_AND_ASSIGN(buffered_, BufferedInputStream::Create(buffer_size, pool, raw_));
if (!fill_raw_read_bound) {
ASSERT_OK_AND_ASSIGN(buffered_,
BufferedInputStream::Create(buffer_size, pool, raw_));
} else {
ASSERT_OK_AND_ASSIGN(auto file_size, file_in->GetSize());
ASSERT_OK_AND_ASSIGN(
buffered_, BufferedInputStream::Create(buffer_size, pool, raw_, file_size));
}
}

protected:
Expand Down Expand Up @@ -472,6 +480,24 @@ TEST_F(TestBufferedInputStream, SetBufferSize) {
ASSERT_OK(buffered_->SetBufferSize(5));
}

// GH-43060: Internal buffer should not greater than the
// bytes could buffer.
TEST_F(TestBufferedInputStream, BufferSizeLimit) {
{
MakeExample1(/*buffer_size=*/100000, default_memory_pool(),
/*fill_raw_read_bound=*/true);
// buffer_sizes should be limited to the size of the data
EXPECT_EQ(test_data_.size(), buffered_->buffer_size());
}

{
MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*fill_raw_read_bound=*/true);
ASSERT_OK(buffered_->Read(10));
ASSERT_OK(buffered_->SetBufferSize(/*new_buffer_size=*/100000));
EXPECT_EQ(test_data_.size() - 10, buffered_->buffer_size());
}
}

class TestBufferedInputStreamBound : public ::testing::Test {
public:
void SetUp() { CreateExample(/*bounded=*/true); }
Expand Down

0 comments on commit d025449

Please sign in to comment.