Skip to content

Commit

Permalink
Parallelize notification sending
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Feb 22, 2024
1 parent 00f4bf5 commit 481341f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 33 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ serde = { version = "1.0.114", features = ["derive"] }
femme = "2.1.0"
humantime = "2.0.1"
hyper = { version = "0.14", features = ["tcp"] }
futures = "*"
73 changes: 40 additions & 33 deletions src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use a2::{
};
use anyhow::Result;
use async_std::prelude::*;
use futures::future::join_all;
use log::*;

pub async fn start(
Expand Down Expand Up @@ -32,6 +33,39 @@ pub async fn start(
Ok(())
}

async fn wakeup_token(db: &sled::Db, client: &Client, device_token: &str, topic: Option<&str>) {
info!("notify: {}", device_token);

let payload = SilentNotificationBuilder::new().build(
device_token,
NotificationOptions {
apns_priority: Some(Priority::Normal),
apns_topic: topic,
..Default::default()
},
);

match client.send(payload).await {
Ok(res) => match res.code {
200 => {
info!("delivered notification for {}", device_token);
}
_ => {
warn!("unexpected status: {:?}", res);
}
},
Err(ResponseError(res)) => {
info!("Removing token {} due to error {:?}.", device_token, res);
if let Err(err) = db.remove(device_token) {
error!("failed to remove {}: {:?}", device_token, err);
}
}
Err(err) => {
error!("failed to send notification: {}, {:?}", device_token, err);
}
}
}

async fn wakeup(db: &sled::Db, client: &Client, topic: Option<&str>) {
let tokens = db
.iter()
Expand All @@ -42,37 +76,10 @@ async fn wakeup(db: &sled::Db, client: &Client, topic: Option<&str>) {
.collect::<Vec<_>>();

info!("sending notifications to {} devices", tokens.len());

for device_token in tokens {
info!("notify: {}", device_token);

let payload = SilentNotificationBuilder::new().build(
&device_token,
NotificationOptions {
apns_priority: Some(Priority::Normal),
apns_topic: topic,
..Default::default()
},
);

match client.send(payload).await {
Ok(res) => match res.code {
200 => {
info!("delivered notification for {}", device_token);
}
_ => {
warn!("unexpected status: {:?}", res);
}
},
Err(ResponseError(res)) => {
info!("Removing token {} due to error {:?}.", &device_token, res);
if let Err(err) = db.remove(&device_token) {
error!("failed to remove {}: {:?}", &device_token, err);
}
}
Err(err) => {
error!("failed to send notification: {}, {:?}", device_token, err);
}
}
}
join_all(
tokens.iter().map(|device_token| async move {
wakeup_token(db, client, &device_token, topic).await
}),
)
.await;
}

0 comments on commit 481341f

Please sign in to comment.