Skip to content

Commit

Permalink
Refactor handling for error
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed May 8, 2024
1 parent 7df0415 commit 4930aae
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 31 deletions.
75 changes: 45 additions & 30 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ inline void CheckNumberDecoded(int64_t number_decoded, int64_t expected) {
std::to_string(expected));
}
}

constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues =
"Number of decoded rep / def levels did less than num_values in page_header";
constexpr std::string_view kErrorRepDefLevelInEqual =
"Number of decoded rep / def levels did not match";
constexpr std::string_view kErrorValueNotMatchesLevel =
"Number of decoded values did not match def level";

} // namespace

LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
Expand Down Expand Up @@ -1029,33 +1037,26 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>,
// Read definition and repetition levels. Also return the number of definition levels
// and number of values to read. This function is called before reading values.
void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
int64_t* num_def_levels, int64_t* values_to_read) {
int64_t* num_def_levels, int64_t* non_null_values_to_read) {
batch_size = std::min(batch_size, this->available_values_current_page());

// If the field is required and non-repeated, there are no definition levels
if (this->max_def_level_ > 0 && def_levels != nullptr) {
*num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) {
throw ParquetException(
"Number of decoded definition levels did not match: def_levels: " +
std::to_string(*num_def_levels) +
", batch_size:" + std::to_string(batch_size));
}
// TODO(wesm): this tallying of values-to-decode can be performed with better
// cache-efficiency if fused with the level decoding.
*values_to_read +=
*non_null_values_to_read +=
std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_);
} else {
// Required field, read all values
*values_to_read = batch_size;
*non_null_values_to_read = batch_size;
}

// Not present for non-repeated fields
if (this->max_rep_level_ > 0 && rep_levels != nullptr) {
DCHECK_GT(this->max_rep_level_, 0);
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
if (def_levels != nullptr && *num_def_levels != num_rep_levels) {
throw ParquetException("Number of decoded rep / def levels did not match");
throw ParquetException(kErrorRepDefLevelInEqual);
}
}
}
Expand Down Expand Up @@ -1113,7 +1114,8 @@ template <typename DType>
int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels,
int16_t* rep_levels, T* values,
int64_t* values_read) {
// HasNext invokes ReadNewPage
// HasNext might invoke ReadNewPage until a data page with
// `available_values_current_page() > 0` is found.
if (!HasNext()) {
*values_read = 0;
return 0;
Expand All @@ -1122,28 +1124,41 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def
// TODO(wesm): keep reading data pages until batch_size is reached, or the
// row group is finished
int64_t num_def_levels = 0;
int64_t values_to_read = 0;
ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read);
*values_read = this->ReadValues(values_to_read, values);
ARROW_DCHECK_GE(values_to_read, *values_read);
int64_t total_values = std::max<int64_t>(num_def_levels, *values_read);
int64_t expected_values = std::min(batch_size, this->available_values_current_page());
if (total_values == 0 && expected_values > 0) {
// Number of non-null values to read within `num_def_levels`.
int64_t non_null_values_to_read = 0;
ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels,
&non_null_values_to_read);
// Should not return more values than available in the current data page.
ARROW_DCHECK_LE(num_def_levels, this->available_values_current_page());
// Check levels read matches `available_values_current_page() > 0`
if (num_def_levels == 0 &&
std::min(batch_size, this->available_values_current_page()) > 0) {
std::stringstream ss;
ss << "Read 0 values, expected " << expected_values;
ss << "Read 0 definition levels, expected "
<< std::min(batch_size, this->available_values_current_page());
ParquetException::EofException(ss.str());
}
this->ConsumeBufferedValues(total_values);

return total_values;
if (non_null_values_to_read != 0) {
*values_read = this->ReadValues(non_null_values_to_read, values);
if (*values_read != non_null_values_to_read) {
throw ParquetException(std::string(kErrorValueNotMatchesLevel) + " read" +
std::to_string(*values_read) +
", levels has: " + std::to_string(non_null_values_to_read));
}
} else {
*values_read = 0;
}
this->ConsumeBufferedValues(num_def_levels);
return num_def_levels;
}

template <typename DType>
int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
int64_t* values_read, int64_t* null_count_out) {
// HasNext invokes ReadNewPage
// HasNext might invoke ReadNewPage until a data page with
// `available_values_current_page() > 0` is found.
if (!HasNext()) {
*levels_read = 0;
*values_read = 0;
Expand All @@ -1165,7 +1180,7 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
if (this->max_rep_level_ > 0) {
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
if (num_def_levels != num_rep_levels) {
throw ParquetException("Number of decoded rep / def levels did not match");
throw ParquetException(kErrorRepDefLevelInEqual);
}
}

Expand Down Expand Up @@ -1412,16 +1427,16 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
if (this->max_rep_level_ > 0) {
levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
throw ParquetException("Number of decoded rep / def levels did not match");
throw ParquetException(kErrorRepDefLevelInEqual);
}
} else if (this->max_def_level_ > 0) {
levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
}

if (ARROW_PREDICT_FALSE(batch_size != levels_read)) {
throw ParquetException(
"ReadRecords did not read the expected number of levels: read " +
std::to_string(levels_read) + ", expected " + std::to_string(batch_size));
throw ParquetException(std::string(kErrorRepDefLevelNotMatchesNumValues) +
std::to_string(levels_read) + ", expected " +
std::to_string(batch_size));
}

levels_written_ += levels_read;
Expand Down Expand Up @@ -1584,7 +1599,7 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
int64_t levels_read = 0;
levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
throw ParquetException("Number of decoded rep / def levels did not match");
throw ParquetException(kErrorRepDefLevelInEqual);
}

levels_written_ += levels_read;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class PARQUET_EXPORT ColumnReader {
template <typename DType>
class TypedColumnReader : public ColumnReader {
public:
typedef typename DType::c_type T;
using T = typename DType::c_type;

// Read a batch of repetition levels, definition levels, and values from the
// column.
Expand Down

0 comments on commit 4930aae

Please sign in to comment.