Skip to content

Commit

Permalink
feat: submit node's cpu cores number to metasrv in heartbeat (#5571)
Browse files Browse the repository at this point in the history
* feat: submit node's cpu cores number to metasrv in heartbeat

* update greptime-proto dep
  • Loading branch information
MichaelScofield authored Feb 20, 2025
1 parent e878808 commit f6f617d
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 102 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fc09a5696608d2a0aa718cc835d5cb9c4e8e9387" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a25adc8a01340231121646d8f0a29d0e92f45461" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct Instance {
}

impl Instance {
fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
pub fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
Self {
instance,
_guard: guard,
Expand Down
58 changes: 56 additions & 2 deletions src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::{DefaultHasher, Hash, Hasher};
use std::str::FromStr;

use api::v1::meta::HeartbeatRequest;
use common_error::ext::ErrorExt;
use lazy_static::lazy_static;
use regex::Regex;
Expand Down Expand Up @@ -58,7 +60,7 @@ pub trait ClusterInfo {
///
/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have
/// a `cluster_id`, it serves multiple clusters.
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
pub cluster_id: ClusterId,
Expand All @@ -69,6 +71,28 @@ pub struct NodeInfoKey {
}

impl NodeInfoKey {
/// Try to create a `NodeInfoKey` from a "good" heartbeat request. "good" as in every needed
/// piece of information is provided and valid.
pub fn new(request: &HeartbeatRequest) -> Option<Self> {
let HeartbeatRequest { header, peer, .. } = request;
let header = header.as_ref()?;
let peer = peer.as_ref()?;

let role = header.role.try_into().ok()?;
let node_id = match role {
// Because the Frontend is stateless, it's too easy to neglect choosing a unique id
// for it when setting up a cluster. So we calculate its id from its address.
Role::Frontend => calculate_node_id(&peer.addr),
_ => peer.id,
};

Some(NodeInfoKey {
cluster_id: header.cluster_id,
role,
node_id,
})
}

pub fn key_prefix_with_cluster_id(cluster_id: u64) -> String {
format!("{}-{}-", CLUSTER_NODE_INFO_PREFIX, cluster_id)
}
Expand All @@ -83,6 +107,13 @@ impl NodeInfoKey {
}
}

/// Calculate (by using the DefaultHasher) the node's id from its address.
fn calculate_node_id(addr: &str) -> u64 {
let mut hasher = DefaultHasher::new();
addr.hash(&mut hasher);
hasher.finish()
}

/// The information of a node in the cluster.
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeInfo {
Expand All @@ -100,7 +131,7 @@ pub struct NodeInfo {
pub start_time_ms: u64,
}

#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub enum Role {
Datanode,
Frontend,
Expand Down Expand Up @@ -271,6 +302,7 @@ impl TryFrom<i32> for Role {
mod tests {
use std::assert_matches::assert_matches;

use super::*;
use crate::cluster::Role::{Datanode, Frontend};
use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
use crate::peer::Peer;
Expand Down Expand Up @@ -338,4 +370,26 @@ mod tests {
let prefix = NodeInfoKey::key_prefix_with_role(2, Frontend);
assert_eq!(prefix, "__meta_cluster_node_info-2-1-");
}

#[test]
fn test_calculate_node_id_from_addr() {
// Test empty string
assert_eq!(calculate_node_id(""), calculate_node_id(""));

// Test same addresses return same ids
let addr1 = "127.0.0.1:8080";
let id1 = calculate_node_id(addr1);
let id2 = calculate_node_id(addr1);
assert_eq!(id1, id2);

// Test different addresses return different ids
let addr2 = "127.0.0.1:8081";
let id3 = calculate_node_id(addr2);
assert_ne!(id1, id3);

// Test long address
let long_addr = "very.long.domain.name.example.com:9999";
let id4 = calculate_node_id(long_addr);
assert!(id4 > 0);
}
}
1 change: 1 addition & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ log-store.workspace = true
meta-client.workspace = true
metric-engine.workspace = true
mito2.workspace = true
num_cpus.workspace = true
object-store.workspace = true
prometheus.workspace = true
prost.workspace = true
Expand Down
28 changes: 16 additions & 12 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,20 @@ impl HeartbeatTask {
common_runtime::spawn_hb(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);

let build_info = common_version::build_info();
let heartbeat_request = HeartbeatRequest {
peer: self_peer,
node_epoch,
info: Some(NodeInfo {
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
start_time_ms: node_epoch,
cpus: num_cpus::get() as u32,
}),
..Default::default()
};

loop {
if !running.load(Ordering::Relaxed) {
info!("shutdown heartbeat task");
Expand All @@ -235,9 +249,8 @@ impl HeartbeatTask {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let req = HeartbeatRequest {
peer: self_peer.clone(),
mailbox_message: Some(message),
..Default::default()
..heartbeat_request.clone()
};
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
Some(req)
Expand All @@ -253,22 +266,13 @@ impl HeartbeatTask {
}
}
_ = &mut sleep => {
let build_info = common_version::build_info();
let region_stats = Self::load_region_stats(&region_server_clone);
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;
let req = HeartbeatRequest {
peer: self_peer.clone(),
region_stats,
duration_since_epoch,
node_epoch,
info: Some(NodeInfo {
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
// The start timestamp is the same as node_epoch currently.
start_time_ms: node_epoch,
}),
..Default::default()
..heartbeat_request.clone()
};
sleep.as_mut().reset(now + Duration::from_millis(interval));
Some(req)
Expand Down
1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ lazy_static.workspace = true
meta-client.workspace = true
nom = "7.1.3"
num-traits = "0.2"
num_cpus.workspace = true
operator.workspace = true
partition.workspace = true
prometheus.workspace = true
Expand Down
20 changes: 12 additions & 8 deletions src/flow/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,9 @@ impl HeartbeatTask {
}
}

fn create_heartbeat_request(
fn new_heartbeat_request(
heartbeat_request: &HeartbeatRequest,
message: Option<OutgoingMessage>,
peer: Option<Peer>,
start_time_ms: u64,
latest_report: &Option<FlowStat>,
) -> Option<HeartbeatRequest> {
let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
Expand All @@ -161,10 +160,8 @@ impl HeartbeatTask {

Some(HeartbeatRequest {
mailbox_message,
peer,
info: Self::build_node_info(start_time_ms),
flow_stat,
..Default::default()
..heartbeat_request.clone()
})
}

Expand All @@ -174,6 +171,7 @@ impl HeartbeatTask {
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
start_time_ms,
cpus: num_cpus::get() as u32,
})
}

Expand All @@ -198,18 +196,24 @@ impl HeartbeatTask {
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut latest_report = None;

let heartbeat_request = HeartbeatRequest {
peer: self_peer,
info: Self::build_node_info(start_time_ms),
..Default::default()
};

loop {
let req = tokio::select! {
message = outgoing_rx.recv() => {
if let Some(message) = message {
Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms, &latest_report)
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
} else {
// Receives None that means Sender was dropped, we need to break the current loop
break
}
}
_ = interval.tick() => {
Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms, &latest_report)
Self::new_heartbeat_request(&heartbeat_request, None, &latest_report)
}
};

Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ lazy_static.workspace = true
log-query.workspace = true
log-store.workspace = true
meta-client.workspace = true
num_cpus.workspace = true
opentelemetry-proto.workspace = true
operator.workspace = true
partition.workspace = true
Expand Down
20 changes: 12 additions & 8 deletions src/frontend/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ impl HeartbeatTask {
});
}

fn create_heartbeat_request(
fn new_heartbeat_request(
heartbeat_request: &HeartbeatRequest,
message: Option<OutgoingMessage>,
peer: Option<Peer>,
start_time_ms: u64,
) -> Option<HeartbeatRequest> {
let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
Some(Ok(message)) => Some(message),
Expand All @@ -134,9 +133,7 @@ impl HeartbeatTask {

Some(HeartbeatRequest {
mailbox_message,
peer,
info: Self::build_node_info(start_time_ms),
..Default::default()
..heartbeat_request.clone()
})
}

Expand All @@ -147,6 +144,7 @@ impl HeartbeatTask {
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
start_time_ms,
cpus: num_cpus::get() as u32,
})
}

Expand All @@ -167,19 +165,25 @@ impl HeartbeatTask {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);

let heartbeat_request = HeartbeatRequest {
peer: self_peer,
info: Self::build_node_info(start_time_ms),
..Default::default()
};

loop {
let req = tokio::select! {
message = outgoing_rx.recv() => {
if let Some(message) = message {
Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms)
Self::new_heartbeat_request(&heartbeat_request, Some(message))
} else {
// Receives None that means Sender was dropped, we need to break the current loop
break
}
}
_ = &mut sleep => {
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms)
Self::new_heartbeat_request(&heartbeat_request, None)
}
};

Expand Down
Loading

0 comments on commit f6f617d

Please sign in to comment.