Skip to content

Commit

Permalink
resolve comment
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Mar 20, 2024
1 parent f3a6f29 commit 3043ec9
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 29 deletions.
15 changes: 8 additions & 7 deletions cpp/src/arrow/io/compressed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <algorithm>
#include <cstring>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -233,7 +234,7 @@ class CompressedInputStream::Impl {
: pool_(pool),
raw_(raw),
is_open_(true),
supports_zero_copy_from_raw_(raw->supports_zero_copy()),
supports_zero_copy_from_raw_(raw_->supports_zero_copy()),
compressed_pos_(0),
decompressed_pos_(0),
fresh_decompressor_(false),
Expand Down Expand Up @@ -280,9 +281,6 @@ class CompressedInputStream::Impl {
RETURN_NOT_OK(
compressed_for_non_zero_copy_->Resize(kChunkSize, /*shrink_to_fit=*/false));
}
// set compressed_ to nullptr to avoid `compressed_for_non_zero_copy_` being
// referenced twice, which would making it "immutable".
compressed_ = nullptr;
ARROW_ASSIGN_OR_RAISE(
int64_t read_size,
raw_->Read(kChunkSize,
Expand Down Expand Up @@ -312,7 +310,8 @@ class CompressedInputStream::Impl {
ARROW_ASSIGN_OR_RAISE(decompressed_,
AllocateResizableBuffer(decompress_size, pool_));
} else {
RETURN_NOT_OK(decompressed_->Resize(decompress_size, /*shrink_to_fit=*/false));
// Shrinking the buffer if it's already large enough
RETURN_NOT_OK(decompressed_->Resize(decompress_size, /*shrink_to_fit=*/true));
}
decompressed_pos_ = 0;

Expand All @@ -328,7 +327,9 @@ class CompressedInputStream::Impl {
fresh_decompressor_ = false;
}
if (result.bytes_written > 0 || !result.need_more_output || input_len == 0) {
RETURN_NOT_OK(decompressed_->Resize(result.bytes_written));
// Not calling shrink_to_fit here because we're likely to reusing the buffer.
RETURN_NOT_OK(
decompressed_->Resize(result.bytes_written, /*shrink_to_fit=*/false));
break;
}
DCHECK_EQ(result.bytes_written, 0);
Expand Down Expand Up @@ -441,7 +442,7 @@ Result<std::shared_ptr<CompressedInputStream>> CompressedInputStream::Make(
Codec* codec, const std::shared_ptr<InputStream>& raw, MemoryPool* pool) {
// CAUTION: codec is not owned
std::shared_ptr<CompressedInputStream> res(new CompressedInputStream);
res->impl_.reset(new Impl(pool, std::move(raw)));
res->impl_.reset(new Impl(pool, raw));
RETURN_NOT_OK(res->impl_->Init(codec));
return res;
}
Expand Down
54 changes: 32 additions & 22 deletions cpp/src/arrow/io/compressed_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class NonZeroCopyBufferReader final : public InputStream {
::arrow::io::BufferReader reader_;
};

template <typename BufReader, Compression::type COMPRESSION>
template <typename BufReader, Compression::type COMPRESSION, bool ReadIntoBuffer>
static void CompressionInputBenchmark(::benchmark::State& state) {
std::vector<uint8_t> data = MakeCompressibleData(/*data_size=*/state.range(0));
int64_t per_read_bytes = state.range(1);
Expand All @@ -108,28 +108,50 @@ static void CompressionInputBenchmark(::benchmark::State& state) {
for (auto _ : state) {
state.PauseTiming();
auto reader = std::make_shared<BufReader>(buf);
[[maybe_unused]] auto read_buffer =
::arrow::AllocateBuffer(per_read_bytes).ValueOrDie();
state.ResumeTiming();
// Put `CompressedInputStream::Make` in timing.
auto input_stream =
::arrow::io::CompressedInputStream::Make(codec.get(), reader).ValueOrDie();
auto read_buffer = ::arrow::AllocateBuffer(per_read_bytes).ValueOrDie();
state.ResumeTiming();
int64_t remaining_size = data.size();
while (remaining_size > 0) {
auto value = input_stream->Read(per_read_bytes, read_buffer->mutable_data());
ABORT_NOT_OK(value);
remaining_size -= value.ValueOrDie();
if constexpr (ReadIntoBuffer) {
auto value = input_stream->Read(per_read_bytes, read_buffer->mutable_data());
ABORT_NOT_OK(value);
remaining_size -= value.ValueOrDie();
} else {
auto value = input_stream->Read(per_read_bytes);
ABORT_NOT_OK(value);
remaining_size -= value.ValueOrDie()->size();
}
}
}
state.SetBytesProcessed(length * state.iterations());
}

template <Compression::type COMPRESSION>
static void CompressionInputZeroCopyBenchmarkIntoBuffer(::benchmark::State& state) {
CompressionInputBenchmark<::arrow::io::BufferReader, COMPRESSION,
/*ReadIntoBuffer=*/true>(state);
}

template <Compression::type COMPRESSION>
static void CompressionInputNonZeroCopyBenchmarkIntoBuffer(::benchmark::State& state) {
CompressionInputBenchmark<NonZeroCopyBufferReader, COMPRESSION,
/*ReadIntoBuffer=*/true>(state);
}

template <Compression::type COMPRESSION>
static void CompressionInputZeroCopyBenchmark(::benchmark::State& state) {
CompressionInputBenchmark<::arrow::io::BufferReader, COMPRESSION>(state);
CompressionInputBenchmark<::arrow::io::BufferReader, COMPRESSION,
/*ReadIntoBuffer=*/false>(state);
}

template <Compression::type COMPRESSION>
static void CompressionInputNonZeroCopyBenchmark(::benchmark::State& state) {
CompressionInputBenchmark<NonZeroCopyBufferReader, COMPRESSION>(state);
CompressionInputBenchmark<NonZeroCopyBufferReader, COMPRESSION,
/*ReadIntoBuffer=*/false>(state);
}

static void CompressedInputArguments(::benchmark::internal::Benchmark* b) {
Expand All @@ -142,21 +164,9 @@ static void CompressedInputArguments(::benchmark::internal::Benchmark* b) {
->Args({1024 * 1024, 1024 * 1024});
}

#ifdef ARROW_WITH_ZLIB
BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmark, ::arrow::Compression::GZIP)
->Apply(CompressedInputArguments);
BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmark, ::arrow::Compression::GZIP)
->Apply(CompressedInputArguments);
#endif

#ifdef ARROW_WITH_ZSTD
BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmark, ::arrow::Compression::ZSTD)
->Apply(CompressedInputArguments);
BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmark, ::arrow::Compression::ZSTD)
->Apply(CompressedInputArguments);
#endif

#ifdef ARROW_WITH_LZ4
// Benchmark LZ4 because it's lightweight, which makes benchmarking focused on the
// overhead of the compression input stream.
BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmark, ::arrow::Compression::LZ4_FRAME)
->Apply(CompressedInputArguments);
BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmark, ::arrow::Compression::LZ4_FRAME)
Expand Down

0 comments on commit 3043ec9

Please sign in to comment.