diff --git a/Cargo.lock b/Cargo.lock index d812fc0fa4b..b12827a3d55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2690,6 +2690,7 @@ dependencies = [ "libp2p-metrics", "libp2p-mplex", "libp2p-noise", + "libp2p-peer-store", "libp2p-ping", "libp2p-plaintext", "libp2p-pnet", @@ -3094,6 +3095,20 @@ dependencies = [ "zeroize", ] +[[package]] +name = "libp2p-peer-store" +version = "0.1.0" +dependencies = [ + "libp2p", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "libp2p-swarm-test", + "lru", + "serde_json", + "tokio", +] + [[package]] name = "libp2p-perf" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index f6220cece5e..b4fa59654c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "misc/memory-connection-limits", "misc/metrics", "misc/multistream-select", + "misc/peer-store", "misc/quick-protobuf-codec", "misc/quickcheck-ext", "misc/rw-stream-sink", @@ -89,6 +90,7 @@ libp2p-memory-connection-limits = { version = "0.4.0", path = "misc/memory-conne libp2p-metrics = { version = "0.16.1", path = "misc/metrics" } libp2p-mplex = { version = "0.43.1", path = "muxers/mplex" } libp2p-noise = { version = "0.46.0", path = "transports/noise" } +libp2p-peer-store = { version = "0.1.0", path = "misc/peer-store" } libp2p-perf = { version = "0.4.0", path = "protocols/perf" } libp2p-ping = { version = "0.46.0", path = "protocols/ping" } libp2p-plaintext = { version = "0.43.0", path = "transports/plaintext" } diff --git a/libp2p/CHANGELOG.md b/libp2p/CHANGELOG.md index d75bd7fd9fa..d144d39ed79 100644 --- a/libp2p/CHANGELOG.md +++ b/libp2p/CHANGELOG.md @@ -1,6 +1,9 @@ ## 0.55.1 - Introduce `libp2p-webrtc-websys` behind `webrtc-websys` feature flag. See [PR 5819](https://github.com/libp2p/rust-libp2p/pull/5819). + +- Introduce `libp2p-peer-store`. + See [PR 5724](https://github.com/libp2p/rust-libp2p/pull/5724). - Make the `*-websys` variants (`libp2p-webrtc-websys`, `libp2p-websocket-websys`, `libp2p-webtransport-websys`) only available on wasm32 target architecture. See [PR 5891](https://github.com/libp2p/rust-libp2p/pull/5891). diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 870f0f2e131..778029f4385 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -29,6 +29,7 @@ full = [ "memory-connection-limits", "metrics", "noise", + "peer-store", "ping", "plaintext", "pnet", @@ -69,6 +70,7 @@ mdns = ["dep:libp2p-mdns"] memory-connection-limits = ["dep:libp2p-memory-connection-limits"] metrics = ["dep:libp2p-metrics"] noise = ["dep:libp2p-noise"] +peer-store = ["dep:libp2p-peer-store"] ping = ["dep:libp2p-ping", "libp2p-metrics?/ping"] plaintext = ["dep:libp2p-plaintext"] pnet = ["dep:libp2p-pnet"] @@ -111,6 +113,7 @@ libp2p-identity = { workspace = true, features = ["rand"] } libp2p-kad = { workspace = true, optional = true } libp2p-metrics = { workspace = true, optional = true } libp2p-noise = { workspace = true, optional = true } +libp2p-peer-store = { workspace = true, optional = true } libp2p-ping = { workspace = true, optional = true } libp2p-plaintext = { workspace = true, optional = true } libp2p-pnet = { workspace = true, optional = true } diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 51f4eedeb1a..1814bb4fcbb 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -81,6 +81,9 @@ pub use libp2p_metrics as metrics; #[cfg(feature = "noise")] #[doc(inline)] pub use libp2p_noise as noise; +#[cfg(feature = "peer-store")] +#[doc(inline)] +pub use libp2p_peer_store as peer_store; #[cfg(feature = "ping")] #[doc(inline)] pub use libp2p_ping as ping; diff --git a/misc/peer-store/CHANGELOG.md b/misc/peer-store/CHANGELOG.md new file mode 100644 index 00000000000..8353d77b5cf --- /dev/null +++ b/misc/peer-store/CHANGELOG.md @@ -0,0 +1,4 @@ +## 0.1.0 + +- Introduce `libp2p-peer-store`. + See [PR 5724](https://github.com/libp2p/rust-libp2p/pull/5724). diff --git a/misc/peer-store/Cargo.toml b/misc/peer-store/Cargo.toml new file mode 100644 index 00000000000..e5775feea0c --- /dev/null +++ b/misc/peer-store/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "libp2p-peer-store" +edition = "2021" +version = "0.1.0" +authors = ["drHuangMHT "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +rust-version.workspace = true + +[dependencies] +libp2p-core = { workspace = true } +libp2p-swarm = { workspace = true } +lru = "0.12.3" +libp2p-identity = { workspace = true, optional = true} + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +libp2p-identity = { workspace = true, features = ["rand", "serde"] } +libp2p = { workspace = true, features = ["macros", "identify"] } +libp2p-swarm-test = { path = "../../swarm-test", features = ["tokio"]} +serde_json = { version = "1.0.134" } + +[lints] +workspace = true diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs new file mode 100644 index 00000000000..36bbd11d2c1 --- /dev/null +++ b/misc/peer-store/src/behaviour.rs @@ -0,0 +1,161 @@ +use std::{collections::VecDeque, task::Poll}; + +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::{dummy, NetworkBehaviour}; + +use crate::store::Store; + +/// Events generated by [`Behaviour`] and emitted back to [`Swarm`](libp2p_swarm::Swarm). +#[derive(Debug, Clone)] +pub enum Event { + /// The peer's record has been updated. + /// Manually updating a record will always emit this event + /// even if it provides no new information. + RecordUpdated { + /// The peer that has an update. + peer: PeerId, + }, + /// Event from the internal store. + Store(T), +} + +/// Behaviour that maintains a peer address book. +/// +/// Usage: +/// ``` +/// use libp2p::swarm::NetworkBehaviour; +/// use libp2p_peer_store::{memory_store::MemoryStore, Behaviour}; +/// +/// // `identify::Behaviour` broadcasts listen addresses of the peer, +/// // `peer_store::Behaviour` will then capture the resulting +/// // `FromSwarm::NewExternalAddrOfPeer` and add the addresses +/// // to address book. +/// #[derive(NetworkBehaviour)] +/// struct ComposedBehaviour { +/// peer_store: Behaviour, +/// identify: libp2p::identify::Behaviour, +/// } +/// ``` +pub struct Behaviour { + /// The internal store. + store: S, + /// Pending Events to be emitted back to [`Swarm`](libp2p_swarm::Swarm). + pending_events: VecDeque>, +} + +impl<'a, S> Behaviour +where + S: Store + 'static, +{ + /// Build a new [`Behaviour`] with the given store. + pub fn new(store: S) -> Self { + Self { + store, + pending_events: VecDeque::new(), + } + } + + /// Try to get all observed address of the given peer. + /// Returns `None` when the peer is not in the store. + pub fn address_of_peer<'b>( + &'a self, + peer: &'b PeerId, + ) -> Option + use<'a, 'b, S>> { + self.store.addresses_of_peer(peer) + } + + /// Get an immutable reference to the internal store. + pub fn store(&self) -> &S { + &self.store + } + + /// Get a mutable reference to the internal store. + pub fn store_mut(&mut self) -> &mut S { + &mut self.store + } + + fn handle_store_event(&mut self, event: crate::store::Event<::FromStore>) { + use crate::store::Event::*; + match event { + RecordUpdated(peer) => self.pending_events.push_back(Event::RecordUpdated { peer }), + Store(ev) => self.pending_events.push_back(Event::Store(ev)), + } + } +} + +impl NetworkBehaviour for Behaviour +where + S: Store + 'static, + ::FromStore: Send + Sync, +{ + type ConnectionHandler = dummy::ConnectionHandler; + + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + _peer: libp2p_core::PeerId, + _local_addr: &libp2p_core::Multiaddr, + _remote_addr: &libp2p_core::Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(dummy::ConnectionHandler) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + if maybe_peer.is_none() { + return Ok(Vec::new()); + } + let peer = maybe_peer.expect("already handled"); + Ok(self + .store + .addresses_of_peer(&peer) + .map(|i| i.cloned().collect()) + .unwrap_or_default()) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + _peer: libp2p_core::PeerId, + _addr: &libp2p_core::Multiaddr, + _role_override: libp2p_core::Endpoint, + _port_use: libp2p_core::transport::PortUse, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(dummy::ConnectionHandler) + } + + fn on_swarm_event(&mut self, event: libp2p_swarm::FromSwarm) { + self.store.on_swarm_event(&event); + } + + fn on_connection_handler_event( + &mut self, + _peer_id: libp2p_core::PeerId, + _connection_id: libp2p_swarm::ConnectionId, + _event: libp2p_swarm::THandlerOutEvent, + ) { + unreachable!("No event will be produced by a dummy handler.") + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>> + { + if let Some(ev) = self.store.poll(cx) { + self.handle_store_event(ev); + }; + + if let Some(ev) = self.pending_events.pop_front() { + return Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(ev)); + } + Poll::Pending + } +} diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs new file mode 100644 index 00000000000..e306b828f51 --- /dev/null +++ b/misc/peer-store/src/lib.rs @@ -0,0 +1,25 @@ +//! Implementation of peer store that stores address information +//! about foreign peers. +//! +//! ## Important Discrepancies +//! - **PeerStore is a local**: The peer store itself doesn't facilitate any information exchange +//! between peers. You will need other protocols like `libp2p-kad` to share addresses you know +//! across the network. +//! - **PeerStore is a standalone**: Other protocols cannot expect the existence of PeerStore, and +//! need to be manually hooked up to PeerStore in order to obtain information it provides. +//! +//! ## Usage +//! Compose [`Behaviour`] with other [`NetworkBehaviour`](libp2p_swarm::NetworkBehaviour), +//! and the PeerStore will automatically track addresses from +//! [`FromSwarm::NewExternalAddrOfPeer`](libp2p_swarm::FromSwarm) +//! and provide addresses when dialing peers(require `extend_addresses_through_behaviour` in +//! [`DialOpts`](libp2p_swarm::dial_opts::DialOpts) when dialing). +//! Other protocols need to be manually hooked up to obtain information from +//! or provide information to PeerStore. + +mod behaviour; +pub mod memory_store; +mod store; + +pub use behaviour::{Behaviour, Event}; +pub use store::Store; diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs new file mode 100644 index 00000000000..cc6a4d29558 --- /dev/null +++ b/misc/peer-store/src/memory_store.rs @@ -0,0 +1,505 @@ +//! An in-memory [`Store`] implementation. +//! +//! ## Usage +//! ``` +//! use libp2p_peer_store::{memory_store::MemoryStore, Behaviour}; +//! +//! let store: MemoryStore<()> = MemoryStore::new(Default::default()); +//! let behaviour = Behaviour::new(store); +//! ``` + +use std::{ + collections::{HashMap, VecDeque}, + num::NonZeroUsize, + task::Waker, +}; + +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::FromSwarm; +use lru::LruCache; + +use super::Store; + +/// Event from the store and emitted to [`Swarm`](libp2p_swarm::Swarm). +#[derive(Debug, Clone)] +pub enum Event { + /// Custom data of the peer has been updated. + CustomDataUpdated(PeerId), +} + +/// A in-memory store that uses LRU cache for bounded storage of addresses +/// and a frequency-based ordering of addresses. +#[derive(Default)] +pub struct MemoryStore { + /// The internal store. + records: HashMap>, + /// Events to emit to [`Behaviour`](crate::Behaviour) and [`Swarm`](libp2p_swarm::Swarm) + pending_events: VecDeque>, + /// Config of the store. + config: Config, + /// Waker for store events. + waker: Option, +} + +impl MemoryStore { + /// Create a new [`MemoryStore`] with the given config. + pub fn new(config: Config) -> Self { + Self { + config, + records: HashMap::new(), + pending_events: VecDeque::default(), + waker: None, + } + } + + /// Update an address record and notify swarm when the address is new. + /// Returns `true` when the address is new. + pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + let is_updated = self.update_address_silent(peer, address); + if is_updated { + self.pending_events + .push_back(crate::store::Event::RecordUpdated(*peer)); + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } + is_updated + } + + /// Update an address record without notifying swarm. + /// Returns `true` when the address is new. + pub fn update_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + if let Some(record) = self.records.get_mut(peer) { + return record.update_address(address); + } + let mut new_record = PeerRecord::new(self.config.record_capacity); + new_record.update_address(address); + self.records.insert(*peer, new_record); + true + } + + /// Remove an address record. + /// Returns `true` when the address is removed. + pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + self.records + .get_mut(peer) + .is_some_and(|r| r.remove_address(address)) + } + + pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> { + self.records.get(peer).and_then(|r| r.get_custom_data()) + } + + /// Take ownership of the internal data, leaving `None` in its place. + pub fn take_custom_data(&mut self, peer: &PeerId) -> Option { + self.records + .get_mut(peer) + .and_then(|r| r.take_custom_data()) + } + + /// Insert the data and notify the swarm about the update, dropping the old data if it exists. + pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) { + self.insert_custom_data_silent(peer, custom_data); + self.pending_events + .push_back(crate::store::Event::Store(Event::CustomDataUpdated(*peer))); + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } + + /// Insert the data without notifying the swarm. Old data will be dropped if it exists. + pub fn insert_custom_data_silent(&mut self, peer: &PeerId, custom_data: T) { + if let Some(r) = self.records.get_mut(peer) { + return r.insert_custom_data(custom_data); + } + let mut new_record = PeerRecord::new(self.config.record_capacity); + new_record.insert_custom_data(custom_data); + self.records.insert(*peer, new_record); + } + + /// Iterate over all internal records. + pub fn record_iter(&self) -> impl Iterator)> { + self.records.iter() + } + + /// Iterate over all internal records mutably. + /// Will not wake up the task. + pub fn record_iter_mut(&mut self) -> impl Iterator)> { + self.records.iter_mut() + } +} + +impl Store for MemoryStore { + type FromStore = Event; + + fn on_swarm_event(&mut self, swarm_event: &FromSwarm) { + match swarm_event { + FromSwarm::NewExternalAddrOfPeer(info) => { + self.update_address(&info.peer_id, info.addr); + } + FromSwarm::ConnectionEstablished(info) => { + let mut is_record_updated = false; + for failed_addr in info.failed_addresses { + is_record_updated |= self.remove_address(&info.peer_id, failed_addr); + } + is_record_updated |= + self.update_address_silent(&info.peer_id, info.endpoint.get_remote_address()); + if is_record_updated { + self.pending_events + .push_back(crate::store::Event::RecordUpdated(info.peer_id)); + if let Some(waker) = self.waker.take() { + waker.wake(); // wake up because of update + } + } + } + _ => {} + } + } + + fn addresses_of_peer(&self, peer: &PeerId) -> Option> { + self.records.get(peer).map(|record| record.addresses()) + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> Option> { + if self.pending_events.is_empty() { + self.waker = Some(cx.waker().clone()); + } + self.pending_events.pop_front() + } +} + +/// Config for [`MemoryStore`]. +#[derive(Debug, Clone)] +pub struct Config { + /// The capacaity of an address store. + /// The least active address will be discarded to make room for new address. + record_capacity: NonZeroUsize, +} + +impl Default for Config { + fn default() -> Self { + Self { + record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"), + } + } +} + +impl Config { + /// Capacity for address records. + /// The least active address will be dropped to make room for new address. + pub fn record_capacity(&self) -> &NonZeroUsize { + &self.record_capacity + } + /// Set the capacity for address records. + pub fn set_record_capacity(mut self, capacity: NonZeroUsize) -> Self { + self.record_capacity = capacity; + self + } +} + +/// Internal record of [`MemoryStore`]. +#[derive(Debug, Clone)] +pub struct PeerRecord { + /// A LRU(Least Recently Used) cache for addresses. + /// Will delete the least-recently-used record when full. + addresses: LruCache, + /// Custom data attached to the peer. + custom_data: Option, +} +impl PeerRecord { + pub(crate) fn new(cap: NonZeroUsize) -> Self { + Self { + addresses: LruCache::new(cap), + custom_data: None, + } + } + + /// Iterate over all addresses. More recently-used address comes first. + /// Does not change the order. + pub fn addresses(&self) -> impl Iterator { + self.addresses.iter().map(|(addr, _)| addr) + } + + /// Update the address in the LRU cache, promote it to the front if it exists, + /// insert it to the front if not. + /// Returns true when the address is new. + pub fn update_address(&mut self, address: &Multiaddr) -> bool { + if self.addresses.get(address).is_some() { + return false; + } + self.addresses.get_or_insert(address.clone(), || ()); + true + } + + /// Remove the address in the LRU cache regardless of its position. + /// Returns true when the address is removed, false when not exist. + pub fn remove_address(&mut self, address: &Multiaddr) -> bool { + self.addresses.pop(address).is_some() + } + + pub fn get_custom_data(&self) -> Option<&T> { + self.custom_data.as_ref() + } + + pub fn take_custom_data(&mut self) -> Option { + self.custom_data.take() + } + + pub fn insert_custom_data(&mut self, custom_data: T) { + let _ = self.custom_data.insert(custom_data); + } +} + +#[cfg(test)] +mod test { + use std::{num::NonZero, str::FromStr}; + + use libp2p::Swarm; + use libp2p_core::{Multiaddr, PeerId}; + use libp2p_swarm::{NetworkBehaviour, SwarmEvent}; + use libp2p_swarm_test::SwarmExt; + + use super::MemoryStore; + use crate::Store; + + #[test] + fn recent_use_bubble_up() { + let mut store: MemoryStore = MemoryStore::new(Default::default()); + let peer = PeerId::random(); + let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); + let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); + let addr3 = Multiaddr::from_str("/ip4/127.0.0.3").expect("parsing to succeed"); + store.update_address(&peer, &addr1); + store.update_address(&peer, &addr2); + store.update_address(&peer, &addr3); + assert!( + store + .records + .get(&peer) + .expect("peer to be in the store") + .addresses() + .collect::>() + == vec![&addr3, &addr2, &addr1] + ); + store.update_address(&peer, &addr1); + assert!( + store + .records + .get(&peer) + .expect("peer to be in the store") + .addresses() + .collect::>() + == vec![&addr1, &addr3, &addr2] + ); + store.update_address(&peer, &addr3); + assert!( + store + .records + .get(&peer) + .expect("peer to be in the store") + .addresses() + .collect::>() + == vec![&addr3, &addr1, &addr2] + ); + } + + #[test] + fn bounded_store() { + let mut store: MemoryStore = MemoryStore::new(Default::default()); + let peer = PeerId::random(); + for i in 1..10 { + let addr_string = format!("/ip4/127.0.0.{}", i); + store.update_address( + &peer, + &Multiaddr::from_str(&addr_string).expect("parsing to succeed"), + ); + } + let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); + assert!(!store + .addresses_of_peer(&peer) + .expect("peer to be in the store") + .any(|addr| *addr == first_record)); + let second_record = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); + assert!( + *store + .addresses_of_peer(&peer) + .expect("peer to be in the store") + .last() + .expect("addr to exist") + == second_record + ); + } + + #[test] + fn update_address_on_connect() { + async fn expect_record_update( + swarm: &mut Swarm>, + expected_peer: PeerId, + ) { + swarm + .wait(|ev| match ev { + SwarmEvent::Behaviour(crate::Event::RecordUpdated { peer }) => { + (peer == expected_peer).then_some(()) + } + _ => None, + }) + .await + } + + let store1: MemoryStore<()> = MemoryStore::new( + crate::memory_store::Config::default() + .set_record_capacity(NonZero::new(2).expect("2 > 0")), + ); + let mut swarm1 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store1)); + let store2: MemoryStore<()> = MemoryStore::new( + crate::memory_store::Config::default() + .set_record_capacity(NonZero::new(2).expect("2 > 0")), + ); + let mut swarm2 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store2)); + + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async { + let (listen_addr, _) = swarm1.listen().with_memory_addr_external().await; + let swarm1_peer_id = *swarm1.local_peer_id(); + swarm2.dial(listen_addr.clone()).expect("dial to succeed"); + let handle = spawn_wait_conn_established(swarm1); + swarm2 + .wait(|ev| match ev { + SwarmEvent::ConnectionEstablished { .. } => Some(()), + _ => None, + }) + .await; + let mut swarm1 = handle.await.expect("future to complete"); + assert!(swarm2 + .behaviour() + .address_of_peer(&swarm1_peer_id) + .expect("swarm should be connected and record about it should be created") + .any(|addr| *addr == listen_addr)); + expect_record_update(&mut swarm1, *swarm2.local_peer_id()).await; + let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await; + let handle = spawn_wait_conn_established(swarm1); + swarm2 + .dial( + libp2p_swarm::dial_opts::DialOpts::peer_id(swarm1_peer_id) + .condition(libp2p_swarm::dial_opts::PeerCondition::Always) + .addresses(vec![new_listen_addr.clone()]) + .build(), + ) + .expect("dial to succeed"); + swarm2 + .wait(|ev| match ev { + SwarmEvent::ConnectionEstablished { .. } => Some(()), + _ => None, + }) + .await; + handle.await.expect("future to complete"); + expect_record_update(&mut swarm2, swarm1_peer_id).await; + // The address in store will contain peer ID. + let new_listen_addr = new_listen_addr + .with_p2p(swarm1_peer_id) + .expect("extend to succeed"); + assert!( + swarm2 + .behaviour() + .address_of_peer(&swarm1_peer_id) + .expect("peer to exist") + .collect::>() + == vec![&new_listen_addr, &listen_addr] + ); + }) + } + + #[test] + fn identify_external_addr_report() { + #[derive(NetworkBehaviour)] + struct Behaviour { + peer_store: crate::Behaviour, + identify: libp2p::identify::Behaviour, + } + async fn expect_record_update(swarm: &mut Swarm, expected_peer: PeerId) { + swarm + .wait(|ev| match ev { + SwarmEvent::Behaviour(BehaviourEvent::PeerStore( + crate::Event::RecordUpdated { peer }, + )) => (peer == expected_peer).then_some(()), + _ => None, + }) + .await + } + fn build_swarm() -> Swarm { + Swarm::new_ephemeral_tokio(|kp| Behaviour { + peer_store: crate::Behaviour::new(MemoryStore::new( + crate::memory_store::Config::default() + .set_record_capacity(NonZero::new(4).expect("4 > 0")), + )), + identify: libp2p::identify::Behaviour::new(libp2p::identify::Config::new( + "/TODO/0.0.1".to_string(), + kp.public(), + )), + }) + } + let mut swarm1 = build_swarm(); + let mut swarm2 = build_swarm(); + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async { + let (listen_addr, _) = swarm1.listen().with_memory_addr_external().await; + let swarm1_peer_id = *swarm1.local_peer_id(); + let swarm2_peer_id = *swarm2.local_peer_id(); + swarm2.dial(listen_addr.clone()).expect("dial to succeed"); + let handle = spawn_wait_conn_established(swarm1); + let mut swarm2 = spawn_wait_conn_established(swarm2) + .await + .expect("future to complete"); + let mut swarm1 = handle.await.expect("future to complete"); + // expexting update from direct connection. + expect_record_update(&mut swarm2, swarm1_peer_id).await; + assert!(swarm2 + .behaviour() + .peer_store + .address_of_peer(&swarm1_peer_id) + .expect("swarm should be connected and record about it should be created") + .any(|addr| *addr == listen_addr)); + expect_record_update(&mut swarm1, *swarm2.local_peer_id()).await; + swarm1.next_swarm_event().await; // skip `identify::Event::Sent` + swarm1.next_swarm_event().await; // skip `identify::Event::Received` + let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await; + swarm1.behaviour_mut().identify.push([swarm2_peer_id]); + tokio::spawn(swarm1.loop_on_next()); + // Expecting 3 updates from Identify: + // 2 pair of mem and tcp address for two calls to `::listen()` + // with one address already present through direct connection. + // FLAKY: tcp addresses are not explicitly marked as external addresses. + expect_record_update(&mut swarm2, swarm1_peer_id).await; + expect_record_update(&mut swarm2, swarm1_peer_id).await; + expect_record_update(&mut swarm2, swarm1_peer_id).await; + // The address in store won't contain peer ID because it is from Identify. + assert!(swarm2 + .behaviour() + .peer_store + .address_of_peer(&swarm1_peer_id) + .expect("peer to exist") + .any(|addr| *addr == new_listen_addr)); + }) + } + + fn spawn_wait_conn_established(mut swarm: Swarm) -> tokio::task::JoinHandle> + where + T: NetworkBehaviour + Send + Sync, + Swarm: SwarmExt, + { + tokio::spawn(async move { + swarm + .wait(|ev| match ev { + SwarmEvent::ConnectionEstablished { .. } => Some(()), + _ => None, + }) + .await; + swarm + }) + } +} diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs new file mode 100644 index 00000000000..1226c8f6071 --- /dev/null +++ b/misc/peer-store/src/store.rs @@ -0,0 +1,31 @@ +use std::{fmt::Debug, task::Context}; + +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::FromSwarm; + +/// A store that contains all observed addresses of peers. +pub trait Store { + /// Event generated by the store and emitted to [`Swarm`](libp2p_swarm::Swarm). + /// [`Behaviour`](crate::Behaviour) cannot handle this event. + type FromStore: Debug + Send; + + /// How this store handles events from [`Swarm`](libp2p_swarm::Swarm). + fn on_swarm_event(&mut self, event: &FromSwarm); + + /// Get all stored addresses of the peer. + fn addresses_of_peer(&self, peer: &PeerId) -> Option>; + + /// Polls for things that the store should do. + /// The task should be waked up to emit events to [`Behaviour`](crate::Behaviour) and + /// [`Swarm`](libp2p_swarm::Swarm). + fn poll(&mut self, cx: &mut Context<'_>) -> Option>; +} + +/// Event that will be handled by [`Behaviour`](crate::Behaviour). +pub enum Event { + /// An address record has been updated. + RecordUpdated(PeerId), + /// Event generated by the store. + /// [`Behaviour`](crate::Behaviour) can only forward the event to swarm. + Store(T), +}