Skip to content

Commit

Permalink
[CLUSTER] Introduce NodeId. Make it cheap to copy. (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
mstyura authored May 24, 2023
1 parent 67c0516 commit da6dc4d
Showing 1 changed file with 33 additions and 12 deletions.
45 changes: 33 additions & 12 deletions src/network/cluster_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,31 @@ use serde::{
};
use smallvec::{smallvec, SmallVec};
use std::{
sync::Arc,
cmp::Ordering,
collections::VecDeque,
fmt::{self, Debug, Formatter},
iter::zip,
};

#[derive(Clone, PartialEq, Eq, Debug, PartialOrd, Ord)]
#[repr(transparent)]
struct NodeId(Arc<str>);

impl From<&str> for NodeId {
fn from(value: &str) -> Self {
Self(Arc::from(value))
}
}

impl AsRef<str> for NodeId {
fn as_ref(&self) -> &str {
self.0.as_ref()
}
}

struct Node {
pub id: String,
pub id: NodeId,
pub is_master: bool,
pub address: (String, u16),
pub connection: StandaloneConnection,
Expand All @@ -45,12 +62,12 @@ struct SlotRange {
pub slot_range: (u16, u16),
/// node ids of the shard that owns the slot range,
/// the first node id being the master node id
pub node_ids: SmallVec<[String; 6]>,
pub node_ids: SmallVec<[NodeId; 6]>,
}

#[derive(Debug)]
struct SubRequest {
pub node_id: String,
pub node_id: NodeId,
pub keys: SmallVec<[String; 10]>,
pub result: Option<Option<Result<RespBuf>>>,
}
Expand Down Expand Up @@ -891,18 +908,19 @@ impl ClusterConnection {
let Some(master_info) = shard_info.nodes.into_iter().find(|n| n.role == "master") else {
return Err(Error::Client("Cluster misconfiguration".to_owned()));
};
let master_id: NodeId = master_info.id.as_str().into();

let port = master_info.get_port()?;

let connection = StandaloneConnection::connect(&master_info.ip, port, config).await?;

slot_ranges.extend(shard_info.slots.iter().map(|s| SlotRange {
slot_range: *s,
node_ids: smallvec![master_info.id.clone()],
node_ids: smallvec![master_id.clone()],
}));

nodes.push(Node {
id: master_info.id,
id: master_id.clone(),
is_master: true,
address: (master_info.ip, port),
connection,
Expand Down Expand Up @@ -933,20 +951,22 @@ impl ClusterConnection {
for shard_info in shard_info_list {
for node_info in shard_info.nodes.into_iter().filter(|n| n.role == "replica") {
let port = node_info.get_port()?;
let node_id: NodeId = node_info.id.as_str().into();


let connection =
StandaloneConnection::connect(&node_info.ip, port, &self.config).await?;

for slot_range_info in &shard_info.slots {
if let Some(slot_range) = self.get_slot_range_by_slot_mut(slot_range_info.0) {
if slot_range.slot_range.1 == slot_range_info.1 {
slot_range.node_ids.push(node_info.id.clone())
slot_range.node_ids.push(node_id.clone())
}
}
}

self.nodes.push(Node {
id: node_info.id,
id: node_id,
is_master: false,
address: (node_info.ip.clone(), port),
connection,
Expand Down Expand Up @@ -986,7 +1006,7 @@ impl ClusterConnection {
.collect::<Vec<_>>();
node_ids.sort();
self.nodes
.retain(|node| node_ids.binary_search(&node.id.as_str()).is_ok());
.retain(|node| node_ids.binary_search_by(|n| (*n).cmp(node.id.as_ref())).is_ok());

// create slot_ranges from scratch
self.slot_ranges.clear();
Expand All @@ -1007,12 +1027,13 @@ impl ClusterConnection {
for slot_range_info in &shard_info.slots {
self.slot_ranges.push(SlotRange {
slot_range: *slot_range_info,
node_ids: shard_info.nodes.iter().map(|n| n.id.clone()).collect(),
node_ids: shard_info.nodes.iter().map(|n| n.id.as_str().into()).collect(),
});
}

for node_info in shard_info.nodes {
if let Some(node) = self.nodes.iter_mut().find(|n| n.id == node_info.id) {
let node_id: NodeId = node_info.id.as_str().into();
if let Some(node) = self.nodes.iter_mut().find(|n| n.id == node_id) {
// refresh is_master flag in case a failover happened
node.is_master = node_info.role == "master";
} else {
Expand All @@ -1023,7 +1044,7 @@ impl ClusterConnection {
StandaloneConnection::connect(&node_info.ip, port, &self.config).await?;

self.nodes.push(Node {
id: node_info.id,
id: node_id,
is_master: node_info.role == "master",
address: (node_info.ip, port),
connection,
Expand All @@ -1044,7 +1065,7 @@ impl ClusterConnection {
}

#[inline]
fn get_node_index_by_id(&self, id: &str) -> Option<usize> {
fn get_node_index_by_id(&self, id: &NodeId) -> Option<usize> {
self.nodes.binary_search_by_key(&id, |n| &n.id).ok()
}

Expand Down

0 comments on commit da6dc4d

Please sign in to comment.