Skip to content

Commit

Permalink
optimize hash in build_record_batch
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <chenxu@dmetasoul.com>
  • Loading branch information
dmetasoul01 committed Jan 24, 2025
1 parent 68ade11 commit 0b2e3d6
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 19 deletions.
7 changes: 7 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/lakesoul-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ async-recursion = "1.1.1"
ndarray = "0.15.6"
#hdf5 = {version = "0.8.1"}
rayon = "1.10.0"
nohash = "0.2.0"


[features]
Expand Down
53 changes: 34 additions & 19 deletions rust/lakesoul-io/src/sorted_merge/combiner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::cmp::Reverse;
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::BuildHasherDefault;
use std::sync::Arc;

use crate::constant::{ConstEmptyArray, ConstNullArray};
Expand All @@ -23,6 +24,7 @@ use arrow::{
};
use arrow_array::types::*;
use dary_heap::QuaternaryHeap;
use nohash::BuildNoHashHasher;
use smallvec::SmallVec;

use super::sort_key_range::{UseLastSortKeyBatchRanges, UseLastSortKeyBatchRangesRef};
Expand All @@ -42,7 +44,7 @@ impl RangeCombiner {
merge_operator: Vec<MergeOperator>,
) -> Self {
if merge_operator.is_empty() || merge_operator.iter().all(|op| *op == MergeOperator::UseLast) {
// if false {
// if false {
RangeCombiner::DefaultUseLastRangeCombiner(UseLastRangeCombiner::new(
schema,
streams_num,
Expand Down Expand Up @@ -297,13 +299,18 @@ pub struct UseLastRangeCombiner {
}

impl UseLastRangeCombiner {
pub fn new(schema: SchemaRef, streams_num: usize, fields_map: Arc<Vec<Vec<usize>>>, target_batch_size: usize) -> Self {
Self {
schema: schema.clone(),
fields_map: fields_map.clone(),
heap: QuaternaryHeap::with_capacity(streams_num),
in_progress: Vec::with_capacity(target_batch_size),
target_batch_size,
pub fn new(
schema: SchemaRef,
streams_num: usize,
fields_map: Arc<Vec<Vec<usize>>>,
target_batch_size: usize,
) -> Self {
Self {
schema: schema.clone(),
fields_map: fields_map.clone(),
heap: QuaternaryHeap::with_capacity(streams_num),
in_progress: Vec::with_capacity(target_batch_size),
target_batch_size,
current_sort_key_range: Arc::new(UseLastSortKeyBatchRanges::new(schema, fields_map)),
const_null_array: ConstNullArray::new(),
}
Expand Down Expand Up @@ -359,25 +366,30 @@ impl UseLastRangeCombiner {
.map(|ranges| ranges.column(column_idx))
.collect::<Vec<_>>();

let mut batch_idx_to_flatten_array_idx = HashMap::<usize, usize>::with_capacity(16);
let mut flatten_arrays = vec![self.const_null_array.get(field.data_type())];
let mut batch_idx_to_flatten_array_idx =
HashMap::<usize, usize, BuildNoHashHasher<usize>>::with_capacity_and_hasher(
capacity,
BuildNoHashHasher::default(),
);
let mut flatten_arrays = Vec::with_capacity(capacity);
flatten_arrays.push(self.const_null_array.get(field.data_type()));
let mut interleave_idx = Vec::with_capacity(capacity);

for range in array_and_idx_per_row.iter() {
if let Some(range) = range {
if batch_idx_to_flatten_array_idx.contains_key(&range.batch_idx) {
let flatten_array_idx = batch_idx_to_flatten_array_idx.get(&range.batch_idx).unwrap();
interleave_idx.push((*flatten_array_idx, range.row_idx));
} else {
flatten_arrays.push(range.array());
batch_idx_to_flatten_array_idx.insert(range.batch_idx, flatten_arrays.len() - 1);
interleave_idx.push((flatten_arrays.len() - 1, range.row_idx));
match batch_idx_to_flatten_array_idx.get(&range.batch_idx) {
Some(flatten_array_idx) => interleave_idx.push((*flatten_array_idx, range.row_idx)),
None => {
flatten_arrays.push(range.array());
batch_idx_to_flatten_array_idx.insert(range.batch_idx, flatten_arrays.len() - 1);
interleave_idx.push((flatten_arrays.len() - 1, range.row_idx));
}
}
} else {
interleave_idx.push((0, 0));
}
}

interleave(
flatten_arrays
.iter()
Expand All @@ -399,6 +411,9 @@ impl UseLastRangeCombiner {
}

fn init_current_sort_key_range(&mut self) {
self.current_sort_key_range = Arc::new(UseLastSortKeyBatchRanges::new(self.schema.clone(), self.fields_map.clone()));
self.current_sort_key_range = Arc::new(UseLastSortKeyBatchRanges::new(
self.schema.clone(),
self.fields_map.clone(),
));
}
}

0 comments on commit 0b2e3d6

Please sign in to comment.