Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable arithmetic lint in rate-limiter #7025

Open
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions beacon_node/lighthouse_network/src/rpc/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::{rate_limiter::Quota, Protocol};
use std::num::NonZeroU64;
use std::{
fmt::{Debug, Display},
str::FromStr,
time::Duration,
};

use super::{rate_limiter::Quota, Protocol};

use serde::{Deserialize, Serialize};

/// Auxiliary struct to aid on configuration parsing.
Expand Down Expand Up @@ -100,24 +100,30 @@ pub struct RateLimiterConfig {
}

impl RateLimiterConfig {
pub const DEFAULT_PING_QUOTA: Quota = Quota::n_every(2, 10);
pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(2, 5);
pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(5, 15);
pub const DEFAULT_PING_QUOTA: Quota = Quota::n_every(NonZeroU64::new(2).unwrap(), 10);
pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(NonZeroU64::new(2).unwrap(), 5);
pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(NonZeroU64::new(5).unwrap(), 15);
pub const DEFAULT_GOODBYE_QUOTA: Quota = Quota::one_every(10);
// The number is chosen to balance between upload bandwidth required to serve
// blocks and a decent syncing rate for honest nodes. Malicious nodes would need to
// spread out their requests over the time window to max out bandwidth on the server.
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(128).unwrap(), 10);
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(128).unwrap(), 10);
// `DEFAULT_BLOCKS_BY_RANGE_QUOTA` * (target + 1) to account for high usage
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(896, 10);
pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(896, 10);
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(896).unwrap(), 10);
pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(896).unwrap(), 10);
// 320 blocks worth of columns for regular node, or 40 blocks for supernode.
// Range sync load balances when requesting blocks, and each batch is 32 blocks.
pub const DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA: Quota = Quota::n_every(5120, 10);
pub const DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(5120).unwrap(), 10);
// 512 columns per request from spec. This should be plenty as peers are unlikely to send all
// sampling requests to a single peer.
pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(512, 10);
pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(512).unwrap(), 10);
pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10);
pub const DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA: Quota = Quota::one_every(10);
pub const DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA: Quota = Quota::one_every(10);
Expand Down Expand Up @@ -275,7 +281,7 @@ mod tests {
protocol: Protocol::Goodbye,
quota: Quota {
replenish_all_every: Duration::from_secs(10),
max_tokens: 8,
max_tokens: NonZeroU64::new(8).unwrap(),
},
};
assert_eq!(quota.to_string().parse(), Ok(quota))
Expand Down
53 changes: 38 additions & 15 deletions beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#![deny(clippy::arithmetic_side_effects)]

use super::config::RateLimiterConfig;
use crate::rpc::Protocol;
use fnv::FnvHashMap;
use libp2p::PeerId;
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::hash::Hash;
use std::num::NonZeroU64;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -55,20 +58,20 @@ pub struct Quota {
pub(super) replenish_all_every: Duration,
/// Token limit. This translates on how large can an instantaneous batch of
/// tokens be.
pub(super) max_tokens: u64,
pub(super) max_tokens: NonZeroU64,
}

impl Quota {
/// A hard limit of one token every `seconds`.
pub const fn one_every(seconds: u64) -> Self {
Quota {
replenish_all_every: Duration::from_secs(seconds),
max_tokens: 1,
max_tokens: NonZeroU64::new(1).unwrap(),
}
}

/// Allow `n` tokens to be use used every `seconds`.
pub const fn n_every(n: u64, seconds: u64) -> Self {
pub const fn n_every(n: NonZeroU64, seconds: u64) -> Self {
Quota {
replenish_all_every: Duration::from_secs(seconds),
max_tokens: n,
Expand Down Expand Up @@ -236,7 +239,9 @@ impl RPCRateLimiterBuilder {

// check for peers to prune every 30 seconds, starting in 30 seconds
let prune_every = tokio::time::Duration::from_secs(30);
let prune_start = tokio::time::Instant::now() + prune_every;
let prune_start = tokio::time::Instant::now()
.checked_add(prune_every)
.ok_or("prune time overflow")?;
let prune_interval = tokio::time::interval_at(prune_start, prune_every);
Ok(RPCRateLimiter {
prune_interval,
Expand Down Expand Up @@ -401,14 +406,13 @@ pub struct Limiter<Key: Hash + Eq + Clone> {

impl<Key: Hash + Eq + Clone> Limiter<Key> {
pub fn from_quota(quota: Quota) -> Result<Self, &'static str> {
if quota.max_tokens == 0 {
return Err("Max number of tokens should be positive");
}
let tau = quota.replenish_all_every.as_nanos();
if tau == 0 {
return Err("Replenish time must be positive");
}
let t = (tau / quota.max_tokens as u128)
let t = tau
.checked_div(quota.max_tokens.get() as u128)
.expect("Division by zero never occurs, since Quota::max_token is of type NonZeroU64.")
.try_into()
.map_err(|_| "total replenish time is too long")?;
let tau = tau
Expand All @@ -431,7 +435,7 @@ impl<Key: Hash + Eq + Clone> Limiter<Key> {
let tau = self.tau;
let t = self.t;
// how long does it take to replenish these tokens
let additional_time = t * tokens;
let additional_time = t.saturating_mul(tokens);
if additional_time > tau {
// the time required to process this amount of tokens is longer than the time that
// makes the bucket full. So, this batch can _never_ be processed
Expand All @@ -444,16 +448,16 @@ impl<Key: Hash + Eq + Clone> Limiter<Key> {
.entry(key.clone())
.or_insert(time_since_start);
// check how soon could the request be made
let earliest_time = (*tat + additional_time).saturating_sub(tau);
let earliest_time = (*tat).saturating_add(additional_time).saturating_sub(tau);
// earliest_time is in the future
if time_since_start < earliest_time {
Err(RateLimitedErr::TooSoon(Duration::from_nanos(
/* time they need to wait, i.e. how soon were they */
earliest_time - time_since_start,
earliest_time.saturating_sub(time_since_start),
)))
} else {
// calculate the new TAT
*tat = time_since_start.max(*tat) + additional_time;
*tat = time_since_start.max(*tat).saturating_add(additional_time);
Ok(())
}
}
Expand All @@ -468,14 +472,15 @@ impl<Key: Hash + Eq + Clone> Limiter<Key> {

#[cfg(test)]
mod tests {
use crate::rpc::rate_limiter::{Limiter, Quota};
use crate::rpc::rate_limiter::{Limiter, Quota, RateLimitedErr};
use std::num::NonZeroU64;
use std::time::Duration;

#[test]
fn it_works_a() {
let mut limiter = Limiter::from_quota(Quota {
replenish_all_every: Duration::from_secs(2),
max_tokens: 4,
max_tokens: NonZeroU64::new(4).unwrap(),
})
.unwrap();
let key = 10;
Expand Down Expand Up @@ -512,7 +517,7 @@ mod tests {
fn it_works_b() {
let mut limiter = Limiter::from_quota(Quota {
replenish_all_every: Duration::from_secs(2),
max_tokens: 4,
max_tokens: NonZeroU64::new(4).unwrap(),
})
.unwrap();
let key = 10;
Expand All @@ -536,4 +541,22 @@ mod tests {
.allows(Duration::from_secs_f32(0.4), &key, 1)
.is_err());
}

#[test]
fn large_tokens() {
// These have been adjusted so that an overflow occurs when calculating `additional_time` in
// `Limiter::allows`. If we don't handle overflow properly, `Limiter::allows` returns `Ok`
// in this case.
let replenish_all_every = 2;
let tokens = u64::MAX / 2 + 1;

let mut limiter = Limiter::from_quota(Quota {
replenish_all_every: Duration::from_nanos(replenish_all_every),
max_tokens: NonZeroU64::new(1).unwrap(),
})
.unwrap();

let result = limiter.allows(Duration::from_secs_f32(0.0), &10, tokens);
assert!(matches!(result, Err(RateLimitedErr::TooLarge)));
}
}
3 changes: 2 additions & 1 deletion beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ mod tests {
use crate::rpc::{Ping, Protocol, RequestType};
use crate::service::api_types::{AppRequestId, RequestId, SingleLookupReqId, SyncRequestId};
use libp2p::PeerId;
use std::num::NonZeroU64;
use std::time::Duration;
use types::{EthSpec, ForkContext, Hash256, MainnetEthSpec, Slot};

Expand All @@ -223,7 +224,7 @@ mod tests {
async fn test_next_peer_request_ready() {
let log = logging::test_logger();
let config = OutboundRateLimiterConfig(RateLimiterConfig {
ping_quota: Quota::n_every(1, 2),
ping_quota: Quota::n_every(NonZeroU64::new(1).unwrap(), 2),
..Default::default()
});
let fork_context = std::sync::Arc::new(ForkContext::new::<MainnetEthSpec>(
Expand Down