Skip to content

Commit

Permalink
Make reliable sender optional in Protocol Service
Browse files Browse the repository at this point in the history
  • Loading branch information
pool2win committed Nov 26, 2024
1 parent 52ea26d commit cb83c84
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl Node {
) {
tokio::spawn(async move {
let protocol_service =
Protocol::new(node_id.clone(), state, reliable_sender_handle.clone());
Protocol::new(node_id.clone(), state, Some(reliable_sender_handle.clone()));
let reliable_sender_service =
ReliableSend::new(protocol_service, reliable_sender_handle);
let timeout_layer =
Expand Down Expand Up @@ -390,7 +390,7 @@ impl Node {
let protocol_service = Protocol::new(
node_id.clone(),
state.clone(),
reliable_sender_handle.clone(),
Some(reliable_sender_handle.clone()),
);
let echo_broadcast_service =
EchoBroadcast::new(protocol_service, echo_broadcast_handle, state, node_id);
Expand Down
7 changes: 5 additions & 2 deletions src/node/echo_broadcast/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,11 @@ mod echo_broadcast_service_tests {
}
.into();

let handshake_service =
Protocol::new("localhost".to_string(), state.clone(), mock_reliable_sender);
let handshake_service = Protocol::new(
"localhost".to_string(),
state.clone(),
Some(mock_reliable_sender),
);
let echo_broadcast_service = EchoBroadcast::new(
handshake_service,
echo_bcast_handle,
Expand Down
6 changes: 3 additions & 3 deletions src/node/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ impl From<BroadcastProtocol> for Message {
pub struct Protocol {
node_id: String,
state: State,
peer_sender: ReliableSenderHandle,
peer_sender: Option<ReliableSenderHandle>,
}

impl Protocol {
pub fn new(node_id: String, state: State, peer_sender: ReliableSenderHandle) -> Self {
pub fn new(node_id: String, state: State, peer_sender: Option<ReliableSenderHandle>) -> Self {
Protocol {
node_id,
state,
Expand Down Expand Up @@ -230,7 +230,7 @@ mod protocol_tests {
let membership_handle = MembershipHandle::start("localhost".to_string()).await;
let state = State::new(membership_handle, message_id_generator).await;

let p = Protocol::new("local".into(), state, reliable_sender_handle);
let p = Protocol::new("local".into(), state, Some(reliable_sender_handle));
let m = p.oneshot(PingMessage::default().into()).await;
assert!(m.unwrap().is_some());
}
Expand Down
2 changes: 1 addition & 1 deletion src/node/protocol/dkg/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub(crate) async fn trigger_dkg(
reliable_sender_handle: ReliableSenderHandle,
) -> Result<(KeyPackage, PublicKeyPackage), BoxError> {
let protocol_service: Protocol =
Protocol::new(node_id.clone(), state.clone(), reliable_sender_handle);
Protocol::new(node_id.clone(), state.clone(), Some(reliable_sender_handle));

let round1_future = build_round1_future(
node_id.clone(),
Expand Down
14 changes: 7 additions & 7 deletions src/node/protocol/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ impl HandshakeMessage {
pub struct Handshake {
node_id: String,
state: State,
peer_sender: ReliableSenderHandle,
peer_sender: Option<ReliableSenderHandle>,
}

impl Handshake {
pub fn new(node_id: String, state: State, peer_sender: ReliableSenderHandle) -> Self {
pub fn new(node_id: String, state: State, peer_sender: Option<ReliableSenderHandle>) -> Self {
Handshake {
node_id,
state,
Expand Down Expand Up @@ -86,7 +86,7 @@ impl Service<Message> for Handshake {
})) => match message.as_str() {
"helo" => {
if membership_handle
.add_member(sender_id, peer_sender)
.add_member(sender_id, peer_sender.unwrap())
.await
.is_err()
{
Expand All @@ -102,7 +102,7 @@ impl Service<Message> for Handshake {
}
"oleh" => {
if membership_handle
.add_member(sender_id, peer_sender)
.add_member(sender_id, peer_sender.unwrap())
.await
.is_err()
{
Expand Down Expand Up @@ -154,7 +154,7 @@ mod handshake_tests {
.expect_clone()
.returning(ReliableSenderHandle::default);

let mut p = Handshake::new("local".to_string(), state, reliable_sender_handle);
let mut p = Handshake::new("local".to_string(), state, Some(reliable_sender_handle));

let res = p
.ready()
Expand Down Expand Up @@ -185,7 +185,7 @@ mod handshake_tests {
.expect_clone()
.returning(ReliableSenderHandle::default);

let mut p = Handshake::new("local".to_string(), state, reliable_sender_handle);
let mut p = Handshake::new("local".to_string(), state, Some(reliable_sender_handle));

let res = p
.ready()
Expand Down Expand Up @@ -224,7 +224,7 @@ mod handshake_tests {
mock
});

let mut p = Handshake::new("local".to_string(), state, reliable_sender_handle);
let mut p = Handshake::new("local".to_string(), state, Some(reliable_sender_handle));

let res = p
.ready()
Expand Down
4 changes: 2 additions & 2 deletions src/node/protocol/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(crate) async fn initialize_handshake(
reliable_sender_handle: ReliableSenderHandle,
delivery_timeout: u64,
) {
let protocol_service = Protocol::new(node_id, state, reliable_sender_handle.clone());
let protocol_service = Protocol::new(node_id, state, Some(reliable_sender_handle.clone()));
let reliable_sender_service = ReliableSend::new(protocol_service, reliable_sender_handle);
let timeout_layer = TimeoutLayer::new(Duration::from_millis(delivery_timeout));
let _ = timeout_layer
Expand All @@ -50,7 +50,7 @@ pub(crate) async fn send_membership(
delivery_time: u64,
) {
log::info!("Sending membership information");
let protocol_service = Protocol::new(node_id.clone(), state, sender.clone());
let protocol_service = Protocol::new(node_id.clone(), state, Some(sender.clone()));
let reliable_sender_service = ReliableSend::new(protocol_service, sender);
let timeout_layer =
tower::timeout::TimeoutLayer::new(tokio::time::Duration::from_millis(delivery_time));
Expand Down

0 comments on commit cb83c84

Please sign in to comment.