Skip to content

Commit

Permalink
fix: remove duplicate entries if tokens are registered multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Apr 17, 2024
1 parent 6ef14e3 commit cda6cb6
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn start(state: State, interval: std::time::Duration) -> Result<()> {
.heartbeat_token_count
.set(schedule.token_count() as i64);

let Some((timestamp, token)) = schedule.pop() else {
let Some((timestamp, token)) = schedule.pop()? else {
info!("No tokens to notify, sleeping for a minute.");
async_std::task::sleep(Duration::from_secs(60)).await;
continue;
Expand Down
55 changes: 49 additions & 6 deletions src/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,23 @@ impl Schedule {
Ok(())
}

pub fn pop(&self) -> Option<(u64, String)> {
pub fn pop(&self) -> Result<Option<(u64, String)>> {
let mut heap = self.heap.lock().unwrap();
let (timestamp, token) = heap.pop()?;
Some((timestamp.0, token))
loop {
let Some((timestamp, token)) = heap.pop() else {
return Ok(None);
};
let Some(value) = self.db.get(token.as_bytes())? else {
// Token was removed from the database already.
continue;
};
if timestamp.0.to_be_bytes() != *value {
// Token was reinserted with a different timestamp,
// e.g. by reregistration.
continue;
}
return Ok(Some((timestamp.0, token)));
}
}

/// Returns the number of tokens in the schedule.
Expand All @@ -96,7 +109,7 @@ mod tests {
schedule.insert_token("bar", 20)?;
assert_eq!(schedule.token_count(), 2);

let (first_timestamp, first_token) = schedule.pop().unwrap();
let (first_timestamp, first_token) = schedule.pop()?.unwrap();
assert_eq!(first_timestamp, 10);
assert_eq!(first_token, "foo");
schedule.insert_token("foo", 30)?;
Expand All @@ -108,7 +121,7 @@ mod tests {
let schedule = Schedule::new(&db_path)?;
assert_eq!(schedule.token_count(), 2);

let (second_timestamp, second_token) = schedule.pop().unwrap();
let (second_timestamp, second_token) = schedule.pop()?.unwrap();
assert_eq!(second_timestamp, 20);
assert_eq!(second_token, "bar");
assert_eq!(schedule.token_count(), 1);
Expand All @@ -119,10 +132,40 @@ mod tests {
assert_eq!(schedule.token_count(), 2);

// Token "bar" is still there.
let (second_timestamp, second_token) = schedule.pop().unwrap();
let (second_timestamp, second_token) = schedule.pop()?.unwrap();
assert_eq!(second_timestamp, 20);
assert_eq!(second_token, "bar");

Ok(())
}

#[test]
fn test_insert_deduplication() -> Result<()> {
let dir = tempdir()?;
let db_path = dir.path().join("db.sled");
let schedule = Schedule::new(&db_path)?;
assert_eq!(schedule.token_count(), 0);

schedule.insert_token("foo", 10)?;
schedule.insert_token("bar", 20)?;
schedule.insert_token("baz", 30)?;
assert_eq!(schedule.token_count(), 3);

schedule.insert_token("bar", 50)?;
// There are two items for token "bar", but old one is invalid now
// because the database contains different timestamp.
// It will be dropped when encountered.
assert_eq!(schedule.token_count(), 4);

assert_eq!(schedule.pop()?.unwrap(), (10, "foo".to_string()));
assert_eq!(schedule.token_count(), 3);

assert_eq!(schedule.pop()?.unwrap(), (30, "baz".to_string()));
// Invalid "bar" entry is also dropped before we reach "baz" entry.
assert_eq!(schedule.token_count(), 1);

assert_eq!(schedule.pop()?.unwrap(), (50, "bar".to_string()));
assert_eq!(schedule.token_count(), 0);
Ok(())
}
}

0 comments on commit cda6cb6

Please sign in to comment.