Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove pending requests from ready_requests #6625

Open
wants to merge 8 commits into
base: unstable
Choose a base branch
from
96 changes: 89 additions & 7 deletions beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ use std::{
time::Duration,
};

use super::{
config::OutboundRateLimiterConfig,
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
BehaviourAction, Protocol, RPCSend, ReqId, RequestType,
};
use crate::rpc::rate_limiter::RateLimiterItem;
use futures::FutureExt;
use libp2p::{swarm::NotifyHandler, PeerId};
use slog::{crit, debug, Logger};
use smallvec::SmallVec;
use tokio_util::time::DelayQueue;
use types::EthSpec;

use super::{
config::OutboundRateLimiterConfig,
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
BehaviourAction, Protocol, RPCSend, ReqId, RequestType,
};

/// A request that was rate limited or waiting on rate limited requests for the same peer and
/// protocol.
struct QueuedRequest<Id: ReqId, E: EthSpec> {
Expand Down Expand Up @@ -165,9 +165,30 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
/// Informs the limiter that a peer has disconnected. This removes any pending requests and
/// returns their IDs.
pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> {
let mut failed_requests = Vec::new();

self.ready_requests.retain(|event| {
if let BehaviourAction::NotifyHandler {
peer_id: req_peer_id,
handler: _,
event: RPCSend::Request(request_id, request),
} = event
{
if req_peer_id == &peer_id {
failed_requests.push((*request_id, request.protocol()));
// Remove the entry
false
} else {
// Keep the entry
true
}
} else {
unreachable!()
}
});

// It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map
// should never really be large. So we iterate for simplicity
let mut failed_requests = Vec::new();
self.delayed_requests
.retain(|(map_peer_id, protocol), queue| {
if map_peer_id == &peer_id {
Expand All @@ -183,6 +204,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
true
}
});

failed_requests
}

Expand Down Expand Up @@ -285,4 +307,64 @@ mod tests {
assert_eq!(limiter.ready_requests.len(), 1);
}
}

/// Test that `peer_disconnected` returns the IDs of pending requests.
#[tokio::test]
async fn test_peer_disconnected_returns_failed_requests() {
let log = logging::test_logger();
let config = OutboundRateLimiterConfig(RateLimiterConfig {
ping_quota: Quota::n_every(1, 2),
..Default::default()
});
let mut limiter: SelfRateLimiter<RequestId, MainnetEthSpec> =
SelfRateLimiter::new(config, log).unwrap();
let peer_id = PeerId::random();

for i in 1..=5u32 {
let result = limiter.allows(
peer_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id: i,
})),
RequestType::Ping(Ping { data: i as u64 }),
);

// Check that the limiter allows the first request while other requests are added to the queue.
if i == 1 {
assert!(result.is_ok());
} else {
assert!(result.is_err());
}
}

// Wait until the tokens have been regenerated, then run `next_peer_request_ready`.
tokio::time::sleep(Duration::from_secs(3)).await;
limiter.next_peer_request_ready(peer_id, Protocol::Ping);

// Check that one of the pending requests has moved to ready_requests.
assert_eq!(
limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap()
.len(),
3
);
assert_eq!(limiter.ready_requests.len(), 1);

let mut failed_requests = limiter.peer_disconnected(peer_id);

// Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly.
assert_eq!(failed_requests.len(), 4);
for i in 2..=5u32 {
let (request_id, protocol) = failed_requests.remove(0);
assert!(matches!(
request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id
})) if id == i
));
assert_eq!(protocol, Protocol::Ping);
}
}
}
Loading