Skip to content

Commit

Permalink
Use HTTP/2 pipelining to speedup notification delivery
Browse files Browse the repository at this point in the history
Delivery is no longer limited by RTT
as up to 50 notifications are delivered in parallel.
Even with RTT of 1 second this means we can
deliver 50 * 60 * 20 = 60000 notifications in 20 minutes.
  • Loading branch information
link2xt committed Apr 17, 2024
1 parent 998a597 commit d020345
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
15 changes: 10 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{Context, Result};
use async_std::prelude::*;
use structopt::StructOpt;

use notifiers::{metrics, notifier, server, state};
Expand Down Expand Up @@ -53,18 +52,24 @@ async fn main() -> Result<()> {
opt.interval,
)?;

let state2 = state.clone();
let host = opt.host.clone();
let port = opt.port;
let interval = opt.interval;

if let Some(metrics_address) = opt.metrics.clone() {
async_std::task::spawn(async move { metrics::start(metrics_state, metrics_address).await });
}
let server = async_std::task::spawn(async move { server::start(state2, host, port).await });

let notif = async_std::task::spawn(async move { notifier::start(state, opt.interval).await });
// Setup mulitple parallel notifiers.
// This is needed to utilize HTTP/2 pipelining.
// Notifiers take tokens for notifications from the same schedule
// and use the same HTTP/2 clients, one for production and one for sandbox server.
for _ in 0..50 {
let state = state.clone();
async_std::task::spawn(async move { notifier::start(state, interval).await });
}

server.try_join(notif).await?;
server::start(state, host, port).await?;

Ok(())
}
9 changes: 8 additions & 1 deletion src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ pub async fn start(state: State, interval: std::time::Duration) -> Result<()> {
.unwrap_or(now)
.duration_since(now)
.unwrap_or_default();
async_std::task::sleep(delay).await;

if !delay.is_zero() {
info!(
"Sleeping for {} before next notification.",
humantime::format_duration(delay)
);
async_std::task::sleep(delay).await;
}

if let Err(err) = wakeup(
schedule,
Expand Down

0 comments on commit d020345

Please sign in to comment.