diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index d53649304e0b5..a1f8602d9db49 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1036,6 +1036,9 @@ class TypedColumnReaderImpl : public TypedColumnReader, // Read definition and repetition levels. Also return the number of definition levels // and number of values to read. This function is called before reading values. + // + // ReadLevels will throw exception when any num-levels read is not equal to the number + // of the levels can be read. void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, int64_t* num_def_levels, int64_t* non_null_values_to_read) { batch_size = std::min(batch_size, this->available_values_current_page()); @@ -1043,6 +1046,9 @@ class TypedColumnReaderImpl : public TypedColumnReader, // If the field is required and non-repeated, there are no definition levels if (this->max_def_level_ > 0 && def_levels != nullptr) { *num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); + if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) { + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); + } // TODO(wesm): this tallying of values-to-decode can be performed with better // cache-efficiency if fused with the level decoding. *non_null_values_to_read += @@ -1055,7 +1061,7 @@ class TypedColumnReaderImpl : public TypedColumnReader, // Not present for non-repeated fields if (this->max_rep_level_ > 0 && rep_levels != nullptr) { int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); - if (def_levels != nullptr && *num_def_levels != num_rep_levels) { + if (batch_size != num_rep_levels) { throw ParquetException(kErrorRepDefLevelInEqual); } } @@ -1131,13 +1137,7 @@ int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def // Should not return more values than available in the current data page. ARROW_DCHECK_LE(num_def_levels, this->available_values_current_page()); // Check levels read matches `available_values_current_page() > 0` - if (num_def_levels == 0 && - std::min(batch_size, this->available_values_current_page()) > 0) { - std::stringstream ss; - ss << "Read 0 definition levels, expected " - << std::min(batch_size, this->available_values_current_page()); - ParquetException::EofException(ss.str()); - } + ARROW_DCHECK(num_def_levels == 0 || std::min(batch_size, this->available_values_current_page()) > 0); if (non_null_values_to_read != 0) { *values_read = this->ReadValues(non_null_values_to_read, values); if (*values_read != non_null_values_to_read) { @@ -1422,15 +1422,12 @@ class TypedRecordReader : public TypedColumnReaderImpl, int16_t* def_levels = this->def_levels() + levels_written_; int16_t* rep_levels = this->rep_levels() + levels_written_; - // Not present for non-repeated fields - int64_t levels_read = 0; + int64_t levels_read = this->ReadDefinitionLevels(batch_size, def_levels); if (this->max_rep_level_ > 0) { - levels_read = this->ReadDefinitionLevels(batch_size, def_levels); - if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { + int64_t rep_levels_read = this->ReadRepetitionLevels(batch_size, rep_levels); + if (rep_levels_read != levels_read) { throw ParquetException(kErrorRepDefLevelInEqual); } - } else if (this->max_def_level_ > 0) { - levels_read = this->ReadDefinitionLevels(batch_size, def_levels); } if (ARROW_PREDICT_FALSE(batch_size != levels_read)) { @@ -1601,6 +1598,9 @@ class TypedRecordReader : public TypedColumnReaderImpl, if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { throw ParquetException(kErrorRepDefLevelInEqual); } + if (ARROW_PREDICT_FALSE(batch_size != levels_read)) { + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); + } levels_written_ += levels_read; int64_t remaining_records = num_records - skipped_records;