From bec03856799a69bf0e6d4419ab7bc565afd070fe Mon Sep 17 00:00:00 2001 From: Jinpeng Date: Thu, 4 Jan 2024 21:41:01 -0500 Subject: [PATCH] PARQUET-2411: [C++][Parquet] Allow reading dictionary without reading data via ByteArrayDictionaryRecordReader (#39153) ### Rationale for this change This proposes an API to read only the dictionary from ByteArrayDictionaryRecordReader, enabling possible uses cases where the caller just want to check the dictionary content. ### What changes are included in this PR? New APIs to enable reading dictionary with RecordReader. ### Are these changes tested? Unit tests. ### Are there any user-facing changes? New APIs without breaking existing workflow. Authored-by: jp0317 Signed-off-by: mwish --- cpp/src/parquet/column_reader.cc | 20 +++++ cpp/src/parquet/column_reader.h | 10 +++ cpp/src/parquet/file_reader.cc | 79 +++++++++++-------- cpp/src/parquet/file_reader.h | 15 +++- cpp/src/parquet/reader_test.cc | 127 +++++++++++++++++++++++++++++++ 5 files changed, 217 insertions(+), 34 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index a49e58afbdb83..99978e283b46a 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1370,6 +1370,26 @@ class TypedRecordReader : public TypedColumnReaderImpl, return bytes_for_values; } + const void* ReadDictionary(int32_t* dictionary_length) override { + if (this->current_decoder_ == nullptr && !this->HasNextInternal()) { + dictionary_length = 0; + return nullptr; + } + // Verify the current data page is dictionary encoded. The current_encoding_ should + // have been set as RLE_DICTIONARY if the page encoding is RLE_DICTIONARY or + // PLAIN_DICTIONARY. + if (this->current_encoding_ != Encoding::RLE_DICTIONARY) { + std::stringstream ss; + ss << "Data page is not dictionary encoded. Encoding: " + << EncodingToString(this->current_encoding_); + throw ParquetException(ss.str()); + } + auto decoder = dynamic_cast*>(this->current_decoder_); + const T* dictionary = nullptr; + decoder->GetDictionary(&dictionary, dictionary_length); + return reinterpret_cast(dictionary); + } + int64_t ReadRecords(int64_t num_records) override { if (num_records == 0) return 0; // Delimit records, then read values at the end diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 334b8bcffe0b8..086f6c0e55806 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -368,6 +368,16 @@ class PARQUET_EXPORT RecordReader { virtual void DebugPrintState() = 0; + /// \brief Returns the dictionary owned by the current decoder. Throws an + /// exception if the current decoder is not for dictionary encoding. The caller is + /// responsible for casting the returned pointer to proper type depending on the + /// column's physical type. An example: + /// const ByteArray* dict = reinterpret_cast(ReadDictionary(&len)); + /// or: + /// const float* dict = reinterpret_cast(ReadDictionary(&len)); + /// \param[out] dictionary_length The number of dictionary entries. + virtual const void* ReadDictionary(int32_t* dictionary_length) = 0; + /// \brief Decoded definition levels int16_t* def_levels() const { return reinterpret_cast(def_levels_->mutable_data()); diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 1d972b78fb99c..b3dd1d6054ac8 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -54,6 +54,36 @@ using arrow::internal::AddWithOverflow; namespace parquet { +namespace { +bool IsColumnChunkFullyDictionaryEncoded(const ColumnChunkMetaData& col) { + // Check the encoding_stats to see if all data pages are dictionary encoded. + const std::vector& encoding_stats = col.encoding_stats(); + if (encoding_stats.empty()) { + // Some parquet files may have empty encoding_stats. In this case we are + // not sure whether all data pages are dictionary encoded. + return false; + } + // The 1st page should be the dictionary page. + if (encoding_stats[0].page_type != PageType::DICTIONARY_PAGE || + (encoding_stats[0].encoding != Encoding::PLAIN && + encoding_stats[0].encoding != Encoding::PLAIN_DICTIONARY)) { + return false; + } + // The following pages should be dictionary encoded data pages. + for (size_t idx = 1; idx < encoding_stats.size(); ++idx) { + if ((encoding_stats[idx].encoding != Encoding::RLE_DICTIONARY && + encoding_stats[idx].encoding != Encoding::PLAIN_DICTIONARY) || + (encoding_stats[idx].page_type != PageType::DATA_PAGE && + encoding_stats[idx].page_type != PageType::DATA_PAGE_V2)) { + // Return false if any following page is not a dictionary encoded data + // page. + return false; + } + } + return true; +} +} // namespace + // PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file static constexpr int64_t kDefaultFooterReadSize = 64 * 1024; static constexpr uint32_t kFooterSize = 8; @@ -82,7 +112,8 @@ std::shared_ptr RowGroupReader::Column(int i) { const_cast(contents_->properties())->memory_pool()); } -std::shared_ptr RowGroupReader::RecordReader(int i) { +std::shared_ptr RowGroupReader::RecordReader( + int i, bool read_dictionary) { if (i >= metadata()->num_columns()) { std::stringstream ss; ss << "Trying to read column index " << i << " but row group metadata has only " @@ -96,8 +127,8 @@ std::shared_ptr RowGroupReader::RecordReader(int i) { internal::LevelInfo level_info = internal::LevelInfo::ComputeLevelInfo(descr); auto reader = internal::RecordReader::Make( - descr, level_info, contents_->properties()->memory_pool(), - /* read_dictionary = */ false, contents_->properties()->read_dense_for_nullable()); + descr, level_info, contents_->properties()->memory_pool(), read_dictionary, + contents_->properties()->read_dense_for_nullable()); reader->SetPageReader(std::move(page_reader)); return reader; } @@ -106,41 +137,23 @@ std::shared_ptr RowGroupReader::ColumnWithExposeEncoding( int i, ExposedEncoding encoding_to_expose) { std::shared_ptr reader = Column(i); - if (encoding_to_expose == ExposedEncoding::DICTIONARY) { - // Check the encoding_stats to see if all data pages are dictionary encoded. - std::unique_ptr col = metadata()->ColumnChunk(i); - const std::vector& encoding_stats = col->encoding_stats(); - if (encoding_stats.empty()) { - // Some parquet files may have empty encoding_stats. In this case we are - // not sure whether all data pages are dictionary encoded. So we do not - // enable exposing dictionary. - return reader; - } - // The 1st page should be the dictionary page. - if (encoding_stats[0].page_type != PageType::DICTIONARY_PAGE || - (encoding_stats[0].encoding != Encoding::PLAIN && - encoding_stats[0].encoding != Encoding::PLAIN_DICTIONARY)) { - return reader; - } - // The following pages should be dictionary encoded data pages. - for (size_t idx = 1; idx < encoding_stats.size(); ++idx) { - if ((encoding_stats[idx].encoding != Encoding::RLE_DICTIONARY && - encoding_stats[idx].encoding != Encoding::PLAIN_DICTIONARY) || - (encoding_stats[idx].page_type != PageType::DATA_PAGE && - encoding_stats[idx].page_type != PageType::DATA_PAGE_V2)) { - return reader; - } - } - } else { - // Exposing other encodings are not supported for now. - return reader; + if (encoding_to_expose == ExposedEncoding::DICTIONARY && + IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i))) { + // Set exposed encoding. + reader->SetExposedEncoding(encoding_to_expose); } - // Set exposed encoding. - reader->SetExposedEncoding(encoding_to_expose); return reader; } +std::shared_ptr RowGroupReader::RecordReaderWithExposeEncoding( + int i, ExposedEncoding encoding_to_expose) { + return RecordReader( + i, + /*read_dictionary=*/encoding_to_expose == ExposedEncoding::DICTIONARY && + IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i))); +} + std::unique_ptr RowGroupReader::GetColumnPageReader(int i) { if (i >= metadata()->num_columns()) { std::stringstream ss; diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index da85b73fc2dfe..b59b59f95c2d8 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -64,7 +64,8 @@ class PARQUET_EXPORT RowGroupReader { // EXPERIMENTAL: Construct a RecordReader for the indicated column of the row group. // Ownership is shared with the RowGroupReader. - std::shared_ptr RecordReader(int i); + std::shared_ptr RecordReader(int i, + bool read_dictionary = false); // Construct a ColumnReader, trying to enable exposed encoding. // @@ -80,6 +81,18 @@ class PARQUET_EXPORT RowGroupReader { std::shared_ptr ColumnWithExposeEncoding( int i, ExposedEncoding encoding_to_expose); + // Construct a RecordReader, trying to enable exposed encoding. + // + // For dictionary encoding, currently we only support column chunks that are + // fully dictionary encoded byte arrays. The caller should verify if the reader can read + // and expose the dictionary by checking the reader's read_dictionary(). If a column + // chunk uses dictionary encoding but then falls back to plain encoding, the returned + // reader will read decoded data without exposing the dictionary. + // + // \note API EXPERIMENTAL + std::shared_ptr RecordReaderWithExposeEncoding( + int i, ExposedEncoding encoding_to_expose); + std::unique_ptr GetColumnPageReader(int i); private: diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index 5223158e5f4f9..2c2b62f5d12f6 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -542,6 +542,83 @@ TEST(TestFileReader, GetRecordReader) { ASSERT_EQ(8, col_record_reader_->levels_written()); } +TEST(TestFileReader, RecordReaderWithExposingDictionary) { + const int num_rows = 1000; + + // Make schema + schema::NodeVector fields; + fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED, Type::BYTE_ARRAY, + ConvertedType::NONE)); + auto schema = std::static_pointer_cast( + GroupNode::Make("schema", Repetition::REQUIRED, fields)); + + // Write small batches and small data pages + std::shared_ptr writer_props = WriterProperties::Builder() + .write_batch_size(64) + ->data_pagesize(128) + ->enable_dictionary() + ->build(); + + ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); + std::shared_ptr file_writer = + ParquetFileWriter::Open(out_file, schema, writer_props); + + RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); + + // write one column + ::arrow::random::RandomArrayGenerator rag(0); + ByteArrayWriter* writer = static_cast(rg_writer->NextColumn()); + std::vector raw_unique_data = {"a", "bc", "defg"}; + std::vector col_typed; + for (int i = 0; i < num_rows; i++) { + std::string_view chosed_data = raw_unique_data[i % raw_unique_data.size()]; + col_typed.emplace_back(chosed_data); + } + writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.data()); + rg_writer->Close(); + file_writer->Close(); + + // Open the reader + ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish()); + auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf); + + ReaderProperties reader_props; + reader_props.enable_buffered_stream(); + reader_props.set_buffer_size(64); + std::unique_ptr file_reader = + ParquetFileReader::Open(in_file, reader_props); + + auto row_group = file_reader->RowGroup(0); + auto record_reader = std::dynamic_pointer_cast( + row_group->RecordReaderWithExposeEncoding(0, ExposedEncoding::DICTIONARY)); + ASSERT_NE(record_reader, nullptr); + ASSERT_TRUE(record_reader->read_dictionary()); + + int32_t dict_len = 0; + auto dict = + reinterpret_cast(record_reader->ReadDictionary(&dict_len)); + ASSERT_NE(dict, nullptr); + ASSERT_EQ(dict_len, raw_unique_data.size()); + ASSERT_EQ(record_reader->ReadRecords(num_rows), num_rows); + std::shared_ptr<::arrow::ChunkedArray> result_array = record_reader->GetResult(); + ASSERT_EQ(result_array->num_chunks(), 1); + const std::shared_ptr<::arrow::Array> chunk = result_array->chunk(0); + auto dictionary_array = std::dynamic_pointer_cast<::arrow::DictionaryArray>(chunk); + const int32_t* indices = + (std::dynamic_pointer_cast<::arrow::Int32Array>(dictionary_array->indices())) + ->raw_values(); + + // Verify values based on the dictionary from ReadDictionary(). + int64_t indices_read = chunk->length(); + ASSERT_EQ(indices_read, num_rows); + for (int i = 0; i < indices_read; ++i) { + ASSERT_LT(indices[i], dict_len); + ASSERT_EQ(std::string_view(reinterpret_cast(dict[indices[i]].ptr), + dict[indices[i]].len), + col_typed[i]); + } +} + class TestLocalFile : public ::testing::Test { public: void SetUp() { @@ -1064,6 +1141,56 @@ TEST(TestFileReader, BufferedReadsWithDictionary) { } } +TEST(TestFileReader, PartiallyDictionaryEncodingNotExposed) { + const int num_rows = 1000; + + // Make schema + schema::NodeVector fields; + fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED, Type::DOUBLE, + ConvertedType::NONE)); + auto schema = std::static_pointer_cast( + GroupNode::Make("schema", Repetition::REQUIRED, fields)); + + // Write small batches and small data pages. Explicitly set the dictionary page size + // limit such that the column chunk will not be fully dictionary encoded. + std::shared_ptr writer_props = WriterProperties::Builder() + .write_batch_size(64) + ->data_pagesize(128) + ->enable_dictionary() + ->dictionary_pagesize_limit(4) + ->build(); + + ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); + std::shared_ptr file_writer = + ParquetFileWriter::Open(out_file, schema, writer_props); + + RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); + + // write one column + ::arrow::random::RandomArrayGenerator rag(0); + DoubleWriter* writer = static_cast(rg_writer->NextColumn()); + std::shared_ptr<::arrow::Array> col = rag.Float64(num_rows, 0, 100); + const auto& col_typed = static_cast(*col); + writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.raw_values()); + rg_writer->Close(); + file_writer->Close(); + + // Open the reader + ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish()); + auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf); + + ReaderProperties reader_props; + reader_props.enable_buffered_stream(); + reader_props.set_buffer_size(64); + std::unique_ptr file_reader = + ParquetFileReader::Open(in_file, reader_props); + + auto row_group = file_reader->RowGroup(0); + auto col_reader = std::static_pointer_cast( + row_group->ColumnWithExposeEncoding(0, ExposedEncoding::DICTIONARY)); + EXPECT_NE(col_reader->GetExposedEncoding(), ExposedEncoding::DICTIONARY); +} + TEST(TestFileReader, BufferedReads) { // PARQUET-1636: Buffered reads were broken before introduction of // RandomAccessFile::GetStream