From 1fc0b1a72bf043422b8544fce4d866e9375e0703 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 24 Apr 2024 11:55:26 +0800 Subject: [PATCH] basic optimization (bug included) --- cpp/src/parquet/column_reader.cc | 64 ++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index af489c70a5233..72fda000e4fe1 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1677,41 +1677,51 @@ class TypedRecordReader : public TypedColumnReaderImpl, int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) { int64_t values_to_read = 0; int64_t records_read = 0; - - const int16_t* def_levels = this->def_levels() + levels_position_; - const int16_t* rep_levels = this->rep_levels() + levels_position_; - + const int16_t* const rep_levels = this->rep_levels(); + const int16_t* const def_levels = this->def_levels(); ARROW_DCHECK_GT(this->max_rep_level_, 0); + ARROW_DCHECK_LT(levels_position_, levels_written_); + if (at_record_start_) { + values_to_read += def_levels[levels_position_] == 0; + ++levels_position_; + at_record_start_ = false; + } // Count logical records and number of values to read + ARROW_DCHECK(!at_record_start_); + // std::cout << "read " << num_records << " from " << levels_position_ << " to " + // << levels_written_ << '\n'; while (levels_position_ < levels_written_) { - const int16_t rep_level = *rep_levels++; + int64_t stride = + std::min(levels_written_ - levels_position_, num_records - records_read); + for (int64_t i = 0; i < stride; ++i) { + records_read += rep_levels[levels_position_ + i] == 0; + values_to_read += def_levels[levels_position_ + i] == this->max_def_level_; + } + levels_position_ += stride; + if (records_read == num_records) { + break; + } + } + /* + for (; levels_position_ < levels_written_; ++levels_position_) { + const int16_t rep_level = rep_levels[levels_position_]; if (rep_level == 0) { - // If at_record_start_ is true, we are seeing the start of a record - // for the second time, such as after repeated calls to - // DelimitRecords. In this case we must continue until we find - // another record start or exhausting the ColumnChunk - if (!at_record_start_) { - // We've reached the end of a record; increment the record count. - ++records_read; - if (records_read == num_records) { - // We've found the number of records we were looking for. Set - // at_record_start_ to true and break - at_record_start_ = true; - break; - } + // We've reached the end of a record; increment the record count. + ++records_read; + if (records_read == num_records) { + // We've found the number of records we were looking for. Set + // at_record_start_ to true and break + at_record_start_ = true; + break; } } - // We have decided to consume the level at this position; therefore we - // must advance until we find another record boundary - at_record_start_ = false; - - const int16_t def_level = *def_levels++; - if (def_level == this->max_def_level_) { - ++values_to_read; - } - ++levels_position_; } + */ + // *values_seen = + // std::count(this->def_levels() + current_levels_position, this->def_levels() + // + levels_position_, + // this->max_def_level_); *values_seen = values_to_read; return records_read; }