Skip to content

Commit

Permalink
Count decompressed bytes; naming nits
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Mar 27, 2024
1 parent e50c3fc commit 4c97871
Showing 1 changed file with 56 additions and 43 deletions.
99 changes: 56 additions & 43 deletions cpp/src/arrow/io/compressed_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

namespace arrow::io {

using ::arrow::Compression;

std::vector<uint8_t> MakeCompressibleData(int data_size) {
// XXX This isn't a real-world corpus so doesn't really represent the
// comparative qualities of the algorithms
Expand Down Expand Up @@ -93,72 +95,83 @@ class NonZeroCopyBufferReader final : public InputStream {
::arrow::io::BufferReader reader_;
};

template <typename BufReader, Compression::type COMPRESSION, bool ReadIntoBuffer>
static void CompressionInputBenchmark(::benchmark::State& state) {
std::vector<uint8_t> data =
MakeCompressibleData(/*data_size=*/static_cast<int>(state.range(0)));
int64_t per_read_bytes = state.range(1);
auto codec = ::arrow::util::Codec::Create(COMPRESSION).ValueOrDie();
enum class BufferReadMode { ProvidedByCaller, ReturnedByCallee };

template <typename BufReader, BufferReadMode Mode>
static void CompressedInputStreamBenchmark(::benchmark::State& state,
Compression::type compression) {
const int64_t input_size = state.range(0);
const int64_t batch_size = state.range(1);

const std::vector<uint8_t> data = MakeCompressibleData(input_size);
auto codec = ::arrow::util::Codec::Create(compression).ValueOrDie();
int64_t max_compress_len =
codec->MaxCompressedLen(static_cast<int64_t>(data.size()), data.data());
std::shared_ptr<::arrow::ResizableBuffer> buf =
::arrow::AllocateResizableBuffer(max_compress_len).ValueOrDie();
int64_t length = codec
->Compress(static_cast<int64_t>(data.size()), data.data(),
max_compress_len, buf->mutable_data())
.ValueOrDie();
ABORT_NOT_OK(buf->Resize(length));
const int64_t compressed_length =
codec
->Compress(static_cast<int64_t>(data.size()), data.data(), max_compress_len,
buf->mutable_data())
.ValueOrDie();
ABORT_NOT_OK(buf->Resize(compressed_length));
for (auto _ : state) {
state.PauseTiming();
auto reader = std::make_shared<BufReader>(buf);
[[maybe_unused]] auto read_buffer =
::arrow::AllocateBuffer(per_read_bytes).ValueOrDie();
[[maybe_unused]] std::unique_ptr<Buffer> read_buffer;
if constexpr (Mode == BufferReadMode::ProvidedByCaller) {
read_buffer = ::arrow::AllocateBuffer(batch_size).ValueOrDie();
}
state.ResumeTiming();
// Put `CompressedInputStream::Make` in timing.
auto input_stream =
::arrow::io::CompressedInputStream::Make(codec.get(), reader).ValueOrDie();
auto remaining_size = static_cast<int64_t>(data.size());
auto remaining_size = input_size;
while (remaining_size > 0) {
if constexpr (ReadIntoBuffer) {
auto value = input_stream->Read(per_read_bytes, read_buffer->mutable_data());
if constexpr (Mode == BufferReadMode::ProvidedByCaller) {
auto value = input_stream->Read(batch_size, read_buffer->mutable_data());
ABORT_NOT_OK(value);
remaining_size -= value.ValueOrDie();
} else {
auto value = input_stream->Read(per_read_bytes);
auto value = input_stream->Read(batch_size);
ABORT_NOT_OK(value);
remaining_size -= value.ValueOrDie()->size();
}
}
}
state.SetBytesProcessed(length * state.iterations());
state.SetBytesProcessed(input_size * state.iterations());
}

template <Compression::type COMPRESSION>
static void CompressionInputZeroCopyBenchmarkIntoBuffer(::benchmark::State& state) {
CompressionInputBenchmark<::arrow::io::BufferReader, COMPRESSION,
/*ReadIntoBuffer=*/true>(state);
template <Compression::type kCompression>
static void CompressedInputStreamZeroCopyBufferProvidedByCaller(
::benchmark::State& state) {
CompressedInputStreamBenchmark<::arrow::io::BufferReader,
BufferReadMode::ProvidedByCaller>(state, kCompression);
}

template <Compression::type COMPRESSION>
static void CompressionInputNonZeroCopyBenchmarkIntoBuffer(::benchmark::State& state) {
CompressionInputBenchmark<NonZeroCopyBufferReader, COMPRESSION,
/*ReadIntoBuffer=*/true>(state);
template <Compression::type kCompression>
static void CompressedInputStreamNonZeroCopyBufferProvidedByCaller(
::benchmark::State& state) {
CompressedInputStreamBenchmark<NonZeroCopyBufferReader,
BufferReadMode::ProvidedByCaller>(state, kCompression);
}

template <Compression::type COMPRESSION>
static void CompressionInputZeroCopyBenchmarkDirectRead(::benchmark::State& state) {
CompressionInputBenchmark<::arrow::io::BufferReader, COMPRESSION,
/*ReadIntoBuffer=*/false>(state);
template <Compression::type kCompression>
static void CompressedInputStreamZeroCopyBufferReturnedByCallee(
::benchmark::State& state) {
CompressedInputStreamBenchmark<::arrow::io::BufferReader,
BufferReadMode::ReturnedByCallee>(state, kCompression);
}

template <Compression::type COMPRESSION>
static void CompressionInputNonZeroCopyBenchmarkDirectRead(::benchmark::State& state) {
CompressionInputBenchmark<NonZeroCopyBufferReader, COMPRESSION,
/*ReadIntoBuffer=*/false>(state);
template <Compression::type kCompression>
static void CompressedInputStreamNonZeroCopyBufferReturnedByCallee(
::benchmark::State& state) {
CompressedInputStreamBenchmark<NonZeroCopyBufferReader,
BufferReadMode::ReturnedByCallee>(state, kCompression);
}

static void CompressedInputArguments(::benchmark::internal::Benchmark* b) {
b->ArgNames({"InputBytes", "PerReadBytes"})
b->ArgNames({"num_bytes", "batch_size"})
->Args({8 * 1024, 8 * 1024})
->Args({64 * 1024, 8 * 1024})
->Args({64 * 1024, 64 * 1024})
Expand All @@ -170,17 +183,17 @@ static void CompressedInputArguments(::benchmark::internal::Benchmark* b) {
#ifdef ARROW_WITH_LZ4
// Benchmark LZ4 because it's lightweight, which makes benchmarking focused on the
// overhead of the compression input stream.
BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmarkIntoBuffer,
::arrow::Compression::LZ4_FRAME)
BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferProvidedByCaller,
Compression::LZ4_FRAME)
->Apply(CompressedInputArguments);
BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmarkIntoBuffer,
::arrow::Compression::LZ4_FRAME)
BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferProvidedByCaller,
Compression::LZ4_FRAME)
->Apply(CompressedInputArguments);
BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmarkDirectRead,
::arrow::Compression::LZ4_FRAME)
BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferReturnedByCallee,
Compression::LZ4_FRAME)
->Apply(CompressedInputArguments);
BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmarkDirectRead,
::arrow::Compression::LZ4_FRAME)
BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferReturnedByCallee,
Compression::LZ4_FRAME)
->Apply(CompressedInputArguments);
#endif

Expand Down

0 comments on commit 4c97871

Please sign in to comment.