Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: submit node's cpu cores number to metasrv in heartbeat #5571

Merged
merged 2 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fc09a5696608d2a0aa718cc835d5cb9c4e8e9387" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a25adc8a01340231121646d8f0a29d0e92f45461" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct Instance {
}

impl Instance {
fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
pub fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
Self {
instance,
_guard: guard,
Expand Down
58 changes: 56 additions & 2 deletions src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::{DefaultHasher, Hash, Hasher};
use std::str::FromStr;

use api::v1::meta::HeartbeatRequest;
use common_error::ext::ErrorExt;
use lazy_static::lazy_static;
use regex::Regex;
Expand Down Expand Up @@ -58,7 +60,7 @@ pub trait ClusterInfo {
///
/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have
/// a `cluster_id`, it serves multiple clusters.
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
pub cluster_id: ClusterId,
Expand All @@ -69,6 +71,28 @@ pub struct NodeInfoKey {
}

impl NodeInfoKey {
/// Try to create a `NodeInfoKey` from a "good" heartbeat request. "good" as in every needed
/// piece of information is provided and valid.
pub fn new(request: &HeartbeatRequest) -> Option<Self> {
let HeartbeatRequest { header, peer, .. } = request;
let header = header.as_ref()?;
let peer = peer.as_ref()?;

let role = header.role.try_into().ok()?;
let node_id = match role {
// Because the Frontend is stateless, it's too easy to neglect choosing a unique id
// for it when setting up a cluster. So we calculate its id from its address.
Role::Frontend => calculate_node_id(&peer.addr),
_ => peer.id,
};

Some(NodeInfoKey {
cluster_id: header.cluster_id,
role,
node_id,
})
}

pub fn key_prefix_with_cluster_id(cluster_id: u64) -> String {
format!("{}-{}-", CLUSTER_NODE_INFO_PREFIX, cluster_id)
}
Expand All @@ -83,6 +107,13 @@ impl NodeInfoKey {
}
}

/// Calculate (by using the DefaultHasher) the node's id from its address.
fn calculate_node_id(addr: &str) -> u64 {
let mut hasher = DefaultHasher::new();
addr.hash(&mut hasher);
hasher.finish()
}

/// The information of a node in the cluster.
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeInfo {
Expand All @@ -100,7 +131,7 @@ pub struct NodeInfo {
pub start_time_ms: u64,
}

#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub enum Role {
Datanode,
Frontend,
Expand Down Expand Up @@ -271,6 +302,7 @@ impl TryFrom<i32> for Role {
mod tests {
use std::assert_matches::assert_matches;

use super::*;
use crate::cluster::Role::{Datanode, Frontend};
use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
use crate::peer::Peer;
Expand Down Expand Up @@ -338,4 +370,26 @@ mod tests {
let prefix = NodeInfoKey::key_prefix_with_role(2, Frontend);
assert_eq!(prefix, "__meta_cluster_node_info-2-1-");
}

#[test]
fn test_calculate_node_id_from_addr() {
// Test empty string
assert_eq!(calculate_node_id(""), calculate_node_id(""));

// Test same addresses return same ids
let addr1 = "127.0.0.1:8080";
let id1 = calculate_node_id(addr1);
let id2 = calculate_node_id(addr1);
assert_eq!(id1, id2);

// Test different addresses return different ids
let addr2 = "127.0.0.1:8081";
let id3 = calculate_node_id(addr2);
assert_ne!(id1, id3);

// Test long address
let long_addr = "very.long.domain.name.example.com:9999";
let id4 = calculate_node_id(long_addr);
assert!(id4 > 0);
}
}
1 change: 1 addition & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ log-store.workspace = true
meta-client.workspace = true
metric-engine.workspace = true
mito2.workspace = true
num_cpus.workspace = true
object-store.workspace = true
prometheus.workspace = true
prost.workspace = true
Expand Down
28 changes: 16 additions & 12 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,20 @@ impl HeartbeatTask {
common_runtime::spawn_hb(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);

let build_info = common_version::build_info();
let heartbeat_request = HeartbeatRequest {
peer: self_peer,
node_epoch,
info: Some(NodeInfo {
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
start_time_ms: node_epoch,
cpus: num_cpus::get() as u32,
}),
..Default::default()
};

loop {
if !running.load(Ordering::Relaxed) {
info!("shutdown heartbeat task");
Expand All @@ -235,9 +249,8 @@ impl HeartbeatTask {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let req = HeartbeatRequest {
peer: self_peer.clone(),
mailbox_message: Some(message),
..Default::default()
..heartbeat_request.clone()
};
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
Some(req)
Expand All @@ -253,22 +266,13 @@ impl HeartbeatTask {
}
}
_ = &mut sleep => {
let build_info = common_version::build_info();
let region_stats = Self::load_region_stats(&region_server_clone);
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;
let req = HeartbeatRequest {
peer: self_peer.clone(),
region_stats,
duration_since_epoch,
node_epoch,
info: Some(NodeInfo {
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
// The start timestamp is the same as node_epoch currently.
start_time_ms: node_epoch,
}),
..Default::default()
..heartbeat_request.clone()
};
sleep.as_mut().reset(now + Duration::from_millis(interval));
Some(req)
Expand Down
1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ lazy_static.workspace = true
meta-client.workspace = true
nom = "7.1.3"
num-traits = "0.2"
num_cpus.workspace = true
operator.workspace = true
partition.workspace = true
prometheus.workspace = true
Expand Down
20 changes: 12 additions & 8 deletions src/flow/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,9 @@ impl HeartbeatTask {
}
}

fn create_heartbeat_request(
fn new_heartbeat_request(
heartbeat_request: &HeartbeatRequest,
message: Option<OutgoingMessage>,
peer: Option<Peer>,
start_time_ms: u64,
latest_report: &Option<FlowStat>,
) -> Option<HeartbeatRequest> {
let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
Expand All @@ -161,10 +160,8 @@ impl HeartbeatTask {

Some(HeartbeatRequest {
mailbox_message,
peer,
info: Self::build_node_info(start_time_ms),
flow_stat,
..Default::default()
..heartbeat_request.clone()
})
}

Expand All @@ -174,6 +171,7 @@ impl HeartbeatTask {
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
start_time_ms,
cpus: num_cpus::get() as u32,
})
}

Expand All @@ -198,18 +196,24 @@ impl HeartbeatTask {
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut latest_report = None;

let heartbeat_request = HeartbeatRequest {
peer: self_peer,
info: Self::build_node_info(start_time_ms),
..Default::default()
};

loop {
let req = tokio::select! {
message = outgoing_rx.recv() => {
if let Some(message) = message {
Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms, &latest_report)
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
} else {
// Receives None that means Sender was dropped, we need to break the current loop
break
}
}
_ = interval.tick() => {
Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms, &latest_report)
Self::new_heartbeat_request(&heartbeat_request, None, &latest_report)
}
};

Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ lazy_static.workspace = true
log-query.workspace = true
log-store.workspace = true
meta-client.workspace = true
num_cpus.workspace = true
opentelemetry-proto.workspace = true
operator.workspace = true
partition.workspace = true
Expand Down
20 changes: 12 additions & 8 deletions src/frontend/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ impl HeartbeatTask {
});
}

fn create_heartbeat_request(
fn new_heartbeat_request(
heartbeat_request: &HeartbeatRequest,
message: Option<OutgoingMessage>,
peer: Option<Peer>,
start_time_ms: u64,
) -> Option<HeartbeatRequest> {
let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
Some(Ok(message)) => Some(message),
Expand All @@ -134,9 +133,7 @@ impl HeartbeatTask {

Some(HeartbeatRequest {
mailbox_message,
peer,
info: Self::build_node_info(start_time_ms),
..Default::default()
..heartbeat_request.clone()
})
}

Expand All @@ -147,6 +144,7 @@ impl HeartbeatTask {
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
start_time_ms,
cpus: num_cpus::get() as u32,
})
}

Expand All @@ -167,19 +165,25 @@ impl HeartbeatTask {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);

let heartbeat_request = HeartbeatRequest {
peer: self_peer,
info: Self::build_node_info(start_time_ms),
..Default::default()
};

loop {
let req = tokio::select! {
message = outgoing_rx.recv() => {
if let Some(message) = message {
Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms)
Self::new_heartbeat_request(&heartbeat_request, Some(message))
} else {
// Receives None that means Sender was dropped, we need to break the current loop
break
}
}
_ = &mut sleep => {
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms)
Self::new_heartbeat_request(&heartbeat_request, None)
}
};

Expand Down
Loading
Loading