Skip to content

Commit

Permalink
update impl
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed May 16, 2024
1 parent f50a393 commit 3d58afe
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1176,11 +1176,14 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
// If the field is required and non-repeated, there are no definition levels
if (this->max_def_level_ > 0) {
int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
if (ARROW_PREDICT_FALSE(num_def_levels != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}

// Not present for non-repeated fields
if (this->max_rep_level_ > 0) {
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
if (num_def_levels != num_rep_levels) {
if (ARROW_PREDICT_FALSE(num_def_levels != num_rep_levels)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
}
Expand Down Expand Up @@ -1423,19 +1426,18 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;

int64_t levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
if (ARROW_PREDICT_FALSE(this->ReadDefinitionLevels(batch_size, def_levels) !=
batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
if (this->max_rep_level_ > 0) {
int64_t rep_levels_read = this->ReadRepetitionLevels(batch_size, rep_levels);
if (rep_levels_read != levels_read) {
if (ARROW_PREDICT_FALSE(rep_levels_read != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
}

if (ARROW_PREDICT_FALSE(batch_size != levels_read)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}

levels_written_ += levels_read;
levels_written_ += batch_size;
records_read += ReadRecordData(num_records - records_read);
} else {
// No repetition and definition levels, we can read values directly
Expand Down Expand Up @@ -1592,16 +1594,14 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;

int64_t levels_read = 0;
levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
if (this->ReadDefinitionLevels(batch_size, def_levels) != batch_size) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
if (ARROW_PREDICT_FALSE(batch_size != levels_read)) {
if (this->ReadRepetitionLevels(batch_size, rep_levels) != batch_size) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}

levels_written_ += levels_read;
levels_written_ += batch_size;
int64_t remaining_records = num_records - skipped_records;
// This updates at_record_start_.
skipped_records += DelimitAndSkipRecordsInBuffer(remaining_records);
Expand Down

0 comments on commit 3d58afe

Please sign in to comment.