diff --git a/Cargo.lock b/Cargo.lock index b917d0a96e8..7d19c1b4fce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -654,6 +654,14 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "behaviour-compose" +version = "0.1.0" +dependencies = [ + "futures", + "libp2p", +] + [[package]] name = "bimap" version = "0.6.3" diff --git a/Cargo.toml b/Cargo.toml index 0d1880eb2b1..5a96d427b6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ members = [ "transports/websocket-websys", "transports/websocket", "transports/webtransport-websys", - "wasm-tests/webtransport-tests", + "wasm-tests/webtransport-tests", "examples/behaviour-compose", ] resolver = "2" diff --git a/examples/behaviour-compose/Cargo.toml b/examples/behaviour-compose/Cargo.toml new file mode 100644 index 00000000000..e4f67899f22 --- /dev/null +++ b/examples/behaviour-compose/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "behaviour-compose" +version = "0.1.0" +edition = "2021" +rust-version.workspace = true + +[dependencies] +libp2p = { workspace = true, features = ["identify", "ping", "mdns", "tokio"] } +futures = { workspace = true } + +[lints] +workspace = true diff --git a/examples/behaviour-compose/src/behaviour.rs b/examples/behaviour-compose/src/behaviour.rs new file mode 100644 index 00000000000..7c1a80254e5 --- /dev/null +++ b/examples/behaviour-compose/src/behaviour.rs @@ -0,0 +1,326 @@ +//! # Composing `NetworkBehaviour`s +//! +//! Manually composing behaviours is more straightforward, +//! you combine the behaviours in a struct and delegate calls, +//! then combine their respective `ToSwarm` events in an enum. +//! +//! ``` +//! use libp2p::{identify, mdns, ping}; +//! +//! struct Behaviour { +//! identify: identify::Behaviour, +//! ping: ping::Behaviour, +//! mdns: mdns::tokio::Behaviour, // using `tokio` runtime +//! } +//! ``` + +use std::task::Poll; + +use libp2p::{identify, mdns, ping, swarm::NetworkBehaviour}; + +use crate::connection_handler::Handler; + +pub struct Behaviour { + identify: identify::Behaviour, + ping: ping::Behaviour, + mdns: mdns::tokio::Behaviour, +} + +pub enum ToSwarm { + Identify(identify::Event), + Ping(ping::Event), + Mdns(mdns::Event), +} + +macro_rules! poll_behaviour { + (($self:ident,$cx:ident)=>{$($name:ident:$behaviour:ident,)+}) => { + $( + // I know it looks kind of dumb but the type system requires this + // to get rid of the generics. + match ::libp2p::swarm::NetworkBehaviour::poll( + &mut $self.$name, + $cx, + ){ + std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::GenerateEvent( + event, + ), + ) => { + return std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::GenerateEvent( + ToSwarm::$behaviour(event), + ), + ); + } + std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::Dial { opts }, + ) => { + return std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::Dial { + opts, + }); + } + std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::ListenOn { opts }, + ) => { + return std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::ListenOn { + opts, + }); + } + std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::RemoveListener{id})=>{ + return std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::RemoveListener { + id + }); + } + std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::NotifyHandler { + peer_id, + handler, + event, + }, + ) => { + return std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::NotifyHandler { + peer_id, + handler, + event: crate::connection_handler::FromBehaviour::$behaviour(event) + }); + } + std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::NewExternalAddrCandidate(addr))=>{ + return std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::NewExternalAddrCandidate(addr)); + } + std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::ExternalAddrConfirmed(addr))=>{ + return std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::ExternalAddrConfirmed(addr)); + } + std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::ExternalAddrExpired(addr))=>{ + return std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::ExternalAddrExpired(addr)); + } + std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::CloseConnection { + peer_id, + connection, + }, + ) => { + return std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::CloseConnection { + peer_id, + connection, + }); + } + std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::NewExternalAddrOfPeer{peer_id,address})=>{ + return std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::NewExternalAddrOfPeer{peer_id,address}); + } + std::task::Poll::Ready(uncovered)=>{ + unimplemented!("New branch {:?} not covered", uncovered) + } + std::task::Poll::Pending => {} + } + )* + }; +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = crate::connection_handler::Handler; + + type ToSwarm = ToSwarm; + + fn handle_established_inbound_connection( + &mut self, + connection_id: libp2p::swarm::ConnectionId, + peer: libp2p::PeerId, + local_addr: &libp2p::Multiaddr, + remote_addr: &libp2p::Multiaddr, + ) -> Result, libp2p::swarm::ConnectionDenied> { + let ping_handler = self.ping.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + )?; + let identify_handler = self.identify.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + )?; + let mdns_handler = self.mdns.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + )?; + Ok(Handler { + ping: ping_handler, + identify: identify_handler, + mdns: mdns_handler, + }) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: libp2p::swarm::ConnectionId, + peer: libp2p::PeerId, + addr: &libp2p::Multiaddr, + role_override: libp2p::core::Endpoint, + port_use: libp2p::core::transport::PortUse, + ) -> Result, libp2p::swarm::ConnectionDenied> { + let ping_handler = self.ping.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + )?; + let identify_handler = self.identify.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + )?; + let mdns_handler = self.mdns.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + )?; + // Yes, the entire connection will be denied when any of the behaviours rejects. + // I'm not sure what will happen if the connection is partially rejected, e.g. making + // `Handler` containing `Option`s. + Ok(Handler { + ping: ping_handler, + identify: identify_handler, + mdns: mdns_handler, + }) + } + + fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { + // `FromSwarm` needs to be broadcast to all `NetworkBehaviour`s. + self.identify.on_swarm_event(event); + self.mdns.on_swarm_event(event); + self.ping.on_swarm_event(event); + } + + fn on_connection_handler_event( + &mut self, + peer_id: libp2p::PeerId, + connection_id: libp2p::swarm::ConnectionId, + event: libp2p::swarm::THandlerOutEvent, + ) { + // Events collected from the `Handler` need to be delegated to the corresponding `ConnectionHandler`. + use super::connection_handler::ToBehaviour::*; + match event { + Ping(ev) => self + .ping + .on_connection_handler_event(peer_id, connection_id, ev), + Mdns(ev) => self + .mdns + .on_connection_handler_event(peer_id, connection_id, ev), + Identify(ev) => self + .identify + .on_connection_handler_event(peer_id, connection_id, ev), + } + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>> + { + // Yes, composed behaviour has a hidden polling order, but is not a hard one. + // You can change the order they're polled by rearrange the statements, + // though the order can be changed by the compiler. + // Here we need to properly delegate the events. + + match ::libp2p::swarm::NetworkBehaviour::poll(&mut self.ping, cx) { + // `ToSwarm::GenerateEvent` needs to be aggregated. + std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::GenerateEvent( + event, + )) => { + return std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::GenerateEvent(ToSwarm::Ping(event)), + ); + } + // `ToSwarm::NotifyHandler` needs to be delegated. + std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::NotifyHandler { + peer_id, + handler, + event, + }) => { + return std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::NotifyHandler { + peer_id, + handler, + event: crate::connection_handler::FromBehaviour::Ping(event), + }, + ); + } + std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::Dial { opts }) => { + return std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::Dial { + opts, + }); + } + std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::ListenOn { opts }) => { + return std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::ListenOn { opts }, + ); + } + std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::RemoveListener { + id, + }) => { + return std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::RemoveListener { id }, + ); + } + std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::NewExternalAddrCandidate(addr), + ) => { + return std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::NewExternalAddrCandidate(addr), + ); + } + std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::ExternalAddrConfirmed(addr), + ) => { + return std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::ExternalAddrConfirmed(addr), + ); + } + std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::ExternalAddrExpired(addr), + ) => { + return std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::ExternalAddrExpired(addr), + ); + } + std::task::Poll::Ready(::libp2p::swarm::derive_prelude::ToSwarm::CloseConnection { + peer_id, + connection, + }) => { + return std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::CloseConnection { + peer_id, + connection, + }, + ); + } + std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::NewExternalAddrOfPeer { + peer_id, + address, + }, + ) => { + return std::task::Poll::Ready( + ::libp2p::swarm::derive_prelude::ToSwarm::NewExternalAddrOfPeer { + peer_id, + address, + }, + ); + } + std::task::Poll::Ready(uncovered) => { + unimplemented!("New branch {:?} not covered", uncovered) + } + std::task::Poll::Pending => {} + } + poll_behaviour!((self, cx)=>{identify:Identify,mdns:Mdns,}); + Poll::Pending + } +} diff --git a/examples/behaviour-compose/src/connection_handler.rs b/examples/behaviour-compose/src/connection_handler.rs new file mode 100644 index 00000000000..71d871c2692 --- /dev/null +++ b/examples/behaviour-compose/src/connection_handler.rs @@ -0,0 +1,872 @@ +use std::task::Poll; + +use libp2p::{ + identify, mdns, ping, + swarm::{handler::ConnectionEvent, ConnectionHandler, NetworkBehaviour}, +}; + +// The actual types are not accessible from the crates but visible through trait. +// There will be TONS of types like these with `as` casts. +type IdentifyHandler = ::ConnectionHandler; +type PingHandler = ::ConnectionHandler; +type MdnsHandler = ::ConnectionHandler; + +/// The composed `ConnectionHandler` +pub struct Handler { + pub ping: PingHandler, + pub identify: IdentifyHandler, + pub mdns: MdnsHandler, +} + +/// Aggregated `FromBehaviour` type that needs to be delegated to the respective +/// `ConnectionHandler`. +#[derive(Debug)] +pub enum FromBehaviour { + Ping(::FromBehaviour), + Identify(::FromBehaviour), + Mdns(::FromBehaviour), +} + +/// Aggregated `ToBehaviour` type that needs to be aggregated to the upper +/// `NetworkBehaviour`. +#[derive(Debug)] +pub enum ToBehaviour { + Ping(::ToBehaviour), + Identify(::ToBehaviour), + Mdns(::ToBehaviour), +} + +/// Module for code concerning inbound substream upgrade. +pub mod inbound_upgrade { + use libp2p::{ + core::UpgradeInfo, swarm::handler::FullyNegotiatedInbound, InboundUpgrade, Stream, + }; + + use super::*; + + // Shorten them somewhat with type ailas. + pub type PingProtocol = <::ConnectionHandler as ConnectionHandler>::InboundProtocol; + pub type IdentifyProtocol = <::ConnectionHandler as ConnectionHandler>::InboundProtocol; + pub type MdnsProtocol = <::ConnectionHandler as ConnectionHandler>::InboundProtocol; + + /// Aggregated inbound `UpgradeInfo` + #[derive(Clone)] + pub enum UpgradeInfoSelect { + Ping(::Info), + Identify(::Info), + Mdns(::Info), + } + + // "Serialize" the type to compare against the protocol of incoming substream request, + // as well as type erasure. + impl AsRef for UpgradeInfoSelect { + fn as_ref(&self) -> &str { + use UpgradeInfoSelect::*; + match self { + Ping(info) => AsRef::::as_ref(info), + Identify(info) => AsRef::::as_ref(info), + Mdns(info) => AsRef::::as_ref(info), + } + } + } + + /// Aggregated `UpgradeInfo` iterator. + /// ### Difference from outbound upgrade + /// The peer accepting an inbound stream needs to know all the + /// protocol it supports, but the other peer requesting an outbound only needs + /// to know what kind of substream it wants. So for inbound upgrade this is a + /// struct containing all `UpgradeInfo` from all behaviours that get composed. + type UpgradeInfoIter = IntoIter; + + #[expect(deprecated)] + pub struct OpenInfo { + pub ping: ::InboundOpenInfo, + pub mdns: ::InboundOpenInfo, + pub identify: ::InboundOpenInfo, + } + + pub struct Protocols { + pub ping: PingProtocol, + pub identify: IdentifyProtocol, + pub mdns: MdnsProtocol, + } + + impl UpgradeInfo for Protocols { + type Info = UpgradeInfoSelect; + type InfoIter = UpgradeInfoIter; + + fn protocol_info(&self) -> Self::InfoIter { + self.ping + .protocol_info() + .into_iter() + .map(UpgradeInfoSelect::Ping) + .chain( + self.identify + .protocol_info() + .into_iter() + .map(UpgradeInfoSelect::Identify), + ) + .chain( + self.mdns + .protocol_info() + .into_iter() + .map(UpgradeInfoSelect::Mdns), + ) + .collect::>() + .into_iter() + } + } + + /// Aggregated type for upgrade errors. + pub enum UpgradeErrorSelect { + Ping(>::Error), + Identify(>::Error), + Mdns(>::Error), + } + + use std::{pin::Pin, vec::IntoIter}; + + type IdentifyFuture = + >::Future; + type PingFuture = >::Future; + type MdnsFuture = >::Future; + + pub enum SubstreamFuture { + Ping(PingFuture), + Identify(IdentifyFuture), + Mdns(MdnsFuture), + } + + pub enum Pinned<'a> { + Ping(Pin<&'a mut PingFuture>), + Mdns(Pin<&'a mut MdnsFuture>), + Identify(Pin<&'a mut IdentifyFuture>), + } + + /// Aggregated upgrade output, aka substream, that needs to be delegated to the respective + /// `ConnectionHandler`. + pub enum SubstreamSelect { + Ping(>::Output), + Identify(>::Output), + Mdns(>::Output), + } + + impl SubstreamFuture { + pub fn as_pin_mut(self: ::std::pin::Pin<&mut Self>) -> Pinned<'_> { + unsafe { + match *::std::pin::Pin::get_unchecked_mut(self) { + SubstreamFuture::Ping(ref mut inner) => { + Pinned::Ping(::std::pin::Pin::new_unchecked(inner)) + } + SubstreamFuture::Mdns(ref mut inner) => { + Pinned::Mdns(::std::pin::Pin::new_unchecked(inner)) + } + SubstreamFuture::Identify(ref mut inner) => { + Pinned::Identify(::std::pin::Pin::new_unchecked(inner)) + } + } + } + } + } + + impl ::futures::Future for SubstreamFuture { + type Output = Result; + + fn poll( + self: ::std::pin::Pin<&mut Self>, + cx: &mut ::std::task::Context<'_>, + ) -> std::task::Poll { + use ::std::task::Poll; + match self.as_pin_mut() { + Pinned::Ping(inner) => match inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(inner) => Poll::Ready(match inner { + Ok(stream) => Ok(SubstreamSelect::Ping(stream)), + Err(upg_err) => Err(UpgradeErrorSelect::Ping(upg_err)), + }), + }, + Pinned::Mdns(inner) => match inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(inner) => Poll::Ready(match inner { + Ok(stream) => Ok(SubstreamSelect::Mdns(stream)), + Err(upg_err) => Err(UpgradeErrorSelect::Mdns(upg_err)), + }), + }, + Pinned::Identify(inner) => match inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(inner) => Poll::Ready(match inner { + Ok(stream) => Ok(SubstreamSelect::Identify(stream)), + Err(upg_err) => Err(UpgradeErrorSelect::Identify(upg_err)), + }), + }, + } + } + } + + // Make the composed `Protocols` be able to actually accept inbound upgrades. + impl ::libp2p::core::upgrade::InboundUpgrade<::libp2p::swarm::Stream> for Protocols { + type Output = SubstreamSelect; + type Future = SubstreamFuture; + type Error = UpgradeErrorSelect; + + fn upgrade_inbound(self, sock: ::libp2p::Stream, info: UpgradeInfoSelect) -> Self::Future { + match info { + UpgradeInfoSelect::Ping(info) => { + Self::Future::Ping(self.ping.upgrade_inbound(sock, info)) + } + UpgradeInfoSelect::Mdns(info) => { + Self::Future::Mdns(self.mdns.upgrade_inbound(sock, info)) + } + UpgradeInfoSelect::Identify(info) => { + Self::Future::Identify(self.identify.upgrade_inbound(sock, info)) + } + } + } + } + + #[expect(deprecated)] + pub enum NegotiatedSelect { + Ping( + FullyNegotiatedInbound< + PingProtocol, + ::OutboundOpenInfo, + >, + ), + Mdns( + FullyNegotiatedInbound< + MdnsProtocol, + ::OutboundOpenInfo, + >, + ), + Identify( + FullyNegotiatedInbound< + IdentifyProtocol, + ::OutboundOpenInfo, + >, + ), + } + + pub fn transpose_negotiated_inbound( + inbound: FullyNegotiatedInbound, + ) -> NegotiatedSelect { + match inbound { + FullyNegotiatedInbound { + protocol: SubstreamSelect::Ping(stream), + info, + } => NegotiatedSelect::Ping(FullyNegotiatedInbound { + protocol: stream, + info: info.ping, + }), + FullyNegotiatedInbound { + protocol: SubstreamSelect::Mdns(stream), + info, + } => NegotiatedSelect::Mdns(FullyNegotiatedInbound { + protocol: stream, + info: info.mdns, + }), + FullyNegotiatedInbound { + protocol: SubstreamSelect::Identify(stream), + info, + } => NegotiatedSelect::Identify(FullyNegotiatedInbound { + protocol: stream, + info: info.identify, + }), + #[allow(unreachable_patterns)] + _ => panic!("protocol mismatch!"), + } + } +} + +mod outbound_upgrade { + use libp2p::{core::UpgradeInfo, OutboundUpgrade, Stream}; + + use super::*; + + type PingProtocol = ::OutboundProtocol; + type IdentifyProtocol = ::OutboundProtocol; + type MdnsProtocol = ::OutboundProtocol; + + /// Aggreated type for outbound `UpgradeInfo`. + /// ### Difference from inbound upgrade + /// Does not require an accompanying iterator that outputs all supported protocols because you + /// can only open one kind of substream at a time. + #[derive(Clone)] + pub enum UpgradeInfoSelect { + Ping(::Info), + Identify(::Info), + Mdns(::Info), + } + + // "Serialize" the protocol to send over the wire, as well as type erasure. + impl AsRef for UpgradeInfoSelect { + fn as_ref(&self) -> &str { + use UpgradeInfoSelect::*; + match self { + Ping(info) => AsRef::::as_ref(info), + Identify(info) => AsRef::::as_ref(info), + Mdns(info) => AsRef::::as_ref(info), + } + } + } + + /// A marker for the to-be-opened substream. You can put in information about + /// what the substream is negotiated for. + #[expect(deprecated)] + pub enum OpenInfoSelect { + Ping(::OutboundOpenInfo), + Mdns(::OutboundOpenInfo), + Identify(::OutboundOpenInfo), + } + + /// Aggregated type for outbound protocols. + pub enum ProtocolSelect { + Ping(PingProtocol), + Identify(IdentifyProtocol), + Mdns(MdnsProtocol), + } + + impl UpgradeInfo for ProtocolSelect { + type Info = UpgradeInfoSelect; + // Only produce one `UpgradeInfo` because `ProtocolSelect` is also an enum. + type InfoIter = core::iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + match self { + Self::Ping(inner) => core::iter::once(UpgradeInfoSelect::Ping( + inner.protocol_info().into_iter().next().unwrap(), + )), + Self::Mdns(inner) => core::iter::once(UpgradeInfoSelect::Mdns( + inner.protocol_info().into_iter().next().unwrap(), + // Yes, this will panic when we're trying to negotiate a substream for `mdns` + // protocol. But in practice it never does, because the + // protocol never negotiate a substream for itself. + )), + Self::Identify(inner) => core::iter::once(UpgradeInfoSelect::Identify( + inner.protocol_info().into_iter().next().unwrap(), + )), + } + } + } + + pub enum UpgradeErrorSelect { + Ping(>::Error), + Identify(>::Error), + Mdns(>::Error), + } + + use std::pin::Pin; + + type IdentifyFuture = + >::Future; + type PingFuture = >::Future; + type MdnsFuture = >::Future; + + pub enum SubstreamFuture { + Ping(PingFuture), + Identify(IdentifyFuture), + Mdns(MdnsFuture), + } + + pub enum Pinned<'a> { + Ping(Pin<&'a mut PingFuture>), + Mdns(Pin<&'a mut MdnsFuture>), + Identify(Pin<&'a mut IdentifyFuture>), + } + + pub enum SubstreamSelect { + Ping(>::Output), + Identify(>::Output), + Mdns(>::Output), + } + + impl SubstreamFuture { + pub fn as_pin_mut(self: ::std::pin::Pin<&mut Self>) -> Pinned<'_> { + unsafe { + match *::std::pin::Pin::get_unchecked_mut(self) { + SubstreamFuture::Ping(ref mut inner) => { + Pinned::Ping(::std::pin::Pin::new_unchecked(inner)) + } + SubstreamFuture::Mdns(ref mut inner) => { + Pinned::Mdns(::std::pin::Pin::new_unchecked(inner)) + } + SubstreamFuture::Identify(ref mut inner) => { + Pinned::Identify(::std::pin::Pin::new_unchecked(inner)) + } + } + } + } + } + + impl ::futures::Future for SubstreamFuture { + type Output = Result; + + fn poll( + self: ::std::pin::Pin<&mut Self>, + cx: &mut ::std::task::Context<'_>, + ) -> std::task::Poll { + use ::std::task::Poll; + match self.as_pin_mut() { + Pinned::Ping(inner) => match inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(inner) => Poll::Ready(match inner { + Ok(stream) => Ok(SubstreamSelect::Ping(stream)), + Err(upg_err) => Err(UpgradeErrorSelect::Ping(upg_err)), + }), + }, + Pinned::Mdns(inner) => match inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(inner) => Poll::Ready(match inner { + Ok(stream) => Ok(SubstreamSelect::Mdns(stream)), + Err(upg_err) => Err(UpgradeErrorSelect::Mdns(upg_err)), + }), + }, + Pinned::Identify(inner) => match inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(inner) => Poll::Ready(match inner { + Ok(stream) => Ok(SubstreamSelect::Identify(stream)), + Err(upg_err) => Err(UpgradeErrorSelect::Identify(upg_err)), + }), + }, + } + } + } + + impl ::libp2p::core::upgrade::OutboundUpgrade<::libp2p::swarm::Stream> for ProtocolSelect { + type Output = SubstreamSelect; + type Future = SubstreamFuture; + type Error = UpgradeErrorSelect; + + fn upgrade_outbound(self, sock: ::libp2p::Stream, info: UpgradeInfoSelect) -> Self::Future { + match info { + UpgradeInfoSelect::Ping(info) => Self::Future::Ping(match self { + Self::Ping(inner) => inner.upgrade_outbound(sock, info), + _ => panic!("upgrade info and upgrade mismatch!"), + }), + UpgradeInfoSelect::Mdns(info) => Self::Future::Mdns(match self { + Self::Mdns(inner) => inner.upgrade_outbound(sock, info), + _ => panic!("upgrade info and upgrade mismatch!"), + }), + UpgradeInfoSelect::Identify(info) => Self::Future::Identify(match self { + Self::Identify(inner) => inner.upgrade_outbound(sock, info), + _ => panic!("upgrade info and upgrade mismatch!"), + }), + } + } + } + + use libp2p::swarm::handler::FullyNegotiatedOutbound; + + #[expect(deprecated)] + pub(crate) enum NegotiatedSelect { + Ping( + FullyNegotiatedOutbound< + PingProtocol, + ::OutboundOpenInfo, + >, + ), + Mdns( + FullyNegotiatedOutbound< + MdnsProtocol, + ::OutboundOpenInfo, + >, + ), + Identify( + FullyNegotiatedOutbound< + IdentifyProtocol, + ::OutboundOpenInfo, + >, + ), + } + + /// Delegate the negotiated outbound substream. + pub(crate) fn transpose_full_outbound( + outbound: FullyNegotiatedOutbound, + ) -> NegotiatedSelect { + match outbound { + FullyNegotiatedOutbound { + protocol: SubstreamSelect::Ping(stream), + info: OpenInfoSelect::Ping(info), + } => NegotiatedSelect::Ping(FullyNegotiatedOutbound { + protocol: stream, + info, + }), + FullyNegotiatedOutbound { + protocol: SubstreamSelect::Mdns(stream), + info: OpenInfoSelect::Mdns(info), + } => NegotiatedSelect::Mdns(FullyNegotiatedOutbound { + protocol: stream, + info, + }), + FullyNegotiatedOutbound { + protocol: SubstreamSelect::Identify(stream), + info: OpenInfoSelect::Identify(info), + } => NegotiatedSelect::Identify(FullyNegotiatedOutbound { + protocol: stream, + info, + }), + #[allow(unreachable_patterns)] + _ => panic!("protocol mismatch!"), + } + } +} + +impl Handler { + #[expect(deprecated)] + fn on_listen_upgrade_error( + &mut self, + ::libp2p::swarm::handler::ListenUpgradeError { + info, + error, + }: ::libp2p::swarm::handler::ListenUpgradeError< + ::InboundOpenInfo, + ::InboundProtocol, + >, + ) { + use inbound_upgrade::UpgradeErrorSelect::*; + use libp2p::swarm::handler::ConnectionEvent::ListenUpgradeError; + // delegate the error to their respective `ConnectionHandler` + match error { + Identify(error) => self.identify.on_connection_event(ListenUpgradeError( + libp2p::swarm::handler::ListenUpgradeError { + info: info.identify, + error, + }, + )), + inbound_upgrade::UpgradeErrorSelect::Mdns(error) => self.mdns.on_connection_event( + ListenUpgradeError(libp2p::swarm::handler::ListenUpgradeError { + info: info.identify, + error, + }), + ), + inbound_upgrade::UpgradeErrorSelect::Ping(error) => self.ping.on_connection_event( + ListenUpgradeError(libp2p::swarm::handler::ListenUpgradeError { + info: info.identify, + error, + }), + ), + } + } +} + +#[expect(deprecated)] +impl ConnectionHandler for Handler { + type FromBehaviour = FromBehaviour; + + type ToBehaviour = ToBehaviour; + + type InboundProtocol = inbound_upgrade::Protocols; + + type OutboundProtocol = outbound_upgrade::ProtocolSelect; + + type InboundOpenInfo = inbound_upgrade::OpenInfo; + + type OutboundOpenInfo = outbound_upgrade::OpenInfoSelect; + + fn listen_protocol( + &self, + ) -> libp2p::swarm::SubstreamProtocol { + let mdns = self.mdns.listen_protocol().into_upgrade(); + let ping = self.ping.listen_protocol().into_upgrade(); + let identify = self.identify.listen_protocol().into_upgrade(); + generate_substream_protocol(mdns, ping, identify) + } + + fn connection_keep_alive(&self) -> bool { + [ + self.mdns.connection_keep_alive(), + self.ping.connection_keep_alive(), + self.identify.connection_keep_alive(), + ] + .into_iter() + .max() + .unwrap_or(false) + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll< + libp2p::swarm::ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + >, + > { + // Polling all the `ConnectionHandler`. The order shouldn't matter because of the compiler. + use libp2p::swarm::ConnectionHandlerEvent::*; + match self.ping.poll(cx) { + Poll::Ready(NotifyBehaviour(event)) => { + return Poll::Ready(NotifyBehaviour(ToBehaviour::Ping(event))); + } + Poll::Ready(OutboundSubstreamRequest { protocol }) => { + return Poll::Ready(OutboundSubstreamRequest { + protocol: protocol + .map_upgrade(|u| outbound_upgrade::ProtocolSelect::Ping(u)) + .map_info(|i| outbound_upgrade::OpenInfoSelect::Ping(i)), + }); + } + Poll::Ready(ReportRemoteProtocols(report)) => { + return Poll::Ready(ReportRemoteProtocols(report)); + } + Poll::Pending => (), + _ => (), + }; + match self.identify.poll(cx) { + Poll::Ready(NotifyBehaviour(event)) => { + return Poll::Ready(NotifyBehaviour(ToBehaviour::Identify(event))); + } + Poll::Ready(OutboundSubstreamRequest { protocol }) => { + return Poll::Ready(OutboundSubstreamRequest { + protocol: protocol + .map_upgrade(|u| outbound_upgrade::ProtocolSelect::Identify(u)) + .map_info(|i| outbound_upgrade::OpenInfoSelect::Identify(i)), + }); + } + Poll::Ready(ReportRemoteProtocols(report)) => { + return Poll::Ready(ReportRemoteProtocols(report)); + } + Poll::Pending => (), + _ => (), + }; + match self.mdns.poll(cx) { + Poll::Ready(NotifyBehaviour(event)) => { + return Poll::Ready(NotifyBehaviour(ToBehaviour::Mdns(event))); + } + Poll::Ready(OutboundSubstreamRequest { protocol }) => { + return Poll::Ready(OutboundSubstreamRequest { + protocol: protocol + .map_upgrade(|u| outbound_upgrade::ProtocolSelect::Mdns(u)) + .map_info(|i| outbound_upgrade::OpenInfoSelect::Mdns(i)), + }); + } + Poll::Ready(ReportRemoteProtocols(report)) => { + return Poll::Ready(ReportRemoteProtocols(report)); + } + Poll::Pending => (), + _ => (), + }; + Poll::Pending + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + use FromBehaviour::*; + match event { + Ping(ev) => self.ping.on_behaviour_event(ev), + Mdns(ev) => self.mdns.on_behaviour_event(ev), + Identify(ev) => self.identify.on_behaviour_event(ev), + } + } + + fn on_connection_event( + &mut self, + event: libp2p::swarm::handler::ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + // Hand the negotiated substream over to the corresponding `ConnectionHandler`. + // Substreams are negotiated and owned by their `ConnectionHandler`, not the composed + // handler. + ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + use outbound_upgrade::NegotiatedSelect::*; + use ConnectionEvent::FullyNegotiatedOutbound; + match outbound_upgrade::transpose_full_outbound(fully_negotiated_outbound) { + Mdns(inner) => self + .mdns + .on_connection_event(FullyNegotiatedOutbound(inner)), + Ping(inner) => self + .ping + .on_connection_event(FullyNegotiatedOutbound(inner)), + Identify(inner) => self + .identify + .on_connection_event(FullyNegotiatedOutbound(inner)), + } + } + ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + use inbound_upgrade::NegotiatedSelect::*; + use ConnectionEvent::FullyNegotiatedInbound; + match inbound_upgrade::transpose_negotiated_inbound(fully_negotiated_inbound) { + Mdns(inner) => self.mdns.on_connection_event(FullyNegotiatedInbound(inner)), + Ping(inner) => self.ping.on_connection_event(FullyNegotiatedInbound(inner)), + Identify(inner) => self + .identify + .on_connection_event(FullyNegotiatedInbound(inner)), + } + } + ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + match transpose_upgr_error(dial_upgrade_error) { + DialUpgradeErrorSelect::Ping(inner) => self + .ping + .on_connection_event(ConnectionEvent::DialUpgradeError(inner)), + DialUpgradeErrorSelect::Mdns(inner) => self + .mdns + .on_connection_event(ConnectionEvent::DialUpgradeError(inner)), + DialUpgradeErrorSelect::Identify(inner) => self + .identify + .on_connection_event(ConnectionEvent::DialUpgradeError(inner)), + } + } + // Below are events that need to be "broadcast" to all `ConnectionHandler`s. + // We need to re-package the event to do "type-erasure" because even though + // no generics are involved, they are still a part of the type. + ConnectionEvent::AddressChange(address) => { + self.mdns + .on_connection_event(ConnectionEvent::AddressChange( + libp2p::swarm::handler::AddressChange { + new_address: address.new_address, + }, + )); + self.ping + .on_connection_event(ConnectionEvent::AddressChange( + libp2p::swarm::handler::AddressChange { + new_address: address.new_address, + }, + )); + self.identify + .on_connection_event(ConnectionEvent::AddressChange( + libp2p::swarm::handler::AddressChange { + new_address: address.new_address, + }, + )); + } + ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { + self.on_listen_upgrade_error(listen_upgrade_error) + } + ConnectionEvent::LocalProtocolsChange(supported_protocols) => { + self.mdns + .on_connection_event(ConnectionEvent::LocalProtocolsChange( + supported_protocols.clone(), + )); + self.ping + .on_connection_event(ConnectionEvent::LocalProtocolsChange( + supported_protocols.clone(), + )); + self.identify + .on_connection_event(ConnectionEvent::LocalProtocolsChange( + supported_protocols.clone(), + )); + } + ConnectionEvent::RemoteProtocolsChange(supported_protocols) => { + self.mdns + .on_connection_event(ConnectionEvent::RemoteProtocolsChange( + supported_protocols.clone(), + )); + self.ping + .on_connection_event(ConnectionEvent::RemoteProtocolsChange( + supported_protocols.clone(), + )); + self.identify + .on_connection_event(ConnectionEvent::RemoteProtocolsChange( + supported_protocols.clone(), + )); + } + _ => unimplemented!("New branch not covered"), + } + } +} + +/// Aggregates all supported protocols. +#[expect(deprecated)] +fn generate_substream_protocol( + mdns: ( + ::InboundProtocol, + ::InboundOpenInfo, + ), + ping: ( + ::InboundProtocol, + ::InboundOpenInfo, + ), + identify: ( + ::InboundProtocol, + ::InboundOpenInfo, + ), +) -> ::libp2p::swarm::SubstreamProtocol { + ::libp2p::swarm::SubstreamProtocol::new( + inbound_upgrade::Protocols { + mdns: mdns.0, + identify: identify.0, + ping: ping.0, + }, + inbound_upgrade::OpenInfo { + mdns: mdns.1, + identify: identify.1, + ping: ping.1, + }, + ) +} + +use libp2p::swarm::handler::{DialUpgradeError, StreamUpgradeError}; +#[expect(deprecated)] +enum DialUpgradeErrorSelect { + Ping( + DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, + ), + Mdns( + DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, + ), + Identify( + DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, + ), +} + +fn transpose_upgr_error( + error: DialUpgradeError, +) -> DialUpgradeErrorSelect { + match error { + DialUpgradeError { + error: StreamUpgradeError::Apply(outbound_upgrade::UpgradeErrorSelect::Ping(error)), + info: outbound_upgrade::OpenInfoSelect::Ping(info), + } => DialUpgradeErrorSelect::Ping(DialUpgradeError { + info, + error: StreamUpgradeError::Apply(error), + }), + DialUpgradeError { + error: StreamUpgradeError::Apply(outbound_upgrade::UpgradeErrorSelect::Mdns(error)), + info: outbound_upgrade::OpenInfoSelect::Mdns(info), + } => DialUpgradeErrorSelect::Mdns(DialUpgradeError { + info, + error: StreamUpgradeError::Apply(error), + }), + DialUpgradeError { + error: StreamUpgradeError::Apply(outbound_upgrade::UpgradeErrorSelect::Identify(error)), + info: outbound_upgrade::OpenInfoSelect::Identify(info), + } => DialUpgradeErrorSelect::Identify(DialUpgradeError { + info, + error: StreamUpgradeError::Apply(error), + }), + + DialUpgradeError { + error: e, + info: outbound_upgrade::OpenInfoSelect::Ping(info), + } => DialUpgradeErrorSelect::Ping(DialUpgradeError { + info, + error: e.map_upgrade_err(|_| panic!("already handled above")), + }), + DialUpgradeError { + error: e, + info: outbound_upgrade::OpenInfoSelect::Mdns(info), + } => DialUpgradeErrorSelect::Mdns(DialUpgradeError { + info, + error: e.map_upgrade_err(|_| panic!("already handled above")), + }), + DialUpgradeError { + error: e, + info: outbound_upgrade::OpenInfoSelect::Identify(info), + } => DialUpgradeErrorSelect::Identify(DialUpgradeError { + info, + error: e.map_upgrade_err(|_| panic!("already handled above")), + }), + } +} diff --git a/examples/behaviour-compose/src/main.rs b/examples/behaviour-compose/src/main.rs new file mode 100644 index 00000000000..fc34a0039d7 --- /dev/null +++ b/examples/behaviour-compose/src/main.rs @@ -0,0 +1,26 @@ +#![no_main] +//! # Manually composing `NetworkBehaviour`s +//! +//! In most cases, `NetworkBehaviour` derive macro should be able to +//! do the heavy lifting of composing behaviours. But manually composing +//! behaviours can bring about some degree of freedom. +//! +//! This example not only shows how to compose `NetworkBehaviour`s, +//! but also some core concepts of `rust-libp2p`. +//! +//! To fully understand how to compose behaviours, the core concepts cannot be forgotten: +//! - [`Swarm`] only holds **one `NetworkBehaviour` instance**: All composed behaviour instances +//! should live under one composed instance, typically a struct. +//! - [`NetworkBehaviour`] only requires **one `ConnectionHandler` type** +//! and only assign one instance of it per connection: All `ConnectionHandler`s should also reside +//! in the same instance, also a struct. +//! The "illusion" of multiple behaviour acting together comes from properly **delegating** calls +//! and **aggregating** events. +//! This tutorial is broken down into two parts: composing `NetworkBehaviour` and composing +//! `ConnectionHandler`. The former is easier to understand and implement, while the latter is more +//! complex because it involves how peers negotiate protocols. But they need to work together. +//! +//! Proceeding to their dedicated modules for more information. + +pub mod behaviour; +pub mod connection_handler; diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 8c8c5998f67..0b0d66f2809 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -121,6 +121,13 @@ use crate::{ /// } /// } /// ``` +/// +/// ## Manually compose multiple `NetworkBehaviour`s without derive macro +/// +/// Although it is not the recommended way to compose behaviours, you can gain more +/// insight into how `rust-libp2p` works under the hood this way. +/// See the dedicated example for more information. + pub trait NetworkBehaviour: 'static { /// Handler for all the protocols the network behaviour supports. type ConnectionHandler: ConnectionHandler;