Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-39398: [C++][Parquet] DNM: benchmark for readLevels #39486

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>,
// Read definition and repetition levels. Also return the number of definition levels
// and number of values to read. This function is called before reading values.
void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
int64_t* num_def_levels, int64_t* values_to_read) {
int64_t* num_def_levels, int64_t* values_to_read) final {
batch_size =
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);

Expand All @@ -1062,11 +1062,13 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>,
*num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
// TODO(wesm): this tallying of values-to-decode can be performed with better
// cache-efficiency if fused with the level decoding.
for (int64_t i = 0; i < *num_def_levels; ++i) {
if (def_levels[i] == this->max_def_level_) {
++(*values_to_read);
}
}
// for (int64_t i = 0; i < *num_def_levels; ++i) {
// if (def_levels[i] == this->max_def_level_) {
// ++(*values_to_read);
// }
// }
*values_to_read +=
std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_);
} else {
// Required field, read all values
*values_to_read = batch_size;
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ class TypedColumnReader : public ColumnReader {
int16_t* rep_levels, int32_t* indices,
int64_t* indices_read, const T** dict,
int32_t* dict_len) = 0;

virtual void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about making this private with a Test Peer that can be used in the benchmark, so we can check this PR in?

int64_t* num_def_levels, int64_t* values_to_read);
};

namespace internal {
Expand Down
43 changes: 40 additions & 3 deletions cpp/src/parquet/column_reader_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ class BenchmarkHelper {
NodePtr type = schema::Int32("b", repetition);

if (repetition == Repetition::REQUIRED) {
descr_ = std::make_unique<ColumnDescriptor>(type, 0, 0);
descr_ = std::make_unique<ColumnDescriptor>(type, /*max_definition_level=*/0,
/*max_repetition_level=*/0);
} else if (repetition == Repetition::OPTIONAL) {
descr_ = std::make_unique<ColumnDescriptor>(type, 1, 0);
descr_ = std::make_unique<ColumnDescriptor>(type, /*max_definition_level=*/1,
/*max_repetition_level=*/0);
} else {
descr_ = std::make_unique<ColumnDescriptor>(type, 1, 1);
descr_ = std::make_unique<ColumnDescriptor>(type, /*max_definition_level=*/1,
/*max_repetition_level=*/1);
}

// Vectors filled with random rep/defs and values to make pages.
Expand Down Expand Up @@ -141,6 +144,35 @@ static void ColumnReaderReadBatchInt32(::benchmark::State& state) {
state.SetBytesProcessed(state.iterations() * helper.total_size());
}

// Benchmarks ReadBatch for ColumnReader with the following parameters in order:
// - repetition: 0 for REQUIRED, 1 for OPTIONAL, 2 for REPEATED.
// - batch_size: sets how many values to read at each call.
static void ColumnReaderReadLevels(::benchmark::State& state) {
const auto repetition = static_cast<Repetition::type>(state.range(0));
const auto batch_size = static_cast<int64_t>(state.range(1));

BenchmarkHelper helper(repetition, /*num_pages=*/1, /*levels_per_page=*/16 * 80000);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using one page to make it simple


// Vectors to read the values into.
std::vector<int32_t> read_values(batch_size, -1);
std::vector<int16_t> read_defs(batch_size, -1);
std::vector<int16_t> read_reps(batch_size, -1);
for (auto _ : state) {
state.PauseTiming();
Int32Reader* reader = helper.ResetColumnReader();
[[maybe_unused]] bool v = reader->HasNext();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using hasNext to trigger initialization

state.ResumeTiming();
int64_t num_levels = 0;
do {
int64_t values_read = 0;
reader->ReadLevels(batch_size, read_defs.data(), read_reps.data(), &num_levels,
&values_read);
} while (num_levels != 0);
}

state.SetBytesProcessed(state.iterations() * helper.total_size());
}

// Benchmarks ReadRecords for RecordReader with the following parameters in order:
// - repetition: 0 for REQUIRED, 1 for OPTIONAL, 2 for REPEATED.
// - batch_size: sets how many values to read at each call.
Expand Down Expand Up @@ -204,6 +236,11 @@ BENCHMARK(ColumnReaderReadBatchInt32)
->Args({1, 1000})
->Args({2, 1000});

BENCHMARK(ColumnReaderReadLevels)
->ArgNames({"Repetition", "BatchSize"})
->Args({1, 1000})
->Args({2, 1000});

BENCHMARK(RecordReaderSkipRecords)
->ArgNames({"Repetition", "BatchSize"})
->Args({0, 1000})
Expand Down
Loading