Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): implement message sizes per topic. #5868

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/ipfs-private/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.with_dns()?
.with_behaviour(|key| {
let gossipsub_config = gossipsub::ConfigBuilder::default()
.max_transmit_size(262144)
.max_transmit_size_for_topic(262144, gossipsub_topic.hash())
.build()
.map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?; // Temporary hack because `build` does not return a proper `std::error::Error`.
Ok(MyBehaviour {
Expand Down
4 changes: 4 additions & 0 deletions libp2p/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
- Make the `*-websys` variants (`libp2p-webrtc-websys`, `libp2p-websocket-websys`, `libp2p-webtransport-websys`) only available on wasm32 target architecture.
See [PR 5891](https://github.com/libp2p/rust-libp2p/pull/5891).

- Adds TopicsConfig structure and topic_configuration key to Config and ProtocolConfig to enable
setting message and mesh-*-n sizes on the Gossipsub configuration. Adds appropriate functions to
allow modifying default values from the config interface.

## 0.55.0

- Raise MSRV to 1.83.0.
Expand Down
64 changes: 37 additions & 27 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,14 +592,17 @@ where
// Transform the data before building a raw_message.
let transformed_data = self
.data_transform
.outbound_transform(&topic, data.clone())?;
.outbound_transform(&topic.clone(), data.clone())?;

let max_transmit_size_for_topic = self.config.max_transmit_size_for_topic(&topic);

// check that the size doesn't exceed the max transmission size.
if transformed_data.len() > self.config.max_transmit_size() {
if transformed_data.len() > max_transmit_size_for_topic {
return Err(PublishError::MessageTooLarge);
}

let raw_message = self.build_raw_message(topic, transformed_data)?;
let raw_message = self.build_raw_message(topic.clone(), transformed_data)?;
let mesh_n = self.config.mesh_n_for_topic(&topic);

// calculate the message id from the un-transformed data
let msg_id = self.config.message_id(&Message {
Expand Down Expand Up @@ -648,7 +651,7 @@ where
Some(mesh_peers) => {
// We have a mesh set. We want to make sure to publish to at least `mesh_n`
// peers (if possible).
let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len());
let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len());

if needed_extra_peers > 0 {
// We don't have `mesh_n` peers in our mesh, we will randomly select extras
Expand Down Expand Up @@ -687,7 +690,6 @@ where
}
} else {
// We have no fanout peers, select mesh_n of them and add them to the fanout
let mesh_n = self.config.mesh_n();
let new_peers =
get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
|p| {
Expand Down Expand Up @@ -967,6 +969,7 @@ where
}

let mut added_peers = HashSet::new();
let mesh_n = self.config.mesh_n_for_topic(topic_hash);

if let Some(m) = self.metrics.as_mut() {
m.joined(topic_hash)
Expand All @@ -989,7 +992,7 @@ where

// Add up to mesh_n of them to the mesh
// NOTE: These aren't randomly added, currently FIFO
let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
let add_peers = std::cmp::min(peers.len(), mesh_n);
tracing::debug!(
topic=%topic_hash,
"JOIN: Adding {:?} peers from the fanout for topic",
Expand All @@ -1012,12 +1015,12 @@ where
}

// check if we need to get more peers, which we randomly select
if added_peers.len() < self.config.mesh_n() {
if added_peers.len() < mesh_n {
// get the peers
let new_peers = get_random_peers(
&self.connected_peers,
topic_hash,
self.config.mesh_n() - added_peers.len(),
mesh_n - added_peers.len(),
|peer| {
!added_peers.contains(peer)
&& !self.explicit_peers.contains(peer)
Expand Down Expand Up @@ -1457,9 +1460,9 @@ where

// check mesh upper bound and only allow graft if the upper bound is not reached
// or if it is an outbound peer
if peers.len() >= self.config.mesh_n_high()
&& !self.outbound_peers.contains(peer_id)
{
let mesh_n_high = self.config.mesh_n_high_for_topic(&topic_hash);

if peers.len() >= mesh_n_high && !self.outbound_peers.contains(peer_id) {
to_prune_topics.insert(topic_hash.clone());
continue;
}
Expand Down Expand Up @@ -1935,9 +1938,9 @@ where
.is_backoff_with_slack(topic_hash, propagation_source)
{
if let Some(peers) = self.mesh.get_mut(topic_hash) {
if peers.len() < self.config.mesh_n_low()
&& peers.insert(*propagation_source)
{
let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);

if peers.len() < mesh_n_low && peers.insert(*propagation_source) {
tracing::debug!(
peer=%propagation_source,
topic=%topic_hash,
Expand Down Expand Up @@ -2091,6 +2094,11 @@ where
let backoffs = &self.backoffs;
let outbound_peers = &self.outbound_peers;

let mesh_n = self.config.mesh_n_for_topic(topic_hash);
let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
let mesh_n_high = self.config.mesh_n_high_for_topic(topic_hash);
let mesh_outbound_min = self.config.mesh_outbound_min_for_topic(topic_hash);

// drop all peers with negative score, without PX
// if there is at some point a stable retain method for BTreeSet the following can be
// written more efficiently with retain.
Expand Down Expand Up @@ -2127,15 +2135,15 @@ where
}

// too little peers - add some
if peers.len() < self.config.mesh_n_low() {
if peers.len() < mesh_n_low {
tracing::debug!(
topic=%topic_hash,
"HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
peers.len(),
self.config.mesh_n_low()
mesh_n_low
);
// not enough peers - get mesh_n - current_length more
let desired_peers = self.config.mesh_n() - peers.len();
let desired_peers = mesh_n - peers.len();
let peer_list =
get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
!peers.contains(peer)
Expand All @@ -2156,14 +2164,14 @@ where
}

// too many peers - remove some
if peers.len() > self.config.mesh_n_high() {
if peers.len() > mesh_n_high {
tracing::debug!(
topic=%topic_hash,
"HEARTBEAT: Mesh high. Topic contains: {} needs: {}",
peers.len(),
self.config.mesh_n_high()
mesh_n_high
);
let excess_peer_no = peers.len() - self.config.mesh_n();
let excess_peer_no = peers.len() - mesh_n;

// shuffle the peers and then sort by score ascending beginning with the worst
let mut rng = thread_rng();
Expand Down Expand Up @@ -2195,7 +2203,7 @@ where
break;
}
if self.outbound_peers.contains(&peer) {
if outbound <= self.config.mesh_outbound_min() {
if outbound <= mesh_outbound_min {
// do not remove anymore outbound peers
continue;
}
Expand All @@ -2216,13 +2224,13 @@ where
}

// do we have enough outbound peers?
if peers.len() >= self.config.mesh_n_low() {
if peers.len() >= mesh_n_low {
// count number of outbound peers we have
let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };

// if we have not enough outbound peers, graft to some new outbound peers
if outbound < self.config.mesh_outbound_min() {
let needed = self.config.mesh_outbound_min() - outbound;
if outbound < mesh_outbound_min {
let needed = mesh_outbound_min - outbound;
let peer_list =
get_random_peers(&self.connected_peers, topic_hash, needed, |peer| {
!peers.contains(peer)
Expand Down Expand Up @@ -2345,6 +2353,8 @@ where
Some((_, thresholds, _)) => thresholds.publish_threshold,
_ => 0.0,
};
let mesh_n = self.config.mesh_n_for_topic(topic_hash);

for peer_id in peers.iter() {
// is the peer still subscribed to the topic?
let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
Expand All @@ -2369,13 +2379,13 @@ where
}

// not enough peers
if peers.len() < self.config.mesh_n() {
if peers.len() < mesh_n {
tracing::debug!(
"HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
peers.len(),
self.config.mesh_n()
mesh_n
);
let needed_peers = self.config.mesh_n() - peers.len();
let needed_peers = mesh_n - peers.len();
let explicit_peers = &self.explicit_peers;
let new_peers =
get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
Expand Down
Loading
Loading