Skip to content

Commit

Permalink
feat(stream): introduce allocator_api in JoinHashMap (#3020)
Browse files Browse the repository at this point in the history
* feat(stream): introduce allocator_api in JoinHashMap

Signed-off-by: TennyZhuang <zty0826@gmail.com>

* fix typo

Signed-off-by: TennyZhuang <zty0826@gmail.com>

* cargo sort

Signed-off-by: TennyZhuang <zty0826@gmail.com>
  • Loading branch information
TennyZhuang authored Jun 7, 2022
1 parent 73d525b commit 82aeacc
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 14 deletions.
21 changes: 19 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ members = [
"src/utils/memcomparable",
"src/utils/pgwire",
"src/utils/static-link",
"src/utils/stats_alloc",
"src/utils/value-encoding",
"src/workspace-hack",
]
Expand Down
2 changes: 1 addition & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ hyper = "0.14"
itertools = "0.10"
lazy_static = "1"
log = "0.4"
lru = "0.7"
lru = { git = "https://github.com/singularity-data/lru-rs.git", rev = "e0e9ddaf8e4a51c244a03676734437e68a336b30" }
madsim = "=0.2.0-alpha.3"
memcomparable = { path = "../utils/memcomparable" }
more-asserts = "0.3"
Expand Down
30 changes: 25 additions & 5 deletions src/common/src/collection/evictable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,38 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::alloc::{Allocator, Global};
use std::cmp::Eq;
use std::hash::{BuildHasher, Hash};
use std::ops::{Deref, DerefMut};

use lru::{DefaultHasher, LruCache};

/// A wrapper for [`LruCache`] which provides manual eviction.
pub struct EvictableHashMap<K, V, S = DefaultHasher> {
inner: LruCache<K, V, S>,
pub struct EvictableHashMap<K, V, S = DefaultHasher, A: Clone + Allocator = Global> {
inner: LruCache<K, V, S, A>,

/// Target capacity to keep when calling `evict_to_target_cap`.
target_cap: usize,
}

impl<K: Hash + Eq, V, A: Clone + Allocator> EvictableHashMap<K, V, DefaultHasher, A> {
/// Create a [`EvictableHashMap`] with the given target capacity and allocator.
pub fn new_in(target_cap: usize, alloc: A) -> Self {
Self::with_hasher_in(target_cap, DefaultHasher::new(), alloc)
}
}

impl<K: Hash + Eq, V, S: BuildHasher, A: Clone + Allocator> EvictableHashMap<K, V, S, A> {
/// Create a [`EvictableHashMap`] with the given target capacity, hasher and allocator.
pub fn with_hasher_in(target_cap: usize, hasher: S, alloc: A) -> Self {
Self {
inner: LruCache::unbounded_with_hasher_in(hasher, alloc),
target_cap,
}
}
}

impl<K: Hash + Eq, V> EvictableHashMap<K, V> {
/// Create a [`EvictableHashMap`] with the given target capacity.
pub fn new(target_cap: usize) -> EvictableHashMap<K, V> {
Expand All @@ -41,7 +59,9 @@ impl<K: Hash + Eq, V, S: BuildHasher> EvictableHashMap<K, V, S> {
target_cap,
}
}
}

impl<K: Hash + Eq, V, S: BuildHasher, A: Clone + Allocator> EvictableHashMap<K, V, S, A> {
pub fn target_cap(&self) -> usize {
self.target_cap
}
Expand Down Expand Up @@ -79,15 +99,15 @@ impl<K: Hash + Eq, V, S: BuildHasher> EvictableHashMap<K, V, S> {
}
}

impl<K, V, S> Deref for EvictableHashMap<K, V, S> {
type Target = LruCache<K, V, S>;
impl<K, V, S, A: Clone + Allocator> Deref for EvictableHashMap<K, V, S, A> {
type Target = LruCache<K, V, S, A>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<K, V, S> DerefMut for EvictableHashMap<K, V, S> {
impl<K, V, S, A: Clone + Allocator> DerefMut for EvictableHashMap<K, V, S, A> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
Expand Down
1 change: 1 addition & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#![feature(type_alias_impl_trait)]
#![feature(test)]
#![feature(trusted_len)]
#![feature(allocator_api)]

#[macro_use]
pub mod error;
Expand Down
2 changes: 2 additions & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ memcomparable = { path = "../utils/memcomparable" }
num-traits = "0.2"
parking_lot = "0.12"
paste = "1"
pin-project = "1"
prometheus = { version = "0.13", features = ["process"] }
prost = "0.10"
rdkafka = { version = "0.28", features = ["cmake-build"] }
Expand All @@ -43,6 +44,7 @@ serde-value = "0.7"
serde_json = "1"
smallvec = "1"
static_assertions = "1"
stats_alloc = { path = "../utils/stats_alloc" }
thiserror = "1"
tokio = { version = "=0.2.0-alpha.3", package = "madsim-tokio", features = [
"rt",
Expand Down
29 changes: 25 additions & 4 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod join_entry_state;
use std::alloc::Global;
use std::ops::{Deref, DerefMut, Index};
use std::sync::Arc;

Expand All @@ -25,6 +26,7 @@ use risingwave_common::error::{ErrorCode, Result as RwResult};
use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
use risingwave_common::types::{DataType, Datum};
use risingwave_storage::{Keyspace, StateStore};
use stats_alloc::{SharedStatsAlloc, StatsAlloc};

/// This is a row with a match degree
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -116,9 +118,16 @@ type PkType = Row;
pub type StateValueType = JoinRow;
pub type HashValueType<S> = JoinEntryState<S>;

type JoinHashMapInner<K, S> =
EvictableHashMap<K, HashValueType<S>, PrecomputedBuildHasher, SharedStatsAlloc<Global>>;

pub struct JoinHashMap<K: HashKey, S: StateStore> {
/// Allocator
alloc: SharedStatsAlloc<Global>,
/// Store the join states.
inner: EvictableHashMap<K, HashValueType<S>, PrecomputedBuildHasher>,
// SAFETY: This is a self-referential data structure and the allocator is owned by the struct
// itself. Use the field is safe iff the struct is constructed with [`moveit`](https://crates.io/crates/moveit)'s way.
inner: JoinHashMapInner<K, S>,
/// Data types of the columns
data_types: Arc<[DataType]>,
/// Data types of the columns
Expand Down Expand Up @@ -148,17 +157,29 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
.iter()
.map(|idx| data_types[*idx].clone())
.collect_vec();

let alloc = StatsAlloc::new(Global).shared();
Self {
inner: EvictableHashMap::with_hasher(target_cap, PrecomputedBuildHasher),
inner: EvictableHashMap::with_hasher_in(
target_cap,
PrecomputedBuildHasher,
alloc.clone(),
),
data_types: data_types.into(),
join_key_data_types: join_key_data_types.into(),
pk_data_types: pk_data_types.into(),
keyspace,
current_epoch: 0,
alloc,
}
}

#[allow(dead_code)]
/// Report the bytes used by the join map.
// FIXME: Currently, only memory used in the hash map itself is counted.
pub fn bytes_in_use(&self) -> usize {
self.alloc.bytes_in_use()
}

pub fn update_epoch(&mut self, epoch: u64) {
self.current_epoch = epoch;
}
Expand Down Expand Up @@ -296,7 +317,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
}

impl<K: HashKey, S: StateStore> Deref for JoinHashMap<K, S> {
type Target = EvictableHashMap<K, HashValueType<S>, PrecomputedBuildHasher>;
type Target = JoinHashMapInner<K, S>;

fn deref(&self) -> &Self::Target {
&self.inner
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#![feature(proc_macro_hygiene)]
#![feature(stmt_expr_attributes)]
#![feature(unzip_option)]
#![feature(allocator_api)]

#[macro_use]
extern crate log;
Expand Down
11 changes: 11 additions & 0 deletions src/utils/stats_alloc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "stats_alloc"
version = "0.1.8"
edition = "2021"
description = "Allocator with statistics"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
workspace-hack = { version = "0.1", path = "../../workspace-hack" }

[dev-dependencies]
Loading

0 comments on commit 82aeacc

Please sign in to comment.