Skip to content

Commit

Permalink
refactor(networking): stop using Box<dyn Error> (#657)
Browse files Browse the repository at this point in the history
  • Loading branch information
Serial-ATA authored Feb 12, 2025
1 parent c5d590b commit 89f298a
Show file tree
Hide file tree
Showing 12 changed files with 506 additions and 309 deletions.
653 changes: 405 additions & 248 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ tracing-subscriber = { version = "0.3", default-features = false }
# Networking & HTTP
jsonrpc-core = { version = "18.0.0", default-features = false }
jsonrpc-http-server = { version = "18.0.0", default-features = false }
libp2p = { version = "0.54", default-features = false }
libp2p = { version = "0.55.0", default-features = false }
reqwest = { version = "0.12.7", default-features = false }
url = { version = "2.5.2", default-features = false }

Expand Down
2 changes: 1 addition & 1 deletion crates/macros/context-derive/tests/ui/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Network for StubNetwork {
async fn send_message(
&self,
message: ProtocolMessage,
) -> Result<(), blueprint_sdk::networking::Error> {
) -> Result<(), blueprint_sdk::networking::error::Error> {
drop(message);
Ok(())
}
Expand Down
55 changes: 55 additions & 0 deletions crates/networking/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Network error: {0}")]
NetworkError(String),

#[error("Channel error: {0}")]
ChannelError(String),

#[error("Gossip error: {0}")]
GossipError(String),

#[error("Messaging error: {0}")]
MessagingError(String),

#[error("Round based error: {0}")]
RoundBasedError(String),

#[error("Serde JSON error: {0}")]
SerdeJson(#[from] serde_json::Error),

#[error("Connection error: {0}")]
ConnectionError(String),

#[error("Protocol error: {0}")]
ProtocolError(String),

#[error("Attempted to start a network with too many topics: {0}")]
TooManyTopics(usize),
#[error("All topics must be unique")]
DuplicateTopics,

#[error("No network found")]
NoNetworkFound,

#[error("Other error: {0}")]
Other(String),

// std compat
#[error(transparent)]
Io(#[from] std::io::Error),

// libp2p compat
#[error(transparent)]
Dial(#[from] libp2p::swarm::DialError),
#[error(transparent)]
Noise(#[from] libp2p::noise::Error),
#[error(transparent)]
Behaviour(#[from] libp2p::BehaviourBuilderError),
#[error(transparent)]
Subscription(#[from] libp2p::gossipsub::SubscriptionError),
#[error(transparent)]
TransportIo(#[from] libp2p::TransportError<std::io::Error>),
#[error(transparent)]
Multiaddr(#[from] libp2p::multiaddr::Error),
}
2 changes: 1 addition & 1 deletion crates/networking/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
clippy::exhaustive_enums
)]

use crate::error::Error;
use crate::key_types::{GossipMsgKeyPair, GossipMsgPublicKey, GossipSignedMsgSignature};
use crate::Error;
use async_trait::async_trait;
use gadget_crypto::hashing::blake3_256;
use gadget_std::collections::BTreeMap;
Expand Down
17 changes: 11 additions & 6 deletions crates/networking/src/handlers/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use crate::gossip::{GossipMessage, NetworkService};
use gadget_std::string::ToString;
use gadget_std::sync::atomic::AtomicUsize;
use gadget_std::sync::Arc;
use libp2p::gossipsub::TopicHash;
use libp2p::gossipsub::{Event, TopicHash};
use libp2p::{gossipsub, PeerId};

impl NetworkService<'_> {
#[tracing::instrument(skip(self, event))]
pub(crate) async fn handle_gossip(&mut self, event: gossipsub::Event) {
use gossipsub::Event::{GossipsubNotSupported, Message, Subscribed, Unsubscribed};
let with_connected_peers = |topic: &TopicHash, f: fn(&Arc<AtomicUsize>)| {
let maybe_mapping = self
.inbound_mapping
Expand All @@ -25,15 +24,15 @@ impl NetworkService<'_> {
}
};
match event {
Message {
Event::Message {
propagation_source,
message_id,
message,
} => {
self.handle_gossip_message(propagation_source, message_id, message)
.await;
}
Subscribed { peer_id, topic } => {
Event::Subscribed { peer_id, topic } => {
let added = with_connected_peers(&topic, |_connected_peers| {
// Code commented out because each peer needs to do a request-response
// direct P2P handshake, which is where the connected_peers counter is
Expand All @@ -46,7 +45,7 @@ impl NetworkService<'_> {
gadget_logging::error!("{peer_id} subscribed to unknown topic: {topic}");
}
}
Unsubscribed { peer_id, topic } => {
Event::Unsubscribed { peer_id, topic } => {
let removed = with_connected_peers(&topic, |_connected_peers| {
// Code commented out because each peer needs to do a request-response
// direct P2P handshake, which is where the connected_peers counter is
Expand All @@ -59,9 +58,15 @@ impl NetworkService<'_> {
gadget_logging::error!("{peer_id} unsubscribed from unknown topic: {topic}");
}
}
GossipsubNotSupported { peer_id } => {
Event::GossipsubNotSupported { peer_id } => {
gadget_logging::trace!("{peer_id} does not support gossipsub!");
}
Event::SlowPeer {
peer_id,
failed_messages: _,
} => {
gadget_logging::error!("{peer_id} wasn't able to download messages in time!");
}
}
}

Expand Down
14 changes: 12 additions & 2 deletions crates/networking/src/handlers/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,35 @@ impl NetworkService<'_> {
) {
use request_response::Event::{InboundFailure, Message, OutboundFailure, ResponseSent};
match event {
Message { peer, message } => {
Message {
peer,
message,
connection_id: _,
} => {
gadget_logging::trace!("Received P2P message from: {peer}");
self.handle_p2p_message(peer, message).await;
}
OutboundFailure {
peer,
request_id,
error,
connection_id: _,
} => {
gadget_logging::error!("Failed to send message to peer: {peer} with request_id: {request_id} and error: {error}");
}
InboundFailure {
peer,
request_id,
error,
connection_id: _,
} => {
gadget_logging::error!("Failed to receive message from peer: {peer} with request_id: {request_id} and error: {error}");
}
ResponseSent { peer, request_id } => {
ResponseSent {
peer,
request_id,
connection_id: _,
} => {
gadget_logging::debug!(
"Sent response to peer: {peer} with request_id: {request_id}"
);
Expand Down
33 changes: 1 addition & 32 deletions crates/networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,12 @@ pub mod round_based_compat;
#[cfg(feature = "round-based-compat")]
pub use round_based;

pub mod error;
pub mod setup;

use gadget_std::string::String;

/// Unique identifier for a party
pub type UserID = u16;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Network error: {0}")]
NetworkError(String),

#[error("Channel error: {0}")]
ChannelError(String),

#[error("Gossip error: {0}")]
GossipError(String),

#[error("Messaging error: {0}")]
MessagingError(String),

#[error("Round based error: {0}")]
RoundBasedError(String),

#[error("Serde JSON error: {0}")]
SerdeJson(#[from] serde_json::Error),

#[error("Connection error: {0}")]
ConnectionError(String),

#[error("Protocol error: {0}")]
ProtocolError(String),

#[error("Other error: {0}")]
Other(String),
}

pub use key_types::*;

#[cfg(all(
Expand Down
8 changes: 4 additions & 4 deletions crates/networking/src/networking.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::Error;
use crate::key_types::GossipMsgPublicKey;
use crate::Error;
use async_trait::async_trait;
use dashmap::DashMap;
use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -599,7 +599,7 @@ mod tests {

const TOPIC: &str = "/gadget/test/1.0.0";

fn deserialize<'a, T>(data: &'a [u8]) -> Result<T, crate::Error>
fn deserialize<'a, T>(data: &'a [u8]) -> Result<T, crate::error::Error>
where
T: Deserialize<'a>,
{
Expand Down Expand Up @@ -704,7 +704,7 @@ mod tests {
node: N,
i: u16,
mapping: BTreeMap<u16, crate::GossipMsgPublicKey>,
) -> Result<(), crate::Error> {
) -> Result<(), crate::error::Error> {
let task_hash = [0u8; 32];
// Safety note: We should be passed a NetworkMultiplexer, and all uses of the N: Network
// used throughout the program must also use the multiplexer to prevent mixed messages.
Expand Down Expand Up @@ -749,7 +749,7 @@ mod tests {
gadget_logging::debug!("Broadcast Message");
round1_network
.send(msg)
.map_err(|_| crate::Error::Other("Failed to send message".into()))?;
.map_err(|_| crate::error::Error::Other("Failed to send message".into()))?;

// Wait for all other nodes to send their messages
let mut msgs = BTreeMap::new();
Expand Down
14 changes: 8 additions & 6 deletions crates/networking/src/round_based_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ where
{
type Send = SplitSink<NetworkWrapper<M>, Outgoing<M>>;
type Receive = SplitStream<NetworkWrapper<M>>;
type SendError = crate::Error;
type ReceiveError = crate::Error;
type SendError = crate::error::Error;
type ReceiveError = crate::error::Error;

fn split(self) -> (Self::Receive, Self::Send) {
let (sink, stream) = self.network.split();
Expand All @@ -104,7 +104,7 @@ where
M: serde::de::DeserializeOwned + Unpin,
M: round_based::ProtocolMessage,
{
type Item = Result<Incoming<M>, crate::Error>;
type Item = Result<Incoming<M>, crate::error::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let res = ready!(self.get_mut().rx.poll_recv(cx));
Expand All @@ -121,7 +121,7 @@ where
Ok(msg) => msg,
Err(err) => {
gadget_logging::error!(%err, "Failed to deserialize message (round_based_compat)");
return Poll::Ready(Some(Err(crate::Error::Other(err.to_string()))));
return Poll::Ready(Some(Err(crate::error::Error::Other(err.to_string()))));
}
};

Expand All @@ -142,7 +142,7 @@ where
M: Unpin + serde::Serialize,
M: round_based::ProtocolMessage,
{
type Error = crate::Error;
type Error = crate::error::Error;

fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
Expand Down Expand Up @@ -183,7 +183,9 @@ where

if matches!(out.recipient, MessageDestination::OneParty(_)) && to_network_id.is_none() {
gadget_logging::warn!("Recipient not found when required for {:?}", out.recipient);
return Err(crate::Error::Other("Recipient not found".to_string()));
return Err(crate::error::Error::Other(
"Recipient not found".to_string(),
));
}

// Manually construct a `ProtocolMessage` since rounds-based
Expand Down
13 changes: 6 additions & 7 deletions crates/networking/src/setup.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
#![allow(unused_results, missing_docs)]

use crate::error::Error;
use crate::gossip::{
GossipHandle, IntraNodePayload, MyBehaviour, NetworkServiceWithoutSwarm, MAX_MESSAGE_SIZE,
};
pub use crate::key_types::GossipMsgKeyPair;
use futures::StreamExt;
use gadget_std as std;
use gadget_std::boxed::Box;
use gadget_std::collections::BTreeMap;
use gadget_std::error::Error;
use gadget_std::format;
use gadget_std::io;
use gadget_std::net::IpAddr;
Expand Down Expand Up @@ -110,17 +109,17 @@ impl NetworkConfig {
/// # Errors
///
/// Returns an error if the network setup fails.
pub fn start_p2p_network(config: NetworkConfig) -> Result<GossipHandle, Box<dyn Error>> {
pub fn start_p2p_network(config: NetworkConfig) -> Result<GossipHandle, Error> {
if config.topics.len() != 1 {
return Err("Only one network topic is allowed when running this function".into());
return Err(Error::TooManyTopics(config.topics.len()));
}

let (networks, _) = multiplexed_libp2p_network(config)?;
let network = networks.into_iter().next().ok_or("No network found")?.1;
let network = networks.into_iter().next().ok_or(Error::NoNetworkFound)?.1;
Ok(network)
}

pub type NetworkResult = Result<(BTreeMap<String, GossipHandle>, JoinHandle<()>), Box<dyn Error>>;
pub type NetworkResult = Result<(BTreeMap<String, GossipHandle>, JoinHandle<()>), Error>;

#[allow(clippy::collapsible_else_if, clippy::too_many_lines)]
/// Starts the multiplexed libp2p network with the given configuration.
Expand Down Expand Up @@ -157,7 +156,7 @@ pub fn multiplexed_libp2p_network(config: NetworkConfig) -> NetworkResult {
.collect::<Vec<_>>();

if topics_unique.len() != topics.len() {
return Err("All topics must be unique".into());
return Err(Error::DuplicateTopics);
}

let networks = topics;
Expand Down
2 changes: 1 addition & 1 deletion crates/sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub enum Error {
// Specific to Networking
#[cfg(feature = "networking")]
#[error("Networking error: {0}")]
Networking(#[from] gadget_networking::Error),
Networking(#[from] gadget_networking::error::Error),
}

#[cfg(any(feature = "evm", feature = "eigenlayer"))]
Expand Down

0 comments on commit 89f298a

Please sign in to comment.