Skip to content

Commit

Permalink
Add test for respond to unicast
Browse files Browse the repository at this point in the history
  • Loading branch information
pool2win committed Oct 9, 2024
1 parent 53c67a6 commit f2eb630
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 16 deletions.
61 changes: 46 additions & 15 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,18 @@ impl Node {

let node_id = self.get_node_id();

// let handshake_service = protocol::Protocol::new(node_id.clone(), self.state.clone());
// let reliable_sender_service =
// ReliableSend::new(handshake_service, reliable_sender_handle.clone());
// let timeout_layer = tower::timeout::TimeoutLayer::new(
// tokio::time::Duration::from_millis(self.delivery_timeout),
// );
// let _ = timeout_layer
// .layer(reliable_sender_service)
// .oneshot(HandshakeMessage::default().into())
// .await;

// log::info!("Handshake finished");
let handshake_service = protocol::Protocol::new(node_id.clone(), self.state.clone());
let reliable_sender_service =
ReliableSend::new(handshake_service, reliable_sender_handle.clone());
let timeout_layer = tower::timeout::TimeoutLayer::new(
tokio::time::Duration::from_millis(self.delivery_timeout),
);
let _ = timeout_layer
.layer(reliable_sender_service)
.oneshot(HandshakeMessage::default().into())
.await;

log::info!("Handshake finished");

let round_one_service = protocol::Protocol::new(node_id.clone(), self.state.clone());
let echo_broadcast_service = EchoBroadcast::new(
Expand Down Expand Up @@ -356,8 +356,7 @@ impl Node {
state: State,
) {
let protocol_service = protocol::Protocol::new(node_id.clone(), state);
let reliable_sender_service =
ReliableSend::new(protocol_service, reliable_sender_handle.clone());
let reliable_sender_service = ReliableSend::new(protocol_service, reliable_sender_handle);
let timeout_layer =
tower::timeout::TimeoutLayer::new(tokio::time::Duration::from_millis(timeout));
let _ = timeout_layer
Expand Down Expand Up @@ -398,7 +397,13 @@ impl Node {

#[cfg(test)]
mod node_tests {
use super::Node;
use super::{membership, Node};
use crate::node::membership::MembershipHandle;
use crate::node::protocol::message_id_generator::MessageIdGenerator;
use crate::node::protocol::{Message, PingMessage};
#[mockall_double::double]
use crate::node::reliable_sender::ReliableSenderHandle;
use futures::FutureExt;

#[tokio::test]
async fn it_should_return_well_formed_node_id() {
Expand Down Expand Up @@ -434,4 +439,30 @@ mod node_tests {
let mut node = Node::new().await.bind_address("localhost:6880".to_string());
assert!(node.listen().await.is_ok());
}

#[tokio::test]
async fn it_should_respond_to_unicast_messages() {
let unicast_message: Message = PingMessage::default().into();
let mut reliable_sender_handle = ReliableSenderHandle::default();

reliable_sender_handle.expect_clone().return_once(|| {
let mut cloned = ReliableSenderHandle::default();
cloned
.expect_send()
.times(1)
.return_once(|_| async { Ok(()) }.boxed());
cloned
});

let membership_handle = MembershipHandle::start("local".into()).await;
let state = super::State::new(membership_handle, MessageIdGenerator::new("local".into()));
let res = Node::respond_to_unicast_message(
"local".into(),
100,
unicast_message,
reliable_sender_handle,
state,
)
.await;
}
}
2 changes: 1 addition & 1 deletion src/node/reliable_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ mockall::mock! {
mod reliable_sender_tests {
use super::ReliableNetworkMessage;
use crate::node::connection::MockConnectionHandle;
use crate::node::protocol::{Broadcast, Message, PingMessage, Unicast};
use crate::node::protocol::{Message, PingMessage};
use serde::Serialize;
use tokio::sync::mpsc;
use tokio_util::bytes::Bytes;
Expand Down

0 comments on commit f2eb630

Please sign in to comment.