Skip to content

Commit

Permalink
Try to optimize byte stream split
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Feb 20, 2025
1 parent dad2a1b commit 456b930
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 32 deletions.
22 changes: 11 additions & 11 deletions cpp/src/arrow/array/builder_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ class ARROW_EXPORT ArrayBuilder {
/// \brief Return the type of the built Array
virtual std::shared_ptr<DataType> type() const = 0;

// Vector append. Copy from a given bitmap. If bitmap is null assume
// all of length bits are valid.
void UnsafeAppendToBitmap(const uint8_t* bitmap, int64_t offset, int64_t length) {
if (bitmap == NULLPTR) {
return UnsafeSetNotNull(length);
}
null_bitmap_builder_.UnsafeAppend(bitmap, offset, length);
length_ += length;
null_count_ = null_bitmap_builder_.false_count();
}

protected:
/// Append to null bitmap
Status AppendToBitmap(bool is_valid);
Expand Down Expand Up @@ -242,17 +253,6 @@ class ARROW_EXPORT ArrayBuilder {
null_count_ = null_bitmap_builder_.false_count();
}

// Vector append. Copy from a given bitmap. If bitmap is null assume
// all of length bits are valid.
void UnsafeAppendToBitmap(const uint8_t* bitmap, int64_t offset, int64_t length) {
if (bitmap == NULLPTR) {
return UnsafeSetNotNull(length);
}
null_bitmap_builder_.UnsafeAppend(bitmap, offset, length);
length_ += length;
null_count_ = null_bitmap_builder_.false_count();
}

// Append the same validity value a given number of times.
void UnsafeAppendToBitmap(const int64_t num_bits, bool value) {
if (value) {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/array/builder_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ class BaseBinaryBuilder
return std::numeric_limits<offset_type>::max() - 1;
}

TypedBufferBuilder<uint8_t>& value_data_builder() { return value_data_builder_; }

TypedBufferBuilder<offset_type>& offset_builder() { return offsets_builder_; }

protected:
TypedBufferBuilder<offset_type> offsets_builder_;
TypedBufferBuilder<uint8_t> value_data_builder_;
Expand Down
43 changes: 22 additions & 21 deletions cpp/src/parquet/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1679,36 +1679,37 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out,
int* out_num_values) {
ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
RETURN_NOT_OK(helper.Prepare());

std::vector<ByteArray> values(num_values - null_count);
const int num_valid_values = Decode(values.data(), num_values - null_count);
if (ARROW_PREDICT_FALSE(num_values - null_count != num_valid_values)) {
throw ParquetException("Expected to decode ", num_values - null_count,
" values, but decoded ", num_valid_values, " values.");
}

auto values_ptr = values.data();
int value_idx = 0;

const int32_t* length_ptr = buffered_length_->data_as<int32_t>() + length_idx_;
int bytes_offset = len_ - decoder_->bytes_left();
// get the final bytes offset
int accum_length = 0;
for (int i = 0; i < num_values; ++i) {
if (ARROW_PREDICT_FALSE(length_ptr[i] < 0)) {
return Status::Invalid("Negative string delta length");
}
accum_length += length_ptr[i];
}
RETURN_NOT_OK(out->builder->Reserve(num_values));
out->builder->UnsafeAppendToBitmap(valid_bits, valid_bits_offset, num_values);
RETURN_NOT_OK(
out->builder->value_data_builder().Append(data_ + bytes_offset, accum_length));
auto& offset_builder = out->builder->offset_builder();
accum_length = 0;
int length_idx = 0;
RETURN_NOT_OK(VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
const auto& val = values_ptr[value_idx];
RETURN_NOT_OK(helper.PrepareNextInput(val.len));
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++value_idx;
offset_builder.UnsafeAppend(length_ptr[length_idx]);
++length_idx;
return Status::OK();
},
[&]() {
RETURN_NOT_OK(helper.AppendNull());
--null_count;
offset_builder.UnsafeAppend(0);
return Status::OK();
}));

length_idx_ += (num_values - null_count);
DCHECK_EQ(null_count, 0);
*out_num_values = num_valid_values;
*out_num_values = num_values - null_count;
return Status::OK();
}

Expand Down
26 changes: 26 additions & 0 deletions cpp/src/parquet/encoding_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,27 @@ class BM_ArrowBinaryPlain : public BenchmarkDecodeArrowByteArray {
}
};

class BM_ArrowBinaryDeltaLength : public BenchmarkDecodeArrowByteArray {
public:
void DoEncodeArrow() override {
auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY);
encoder->Put(*input_array_);
buffer_ = encoder->FlushValues();
}

void DoEncodeLowLevel() override {
auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY);
encoder->Put(values_.data(), num_values_);
buffer_ = encoder->FlushValues();
}

std::unique_ptr<ByteArrayDecoder> InitializeDecoder() override {
auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY);
decoder->SetData(num_values_, buffer_->data(), static_cast<int>(buffer_->size()));
return decoder;
}
};

BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, EncodeArrow)
(benchmark::State& state) { EncodeArrowBenchmark(state); }
BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, EncodeArrow)->Range(1 << 18, 1 << 20);
Expand All @@ -1279,6 +1300,11 @@ BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrow_Dense)
(benchmark::State& state) { DecodeArrowDenseBenchmark(state); }
BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrow_Dense)->Range(MIN_RANGE, MAX_RANGE);

BENCHMARK_DEFINE_F(BM_ArrowBinaryDeltaLength, DL_DecodeArrow_Dense)
(benchmark::State& state) { DecodeArrowDenseBenchmark(state); }
BENCHMARK_REGISTER_F(BM_ArrowBinaryDeltaLength, DL_DecodeArrow_Dense)
->Range(MIN_RANGE, MAX_RANGE);

BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dense)
(benchmark::State& state) { DecodeArrowNonNullDenseBenchmark(state); }
BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dense)
Expand Down

0 comments on commit 456b930

Please sign in to comment.