diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 99a85f910bc61..d53649304e0b5 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -101,6 +101,14 @@ inline void CheckNumberDecoded(int64_t number_decoded, int64_t expected) { std::to_string(expected)); } } + +constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues = + "Number of decoded rep / def levels did less than num_values in page_header"; +constexpr std::string_view kErrorRepDefLevelInEqual = + "Number of decoded rep / def levels did not match"; +constexpr std::string_view kErrorValueNotMatchesLevel = + "Number of decoded values did not match def level"; + } // namespace LevelDecoder::LevelDecoder() : num_values_remaining_(0) {} @@ -1029,33 +1037,26 @@ class TypedColumnReaderImpl : public TypedColumnReader, // Read definition and repetition levels. Also return the number of definition levels // and number of values to read. This function is called before reading values. void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, - int64_t* num_def_levels, int64_t* values_to_read) { + int64_t* num_def_levels, int64_t* non_null_values_to_read) { batch_size = std::min(batch_size, this->available_values_current_page()); // If the field is required and non-repeated, there are no definition levels if (this->max_def_level_ > 0 && def_levels != nullptr) { *num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); - if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) { - throw ParquetException( - "Number of decoded definition levels did not match: def_levels: " + - std::to_string(*num_def_levels) + - ", batch_size:" + std::to_string(batch_size)); - } // TODO(wesm): this tallying of values-to-decode can be performed with better // cache-efficiency if fused with the level decoding. - *values_to_read += + *non_null_values_to_read += std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_); } else { // Required field, read all values - *values_to_read = batch_size; + *non_null_values_to_read = batch_size; } // Not present for non-repeated fields if (this->max_rep_level_ > 0 && rep_levels != nullptr) { - DCHECK_GT(this->max_rep_level_, 0); int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); if (def_levels != nullptr && *num_def_levels != num_rep_levels) { - throw ParquetException("Number of decoded rep / def levels did not match"); + throw ParquetException(kErrorRepDefLevelInEqual); } } } @@ -1113,7 +1114,8 @@ template int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, int64_t* values_read) { - // HasNext invokes ReadNewPage + // HasNext might invoke ReadNewPage until a data page with + // `available_values_current_page() > 0` is found. if (!HasNext()) { *values_read = 0; return 0; @@ -1122,20 +1124,32 @@ int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def // TODO(wesm): keep reading data pages until batch_size is reached, or the // row group is finished int64_t num_def_levels = 0; - int64_t values_to_read = 0; - ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read); - *values_read = this->ReadValues(values_to_read, values); - ARROW_DCHECK_GE(values_to_read, *values_read); - int64_t total_values = std::max(num_def_levels, *values_read); - int64_t expected_values = std::min(batch_size, this->available_values_current_page()); - if (total_values == 0 && expected_values > 0) { + // Number of non-null values to read within `num_def_levels`. + int64_t non_null_values_to_read = 0; + ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, + &non_null_values_to_read); + // Should not return more values than available in the current data page. + ARROW_DCHECK_LE(num_def_levels, this->available_values_current_page()); + // Check levels read matches `available_values_current_page() > 0` + if (num_def_levels == 0 && + std::min(batch_size, this->available_values_current_page()) > 0) { std::stringstream ss; - ss << "Read 0 values, expected " << expected_values; + ss << "Read 0 definition levels, expected " + << std::min(batch_size, this->available_values_current_page()); ParquetException::EofException(ss.str()); } - this->ConsumeBufferedValues(total_values); - - return total_values; + if (non_null_values_to_read != 0) { + *values_read = this->ReadValues(non_null_values_to_read, values); + if (*values_read != non_null_values_to_read) { + throw ParquetException(std::string(kErrorValueNotMatchesLevel) + " read" + + std::to_string(*values_read) + + ", levels has: " + std::to_string(non_null_values_to_read)); + } + } else { + *values_read = 0; + } + this->ConsumeBufferedValues(num_def_levels); + return num_def_levels; } template @@ -1143,7 +1157,8 @@ int64_t TypedColumnReaderImpl::ReadBatchSpaced( int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read, int64_t* null_count_out) { - // HasNext invokes ReadNewPage + // HasNext might invoke ReadNewPage until a data page with + // `available_values_current_page() > 0` is found. if (!HasNext()) { *levels_read = 0; *values_read = 0; @@ -1165,7 +1180,7 @@ int64_t TypedColumnReaderImpl::ReadBatchSpaced( if (this->max_rep_level_ > 0) { int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); if (num_def_levels != num_rep_levels) { - throw ParquetException("Number of decoded rep / def levels did not match"); + throw ParquetException(kErrorRepDefLevelInEqual); } } @@ -1412,16 +1427,16 @@ class TypedRecordReader : public TypedColumnReaderImpl, if (this->max_rep_level_ > 0) { levels_read = this->ReadDefinitionLevels(batch_size, def_levels); if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { - throw ParquetException("Number of decoded rep / def levels did not match"); + throw ParquetException(kErrorRepDefLevelInEqual); } } else if (this->max_def_level_ > 0) { levels_read = this->ReadDefinitionLevels(batch_size, def_levels); } if (ARROW_PREDICT_FALSE(batch_size != levels_read)) { - throw ParquetException( - "ReadRecords did not read the expected number of levels: read " + - std::to_string(levels_read) + ", expected " + std::to_string(batch_size)); + throw ParquetException(std::string(kErrorRepDefLevelNotMatchesNumValues) + + std::to_string(levels_read) + ", expected " + + std::to_string(batch_size)); } levels_written_ += levels_read; @@ -1584,7 +1599,7 @@ class TypedRecordReader : public TypedColumnReaderImpl, int64_t levels_read = 0; levels_read = this->ReadDefinitionLevels(batch_size, def_levels); if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { - throw ParquetException("Number of decoded rep / def levels did not match"); + throw ParquetException(kErrorRepDefLevelInEqual); } levels_written_ += levels_read; diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 086f6c0e55806..29e1b2a25e437 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -197,7 +197,7 @@ class PARQUET_EXPORT ColumnReader { template class TypedColumnReader : public ColumnReader { public: - typedef typename DType::c_type T; + using T = typename DType::c_type; // Read a batch of repetition levels, definition levels, and values from the // column.