Skip to content

Commit

Permalink
Do not flush database after each insert
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Apr 17, 2024
1 parent 0aa1dad commit aefe625
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 10 deletions.
2 changes: 0 additions & 2 deletions src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ async fn wakeup(
info!("delivered notification for {}", device_token);
schedule
.insert_token_now(&key_device_token)
.await
.context("Failed to update latest notification timestamp")?;
metrics.heartbeat_notifications_total.inc();
}
Expand All @@ -136,7 +135,6 @@ async fn wakeup(
// to avoid busy looping.
schedule
.insert_token_now(&key_device_token)
.await
.with_context(|| format!("Failed to update token timestamp: {err:?}"))?;
}
}
Expand Down
19 changes: 12 additions & 7 deletions src/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,24 @@ impl Schedule {
///
/// This should also be called after successful notification
/// to update latest notification time.
pub async fn insert_token(&self, token: &str, now: u64) -> Result<()> {
pub fn insert_token(&self, token: &str, now: u64) -> Result<()> {
self.db.insert(token.as_bytes(), &u64::to_be_bytes(now))?;
self.db.flush_async().await?;
let mut heap = self.heap.lock().unwrap();
heap.push((Reverse(now), token.to_owned()));
Ok(())
}

pub async fn insert_token_now(&self, token: &str) -> Result<()> {
pub fn insert_token_now(&self, token: &str) -> Result<()> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.insert_token(token, now).await
self.insert_token(token, now)
}

pub async fn flush(&self) -> Result<()> {
self.db.flush_async().await?;
Ok(())
}

/// Removes token from the schedule.
Expand Down Expand Up @@ -87,13 +91,14 @@ mod tests {
let db_path = dir.path().join("db.sled");
let schedule = Schedule::new(&db_path)?;

schedule.insert_token("foo", 10).await?;
schedule.insert_token("bar", 20).await?;
schedule.insert_token("foo", 10)?;
schedule.insert_token("bar", 20)?;

let (first_timestamp, first_token) = schedule.pop().unwrap();
assert_eq!(first_timestamp, 10);
assert_eq!(first_token, "foo");
schedule.insert_token("foo", 30).await?;
schedule.insert_token("foo", 30)?;
schedule.flush().await?;

// Reopen to test persistence.
drop(schedule);
Expand Down
5 changes: 4 additions & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ async fn register_device(mut req: tide::Request<State>) -> tide::Result<tide::Re
info!("register_device {}", query.token);

let schedule = req.state().schedule();
schedule.insert_token_now(&query.token).await?;
schedule.insert_token_now(&query.token)?;

// Flush database to ensure we don't lose this token in case of restart.
schedule.flush().await?;

req.state().metrics().heartbeat_registrations_total.inc();

Expand Down

0 comments on commit aefe625

Please sign in to comment.