From 232f1e851837fec88e49e6939596eeb9aa7af882 Mon Sep 17 00:00:00 2001 From: Owen Nelson Date: Wed, 1 Nov 2023 22:25:17 -0700 Subject: [PATCH] feat(redis): add "pending" task to re-queue unhandled messages In svix-server's redis queue implementation, there's a spawned task to look for messages that have been "claimed" (i.e. pulled off the queue) but not ack'd or nack'd within some deadline period. This was missing from the omniqueue version, added in this diff. This refactors the prior background task spawned for delayed messages so it and the new one for the pending sweeper are bundled together in a `JoinSet`. Since dropping a `JoinSet` means the spawned tasks under it will abort, the handle is now held by any consumers/producers that share the same config. --- omniqueue/src/backends/redis/mod.rs | 244 +++++++++++++++++++++++----- omniqueue/tests/redis.rs | 48 ++++++ omniqueue/tests/redis_cluster.rs | 48 ++++++ 3 files changed, 299 insertions(+), 41 deletions(-) diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 294c934..a7d00c7 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -24,15 +24,19 @@ // have generic return types. This is cleaner than the turbofish operator in my opinion. #![allow(clippy::let_unit_value)] +use std::sync::Arc; use std::time::Duration; use std::{any::TypeId, collections::HashMap, marker::PhantomData}; use async_trait::async_trait; use bb8::ManageConnection; pub use bb8_redis::RedisMultiplexedConnectionManager; -use redis::streams::{StreamId, StreamReadOptions, StreamReadReply}; +use redis::{ + streams::{StreamClaimReply, StreamId, StreamReadOptions, StreamReadReply}, + FromRedisValue, RedisResult, +}; use svix_ksuid::KsuidLike; -use tokio::task::JoinHandle; +use tokio::task::JoinSet; use crate::{ decoding::DecoderRegistry, @@ -75,6 +79,7 @@ pub struct RedisConfig { pub consumer_group: String, pub consumer_name: String, pub payload_key: String, + pub ack_deadline_ms: i64, } pub struct RedisQueueBackend(PhantomData); @@ -110,13 +115,18 @@ where .await .map_err(QueueError::generic)?; - let _ = start_scheduler_background_task( - redis.clone(), - &cfg.queue_key, - &cfg.delayed_queue_key, - &cfg.payload_key, - ) - .await; + let background_tasks = Arc::new( + start_background_tasks( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + &cfg.consumer_group, + &cfg.consumer_name, + cfg.ack_deadline_ms, + ) + .await, + ); Ok(( RedisStreamProducer { @@ -125,6 +135,7 @@ where queue_key: cfg.queue_key.clone(), delayed_queue_key: cfg.delayed_queue_key, payload_key: cfg.payload_key.clone(), + _background_tasks: background_tasks.clone(), }, RedisStreamConsumer { registry: custom_decoders, @@ -133,6 +144,7 @@ where consumer_group: cfg.consumer_group, consumer_name: cfg.consumer_name, payload_key: cfg.payload_key, + _background_tasks: background_tasks.clone(), }, )) } @@ -148,19 +160,25 @@ where .await .map_err(QueueError::generic)?; - let _ = start_scheduler_background_task( - redis.clone(), - &cfg.queue_key, - &cfg.delayed_queue_key, - &cfg.payload_key, - ) - .await; + let background_tasks = Arc::new( + start_background_tasks( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + &cfg.consumer_group, + &cfg.consumer_name, + cfg.ack_deadline_ms, + ) + .await, + ); Ok(RedisStreamProducer { registry: custom_encoders, redis, queue_key: cfg.queue_key, delayed_queue_key: cfg.delayed_queue_key, payload_key: cfg.payload_key, + _background_tasks: background_tasks, }) } @@ -175,13 +193,19 @@ where .await .map_err(QueueError::generic)?; - let _ = start_scheduler_background_task( - redis.clone(), - &cfg.queue_key, - &cfg.delayed_queue_key, - &cfg.payload_key, - ) - .await; + let background_tasks = Arc::new( + start_background_tasks( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + &cfg.consumer_group, + &cfg.consumer_name, + cfg.ack_deadline_ms, + ) + .await, + ); + Ok(RedisStreamConsumer { registry: custom_decoders, redis, @@ -189,6 +213,7 @@ where consumer_group: cfg.consumer_group, consumer_name: cfg.consumer_name, payload_key: cfg.payload_key, + _background_tasks: background_tasks, }) } } @@ -198,51 +223,86 @@ where // We need access to the pool, and various bits of config to spawn a task, but none of that is // available where it matters right now. // Doing my own thing for now - standalone function that takes what it needs. -async fn start_scheduler_background_task( +async fn start_background_tasks( redis: bb8::Pool, queue_key: &str, delayed_queue_key: &str, payload_key: &str, -) -> Option>> + consumer_group: &str, + consumer_name: &str, + task_timeout_ms: i64, +) -> JoinSet> where R: RedisConnection, R::Connection: redis::aio::ConnectionLike + Send + Sync, R::Error: 'static + std::error::Error + Send + Sync, { + let mut join_set = JoinSet::new(); + + // FIXME(onelson): does it even make sense to treat delay support as optional here? if delayed_queue_key.is_empty() { tracing::warn!("no delayed_queue_key specified - delayed task scheduler disabled"); - return None; + } else { + join_set.spawn({ + let pool = redis.clone(); + let mqn = queue_key.to_string(); + let dqn = delayed_queue_key.to_string(); + // FIXME(onelson): should delayed_lock be configurable? + // Should `delayed_queue_name` even? Could be a suffix on `queue_name`. + let delayed_lock = format!("{delayed_queue_key}__lock"); + let payload_key = payload_key.to_string(); + tracing::debug!( + "spawning delayed task scheduler: delayed_queue_key=`{delayed_queue_key}`, \ + delayed_lock=`{delayed_lock}`" + ); + + async move { + loop { + if let Err(err) = background_task_delayed( + pool.clone(), + mqn.clone(), + dqn.clone(), + &delayed_lock, + &payload_key, + ) + .await + { + tracing::error!("{}", err); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + }; + } + } + }); } - Some(tokio::spawn({ + join_set.spawn({ let pool = redis.clone(); + let mqn = queue_key.to_string(); - let dqn = delayed_queue_key.to_string(); - let delayed_lock = format!("{delayed_queue_key}__lock"); - let payload_key = payload_key.to_string(); - tracing::debug!( - "spawning delayed task scheduler: delayed_queue_key=`{delayed_queue_key}`, \ - delayed_lock=`{delayed_lock}`" - ); + // FIXME(onelson): expose in config and confirm this is milliseconds + let consumer_group = consumer_group.to_string(); + let consumer_name = consumer_name.to_string(); async move { loop { - if let Err(err) = background_task_delayed( + if let Err(err) = background_task_pending( pool.clone(), - mqn.clone(), - dqn.clone(), - &delayed_lock, - &payload_key, + &mqn, + &consumer_group, + &consumer_name, + task_timeout_ms, ) .await { tracing::error!("{}", err); tokio::time::sleep(Duration::from_millis(500)).await; continue; - }; + } } } - })) + }); + join_set } /// Special ID for XADD command's which generates a stream ID automatically @@ -353,6 +413,106 @@ where Ok(()) } +struct StreamAutoclaimReply { + ids: Vec, +} + +impl FromRedisValue for StreamAutoclaimReply { + fn from_redis_value(v: &redis::Value) -> RedisResult { + // First try the two member array from before Redis 7.0 + match <((), StreamClaimReply)>::from_redis_value(v) { + Ok(res) => Ok(StreamAutoclaimReply { ids: res.1.ids }), + + // If it's a type error, then try the three member array from Redis 7.0 and after + Err(e) if e.kind() == redis::ErrorKind::TypeError => { + <((), StreamClaimReply, ())>::from_redis_value(v) + .map(|ok| StreamAutoclaimReply { ids: ok.1.ids }) + } + // Any other error should be returned as is + Err(e) => Err(e), + } + } +} + +/// The maximum number of pending messages to reinsert into the queue after becoming stale per loop +// FIXME(onelson): expose in config? +const PENDING_BATCH_SIZE: i16 = 1000; + +/// Scoops up messages that have been claimed but not handled by a deadline, then re-queues them. +async fn background_task_pending( + pool: bb8::Pool, + main_queue_name: &str, + consumer_group: &str, + consumer_name: &str, + pending_duration: i64, +) -> Result<(), QueueError> +where + R: RedisConnection, + R::Connection: redis::aio::ConnectionLike + Send + Sync, + R::Error: 'static + std::error::Error + Send + Sync, +{ + let mut conn = pool.get().await.map_err(QueueError::generic)?; + + // Every iteration checks whether the processing queue has items that should be picked back up, + // claiming them in the process + let mut cmd = redis::cmd("XAUTOCLAIM"); + cmd.arg(main_queue_name) + .arg(consumer_group) + .arg(consumer_name) + .arg(pending_duration) + .arg("-") + .arg("COUNT") + .arg(PENDING_BATCH_SIZE); + + let StreamAutoclaimReply { ids } = cmd + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + if !ids.is_empty() { + let mut pipe = redis::pipe(); + + // And reinsert the map of KV pairs into the MAIN queue with a new stream ID + for StreamId { map, .. } in &ids { + let _ = pipe.xadd( + main_queue_name, + GENERATE_STREAM_ID, + &map.iter() + .filter_map(|(k, v)| { + if let redis::Value::Data(data) = v { + Some((k.as_str(), data.as_slice())) + } else { + None + } + }) + .collect::>(), + ); + } + + let _: () = pipe + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + // Acknowledge all the stale ones so the pending queue is cleared + let ids: Vec<_> = ids.iter().map(|wrapped| &wrapped.id).collect(); + + let mut pipe = redis::pipe(); + pipe.add_command(redis::Cmd::xack(main_queue_name, consumer_group, &ids)); + pipe.add_command(redis::Cmd::xdel(main_queue_name, &ids)); + + let _: () = pipe + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + } else { + // Wait for half a second before attempting to fetch again if nothing was found + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Ok(()) +} + pub struct RedisStreamAcker { redis: bb8::Pool, queue_key: String, @@ -402,6 +562,7 @@ pub struct RedisStreamProducer { queue_key: String, delayed_queue_key: String, payload_key: String, + _background_tasks: Arc>>, } #[async_trait] @@ -502,6 +663,7 @@ pub struct RedisStreamConsumer { consumer_group: String, consumer_name: String, payload_key: String, + _background_tasks: Arc>>, } impl RedisStreamConsumer diff --git a/omniqueue/tests/redis.rs b/omniqueue/tests/redis.rs index 8f5f50e..3e2c546 100644 --- a/omniqueue/tests/redis.rs +++ b/omniqueue/tests/redis.rs @@ -47,6 +47,7 @@ async fn make_test_queue() -> (QueueBuilder, RedisStr consumer_group: "test_cg".to_owned(), consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), + ack_deadline_ms: 5_000, }; ( @@ -267,3 +268,50 @@ async fn test_scheduled() { assert!(now.elapsed() < delay * 2); assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); } + +#[tokio::test] +async fn test_pending() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + let delivery1 = c.receive().await.unwrap(); + let delivery2 = c.receive().await.unwrap(); + + // All items claimed, but not yet ack'd. There shouldn't be anything available yet. + assert!(c + .receive_all(1, Duration::from_millis(1)) + .await + .unwrap() + .is_empty()); + + assert_eq!( + Some(&payload1), + delivery1.payload_serde_json().unwrap().as_ref() + ); + assert_eq!( + Some(&payload2), + delivery2.payload_serde_json().unwrap().as_ref() + ); + + // ack 2, but neglect 1 + let _ = delivery2.ack().await; + + // After the deadline, the first payload should appear again. + let delivery3 = c.receive().await.unwrap(); + assert_eq!( + Some(&payload1), + delivery3.payload_serde_json().unwrap().as_ref() + ); + + // queue should be empty once again + assert!(c + .receive_all(1, Duration::from_millis(1)) + .await + .unwrap() + .is_empty()); +} diff --git a/omniqueue/tests/redis_cluster.rs b/omniqueue/tests/redis_cluster.rs index 24683cb..a4c2ecd 100644 --- a/omniqueue/tests/redis_cluster.rs +++ b/omniqueue/tests/redis_cluster.rs @@ -50,6 +50,7 @@ async fn make_test_queue() -> ( consumer_group: "test_cg".to_owned(), consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), + ack_deadline_ms: 5_000, }; ( @@ -270,3 +271,50 @@ async fn test_scheduled() { assert!(now.elapsed() < delay * 2); assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); } + +#[tokio::test] +async fn test_pending() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + let delivery1 = c.receive().await.unwrap(); + let delivery2 = c.receive().await.unwrap(); + + // All items claimed, but not yet ack'd. There shouldn't be anything available yet. + assert!(c + .receive_all(1, Duration::from_millis(1)) + .await + .unwrap() + .is_empty()); + + assert_eq!( + Some(&payload1), + delivery1.payload_serde_json().unwrap().as_ref() + ); + assert_eq!( + Some(&payload2), + delivery2.payload_serde_json().unwrap().as_ref() + ); + + // ack 2, but neglect 1 + let _ = delivery2.ack().await; + + // After the deadline, the first payload should appear again. + let delivery3 = c.receive().await.unwrap(); + assert_eq!( + Some(&payload1), + delivery3.payload_serde_json().unwrap().as_ref() + ); + + // queue should be empty once again + assert!(c + .receive_all(1, Duration::from_millis(1)) + .await + .unwrap() + .is_empty()); +}