From db35d8bb35ea1c32383e5ffe4ef6cf614212b09e Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 8 May 2024 17:21:05 +0800 Subject: [PATCH] Fix test and add more tests --- cpp/src/parquet/column_reader.cc | 28 ++++++------ cpp/src/parquet/column_reader_test.cc | 62 ++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 16 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index c717ae256da7d..d72fa6e782041 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -106,8 +106,6 @@ constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues = "Number of decoded rep / def levels did less than num_values in page_header"; constexpr std::string_view kErrorRepDefLevelInEqual = "Number of decoded rep / def levels did not match"; -constexpr std::string_view kErrorValueNotMatchesLevel = - "Number of decoded values did not match def level"; } // namespace @@ -1055,6 +1053,7 @@ class TypedColumnReaderImpl : public TypedColumnReader, std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_); } else { // Required field, read all values + *num_def_levels = 0; *non_null_values_to_read = batch_size; } @@ -1136,21 +1135,22 @@ int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def &non_null_values_to_read); // Should not return more values than available in the current data page. ARROW_DCHECK_LE(num_def_levels, this->available_values_current_page()); - // Check levels read matches `available_values_current_page() > 0` - ARROW_DCHECK(num_def_levels == 0 || - std::min(batch_size, this->available_values_current_page()) > 0); if (non_null_values_to_read != 0) { *values_read = this->ReadValues(non_null_values_to_read, values); - if (*values_read != non_null_values_to_read) { - throw ParquetException(std::string(kErrorValueNotMatchesLevel) + " read" + - std::to_string(*values_read) + - ", levels has: " + std::to_string(non_null_values_to_read)); - } } else { *values_read = 0; } - this->ConsumeBufferedValues(num_def_levels); - return num_def_levels; + // Adjust total_values, since if max_def_level_ == 0, num_def_levels would + // be 0 and `values_read` would adjust to `available_values_current_page()`. + int64_t total_values = std::max(num_def_levels, *values_read); + int64_t expected_values = std::min(batch_size, this->available_values_current_page()); + if (total_values == 0 && expected_values > 0) { + std::stringstream ss; + ss << "Read 0 values, expected " << expected_values; + ParquetException::EofException(ss.str()); + } + this->ConsumeBufferedValues(total_values); + return total_values; } template @@ -1432,9 +1432,7 @@ class TypedRecordReader : public TypedColumnReaderImpl, } if (ARROW_PREDICT_FALSE(batch_size != levels_read)) { - throw ParquetException(std::string(kErrorRepDefLevelNotMatchesNumValues) + - std::to_string(levels_read) + ", expected " + - std::to_string(batch_size)); + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); } levels_written_ += levels_read; diff --git a/cpp/src/parquet/column_reader_test.cc b/cpp/src/parquet/column_reader_test.cc index a48573966a905..60edc75d05d6a 100644 --- a/cpp/src/parquet/column_reader_test.cc +++ b/cpp/src/parquet/column_reader_test.cc @@ -415,7 +415,7 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) { &descr, values, /*num_values=*/2, Encoding::PLAIN, /*indices=*/{}, /*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_, /*rep_levels=*/{}, - /*max_rep_level=*/0); + /*max_rep_level=*/max_rep_level_); pages_.push_back(data_page); InitReader(&descr); auto reader = static_cast(reader_.get()); @@ -431,6 +431,66 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) { ParquetException); } +TEST_F(TestPrimitiveReader, DefLevelNotExpected) { + max_def_level_ = 1; + max_rep_level_ = 0; + std::vector values(1, false); + // Less than expected + { + std::vector input_def_levels(1, 1); + NodePtr type = schema::Boolean("a", Repetition::OPTIONAL); + const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); + + // The data page falls back to plain encoding + std::shared_ptr dummy = AllocateBuffer(); + std::shared_ptr data_page = MakeDataPage( + &descr, values, /*num_values=*/3, Encoding::PLAIN, /*indices=*/{}, + /*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_, + /*rep_levels=*/{}, + /*max_rep_level=*/max_rep_level_); + pages_.push_back(data_page); + InitReader(&descr); + auto reader = static_cast(reader_.get()); + ASSERT_TRUE(reader->HasNext()); + + constexpr int batch_size = 3; + std::vector def_levels(batch_size, 0); + std::vector rep_levels(batch_size, 0); + bool values_out[batch_size]; + int64_t values_read; + ASSERT_THROW(reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(), + values_out, &values_read), + ParquetException); + } + // More than expected + { + std::vector input_def_levels(2, 1); + NodePtr type = schema::Boolean("a", Repetition::OPTIONAL); + const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); + + // The data page falls back to plain encoding + std::shared_ptr dummy = AllocateBuffer(); + std::shared_ptr data_page = MakeDataPage( + &descr, values, /*num_values=*/1, Encoding::PLAIN, /*indices=*/{}, + /*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_, + /*rep_levels=*/{}, + /*max_rep_level=*/max_rep_level_); + pages_.push_back(data_page); + InitReader(&descr); + auto reader = static_cast(reader_.get()); + ASSERT_TRUE(reader->HasNext()); + + constexpr int batch_size = 3; + std::vector def_levels(batch_size, 0); + std::vector rep_levels(batch_size, 0); + bool values_out[batch_size]; + int64_t values_read; + ASSERT_THROW(reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(), + values_out, &values_read), + ParquetException); + } +} + // Repetition level byte length reported in Page but Max Repetition level // is zero for the column. TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) {