Skip to content
This repository has been archived by the owner on Dec 2, 2022. It is now read-only.

Commit

Permalink
FetchRequestStage: avoid log spam on send queue congestion. (#30)
Browse files Browse the repository at this point in the history
When we send too many header slice fetch request at a time
we start getting SendQueueFull, and pending_count stays positive.

In this case execute() was not awaiting anything, and returning immediately
just to get yet another SendQueueFull leading to a live spin.

Fix this by calling sender.reserve_owned().await.

Note: not using the await fn to avoid holding the sentry.read() lock while awaiting.
  • Loading branch information
battlmonstr authored Sep 16, 2021
1 parent 490a90b commit 91c2267
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
16 changes: 15 additions & 1 deletion src/downloader/headers/fetch_request_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ impl FetchRequestStage {
self.pending_count()
);
self.request_pending()?;

// in case of SendQueueFull, await for extra capacity
if self.pending_count() > 0 {
// obtain the sentry lock, and release it before awaiting
let capacity_future = {
let sentry = self.sentry.read();
sentry.reserve_capacity_in_send_queue()
};
capacity_future.await?;
}

debug!("FetchRequestStage: done");
Ok(())
}
Expand All @@ -80,7 +91,10 @@ impl FetchRequestStage {
let result = self.request(request_id, block_num, limit);
match result {
Err(error) => match error.downcast_ref::<SendMessageError>() {
Some(SendMessageError::SendQueueFull) => return Some(Ok(())),
Some(SendMessageError::SendQueueFull) => {
debug!("FetchRequestStage: request send queue is full");
return Some(Ok(()));
}
Some(SendMessageError::ReactorStopped) => return Some(Err(error)),
None => return Some(Err(error)),
},
Expand Down
20 changes: 14 additions & 6 deletions src/downloader/sentry_client_reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use crate::downloader::{
messages::{EthMessageId, Message},
sentry_client::*,
};
use anyhow::bail;
use futures_core::Stream;
use futures_util::TryStreamExt;
use futures_core::{Future, Stream};
use futures_util::{FutureExt, TryStreamExt};
use parking_lot::RwLock;
use std::{collections::HashMap, fmt, pin::Pin, sync::Arc};
use strum::IntoEnumIterator;
Expand Down Expand Up @@ -55,8 +54,7 @@ impl std::error::Error for SendMessageError {}

impl SentryClientReactor {
pub fn new(sentry: Box<dyn SentryClient>) -> Self {
let (send_message_sender, send_message_receiver) =
mpsc::channel::<SendMessageCommand>(1024);
let (send_message_sender, send_message_receiver) = mpsc::channel::<SendMessageCommand>(16);

let mut receive_messages_senders =
HashMap::<EthMessageId, broadcast::Sender<Message>>::new();
Expand Down Expand Up @@ -119,7 +117,7 @@ impl SentryClientReactor {
peer_filter,
};
if self.send_message_sender.send(command).await.is_err() {
bail!("Reactor stopped");
return Err(anyhow::Error::new(SendMessageError::ReactorStopped));
}

Ok(())
Expand All @@ -146,6 +144,16 @@ impl SentryClientReactor {
}
}

pub fn reserve_capacity_in_send_queue(
&self,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>>>> {
let sender = self.send_message_sender.clone();
Box::pin(sender.reserve_owned().map(|result| match result {
Ok(_) => Ok(()),
Err(_) => Err(anyhow::Error::new(SendMessageError::ReactorStopped)),
}))
}

pub fn receive_messages(
&self,
filter_id: EthMessageId,
Expand Down

0 comments on commit 91c2267

Please sign in to comment.