Skip to content

Commit

Permalink
Persist latest notification timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Apr 17, 2024
1 parent f79a513 commit 38c21ec
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 91 deletions.
51 changes: 45 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ tide = "0.16.0"

# Workaround for <https://github.com/WalletConnect/a2/issues/76>
hyper = { version = "0.14", features = ["tcp"] }

[dev-dependencies]
tempfile = "3"
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod metrics;
pub mod notifier;
pub mod schedule;
pub mod server;
pub mod state;
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ async fn main() -> Result<()> {
&opt.password,
opt.topic.clone(),
metrics_state.clone(),
opt.interval,
)?;

let state2 = state.clone();
Expand Down
169 changes: 98 additions & 71 deletions src/notifier.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::time::{Duration, SystemTime};

use a2::{
Client, DefaultNotificationBuilder, Error::ResponseError, NotificationBuilder,
NotificationOptions, Priority,
};
use anyhow::Result;
use anyhow::{bail, Context as _, Result};
use log::*;

use crate::metrics::Metrics;
use crate::schedule::Schedule;
use crate::state::State;

pub async fn start(state: State, interval: std::time::Duration) -> Result<()> {
let db = state.db();
let schedule = state.schedule();
let metrics = state.metrics();
let production_client = state.production_client();
let sandbox_client = state.sandbox_client();
Expand All @@ -21,88 +24,112 @@ pub async fn start(state: State, interval: std::time::Duration) -> Result<()> {
);

loop {
let wakeup_start = std::time::Instant::now();
wakeup(db, metrics, production_client, sandbox_client, topic).await;
let elapsed = wakeup_start.elapsed();
info!(
"Waking up all devices took {}",
humantime::format_duration(elapsed)
);
async_std::task::sleep(interval.saturating_sub(elapsed)).await;
metrics.heartbeat_token_count.set(schedule.len() as i64);

let Some((timestamp, token)) = schedule.pop() else {
info!("No tokens to notify, sleeping for a minute.");
async_std::task::sleep(Duration::from_secs(60)).await;
continue;
};

// Sleep until we need to notify the token.
let now = SystemTime::now();
let timestamp: SystemTime = SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(timestamp))
.unwrap_or(now);
let timestamp = std::cmp::min(timestamp, now);
let delay = timestamp
.checked_add(interval)
.unwrap_or(now)
.duration_since(now)
.unwrap_or_default();
async_std::task::sleep(delay).await;

if let Err(err) = wakeup(
schedule,
metrics,
production_client,
sandbox_client,
topic,
token,
)
.await
{
error!("Failed to notify token: {err:#}");

// Sleep to avoid busy looping and flooding APNS
// with requests in case of database errors.
async_std::task::sleep(Duration::from_secs(60)).await;
}
}
}

async fn wakeup(
db: &sled::Db,
schedule: &Schedule,
metrics: &Metrics,
production_client: &Client,
sandbox_client: &Client,
topic: Option<&str>,
) {
let tokens = db
.iter()
.filter_map(|entry| match entry {
Ok((key, _)) => Some(String::from_utf8(key.to_vec()).unwrap()),
Err(_) => None,
})
.collect::<Vec<_>>();

info!("sending notifications to {} devices", tokens.len());
metrics.heartbeat_token_count.set(tokens.len() as i64);
key_device_token: String,
) -> Result<()> {
info!("notify: {}", key_device_token);

for key_device_token in tokens {
info!("notify: {}", key_device_token);
let (client, device_token) =
if let Some(sandbox_token) = key_device_token.strip_prefix("sandbox:") {
(sandbox_client, sandbox_token)
} else {
(production_client, key_device_token.as_str())
};

let (client, device_token) =
if let Some(sandbox_token) = key_device_token.strip_prefix("sandbox:") {
(sandbox_client, sandbox_token)
} else {
(production_client, key_device_token.as_str())
};

// Send silent notification.
// According to <https://developer.apple.com/documentation/usernotifications/generating-a-remote-notification>
// to send a silent notification you need to set background notification flag `content-available` to 1
// and don't include `alert`, `badge` or `sound`.
let payload = DefaultNotificationBuilder::new()
.set_content_available()
.build(
device_token,
NotificationOptions {
// Normal priority (5) means
// "send the notification based on power considerations on the user’s device".
// <https://developer.apple.com/documentation/usernotifications/sending-notification-requests-to-apns>
apns_priority: Some(Priority::Normal),
apns_topic: topic,
..Default::default()
},
);

match client.send(payload).await {
Ok(res) => match res.code {
200 => {
info!("delivered notification for {}", device_token);
metrics.heartbeat_notifications_total.inc();
}
_ => {
warn!("unexpected status: {:?}", res);
}
// Send silent notification.
// According to <https://developer.apple.com/documentation/usernotifications/generating-a-remote-notification>
// to send a silent notification you need to set background notification flag `content-available` to 1
// and don't include `alert`, `badge` or `sound`.
let payload = DefaultNotificationBuilder::new()
.set_content_available()
.build(
device_token,
NotificationOptions {
// Normal priority (5) means
// "send the notification based on power considerations on the user’s device".
// <https://developer.apple.com/documentation/usernotifications/sending-notification-requests-to-apns>
apns_priority: Some(Priority::Normal),
apns_topic: topic,
..Default::default()
},
Err(ResponseError(res)) => {
info!(
"Removing token {} due to error {:?}.",
&key_device_token, res
);
if let Err(err) = db.remove(&key_device_token) {
error!("failed to remove {}: {:?}", &key_device_token, err);
}
);

match client.send(payload).await {
Ok(res) => match res.code {
200 => {
info!("delivered notification for {}", device_token);
schedule
.insert_token_now(&key_device_token)
.await
.context("Failed to update latest notification timestamp")?;
metrics.heartbeat_notifications_total.inc();
}
Err(err) => {
error!(
"failed to send notification: {}, {:?}",
key_device_token, err
);
_ => {
bail!("unexpected status: {:?}", res);
}
},
Err(ResponseError(res)) => {
info!(
"Removing token {} due to error {:?}.",
&key_device_token, res
);
schedule
.remove_token(&key_device_token)
.with_context(|| format!("Failed to remove {}", &key_device_token))?;
}
Err(err) => {
// Update notification time regardless of success
// to avoid busy looping.
schedule
.insert_token_now(&key_device_token)
.await
.with_context(|| format!("Failed to update token timestamp: {err:?}"))?;
}
}
Ok(())
}
Loading

0 comments on commit 38c21ec

Please sign in to comment.