Skip to content

Commit

Permalink
Fix test and add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed May 8, 2024
1 parent 2cf9591 commit fbafc33
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 16 deletions.
29 changes: 14 additions & 15 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues =
"Number of decoded rep / def levels did less than num_values in page_header";
constexpr std::string_view kErrorRepDefLevelInEqual =
"Number of decoded rep / def levels did not match";
constexpr std::string_view kErrorValueNotMatchesLevel =
"Number of decoded values did not match def level";

} // namespace

Expand Down Expand Up @@ -1055,6 +1053,7 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>,
std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_);
} else {
// Required field, read all values
*num_def_levels = 0;
*non_null_values_to_read = batch_size;
}

Expand Down Expand Up @@ -1136,21 +1135,23 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def
&non_null_values_to_read);
// Should not return more values than available in the current data page.
ARROW_DCHECK_LE(num_def_levels, this->available_values_current_page());
// Check levels read matches `available_values_current_page() > 0`
ARROW_DCHECK(num_def_levels == 0 ||
std::min(batch_size, this->available_values_current_page()) > 0);
if (non_null_values_to_read != 0) {
*values_read = this->ReadValues(non_null_values_to_read, values);
if (*values_read != non_null_values_to_read) {
throw ParquetException(std::string(kErrorValueNotMatchesLevel) + " read" +
std::to_string(*values_read) +
", levels has: " + std::to_string(non_null_values_to_read));
}
} else {
*values_read = 0;
}
this->ConsumeBufferedValues(num_def_levels);
return num_def_levels;
// Adjust total_values, since if max_def_level_ == 0, num_def_levels would
// be 0 and `values_read` would adjust to `available_values_current_page()`.
int64_t total_values = std::max<int64_t>(num_def_levels, *values_read);
int64_t expected_values =
std::min(batch_size, this->available_values_current_page());
if (total_values == 0 && expected_values > 0) {
std::stringstream ss;
ss << "Read 0 values, expected " << expected_values;
ParquetException::EofException(ss.str());
}
this->ConsumeBufferedValues(total_values);
return total_values;
}

template <typename DType>
Expand Down Expand Up @@ -1432,9 +1433,7 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
}

if (ARROW_PREDICT_FALSE(batch_size != levels_read)) {
throw ParquetException(std::string(kErrorRepDefLevelNotMatchesNumValues) +
std::to_string(levels_read) + ", expected " +
std::to_string(batch_size));
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}

levels_written_ += levels_read;
Expand Down
62 changes: 61 additions & 1 deletion cpp/src/parquet/column_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
&descr, values, /*num_values=*/2, Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
/*rep_levels=*/{},
/*max_rep_level=*/0);
/*max_rep_level=*/max_rep_level_);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<BoolReader*>(reader_.get());
Expand All @@ -431,6 +431,66 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
ParquetException);
}

TEST_F(TestPrimitiveReader, DefLevelNotExpected) {
max_def_level_ = 1;
max_rep_level_ = 0;
std::vector<bool> values(1, false);
// Less than expected
{
std::vector<int16_t> input_def_levels(1, 1);
NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);

// The data page falls back to plain encoding
std::shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
std::shared_ptr<DataPageV1> data_page = MakeDataPage<BooleanType>(
&descr, values, /*num_values=*/3, Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
/*rep_levels=*/{},
/*max_rep_level=*/max_rep_level_);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<BoolReader*>(reader_.get());
ASSERT_TRUE(reader->HasNext());

constexpr int batch_size = 3;
std::vector<int16_t> def_levels(batch_size, 0);
std::vector<int16_t> rep_levels(batch_size, 0);
bool values_out[batch_size];
int64_t values_read;
ASSERT_THROW(reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(),
values_out, &values_read),
ParquetException);
}
// More than expected
{
std::vector<int16_t> input_def_levels(2, 1);
NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);

// The data page falls back to plain encoding
std::shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
std::shared_ptr<DataPageV1> data_page = MakeDataPage<BooleanType>(
&descr, values, /*num_values=*/1, Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
/*rep_levels=*/{},
/*max_rep_level=*/max_rep_level_);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<BoolReader*>(reader_.get());
ASSERT_TRUE(reader->HasNext());

constexpr int batch_size = 3;
std::vector<int16_t> def_levels(batch_size, 0);
std::vector<int16_t> rep_levels(batch_size, 0);
bool values_out[batch_size];
int64_t values_read;
ASSERT_THROW(reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(),
values_out, &values_read),
ParquetException);
}
}

// Repetition level byte length reported in Page but Max Repetition level
// is zero for the column.
TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) {
Expand Down

0 comments on commit fbafc33

Please sign in to comment.