From c0187d9ea22ae599491c076df954f2fbd3aa2649 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 6 Feb 2024 11:18:25 +0100 Subject: [PATCH] server: Extract Retry state machine out of run_with_retries For now the state machine is only used for the implementation of run_with_retries, but it will be used directly in the future, to get around control flow analysis issues with run_with_retries. --- server/svix-server/src/core/cache/mod.rs | 2 +- server/svix-server/src/core/mod.rs | 2 +- server/svix-server/src/core/retry.rs | 67 +++++++++++++++++++ .../svix-server/src/core/run_with_retries.rs | 30 --------- server/svix-server/src/queue/mod.rs | 2 +- 5 files changed, 70 insertions(+), 33 deletions(-) create mode 100644 server/svix-server/src/core/retry.rs delete mode 100644 server/svix-server/src/core/run_with_retries.rs diff --git a/server/svix-server/src/core/cache/mod.rs b/server/svix-server/src/core/cache/mod.rs index 9427729f1..dec81d387 100644 --- a/server/svix-server/src/core/cache/mod.rs +++ b/server/svix-server/src/core/cache/mod.rs @@ -8,7 +8,7 @@ use axum::async_trait; use enum_dispatch::enum_dispatch; use serde::{de::DeserializeOwned, Serialize}; -use crate::core::run_with_retries::run_with_retries; +use crate::core::retry::run_with_retries; pub mod memory; pub mod none; diff --git a/server/svix-server/src/core/mod.rs b/server/svix-server/src/core/mod.rs index 3660b7d81..e36e0731f 100644 --- a/server/svix-server/src/core/mod.rs +++ b/server/svix-server/src/core/mod.rs @@ -8,7 +8,7 @@ pub mod message_app; pub mod operational_webhooks; pub mod otel_spans; pub mod permissions; -pub mod run_with_retries; +pub mod retry; pub mod security; pub mod types; pub mod webhook_http_client; diff --git a/server/svix-server/src/core/retry.rs b/server/svix-server/src/core/retry.rs new file mode 100644 index 000000000..2af0e58a9 --- /dev/null +++ b/server/svix-server/src/core/retry.rs @@ -0,0 +1,67 @@ +use std::{future::Future, time::Duration}; + +use tracing::warn; + +pub async fn run_with_retries< + T, + E: std::error::Error, + F: Future>, + FN: FnMut() -> F, +>( + mut fun: FN, + should_retry: impl Fn(&E) -> bool, + retry_schedule: &[Duration], +) -> Result { + let mut retry = Retry::new(should_retry, retry_schedule); + loop { + if let Some(result) = retry.run(&mut fun).await { + return result; + } + } +} + +/// A state machine for retrying an asynchronous operation. +/// +/// Unfortunately needed to get around Rust's lack of `AttachedFn*` traits. +/// For usage, check the implementation of `run_with_retries`.` +pub struct Retry<'a, Re> { + retry_schedule: &'a [Duration], + should_retry: Re, +} + +impl<'a, Re> Retry<'a, Re> { + pub fn new(should_retry: Re, retry_schedule: &'a [Duration]) -> Self { + Self { + retry_schedule, + should_retry, + } + } + + pub async fn run(&mut self, f: F) -> Option> + where + E: std::error::Error, + F: FnOnce() -> Fut, + Fut: Future>, + Re: Fn(&E) -> bool, + { + match f().await { + // If the function succeeded, we're done + Ok(t) => Some(Ok(t)), + Err(e) => { + let should_retry = &self.should_retry; + if self.retry_schedule.is_empty() || !should_retry(&e) { + // If we already used up all the retries or should_retry returns false, + // return the latest error and stop retrying. + self.retry_schedule = &[]; + Some(Err(e)) + } else { + // Otherwise, wait and let the caller call retry.run() again. + warn!("Retrying after error: {e}"); + tokio::time::sleep(self.retry_schedule[0]).await; + self.retry_schedule = &self.retry_schedule[1..]; + None + } + } + } + } +} diff --git a/server/svix-server/src/core/run_with_retries.rs b/server/svix-server/src/core/run_with_retries.rs deleted file mode 100644 index 6477504ef..000000000 --- a/server/svix-server/src/core/run_with_retries.rs +++ /dev/null @@ -1,30 +0,0 @@ -use std::{future::Future, time::Duration}; - -pub async fn run_with_retries< - T, - E: std::error::Error, - F: Future>, - FN: FnMut() -> F, ->( - mut fun: FN, - should_retry: impl Fn(&E) -> bool, - retry_schedule: &[Duration], -) -> Result { - for duration in retry_schedule { - match fun().await { - Ok(ret) => return Ok(ret), - Err(e) => { - if should_retry(&e) { - tracing::warn!("Retrying after error {}", e); - tokio::time::sleep(*duration).await; - } else { - return Err(e); - } - } - } - } - - // Loop sleeps after a failed attempt so you need this last fun call to avoid a fencepost error - // with durations between tries. - fun().await -} diff --git a/server/svix-server/src/queue/mod.rs b/server/svix-server/src/queue/mod.rs index db6d00124..66b17b283 100644 --- a/server/svix-server/src/queue/mod.rs +++ b/server/svix-server/src/queue/mod.rs @@ -10,7 +10,7 @@ use crate::error::Traceable; use crate::{ cfg::{Configuration, QueueBackend}, core::{ - run_with_retries::run_with_retries, + retry::run_with_retries, types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId}, }, error::{Error, ErrorType, Result},