Skip to content

Commit

Permalink
Use From trait for protocol messages
Browse files Browse the repository at this point in the history
  • Loading branch information
pool2win committed Oct 7, 2024
1 parent dbc0f76 commit 08ed8a8
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 39 deletions.
24 changes: 13 additions & 11 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,18 @@ impl Node {
}
let node_id = self.get_node_id();

let handshake_service = protocol::Protocol::new(node_id.clone());
let reliable_sender_service =
ReliableSend::new(handshake_service, reliable_sender_handle);
let timeout_layer = tower::timeout::TimeoutLayer::new(
tokio::time::Duration::from_millis(self.delivery_timeout),
);
let _ = timeout_layer
.layer(reliable_sender_service)
.oneshot(HandshakeMessage::default_as_message())
.await;
// let handshake_service = protocol::Protocol::new(node_id.clone());
// let reliable_sender_service =
// ReliableSend::new(handshake_service, reliable_sender_handle.clone());
// let timeout_layer = tower::timeout::TimeoutLayer::new(
// tokio::time::Duration::from_millis(self.delivery_timeout),
// );
// let _ = timeout_layer
// .layer(reliable_sender_service)
// .oneshot(HandshakeMessage::default().into())
// .await;

// log::info!("Handshake finished");

let ping_service = protocol::Protocol::new(node_id.clone());
let echo_broadcast_service = EchoBroadcast::new(
Expand All @@ -205,7 +207,7 @@ impl Node {
self.state.clone(),
);
let _ = echo_broadcast_service
.oneshot(PingMessage::default_as_message())
.oneshot(PingMessage::default().into())
.await;
}
}
Expand Down
37 changes: 36 additions & 1 deletion src/node/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ extern crate serde;

mod handshake;
mod heartbeat;
mod membership;
pub(crate) mod message_id_generator;
mod ping;

use futures::{Future, FutureExt};
pub use handshake::{Handshake, HandshakeMessage};
pub use heartbeat::{Heartbeat, HeartbeatMessage};
pub use membership::{Membership, MembershipMessage};
pub use ping::{Ping, PingMessage};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
Expand All @@ -38,6 +40,9 @@ pub enum Message {
Handshake(HandshakeMessage),
Heartbeat(HeartbeatMessage),
Ping(PingMessage),
Membership(MembershipMessage),
BroadcastPing(PingMessage),
EchoPing(PingMessage),
}

/// Methods for all protocol messages
Expand All @@ -48,10 +53,37 @@ impl Message {
Message::Handshake(m) => m.sender_id.clone(),
Message::Heartbeat(m) => m.sender_id.clone(),
Message::Ping(m) => m.sender_id.clone(),
Message::Membership(m) => m.sender_id.clone(),
Message::BroadcastPing(m) => m.sender_id.clone(),
Message::EchoPing(m) => m.sender_id.clone(),
}
}
}

impl From<HeartbeatMessage> for Message {
fn from(value: HeartbeatMessage) -> Self {
Message::Heartbeat(value)
}
}

impl From<HandshakeMessage> for Message {
fn from(value: HandshakeMessage) -> Self {
Message::Handshake(value)
}
}

impl From<PingMessage> for Message {
fn from(value: PingMessage) -> Self {
Message::Ping(value)
}
}

impl From<MembershipMessage> for Message {
fn from(value: MembershipMessage) -> Self {
Message::Membership(value)
}
}

#[derive(Debug, Clone)]
pub struct Protocol {
node_id: String,
Expand Down Expand Up @@ -79,6 +111,9 @@ impl Service<Message> for Protocol {
Message::Ping(_m) => BoxService::new(Ping::new(sender_id)),
Message::Handshake(_m) => BoxService::new(Handshake::new(sender_id)),
Message::Heartbeat(_m) => BoxService::new(Heartbeat::new(sender_id)),
Message::Membership(_m) => BoxService::new(Membership::new(sender_id)),
Message::BroadcastPing(_m) => BoxService::new(Ping::new(sender_id)),
Message::EchoPing(_m) => BoxService::new(Ping::new(sender_id)),
};
svc.oneshot(msg).await
}
Expand All @@ -97,7 +132,7 @@ mod protocol_tests {
#[tokio::test]
async fn it_should_create_protocol() {
let p = Protocol::new("local".to_string());
let m = p.oneshot(PingMessage::default_as_message()).await;
let m = p.oneshot(PingMessage::default().into()).await;
assert!(m.unwrap().is_some());
}
}
10 changes: 7 additions & 3 deletions src/node/protocol/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ pub struct HandshakeMessage {
}

impl HandshakeMessage {
pub fn default_as_message() -> Message {
Message::Handshake(HandshakeMessage::default())
pub fn new(sender_id: String, message: String, version: String) -> Self {
Self {
sender_id,
message,
version,
}
}
}

Expand Down Expand Up @@ -102,7 +106,7 @@ mod handshake_tests {
.ready()
.await
.unwrap()
.call(HandshakeMessage::default_as_message())
.call(HandshakeMessage::default().into())
.await
.unwrap();
assert!(res.is_some());
Expand Down
15 changes: 4 additions & 11 deletions src/node/protocol/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ impl Default for HeartbeatMessage {
}

impl HeartbeatMessage {
pub fn default_as_message() -> Message {
Message::Heartbeat(HeartbeatMessage::default())
}

pub fn new(sender_id: String, time: SystemTime) -> Message {
Message::Heartbeat(HeartbeatMessage { sender_id, time })
pub fn new(sender_id: String, time: SystemTime) -> Self {
HeartbeatMessage { sender_id, time }
}
}

Expand Down Expand Up @@ -108,10 +104,7 @@ mod heartbeat_tests {
.ready()
.await
.unwrap()
.call(HeartbeatMessage::new(
"local".to_string(),
SystemTime::now(),
))
.call(HeartbeatMessage::new("local".to_string(), SystemTime::now()).into())
.await
.unwrap();
assert!(res.is_none());
Expand All @@ -126,7 +119,7 @@ mod heartbeat_tests {
.ready()
.await
.unwrap()
.call(HeartbeatMessage::default_as_message())
.call(HeartbeatMessage::default().into())
.await
.unwrap()
{
Expand Down
26 changes: 13 additions & 13 deletions src/node/protocol/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@ pub struct PingMessage {
}

impl PingMessage {
pub fn default_as_message() -> Message {
Message::Ping(PingMessage::default())
}

pub fn new(sender_id: String, message: String) -> Message {
Message::Ping(PingMessage { sender_id, message })
pub fn new(sender_id: String, message: String) -> Self {
PingMessage { sender_id, message }
}
}

Expand Down Expand Up @@ -68,8 +64,12 @@ impl Service<Message> for Ping {
async move {
match msg {
Message::Ping(m) => match m.message.as_str() {
"" => Ok(Some(PingMessage::new(local_sender_id, "ping".to_string()))),
"ping" => Ok(Some(PingMessage::new(local_sender_id, "pong".to_string()))),
"" => Ok(Some(
PingMessage::new(local_sender_id, "ping".to_string()).into(),
)),
"ping" => Ok(Some(
PingMessage::new(local_sender_id, "pong".to_string()).into(),
)),
_ => Ok(None),
},
_ => Ok(None),
Expand All @@ -93,13 +93,13 @@ mod ping_tests {
.ready()
.await
.unwrap()
.call(PingMessage::default_as_message())
.call(PingMessage::default().into())
.await
.unwrap();
assert!(res.is_some());
assert_eq!(
res,
Some(PingMessage::new("local".to_string(), "ping".to_string()))
Some(PingMessage::new("local".to_string(), "ping".to_string()).into())
);
}

Expand All @@ -110,13 +110,13 @@ mod ping_tests {
.ready()
.await
.unwrap()
.call(PingMessage::new("local".to_string(), "ping".to_string()))
.call(PingMessage::new("local".to_string(), "ping".to_string()).into())
.await
.unwrap();
assert!(res.is_some());
assert_eq!(
res,
Some(PingMessage::new("local".to_string(), "pong".to_string()))
Some(PingMessage::new("local".to_string(), "pong".to_string()).into())
);
}

Expand All @@ -127,7 +127,7 @@ mod ping_tests {
.ready()
.await
.unwrap()
.call(PingMessage::new("local".to_string(), "pong".to_string()))
.call(PingMessage::new("local".to_string(), "pong".to_string()).into())
.await
.unwrap();
assert!(res.is_none());
Expand Down

0 comments on commit 08ed8a8

Please sign in to comment.