Skip to content

Commit

Permalink
basic optimization (bug included)
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Apr 24, 2024
1 parent 25bb627 commit 1fc0b1a
Showing 1 changed file with 37 additions and 27 deletions.
64 changes: 37 additions & 27 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1677,41 +1677,51 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) {
int64_t values_to_read = 0;
int64_t records_read = 0;

const int16_t* def_levels = this->def_levels() + levels_position_;
const int16_t* rep_levels = this->rep_levels() + levels_position_;

const int16_t* const rep_levels = this->rep_levels();
const int16_t* const def_levels = this->def_levels();
ARROW_DCHECK_GT(this->max_rep_level_, 0);
ARROW_DCHECK_LT(levels_position_, levels_written_);
if (at_record_start_) {
values_to_read += def_levels[levels_position_] == 0;
++levels_position_;
at_record_start_ = false;
}

// Count logical records and number of values to read
ARROW_DCHECK(!at_record_start_);
// std::cout << "read " << num_records << " from " << levels_position_ << " to "
// << levels_written_ << '\n';
while (levels_position_ < levels_written_) {
const int16_t rep_level = *rep_levels++;
int64_t stride =
std::min(levels_written_ - levels_position_, num_records - records_read);
for (int64_t i = 0; i < stride; ++i) {
records_read += rep_levels[levels_position_ + i] == 0;
values_to_read += def_levels[levels_position_ + i] == this->max_def_level_;
}
levels_position_ += stride;
if (records_read == num_records) {
break;
}
}
/*
for (; levels_position_ < levels_written_; ++levels_position_) {
const int16_t rep_level = rep_levels[levels_position_];
if (rep_level == 0) {
// If at_record_start_ is true, we are seeing the start of a record
// for the second time, such as after repeated calls to
// DelimitRecords. In this case we must continue until we find
// another record start or exhausting the ColumnChunk
if (!at_record_start_) {
// We've reached the end of a record; increment the record count.
++records_read;
if (records_read == num_records) {
// We've found the number of records we were looking for. Set
// at_record_start_ to true and break
at_record_start_ = true;
break;
}
// We've reached the end of a record; increment the record count.
++records_read;
if (records_read == num_records) {
// We've found the number of records we were looking for. Set
// at_record_start_ to true and break
at_record_start_ = true;
break;
}
}
// We have decided to consume the level at this position; therefore we
// must advance until we find another record boundary
at_record_start_ = false;

const int16_t def_level = *def_levels++;
if (def_level == this->max_def_level_) {
++values_to_read;
}
++levels_position_;
}
*/
// *values_seen =
// std::count(this->def_levels() + current_levels_position, this->def_levels()
// + levels_position_,
// this->max_def_level_);
*values_seen = values_to_read;
return records_read;
}
Expand Down

0 comments on commit 1fc0b1a

Please sign in to comment.