Skip to content

Commit

Permalink
address comment: extract CreateColumnWriterForColumn to build column …
Browse files Browse the repository at this point in the history
…writer
  • Loading branch information
mapleFU committed Mar 26, 2024
1 parent 9265b08 commit aaac93b
Showing 1 changed file with 36 additions and 61 deletions.
97 changes: 36 additions & 61 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,37 +139,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
}

const int32_t column_ordinal = next_column_index_++;
const auto& path = col_meta->descr()->path();
const ColumnProperties& column_properties = properties_->column_properties(path);
auto meta_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnMetaEncryptor(path->ToDotString())
: nullptr;
auto data_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
: nullptr;
auto ci_builder = page_index_builder_ && column_properties.page_index_enabled() &&
column_properties.statistics_enabled()
? page_index_builder_->GetColumnIndexBuilder(column_ordinal)
: nullptr;
auto oi_builder = page_index_builder_ && column_properties.page_index_enabled()
? page_index_builder_->GetOffsetIndexBuilder(column_ordinal)
: nullptr;
const CodecOptions* codec_options = column_properties.codec_options()
? column_properties.codec_options().get()
: nullptr;

CodecOptions default_codec_options;
if (!codec_options) {
codec_options = &default_codec_options;
}
DCHECK_NE(nullptr, codec_options);
std::unique_ptr<PageWriter> pager = PageWriter::Open(
sink_, column_properties.compression(), col_meta, row_group_ordinal_,
static_cast<int16_t>(column_ordinal), properties_->memory_pool(),
/*buffered_row_group=*/false, meta_encryptor, data_encryptor,
properties_->page_checksum_enabled(), ci_builder, oi_builder, *codec_options);

column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_);
column_writers_[0] = CreateColumnWriterForColumn(col_meta, column_ordinal);
return column_writers_[0].get();
}

Expand Down Expand Up @@ -286,43 +256,48 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
}

void InitColumns() {
for (int i = 0; i < num_columns(); i++) {
for (int i = 0; i < RowGroupSerializer::num_columns(); i++) {
auto col_meta = metadata_->NextColumnChunk();
const auto& path = col_meta->descr()->path();
const int32_t column_ordinal = next_column_index_++;
const ColumnProperties& column_properties = properties_->column_properties(path);
auto meta_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnMetaEncryptor(path->ToDotString())
column_writers_.push_back(CreateColumnWriterForColumn(col_meta, column_ordinal));
}
}

std::shared_ptr<ColumnWriter> CreateColumnWriterForColumn(
ColumnChunkMetaDataBuilder* col_meta, int32_t column_ordinal) const {
const auto& path = col_meta->descr()->path();
const ColumnProperties& column_properties = properties_->column_properties(path);
auto meta_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnMetaEncryptor(path->ToDotString())
: nullptr;
auto data_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
: nullptr;
auto ci_builder = page_index_builder_ && column_properties.page_index_enabled()
? page_index_builder_->GetColumnIndexBuilder(column_ordinal)
: nullptr;
auto data_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
auto oi_builder = page_index_builder_ && column_properties.page_index_enabled()
? page_index_builder_->GetOffsetIndexBuilder(column_ordinal)
: nullptr;
auto ci_builder = page_index_builder_ && column_properties.page_index_enabled()
? page_index_builder_->GetColumnIndexBuilder(column_ordinal)
: nullptr;
auto oi_builder = page_index_builder_ && column_properties.page_index_enabled()
? page_index_builder_->GetOffsetIndexBuilder(column_ordinal)
: nullptr;

const CodecOptions* codec_options = column_properties.codec_options()
? column_properties.codec_options().get()
: nullptr;
CodecOptions default_codec_options;
if (!codec_options) {
codec_options = &default_codec_options;
}
DCHECK_NE(nullptr, codec_options);
std::unique_ptr<PageWriter> pager = PageWriter::Open(
sink_, column_properties.compression(), col_meta, row_group_ordinal_,
static_cast<int16_t>(column_ordinal), properties_->memory_pool(),
buffered_row_group_, meta_encryptor, data_encryptor,
properties_->page_checksum_enabled(), ci_builder, oi_builder, *codec_options);

column_writers_.push_back(
ColumnWriter::Make(col_meta, std::move(pager), properties_));

const CodecOptions* codec_options = column_properties.codec_options()
? column_properties.codec_options().get()
: nullptr;
CodecOptions default_codec_options;
if (!codec_options) {
codec_options = &default_codec_options;
}
DCHECK_NE(nullptr, codec_options);
std::unique_ptr<PageWriter> pager = PageWriter::Open(
sink_, column_properties.compression(), col_meta, row_group_ordinal_,
static_cast<int16_t>(column_ordinal), properties_->memory_pool(),
buffered_row_group_, meta_encryptor, data_encryptor,
properties_->page_checksum_enabled(), ci_builder, oi_builder, *codec_options);
return ColumnWriter::Make(col_meta, std::move(pager), properties_);
}

// If buffered_row_group_ is false, only column_writers_[0] is used as current writer.
// If buffered_row_group_ is true, multiple column writers are used.
std::vector<std::shared_ptr<ColumnWriter>> column_writers_;
};

Expand Down

0 comments on commit aaac93b

Please sign in to comment.