Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send response to round1 package #46

Merged
merged 26 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a10f2dd
Match on none message in package to send new message
pool2win Nov 19, 2024
bf3582e
Add comment to build_round1_package
pool2win Nov 19, 2024
0a5cea4
Add round_two module with build_round2_packages
pool2win Nov 19, 2024
d5a5cbe
Handle incoming round1_package
pool2win Nov 19, 2024
c4d3781
Add test for building round2 packages
pool2win Nov 19, 2024
feb35fe
Rename state secret_package to round1_secret_package
pool2win Nov 19, 2024
100ea76
Rename state secret pkg to round1 secret pkg
pool2win Nov 19, 2024
f1a804f
Improve build_round2 test
pool2win Nov 20, 2024
87eac64
Add round2_package to dkg state
pool2win Nov 20, 2024
5d672cd
Improve log messages in round_one
pool2win Nov 20, 2024
61eafe7
No need to spawn handshake init in a task
pool2win Nov 20, 2024
e593381
Delay start DKG
pool2win Nov 20, 2024
95b422b
Rename handshake svc to protocol svc in init
pool2win Nov 20, 2024
1b8c62e
Don't respond with protocol for Echo messages
pool2win Nov 20, 2024
4f7afb4
Fix echo broadcast svc to handle init msgs
pool2win Nov 20, 2024
11b4bce
Fix and DRY getting party size
pool2win Nov 20, 2024
3e589e5
Remove log statement
pool2win Nov 21, 2024
b01c70c
Use log level from env var, use info as default
pool2win Nov 21, 2024
5318450
Use Send BoxError in Membership async funcs
pool2win Nov 21, 2024
029bfb9
Add building and sending round2 packages
pool2win Nov 21, 2024
ca0021a
Wrap round1 service in timeout
pool2win Nov 21, 2024
2fd6e44
Send round2 pkgs - fail on a single send failure
pool2win Nov 22, 2024
44c4b11
Run round2 sends in parallel
pool2win Nov 22, 2024
88050ee
Add support for round2 packages in dkg state
pool2win Nov 22, 2024
7911b6c
Add a test to check Identifier is deterministic
pool2win Nov 22, 2024
1228712
Support adding received round2 packages to state
pool2win Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# Set default log level if not provided through environment
export LOG_LEVEL := env_var_or_default("RUST_LOG", "info")

default: test

test:
RUST_LOG=debug cargo test
RUST_LOG={{LOG_LEVEL}} cargo test

build:
cargo build

# For log level use RUST_LOG=<<level>> just run
run config="config.toml":
RUST_LOG=debug cargo run -- --config-file={{config}}
RUST_LOG={{LOG_LEVEL}} cargo run -- --config-file={{config}}

check:
cargo check
9 changes: 3 additions & 6 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,15 @@ impl Node {
let state = self.state.clone();
let delivery_timeout = self.delivery_timeout;
let reliable_sender = reliable_sender_handle.clone();
tokio::spawn(async move {
initialize_handshake(node_id, state, reliable_sender, delivery_timeout).await;
});
initialize_handshake(node_id, state, reliable_sender, delivery_timeout).await;

let node_id = self.get_node_id().clone();
let state = self.state.clone();
let echo_broadcast_handle = self.echo_broadcast_handle.clone();
let reliable_sender_handle = reliable_sender_handle.clone();
let interval = tokio::time::interval(tokio::time::Duration::from_secs(15));
tokio::spawn(async move {
dkg::trigger::run_dkg_trigger(
interval,
15000,
node_id,
state,
echo_broadcast_handle,
Expand Down Expand Up @@ -267,7 +264,7 @@ impl Node {
let interval = tokio::time::interval(tokio::time::Duration::from_secs(15));
tokio::spawn(async move {
dkg::trigger::run_dkg_trigger(
interval,
15000,
node_id,
state,
echo_broadcast_handle,
Expand Down
3 changes: 1 addition & 2 deletions src/node/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ where
return
}
let msg = message.unwrap().freeze();
log::debug!("Received from network {:?}", msg.clone());
actor.handle_received(msg.clone()).await.unwrap();
actor.handle_received(msg.clone()).await.unwrap();
},
None => { // Stream closed, return to clear up connection
log::debug!("Connection actor reader closed");
Expand Down
14 changes: 6 additions & 8 deletions src/node/echo_broadcast/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ where
}
}

fn call(&mut self, msg: Message) -> Self::Future {
/// Handles the message by sending echo if required
/// If it is a broadcast message with None in message, we always drop that message and a new one is generated by inner service
fn call(&mut self, mut msg: Message) -> Self::Future {
let mut this = self.clone();
Box::pin(async move {
let members = this.state.membership_handle.get_members().await.unwrap();
Expand All @@ -87,14 +89,10 @@ where
.await?;
log::info!("Deliver ECHO ...");
}
// mid is not available in Broadcast - implies we are sending this broadcast
Message::Broadcast(m, None) => {
log::debug!("Generating message_id for Send...");
let to_send =
Message::Broadcast(m, Some(this.state.message_id_generator.next()));
this.handle.send(to_send, members.clone()).await?;
}
// mid is not available in Broadcast - we do nothing here, message_id will be generated by inner service
Message::Broadcast(_, None) => {}
};
// call inner service to generate response. This will generate a new message if it is an initial message with None in message field.
let response_message = this.inner.call(msg).await;
match response_message {
Ok(Some(msg)) => {
Expand Down
8 changes: 4 additions & 4 deletions src/node/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#[mockall_double::double]
use crate::node::reliable_sender::ReliableSenderHandle;
use std::collections::HashMap;
use std::error::Error;
use tokio::sync::{mpsc, oneshot};
use tower::BoxError;
pub type ReliableSenderMap = HashMap<String, ReliableSenderHandle>;

pub enum MembershipMessage {
Expand Down Expand Up @@ -97,7 +97,7 @@ impl MembershipHandle {
&self,
member: String,
handle: ReliableSenderHandle,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), BoxError> {
let (respond_to, receiver) = oneshot::channel();
let msg = MembershipMessage::Add(member.clone(), handle, respond_to);
let _ = self.sender.send(msg).await;
Expand All @@ -110,7 +110,7 @@ impl MembershipHandle {
}
}

pub async fn remove_member(&self, member: String) -> Result<(), Box<dyn std::error::Error>> {
pub async fn remove_member(&self, member: String) -> Result<(), BoxError> {
let (respond_to, receiver) = oneshot::channel();
let msg = MembershipMessage::Remove(member, respond_to);
let _ = self.sender.send(msg).await;
Expand All @@ -122,7 +122,7 @@ impl MembershipHandle {
}
}

pub async fn get_members(&self) -> Result<ReliableSenderMap, Box<dyn Error>> {
pub async fn get_members(&self) -> Result<ReliableSenderMap, BoxError> {
let (respond_to, receiver) = oneshot::channel();
if self
.sender
Expand Down
8 changes: 7 additions & 1 deletion src/node/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum Unicast {
Heartbeat(HeartbeatMessage),
Ping(PingMessage),
Membership(MembershipMessage),
DKGRoundTwoPackage(dkg::round_two::PackageMessage),
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
Expand All @@ -77,6 +78,7 @@ impl NetworkMessage for Message {
Unicast::Heartbeat(m) => m.sender_id.clone(),
Unicast::Ping(m) => m.sender_id.clone(),
Unicast::Membership(m) => m.sender_id.clone(),
Unicast::DKGRoundTwoPackage(m) => m.sender_id.clone(),
},
Message::Broadcast(m, _) => match m {
BroadcastProtocol::DKGRoundOnePackage(m) => m.sender_id.clone(),
Expand Down Expand Up @@ -186,7 +188,11 @@ impl Service<Message> for Protocol {
BoxService::new(dkg::round_one::Package::new(sender_id, state))
}
Message::Echo(_, _, _) => {
BoxService::new(dkg::round_one::Package::new(sender_id, state))
// Don't respond to echo messages, by returning None
BoxService::new(tower::service_fn(|_| async { Ok(None) }))
}
Message::Unicast(Unicast::DKGRoundTwoPackage(_m)) => {
BoxService::new(dkg::round_two::Package::new(sender_id, state))
}
};
svc.oneshot(msg).await
Expand Down
25 changes: 25 additions & 0 deletions src/node/protocol/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,30 @@
// <https://www.gnu.org/licenses/>.

pub(crate) mod round_one;
pub(crate) mod round_two;
pub(crate) mod state;
pub(crate) mod trigger;

use crate::node::state::State;

/// Get the max and min signers for the DKG
pub(crate) async fn get_max_min_signers(state: &State) -> (usize, usize) {
let members = state.membership_handle.get_members().await.unwrap();
let num_members = members.len() + 1;
(num_members, (num_members * 2).div_ceil(3))
}
#[cfg(test)]
mod tests {
use frost_secp256k1 as frost;

#[test]
/// A test to check that the derive function is deterministic. Keeping it for future reference.
fn test_identifier_derive() {
let id1 = frost::Identifier::derive(b"test_node").unwrap();
let id2 = frost::Identifier::derive(b"test_node").unwrap();
assert_eq!(id1, id2);

let id3 = frost::Identifier::derive(b"different_node").unwrap();
assert_ne!(id1, id3);
}
}
105 changes: 89 additions & 16 deletions src/node/protocol/dkg/round_one.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// along with Frost-Federation. If not, see
// <https://www.gnu.org/licenses/>.

use crate::node::protocol::dkg::get_max_min_signers;
use crate::node::protocol::BroadcastProtocol;
use crate::node::protocol::Message;
use crate::node::state::State;
Expand All @@ -39,31 +40,31 @@ impl PackageMessage {
}
}

/// Builds a round one package for the given sender id
/// Queries the membership to get the number of members and the threshold
/// Builds a round one package using the frost-secp256k1 crate
async fn build_round1_package(
sender_id: String,
state: crate::node::state::State,
) -> Result<Message, frost::Error> {
let max_min_signers = state
.membership_handle
.get_members()
.await
.map(|members| {
let num_members = members.len();
(num_members, (num_members * 2).div_ceil(3))
})
.unwrap();
let (max_signers, min_signers) = get_max_min_signers(&state).await;

let participant_identifier = frost::Identifier::derive(sender_id.as_bytes()).unwrap();
let rng = thread_rng();
log::debug!("SIGNERS: {} {}", max_min_signers.0, max_min_signers.1);
log::debug!("SIGNERS: {} {}", max_signers, min_signers);
let result = frost::keys::dkg::part1(
participant_identifier,
max_min_signers.0 as u16,
max_min_signers.1 as u16,
max_signers as u16,
min_signers as u16,
rng,
);
match result {
Ok((secret_package, round1_package)) => {
let _ = state.dkg_state.add_secret_package(secret_package).await;
log::debug!("Setting round one package as {:?}", round1_package);
let _ = state
.dkg_state
.add_round1_secret_package(secret_package)
.await;
Ok(Message::Broadcast(
BroadcastProtocol::DKGRoundOnePackage(PackageMessage::new(
sender_id,
Expand Down Expand Up @@ -109,10 +110,44 @@ impl Service<Message> for Package {
/// For now, there is no response.
fn call(&mut self, msg: Message) -> Self::Future {
let state = self.state.clone();
let sender_id = self.sender_id.clone();
let this_sender_id = self.sender_id.clone();
log::debug!("Handle round one package {:?}", msg);
async move {
let response = build_round1_package(sender_id, state).await?;
Ok(Some(response))
match msg {
Message::Broadcast(
BroadcastProtocol::DKGRoundOnePackage(PackageMessage {
sender_id: _,
message: None, // message is None, so we build a new round1 package
}),
_message_id,
) => {
log::debug!("Build round one package");
let response = build_round1_package(this_sender_id, state).await?;
log::debug!("Sending round one package {:?}", response);
Ok(Some(response))
}
Message::Broadcast(
BroadcastProtocol::DKGRoundOnePackage(PackageMessage {
sender_id: from_sender_id,
message: Some(message), // received a message
}),
_message_id,
) => {
log::debug!("Received round one package");
log::debug!("Received message {:?}", message);
let identifier = frost::Identifier::derive(from_sender_id.as_bytes()).unwrap();
state
.dkg_state
.add_round1_package(identifier, message)
.await
.unwrap();
Ok(None)
}
_ => {
log::debug!("Unhandled message {:?}", msg);
Ok(None)
}
}
}
.boxed()
}
Expand Down Expand Up @@ -180,4 +215,42 @@ mod round_one_package_tests {
panic!("Expected DKGRoundOnePackage");
}
}

#[tokio::test]
async fn it_should_store_received_round_one_package_in_state() {
let message_id_generator = MessageIdGenerator::new("localhost".to_string());
let membership_handle = build_membership(3).await;
let state = State::new(membership_handle, message_id_generator);
let state_clone = state.clone();

// First create a round1 package that we'll pretend came from another node
let round1_package = build_round1_package("remote".into(), state).await.unwrap();

// Create our local package service
let mut pkg = Package::new("local".into(), state_clone);

// Send the round1 package to our service
let res = pkg
.ready()
.await
.unwrap()
.call(round1_package)
.await
.unwrap();

// No response expected when receiving a package
assert!(res.is_none());

// Verify the package was stored in state
let received_packages = pkg
.state
.dkg_state
.get_received_round1_packages()
.await
.unwrap();
assert_eq!(received_packages.len(), 1);

let remote_id = frost::Identifier::derive("remote".as_bytes()).unwrap();
assert!(received_packages.contains_key(&remote_id));
}
}
Loading
Loading