Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamline use of default protocol messages #26

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,18 @@ impl Node {
}
let node_id = self.get_node_id();

let handshake_service = protocol::Protocol::new(node_id.clone());
let reliable_sender_service =
ReliableSend::new(handshake_service, reliable_sender_handle);
let timeout_layer = tower::timeout::TimeoutLayer::new(
tokio::time::Duration::from_millis(self.delivery_timeout),
);
let _ = timeout_layer
.layer(reliable_sender_service)
.oneshot(HandshakeMessage::default_as_message())
.await;
// let handshake_service = protocol::Protocol::new(node_id.clone());
// let reliable_sender_service =
// ReliableSend::new(handshake_service, reliable_sender_handle.clone());
// let timeout_layer = tower::timeout::TimeoutLayer::new(
// tokio::time::Duration::from_millis(self.delivery_timeout),
// );
// let _ = timeout_layer
// .layer(reliable_sender_service)
// .oneshot(HandshakeMessage::default().into())
// .await;

// log::info!("Handshake finished");

let ping_service = protocol::Protocol::new(node_id.clone());
let echo_broadcast_service = EchoBroadcast::new(
Expand All @@ -205,7 +207,7 @@ impl Node {
self.state.clone(),
);
let _ = echo_broadcast_service
.oneshot(PingMessage::default_as_message())
.oneshot(PingMessage::default().into())
.await;
}
}
Expand Down
37 changes: 36 additions & 1 deletion src/node/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ extern crate serde;

mod handshake;
mod heartbeat;
mod membership;
pub(crate) mod message_id_generator;
mod ping;

use futures::{Future, FutureExt};
pub use handshake::{Handshake, HandshakeMessage};
pub use heartbeat::{Heartbeat, HeartbeatMessage};
pub use membership::{Membership, MembershipMessage};
pub use ping::{Ping, PingMessage};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
Expand All @@ -38,6 +40,9 @@ pub enum Message {
Handshake(HandshakeMessage),
Heartbeat(HeartbeatMessage),
Ping(PingMessage),
Membership(MembershipMessage),
BroadcastPing(PingMessage),
EchoPing(PingMessage),
}

/// Methods for all protocol messages
Expand All @@ -48,10 +53,37 @@ impl Message {
Message::Handshake(m) => m.sender_id.clone(),
Message::Heartbeat(m) => m.sender_id.clone(),
Message::Ping(m) => m.sender_id.clone(),
Message::Membership(m) => m.sender_id.clone(),
Message::BroadcastPing(m) => m.sender_id.clone(),
Message::EchoPing(m) => m.sender_id.clone(),
}
}
}

impl From<HeartbeatMessage> for Message {
fn from(value: HeartbeatMessage) -> Self {
Message::Heartbeat(value)
}
}

impl From<HandshakeMessage> for Message {
fn from(value: HandshakeMessage) -> Self {
Message::Handshake(value)
}
}

impl From<PingMessage> for Message {
fn from(value: PingMessage) -> Self {
Message::Ping(value)
}
}

impl From<MembershipMessage> for Message {
fn from(value: MembershipMessage) -> Self {
Message::Membership(value)
}
}

#[derive(Debug, Clone)]
pub struct Protocol {
node_id: String,
Expand Down Expand Up @@ -79,6 +111,9 @@ impl Service<Message> for Protocol {
Message::Ping(_m) => BoxService::new(Ping::new(sender_id)),
Message::Handshake(_m) => BoxService::new(Handshake::new(sender_id)),
Message::Heartbeat(_m) => BoxService::new(Heartbeat::new(sender_id)),
Message::Membership(_m) => BoxService::new(Membership::new(sender_id)),
Message::BroadcastPing(_m) => BoxService::new(Ping::new(sender_id)),
Message::EchoPing(_m) => BoxService::new(Ping::new(sender_id)),
};
svc.oneshot(msg).await
}
Expand All @@ -97,7 +132,7 @@ mod protocol_tests {
#[tokio::test]
async fn it_should_create_protocol() {
let p = Protocol::new("local".to_string());
let m = p.oneshot(PingMessage::default_as_message()).await;
let m = p.oneshot(PingMessage::default().into()).await;
assert!(m.unwrap().is_some());
}
}
10 changes: 7 additions & 3 deletions src/node/protocol/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ pub struct HandshakeMessage {
}

impl HandshakeMessage {
pub fn default_as_message() -> Message {
Message::Handshake(HandshakeMessage::default())
pub fn new(sender_id: String, message: String, version: String) -> Self {
Self {
sender_id,
message,
version,
}
}
}

Expand Down Expand Up @@ -102,7 +106,7 @@ mod handshake_tests {
.ready()
.await
.unwrap()
.call(HandshakeMessage::default_as_message())
.call(HandshakeMessage::default().into())
.await
.unwrap();
assert!(res.is_some());
Expand Down
15 changes: 4 additions & 11 deletions src/node/protocol/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ impl Default for HeartbeatMessage {
}

impl HeartbeatMessage {
pub fn default_as_message() -> Message {
Message::Heartbeat(HeartbeatMessage::default())
}

pub fn new(sender_id: String, time: SystemTime) -> Message {
Message::Heartbeat(HeartbeatMessage { sender_id, time })
pub fn new(sender_id: String, time: SystemTime) -> Self {
HeartbeatMessage { sender_id, time }
}
}

Expand Down Expand Up @@ -108,10 +104,7 @@ mod heartbeat_tests {
.ready()
.await
.unwrap()
.call(HeartbeatMessage::new(
"local".to_string(),
SystemTime::now(),
))
.call(HeartbeatMessage::new("local".to_string(), SystemTime::now()).into())
.await
.unwrap();
assert!(res.is_none());
Expand All @@ -126,7 +119,7 @@ mod heartbeat_tests {
.ready()
.await
.unwrap()
.call(HeartbeatMessage::default_as_message())
.call(HeartbeatMessage::default().into())
.await
.unwrap()
{
Expand Down
110 changes: 110 additions & 0 deletions src/node/protocol/membership.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2024 Kulpreet Singh

// This file is part of Frost-Federation

// Frost-Federation is free software: you can redistribute it and/or
// modify it under the terms of the GNU General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.

// Frost-Federation is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Frost-Federation. If not, see
// <https://www.gnu.org/licenses/>.

use crate::node::protocol::Message;
use futures::{Future, FutureExt};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{BoxError, Service};

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Default)]
pub struct MembershipMessage {
pub sender_id: String,
pub message: Vec<String>,
}

impl MembershipMessage {
pub fn new(sender_id: String, members: Vec<String>) -> Self {
MembershipMessage {
sender_id,
message: members,
}
}
}

#[derive(Debug, Clone, Default)]
pub struct Membership {
sender_id: String,
}

impl Membership {
pub fn new(node_id: String) -> Self {
Membership { sender_id: node_id }
}
}

/// Service for handling Membership protocol.
impl Service<Message> for Membership {
type Response = Option<Message>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<Option<Message>, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

/// Membership doesn't respond with anything. It is pushed by a
/// node when anyone connects to it.
fn call(&mut self, _msg: Message) -> Self::Future {
async move { Ok(None) }.boxed()
}
}

#[cfg(test)]
mod membership_tests {

use super::Membership;
use crate::node::protocol::MembershipMessage;
use tower::{Service, ServiceExt};

#[tokio::test]
async fn it_should_create_membership_as_service_and_respond_to_none_with_membership() {
let mut p = Membership::new("local".to_string());
let res = p
.ready()
.await
.unwrap()
.call(MembershipMessage::default().into())
.await
.unwrap();
assert!(res.is_none());
// assert_eq!(
// res,
// Some(MembershipMessage::new("local".to_string(), vec!["a".to_string()]).into())
// );
}

#[tokio::test]
async fn it_should_create_membership_as_service_and_respond_to_membership_with_none() {
let mut p = Membership::new("local".to_string());
let res = p
.ready()
.await
.unwrap()
.call(MembershipMessage::new("local".to_string(), vec!["a".to_string()]).into())
.await
.unwrap();
assert!(res.is_none());
}

#[test]
fn it_should_create_default_membership_message() {
assert_eq!(MembershipMessage::default().sender_id, "".to_string())
}
}
26 changes: 13 additions & 13 deletions src/node/protocol/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@ pub struct PingMessage {
}

impl PingMessage {
pub fn default_as_message() -> Message {
Message::Ping(PingMessage::default())
}

pub fn new(sender_id: String, message: String) -> Message {
Message::Ping(PingMessage { sender_id, message })
pub fn new(sender_id: String, message: String) -> Self {
PingMessage { sender_id, message }
}
}

Expand Down Expand Up @@ -68,8 +64,12 @@ impl Service<Message> for Ping {
async move {
match msg {
Message::Ping(m) => match m.message.as_str() {
"" => Ok(Some(PingMessage::new(local_sender_id, "ping".to_string()))),
"ping" => Ok(Some(PingMessage::new(local_sender_id, "pong".to_string()))),
"" => Ok(Some(
PingMessage::new(local_sender_id, "ping".to_string()).into(),
)),
"ping" => Ok(Some(
PingMessage::new(local_sender_id, "pong".to_string()).into(),
)),
_ => Ok(None),
},
_ => Ok(None),
Expand All @@ -93,13 +93,13 @@ mod ping_tests {
.ready()
.await
.unwrap()
.call(PingMessage::default_as_message())
.call(PingMessage::default().into())
.await
.unwrap();
assert!(res.is_some());
assert_eq!(
res,
Some(PingMessage::new("local".to_string(), "ping".to_string()))
Some(PingMessage::new("local".to_string(), "ping".to_string()).into())
);
}

Expand All @@ -110,13 +110,13 @@ mod ping_tests {
.ready()
.await
.unwrap()
.call(PingMessage::new("local".to_string(), "ping".to_string()))
.call(PingMessage::new("local".to_string(), "ping".to_string()).into())
.await
.unwrap();
assert!(res.is_some());
assert_eq!(
res,
Some(PingMessage::new("local".to_string(), "pong".to_string()))
Some(PingMessage::new("local".to_string(), "pong".to_string()).into())
);
}

Expand All @@ -127,7 +127,7 @@ mod ping_tests {
.ready()
.await
.unwrap()
.call(PingMessage::new("local".to_string(), "pong".to_string()))
.call(PingMessage::new("local".to_string(), "pong".to_string()).into())
.await
.unwrap();
assert!(res.is_none());
Expand Down
Loading