Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): introduce allocator_api in JoinHashMap #3020

Merged
merged 6 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ members = [
"src/utils/memcomparable",
"src/utils/pgwire",
"src/utils/static-link",
"src/utils/stats_alloc",
"src/utils/value-encoding",
"src/workspace-hack",
]
Expand Down
2 changes: 1 addition & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ hyper = "0.14"
itertools = "0.10"
lazy_static = "1"
log = "0.4"
lru = "0.7"
lru = { git = "https://github.com/singularity-data/lru-rs.git", rev = "e0e9ddaf8e4a51c244a03676734437e68a336b30" }
madsim = "=0.2.0-alpha.3"
memcomparable = { path = "../utils/memcomparable" }
more-asserts = "0.2"
Expand Down
30 changes: 25 additions & 5 deletions src/common/src/collection/evictable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,38 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::alloc::{Allocator, Global};
use std::cmp::Eq;
use std::hash::{BuildHasher, Hash};
use std::ops::{Deref, DerefMut};

use lru::{DefaultHasher, LruCache};

/// A wrapper for [`LruCache`] which provides manual eviction.
pub struct EvictableHashMap<K, V, S = DefaultHasher> {
inner: LruCache<K, V, S>,
pub struct EvictableHashMap<K, V, S = DefaultHasher, A: Clone + Allocator = Global> {
inner: LruCache<K, V, S, A>,

/// Target capacity to keep when calling `evict_to_target_cap`.
target_cap: usize,
}

impl<K: Hash + Eq, V, A: Clone + Allocator> EvictableHashMap<K, V, DefaultHasher, A> {
/// Create a [`EvictableHashMap`] with the given target capacity and allocator.
pub fn new_in(target_cap: usize, alloc: A) -> Self {
Self::with_hasher_in(target_cap, DefaultHasher::new(), alloc)
}
}

impl<K: Hash + Eq, V, S: BuildHasher, A: Clone + Allocator> EvictableHashMap<K, V, S, A> {
/// Create a [`EvictableHashMap`] with the given target capacity, hasher and allocator.
pub fn with_hasher_in(target_cap: usize, hasher: S, alloc: A) -> Self {
Self {
inner: LruCache::unbounded_with_hasher_in(hasher, alloc),
target_cap,
}
}
}

impl<K: Hash + Eq, V> EvictableHashMap<K, V> {
/// Create a [`EvictableHashMap`] with the given target capacity.
pub fn new(target_cap: usize) -> EvictableHashMap<K, V> {
Expand All @@ -41,7 +59,9 @@ impl<K: Hash + Eq, V, S: BuildHasher> EvictableHashMap<K, V, S> {
target_cap,
}
}
}

impl<K: Hash + Eq, V, S: BuildHasher, A: Clone + Allocator> EvictableHashMap<K, V, S, A> {
pub fn target_cap(&self) -> usize {
self.target_cap
}
Expand Down Expand Up @@ -79,15 +99,15 @@ impl<K: Hash + Eq, V, S: BuildHasher> EvictableHashMap<K, V, S> {
}
}

impl<K, V, S> Deref for EvictableHashMap<K, V, S> {
type Target = LruCache<K, V, S>;
impl<K, V, S, A: Clone + Allocator> Deref for EvictableHashMap<K, V, S, A> {
type Target = LruCache<K, V, S, A>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<K, V, S> DerefMut for EvictableHashMap<K, V, S> {
impl<K, V, S, A: Clone + Allocator> DerefMut for EvictableHashMap<K, V, S, A> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
Expand Down
1 change: 1 addition & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#![feature(type_alias_impl_trait)]
#![feature(test)]
#![feature(trusted_len)]
#![feature(allocator_api)]

#[macro_use]
pub mod error;
Expand Down
2 changes: 2 additions & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ memcomparable = { path = "../utils/memcomparable" }
num-traits = "0.2"
parking_lot = "0.12"
paste = "1"
pin-project = "1"
prometheus = { version = "0.13", features = ["process"] }
prost = "0.10"
rdkafka = { version = "0.28", features = ["cmake-build"] }
Expand All @@ -43,6 +44,7 @@ serde-value = "0.7"
serde_json = "1"
smallvec = "1"
static_assertions = "1"
stats_alloc = { path = "../utils/stats_alloc" }
thiserror = "1"
tokio = { version = "=0.2.0-alpha.3", package = "madsim-tokio", features = [
"rt",
Expand Down
29 changes: 25 additions & 4 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod join_entry_state;
use std::alloc::Global;
use std::ops::{Deref, DerefMut, Index};
use std::sync::Arc;

Expand All @@ -25,6 +26,7 @@ use risingwave_common::error::{ErrorCode, Result as RwResult};
use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
use risingwave_common::types::{DataType, Datum};
use risingwave_storage::{Keyspace, StateStore};
use stats_alloc::{SharedStatsAlloc, StatsAlloc};

/// This is a row with a match degree
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -116,9 +118,16 @@ type PkType = Row;
pub type StateValueType = JoinRow;
pub type HashValueType<S> = JoinEntryState<S>;

type JoinHashMapInner<K, S> =
EvictableHashMap<K, HashValueType<S>, PrecomputedBuildHasher, SharedStatsAlloc<Global>>;

pub struct JoinHashMap<K: HashKey, S: StateStore> {
/// Allocator
alloc: SharedStatsAlloc<Global>,
/// Store the join states.
inner: EvictableHashMap<K, HashValueType<S>, PrecomputedBuildHasher>,
// SAFETY: This is a self-referential data structure and the allocator is owned by the struct
// itself. Use the field is safe iff the struct is constructed with [`moveit`](https://crates.io/crates/moveit)'s way.
inner: JoinHashMapInner<K, S>,
/// Data types of the columns
data_types: Arc<[DataType]>,
/// Data types of the columns
Expand Down Expand Up @@ -148,17 +157,29 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
.iter()
.map(|idx| data_types[*idx].clone())
.collect_vec();

let alloc = StatsAlloc::new(Global).shared();
Self {
inner: EvictableHashMap::with_hasher(target_cap, PrecomputedBuildHasher),
inner: EvictableHashMap::with_hasher_in(
target_cap,
PrecomputedBuildHasher,
alloc.clone(),
),
data_types: data_types.into(),
join_key_data_types: join_key_data_types.into(),
pk_data_types: pk_data_types.into(),
keyspace,
current_epoch: 0,
alloc,
}
}

#[allow(dead_code)]
/// Report the bytes used by the join map.
// FIXME: Currently, only memory used in the hash map itself is counted.
pub fn bytes_in_use(&self) -> usize {
self.alloc.bytes_in_use()
}

pub fn update_epoch(&mut self, epoch: u64) {
self.current_epoch = epoch;
}
Expand Down Expand Up @@ -296,7 +317,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
}

impl<K: HashKey, S: StateStore> Deref for JoinHashMap<K, S> {
type Target = EvictableHashMap<K, HashValueType<S>, PrecomputedBuildHasher>;
type Target = JoinHashMapInner<K, S>;

fn deref(&self) -> &Self::Target {
&self.inner
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#![feature(proc_macro_hygiene)]
#![feature(stmt_expr_attributes)]
#![feature(unzip_option)]
#![feature(allocator_api)]

#[macro_use]
extern crate log;
Expand Down
11 changes: 11 additions & 0 deletions src/utils/stats_alloc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "stats_alloc"
version = "0.1.8"
edition = "2021"
description = "Allocator with statistics"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
workspace-hack = { version = "0.1", path = "../../workspace-hack" }

[dev-dependencies]
Loading