Skip to content

Commit

Permalink
PARQUET-2411: [C++][Parquet] Allow reading dictionary without reading…
Browse files Browse the repository at this point in the history
… data via ByteArrayDictionaryRecordReader (apache#39153)

### Rationale for this change
This proposes an API to read only the dictionary  from ByteArrayDictionaryRecordReader, enabling possible uses cases where the caller just want to check the dictionary content.

### What changes are included in this PR?

New APIs to enable reading dictionary with RecordReader.

### Are these changes tested?
Unit tests.

### Are there any user-facing changes?

New APIs without breaking existing workflow.

Authored-by: jp0317 <zjpzlz@gmail.com>
Signed-off-by: mwish <maplewish117@gmail.com>
  • Loading branch information
jp0317 authored Jan 5, 2024
1 parent 7b0c6f9 commit bec0385
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 34 deletions.
20 changes: 20 additions & 0 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,26 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
return bytes_for_values;
}

const void* ReadDictionary(int32_t* dictionary_length) override {
if (this->current_decoder_ == nullptr && !this->HasNextInternal()) {
dictionary_length = 0;
return nullptr;
}
// Verify the current data page is dictionary encoded. The current_encoding_ should
// have been set as RLE_DICTIONARY if the page encoding is RLE_DICTIONARY or
// PLAIN_DICTIONARY.
if (this->current_encoding_ != Encoding::RLE_DICTIONARY) {
std::stringstream ss;
ss << "Data page is not dictionary encoded. Encoding: "
<< EncodingToString(this->current_encoding_);
throw ParquetException(ss.str());
}
auto decoder = dynamic_cast<DictDecoder<DType>*>(this->current_decoder_);
const T* dictionary = nullptr;
decoder->GetDictionary(&dictionary, dictionary_length);
return reinterpret_cast<const void*>(dictionary);
}

int64_t ReadRecords(int64_t num_records) override {
if (num_records == 0) return 0;
// Delimit records, then read values at the end
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,16 @@ class PARQUET_EXPORT RecordReader {

virtual void DebugPrintState() = 0;

/// \brief Returns the dictionary owned by the current decoder. Throws an
/// exception if the current decoder is not for dictionary encoding. The caller is
/// responsible for casting the returned pointer to proper type depending on the
/// column's physical type. An example:
/// const ByteArray* dict = reinterpret_cast<const ByteArray*>(ReadDictionary(&len));
/// or:
/// const float* dict = reinterpret_cast<const float*>(ReadDictionary(&len));
/// \param[out] dictionary_length The number of dictionary entries.
virtual const void* ReadDictionary(int32_t* dictionary_length) = 0;

/// \brief Decoded definition levels
int16_t* def_levels() const {
return reinterpret_cast<int16_t*>(def_levels_->mutable_data());
Expand Down
79 changes: 46 additions & 33 deletions cpp/src/parquet/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,36 @@ using arrow::internal::AddWithOverflow;

namespace parquet {

namespace {
bool IsColumnChunkFullyDictionaryEncoded(const ColumnChunkMetaData& col) {
// Check the encoding_stats to see if all data pages are dictionary encoded.
const std::vector<PageEncodingStats>& encoding_stats = col.encoding_stats();
if (encoding_stats.empty()) {
// Some parquet files may have empty encoding_stats. In this case we are
// not sure whether all data pages are dictionary encoded.
return false;
}
// The 1st page should be the dictionary page.
if (encoding_stats[0].page_type != PageType::DICTIONARY_PAGE ||
(encoding_stats[0].encoding != Encoding::PLAIN &&
encoding_stats[0].encoding != Encoding::PLAIN_DICTIONARY)) {
return false;
}
// The following pages should be dictionary encoded data pages.
for (size_t idx = 1; idx < encoding_stats.size(); ++idx) {
if ((encoding_stats[idx].encoding != Encoding::RLE_DICTIONARY &&
encoding_stats[idx].encoding != Encoding::PLAIN_DICTIONARY) ||
(encoding_stats[idx].page_type != PageType::DATA_PAGE &&
encoding_stats[idx].page_type != PageType::DATA_PAGE_V2)) {
// Return false if any following page is not a dictionary encoded data
// page.
return false;
}
}
return true;
}
} // namespace

// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
static constexpr int64_t kDefaultFooterReadSize = 64 * 1024;
static constexpr uint32_t kFooterSize = 8;
Expand Down Expand Up @@ -82,7 +112,8 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
}

std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(int i) {
std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(
int i, bool read_dictionary) {
if (i >= metadata()->num_columns()) {
std::stringstream ss;
ss << "Trying to read column index " << i << " but row group metadata has only "
Expand All @@ -96,8 +127,8 @@ std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(int i) {
internal::LevelInfo level_info = internal::LevelInfo::ComputeLevelInfo(descr);

auto reader = internal::RecordReader::Make(
descr, level_info, contents_->properties()->memory_pool(),
/* read_dictionary = */ false, contents_->properties()->read_dense_for_nullable());
descr, level_info, contents_->properties()->memory_pool(), read_dictionary,
contents_->properties()->read_dense_for_nullable());
reader->SetPageReader(std::move(page_reader));
return reader;
}
Expand All @@ -106,41 +137,23 @@ std::shared_ptr<ColumnReader> RowGroupReader::ColumnWithExposeEncoding(
int i, ExposedEncoding encoding_to_expose) {
std::shared_ptr<ColumnReader> reader = Column(i);

if (encoding_to_expose == ExposedEncoding::DICTIONARY) {
// Check the encoding_stats to see if all data pages are dictionary encoded.
std::unique_ptr<ColumnChunkMetaData> col = metadata()->ColumnChunk(i);
const std::vector<PageEncodingStats>& encoding_stats = col->encoding_stats();
if (encoding_stats.empty()) {
// Some parquet files may have empty encoding_stats. In this case we are
// not sure whether all data pages are dictionary encoded. So we do not
// enable exposing dictionary.
return reader;
}
// The 1st page should be the dictionary page.
if (encoding_stats[0].page_type != PageType::DICTIONARY_PAGE ||
(encoding_stats[0].encoding != Encoding::PLAIN &&
encoding_stats[0].encoding != Encoding::PLAIN_DICTIONARY)) {
return reader;
}
// The following pages should be dictionary encoded data pages.
for (size_t idx = 1; idx < encoding_stats.size(); ++idx) {
if ((encoding_stats[idx].encoding != Encoding::RLE_DICTIONARY &&
encoding_stats[idx].encoding != Encoding::PLAIN_DICTIONARY) ||
(encoding_stats[idx].page_type != PageType::DATA_PAGE &&
encoding_stats[idx].page_type != PageType::DATA_PAGE_V2)) {
return reader;
}
}
} else {
// Exposing other encodings are not supported for now.
return reader;
if (encoding_to_expose == ExposedEncoding::DICTIONARY &&
IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i))) {
// Set exposed encoding.
reader->SetExposedEncoding(encoding_to_expose);
}

// Set exposed encoding.
reader->SetExposedEncoding(encoding_to_expose);
return reader;
}

std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReaderWithExposeEncoding(
int i, ExposedEncoding encoding_to_expose) {
return RecordReader(
i,
/*read_dictionary=*/encoding_to_expose == ExposedEncoding::DICTIONARY &&
IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i)));
}

std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
if (i >= metadata()->num_columns()) {
std::stringstream ss;
Expand Down
15 changes: 14 additions & 1 deletion cpp/src/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class PARQUET_EXPORT RowGroupReader {

// EXPERIMENTAL: Construct a RecordReader for the indicated column of the row group.
// Ownership is shared with the RowGroupReader.
std::shared_ptr<internal::RecordReader> RecordReader(int i);
std::shared_ptr<internal::RecordReader> RecordReader(int i,
bool read_dictionary = false);

// Construct a ColumnReader, trying to enable exposed encoding.
//
Expand All @@ -80,6 +81,18 @@ class PARQUET_EXPORT RowGroupReader {
std::shared_ptr<ColumnReader> ColumnWithExposeEncoding(
int i, ExposedEncoding encoding_to_expose);

// Construct a RecordReader, trying to enable exposed encoding.
//
// For dictionary encoding, currently we only support column chunks that are
// fully dictionary encoded byte arrays. The caller should verify if the reader can read
// and expose the dictionary by checking the reader's read_dictionary(). If a column
// chunk uses dictionary encoding but then falls back to plain encoding, the returned
// reader will read decoded data without exposing the dictionary.
//
// \note API EXPERIMENTAL
std::shared_ptr<internal::RecordReader> RecordReaderWithExposeEncoding(
int i, ExposedEncoding encoding_to_expose);

std::unique_ptr<PageReader> GetColumnPageReader(int i);

private:
Expand Down
127 changes: 127 additions & 0 deletions cpp/src/parquet/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,83 @@ TEST(TestFileReader, GetRecordReader) {
ASSERT_EQ(8, col_record_reader_->levels_written());
}

TEST(TestFileReader, RecordReaderWithExposingDictionary) {
const int num_rows = 1000;

// Make schema
schema::NodeVector fields;
fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED, Type::BYTE_ARRAY,
ConvertedType::NONE));
auto schema = std::static_pointer_cast<GroupNode>(
GroupNode::Make("schema", Repetition::REQUIRED, fields));

// Write small batches and small data pages
std::shared_ptr<WriterProperties> writer_props = WriterProperties::Builder()
.write_batch_size(64)
->data_pagesize(128)
->enable_dictionary()
->build();

ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create());
std::shared_ptr<ParquetFileWriter> file_writer =
ParquetFileWriter::Open(out_file, schema, writer_props);

RowGroupWriter* rg_writer = file_writer->AppendRowGroup();

// write one column
::arrow::random::RandomArrayGenerator rag(0);
ByteArrayWriter* writer = static_cast<ByteArrayWriter*>(rg_writer->NextColumn());
std::vector<std::string> raw_unique_data = {"a", "bc", "defg"};
std::vector<ByteArray> col_typed;
for (int i = 0; i < num_rows; i++) {
std::string_view chosed_data = raw_unique_data[i % raw_unique_data.size()];
col_typed.emplace_back(chosed_data);
}
writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.data());
rg_writer->Close();
file_writer->Close();

// Open the reader
ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish());
auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf);

ReaderProperties reader_props;
reader_props.enable_buffered_stream();
reader_props.set_buffer_size(64);
std::unique_ptr<ParquetFileReader> file_reader =
ParquetFileReader::Open(in_file, reader_props);

auto row_group = file_reader->RowGroup(0);
auto record_reader = std::dynamic_pointer_cast<internal::DictionaryRecordReader>(
row_group->RecordReaderWithExposeEncoding(0, ExposedEncoding::DICTIONARY));
ASSERT_NE(record_reader, nullptr);
ASSERT_TRUE(record_reader->read_dictionary());

int32_t dict_len = 0;
auto dict =
reinterpret_cast<const ByteArray*>(record_reader->ReadDictionary(&dict_len));
ASSERT_NE(dict, nullptr);
ASSERT_EQ(dict_len, raw_unique_data.size());
ASSERT_EQ(record_reader->ReadRecords(num_rows), num_rows);
std::shared_ptr<::arrow::ChunkedArray> result_array = record_reader->GetResult();
ASSERT_EQ(result_array->num_chunks(), 1);
const std::shared_ptr<::arrow::Array> chunk = result_array->chunk(0);
auto dictionary_array = std::dynamic_pointer_cast<::arrow::DictionaryArray>(chunk);
const int32_t* indices =
(std::dynamic_pointer_cast<::arrow::Int32Array>(dictionary_array->indices()))
->raw_values();

// Verify values based on the dictionary from ReadDictionary().
int64_t indices_read = chunk->length();
ASSERT_EQ(indices_read, num_rows);
for (int i = 0; i < indices_read; ++i) {
ASSERT_LT(indices[i], dict_len);
ASSERT_EQ(std::string_view(reinterpret_cast<const char* const>(dict[indices[i]].ptr),
dict[indices[i]].len),
col_typed[i]);
}
}

class TestLocalFile : public ::testing::Test {
public:
void SetUp() {
Expand Down Expand Up @@ -1064,6 +1141,56 @@ TEST(TestFileReader, BufferedReadsWithDictionary) {
}
}

TEST(TestFileReader, PartiallyDictionaryEncodingNotExposed) {
const int num_rows = 1000;

// Make schema
schema::NodeVector fields;
fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED, Type::DOUBLE,
ConvertedType::NONE));
auto schema = std::static_pointer_cast<GroupNode>(
GroupNode::Make("schema", Repetition::REQUIRED, fields));

// Write small batches and small data pages. Explicitly set the dictionary page size
// limit such that the column chunk will not be fully dictionary encoded.
std::shared_ptr<WriterProperties> writer_props = WriterProperties::Builder()
.write_batch_size(64)
->data_pagesize(128)
->enable_dictionary()
->dictionary_pagesize_limit(4)
->build();

ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create());
std::shared_ptr<ParquetFileWriter> file_writer =
ParquetFileWriter::Open(out_file, schema, writer_props);

RowGroupWriter* rg_writer = file_writer->AppendRowGroup();

// write one column
::arrow::random::RandomArrayGenerator rag(0);
DoubleWriter* writer = static_cast<DoubleWriter*>(rg_writer->NextColumn());
std::shared_ptr<::arrow::Array> col = rag.Float64(num_rows, 0, 100);
const auto& col_typed = static_cast<const ::arrow::DoubleArray&>(*col);
writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.raw_values());
rg_writer->Close();
file_writer->Close();

// Open the reader
ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish());
auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf);

ReaderProperties reader_props;
reader_props.enable_buffered_stream();
reader_props.set_buffer_size(64);
std::unique_ptr<ParquetFileReader> file_reader =
ParquetFileReader::Open(in_file, reader_props);

auto row_group = file_reader->RowGroup(0);
auto col_reader = std::static_pointer_cast<DoubleReader>(
row_group->ColumnWithExposeEncoding(0, ExposedEncoding::DICTIONARY));
EXPECT_NE(col_reader->GetExposedEncoding(), ExposedEncoding::DICTIONARY);
}

TEST(TestFileReader, BufferedReads) {
// PARQUET-1636: Buffered reads were broken before introduction of
// RandomAccessFile::GetStream
Expand Down

0 comments on commit bec0385

Please sign in to comment.