From 9c667b3ba3ddb3b2cd7625406e3c30ef4ffc5b0b Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Mon, 29 Apr 2024 14:02:21 +0200 Subject: [PATCH 1/4] redis: Change send_raw to take a reference to slice instead of Vec --- omniqueue/src/backends/redis/mod.rs | 2 +- omniqueue/tests/it/redis.rs | 2 +- omniqueue/tests/it/redis_cluster.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index b688625..36b8612 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -640,7 +640,7 @@ pub struct RedisProducer { } impl RedisProducer { - pub async fn send_raw(&self, payload: &Vec) -> Result<()> { + pub async fn send_raw(&self, payload: &[u8]) -> Result<()> { self.redis .get() .await diff --git a/omniqueue/tests/it/redis.rs b/omniqueue/tests/it/redis.rs index 154d9ad..6869fa7 100644 --- a/omniqueue/tests/it/redis.rs +++ b/omniqueue/tests/it/redis.rs @@ -59,7 +59,7 @@ async fn test_raw_send_recv() { let payload = b"{\"test\": \"data\"}"; let (p, mut c) = builder.build_pair().await.unwrap(); - p.send_raw(&payload.to_vec()).await.unwrap(); + p.send_raw(payload).await.unwrap(); let d = c.receive().await.unwrap(); assert_eq!(d.borrow_payload().unwrap(), payload); diff --git a/omniqueue/tests/it/redis_cluster.rs b/omniqueue/tests/it/redis_cluster.rs index 5d616c7..70af224 100644 --- a/omniqueue/tests/it/redis_cluster.rs +++ b/omniqueue/tests/it/redis_cluster.rs @@ -62,7 +62,7 @@ async fn test_raw_send_recv() { let payload = b"{\"test\": \"data\"}"; let (p, mut c) = builder.build_pair().await.unwrap(); - p.send_raw(&payload.to_vec()).await.unwrap(); + p.send_raw(payload).await.unwrap(); let d = c.receive().await.unwrap(); assert_eq!(d.borrow_payload().unwrap(), payload); From abeb509a4afbcb651fcb589acdf3b38e9e95ac02 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Wed, 24 Apr 2024 14:54:11 +0200 Subject: [PATCH 2/4] redis: Extract v6.2.0+-specific functionality into a separate module --- omniqueue/src/backends/redis/mod.rs | 282 ++-------------------- omniqueue/src/backends/redis/streams.rs | 305 ++++++++++++++++++++++++ 2 files changed, 320 insertions(+), 267 deletions(-) create mode 100644 omniqueue/src/backends/redis/streams.rs diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 36b8612..3d4895a 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -35,13 +35,9 @@ use std::{ time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}, }; -use async_trait::async_trait; use bb8::ManageConnection; pub use bb8_redis::RedisConnectionManager; -use redis::{ - streams::{StreamClaimReply, StreamId, StreamReadOptions, StreamReadReply}, - AsyncCommands, ExistenceCheck, FromRedisValue, RedisResult, SetExpiry, SetOptions, -}; +use redis::{AsyncCommands, ExistenceCheck, SetExpiry, SetOptions}; use serde::Serialize; use svix_ksuid::KsuidLike; use thiserror::Error; @@ -51,12 +47,14 @@ use tracing::{debug, error, trace, warn}; #[allow(deprecated)] use crate::{ builder::{Dynamic, Static}, - queue::{Acker, Delivery, QueueBackend}, + queue::{Delivery, QueueBackend}, DynConsumer, DynProducer, QueueConsumer as _, QueueError, QueueProducer as _, Result, }; #[cfg(feature = "redis_cluster")] mod cluster; +mod streams; + #[cfg(feature = "redis_cluster")] pub use cluster::RedisClusterConnectionManager; @@ -344,31 +342,13 @@ impl RedisBackendBuilder { }); } - join_set.spawn({ - let pool = redis.clone(); - let queue_key = self.config.queue_key.to_owned(); - let consumer_group = self.config.consumer_group.to_owned(); - let consumer_name = self.config.consumer_name.to_owned(); - let ack_deadline_ms = self.config.ack_deadline_ms; - - async move { - loop { - if let Err(err) = background_task_pending( - &pool, - &queue_key, - &consumer_group, - &consumer_name, - ack_deadline_ms, - ) - .await - { - error!("{err}"); - tokio::time::sleep(Duration::from_millis(500)).await; - continue; - } - } - } - }); + join_set.spawn(streams::background_task_pending( + redis.clone(), + self.config.queue_key.to_owned(), + self.config.consumer_group.to_owned(), + self.config.consumer_name.to_owned(), + self.config.ack_deadline_ms, + )); join_set.spawn({ async move { @@ -403,11 +383,6 @@ impl RedisBackendBuilder { } } -/// Special ID for XADD command's which generates a stream ID automatically -const GENERATE_STREAM_ID: &str = "*"; -/// Special ID for XREADGROUP commands which reads any new messages -const LISTEN_STREAM_ID: &str = ">"; - /// Moves "due" messages from a sorted set, where delayed messages are shelved, /// back onto the main queue. async fn background_task_delayed( @@ -454,20 +429,7 @@ async fn background_task_delayed( if !keys.is_empty() { trace!("Moving {} messages from delayed to main queue", keys.len()); - // For each task, XADD them to the MAIN queue - let mut pipe = redis::pipe(); - for key in &keys { - let payload = from_delayed_queue_key(key)?; - let _ = pipe.xadd( - main_queue_name, - GENERATE_STREAM_ID, - &[(payload_key, payload)], - ); - } - let _: () = pipe - .query_async(&mut *conn) - .await - .map_err(QueueError::generic)?; + streams::add_to_main_queue(&keys, main_queue_name, payload_key, &mut *conn).await?; // Then remove the tasks from the delayed queue so they aren't resent let _: () = conn @@ -492,145 +454,6 @@ async fn background_task_delayed( Ok(()) } -struct StreamAutoclaimReply { - ids: Vec, -} - -impl FromRedisValue for StreamAutoclaimReply { - fn from_redis_value(v: &redis::Value) -> RedisResult { - // First try the two member array from before Redis 7.0 - match <((), StreamClaimReply)>::from_redis_value(v) { - Ok(res) => Ok(StreamAutoclaimReply { ids: res.1.ids }), - - // If it's a type error, then try the three member array from Redis 7.0 and after - Err(e) if e.kind() == redis::ErrorKind::TypeError => { - <((), StreamClaimReply, ())>::from_redis_value(v) - .map(|ok| StreamAutoclaimReply { ids: ok.1.ids }) - } - // Any other error should be returned as is - Err(e) => Err(e), - } - } -} - -/// The maximum number of pending messages to reinsert into the queue after -/// becoming stale per loop -// FIXME(onelson): expose in config? -const PENDING_BATCH_SIZE: i16 = 1000; - -/// Scoops up messages that have been claimed but not handled by a deadline, -/// then re-queues them. -async fn background_task_pending( - pool: &bb8::Pool, - main_queue_name: &str, - consumer_group: &str, - consumer_name: &str, - ack_deadline_ms: i64, -) -> Result<()> { - let mut conn = pool.get().await.map_err(QueueError::generic)?; - - // Every iteration checks whether the processing queue has items that should - // be picked back up, claiming them in the process - let mut cmd = redis::cmd("XAUTOCLAIM"); - cmd.arg(main_queue_name) - .arg(consumer_group) - .arg(consumer_name) - .arg(ack_deadline_ms) - .arg("-") - .arg("COUNT") - .arg(PENDING_BATCH_SIZE); - - let StreamAutoclaimReply { ids } = cmd - .query_async(&mut *conn) - .await - .map_err(QueueError::generic)?; - - if !ids.is_empty() { - trace!("Moving {} unhandled messages back to the queue", ids.len()); - - let mut pipe = redis::pipe(); - - // And reinsert the map of KV pairs into the MAIN queue with a new stream ID - for StreamId { map, .. } in &ids { - let _ = pipe.xadd( - main_queue_name, - GENERATE_STREAM_ID, - &map.iter() - .filter_map(|(k, v)| { - if let redis::Value::Data(data) = v { - Some((k.as_str(), data.as_slice())) - } else { - None - } - }) - .collect::>(), - ); - } - - let _: () = pipe - .query_async(&mut *conn) - .await - .map_err(QueueError::generic)?; - - // Acknowledge all the stale ones so the pending queue is cleared - let ids: Vec<_> = ids.iter().map(|wrapped| &wrapped.id).collect(); - - let mut pipe = redis::pipe(); - pipe.xack(main_queue_name, consumer_group, &ids); - pipe.xdel(main_queue_name, &ids); - - let _: () = pipe - .query_async(&mut *conn) - .await - .map_err(QueueError::generic)?; - } else { - // Wait for half a second before attempting to fetch again if nothing was found - tokio::time::sleep(Duration::from_millis(500)).await; - } - - Ok(()) -} - -struct RedisAcker { - redis: bb8::Pool, - queue_key: String, - consumer_group: String, - entry_id: String, - - already_acked_or_nacked: bool, -} - -#[async_trait] -impl Acker for RedisAcker { - async fn ack(&mut self) -> Result<()> { - if self.already_acked_or_nacked { - return Err(QueueError::CannotAckOrNackTwice); - } - - self.already_acked_or_nacked = true; - - let mut pipeline = redis::pipe(); - pipeline.xack(&self.queue_key, &self.consumer_group, &[&self.entry_id]); - pipeline.xdel(&self.queue_key, &[&self.entry_id]); - - let mut conn = self.redis.get().await.map_err(QueueError::generic)?; - pipeline - .query_async(&mut *conn) - .await - .map_err(QueueError::generic) - } - - async fn nack(&mut self) -> Result<()> { - if self.already_acked_or_nacked { - return Err(QueueError::CannotAckOrNackTwice); - } - - self.already_acked_or_nacked = true; - - Ok(()) - } -} - pub struct RedisProducer { redis: bb8::Pool, queue_key: String, @@ -641,17 +464,7 @@ pub struct RedisProducer { impl RedisProducer { pub async fn send_raw(&self, payload: &[u8]) -> Result<()> { - self.redis - .get() - .await - .map_err(QueueError::generic)? - .xadd( - &self.queue_key, - GENERATE_STREAM_ID, - &[(&self.payload_key, payload)], - ) - .await - .map_err(QueueError::generic) + streams::send_raw(self, payload).await } pub async fn send_serde_json(&self, payload: &P) -> Result<()> { @@ -736,45 +549,8 @@ pub struct RedisConsumer { } impl RedisConsumer { - fn wrap_entry(&self, entry: StreamId) -> Result { - let entry_id = entry.id.clone(); - let payload = entry.map.get(&self.payload_key).ok_or(QueueError::NoData)?; - let payload: Vec = redis::from_redis_value(payload).map_err(QueueError::generic)?; - - Ok(Delivery { - payload: Some(payload), - acker: Box::new(RedisAcker { - redis: self.redis.clone(), - queue_key: self.queue_key.clone(), - consumer_group: self.consumer_group.clone(), - entry_id, - already_acked_or_nacked: false, - }), - }) - } - pub async fn receive(&mut self) -> Result { - // Ensure an empty vec is never returned - let read_out: StreamReadReply = self - .redis - .get() - .await - .map_err(QueueError::generic)? - .xread_options( - &[&self.queue_key], - &[LISTEN_STREAM_ID], - &StreamReadOptions::default() - .group(&self.consumer_group, &self.consumer_name) - .block(100_000) - .count(1), - ) - .await - .map_err(QueueError::generic)?; - - let queue = read_out.keys.into_iter().next().ok_or(QueueError::NoData)?; - - let entry = queue.ids.into_iter().next().ok_or(QueueError::NoData)?; - self.wrap_entry(entry) + streams::receive(self).await } pub async fn receive_all( @@ -782,35 +558,7 @@ impl RedisConsumer { max_messages: usize, deadline: Duration, ) -> Result> { - let read_out: StreamReadReply = self - .redis - .get() - .await - .map_err(QueueError::generic)? - .xread_options( - &[&self.queue_key], - &[LISTEN_STREAM_ID], - &StreamReadOptions::default() - .group(&self.consumer_group, &self.consumer_name) - .block( - deadline - .as_millis() - .try_into() - .map_err(QueueError::generic)?, - ) - .count(max_messages), - ) - .await - .map_err(QueueError::generic)?; - - let mut out = Vec::with_capacity(max_messages); - - if let Some(queue) = read_out.keys.into_iter().next() { - for entry in queue.ids { - out.push(self.wrap_entry(entry)?); - } - } - Ok(out) + streams::receive_all(self, deadline, max_messages).await } } diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs new file mode 100644 index 0000000..3f863fc --- /dev/null +++ b/omniqueue/src/backends/redis/streams.rs @@ -0,0 +1,305 @@ +//! Implementation of the main queue using redis streams. + +use std::time::Duration; + +use async_trait::async_trait; +use bb8::ManageConnection; +use redis::{ + streams::{StreamClaimReply, StreamId, StreamReadOptions, StreamReadReply}, + AsyncCommands as _, FromRedisValue, RedisResult, +}; +use tracing::{error, trace}; + +use super::{from_delayed_queue_key, RedisConnection, RedisConsumer, RedisProducer}; +use crate::{queue::Acker, Delivery, QueueError, Result}; + +/// Special ID for XADD command's which generates a stream ID automatically +const GENERATE_STREAM_ID: &str = "*"; +/// Special ID for XREADGROUP commands which reads any new messages +const LISTEN_STREAM_ID: &str = ">"; + +/// The maximum number of pending messages to reinsert into the queue after +/// becoming stale per loop +// FIXME(onelson): expose in config? +const PENDING_BATCH_SIZE: i16 = 1000; + +pub(super) async fn send_raw( + producer: &RedisProducer, + payload: &[u8], +) -> Result<()> { + producer + .redis + .get() + .await + .map_err(QueueError::generic)? + .xadd( + &producer.queue_key, + GENERATE_STREAM_ID, + &[(&producer.payload_key, payload)], + ) + .await + .map_err(QueueError::generic) +} + +pub(super) async fn receive(consumer: &RedisConsumer) -> Result { + // Ensure an empty vec is never returned + let read_out: StreamReadReply = consumer + .redis + .get() + .await + .map_err(QueueError::generic)? + .xread_options( + &[&consumer.queue_key], + &[LISTEN_STREAM_ID], + &StreamReadOptions::default() + .group(&consumer.consumer_group, &consumer.consumer_name) + .block(100_000) + .count(1), + ) + .await + .map_err(QueueError::generic)?; + + let queue = read_out.keys.into_iter().next().ok_or(QueueError::NoData)?; + let entry = queue.ids.into_iter().next().ok_or(QueueError::NoData)?; + + wrap_entry(consumer, entry) +} + +pub(super) async fn receive_all( + consumer: &RedisConsumer, + deadline: Duration, + max_messages: usize, +) -> Result> { + let read_out: StreamReadReply = consumer + .redis + .get() + .await + .map_err(QueueError::generic)? + .xread_options( + &[&consumer.queue_key], + &[LISTEN_STREAM_ID], + &StreamReadOptions::default() + .group(&consumer.consumer_group, &consumer.consumer_name) + .block( + deadline + .as_millis() + .try_into() + .map_err(QueueError::generic)?, + ) + .count(max_messages), + ) + .await + .map_err(QueueError::generic)?; + + let mut out = Vec::with_capacity(max_messages); + + if let Some(queue) = read_out.keys.into_iter().next() { + for entry in queue.ids { + let wrapped = wrap_entry(consumer, entry)?; + out.push(wrapped); + } + } + Ok(out) +} + +fn wrap_entry( + consumer: &RedisConsumer, + entry: StreamId, +) -> Result { + let entry_id = entry.id.clone(); + let payload = entry + .map + .get(&consumer.payload_key) + .ok_or(QueueError::NoData)?; + let payload: Vec = redis::from_redis_value(payload).map_err(QueueError::generic)?; + + Ok(Delivery { + payload: Some(payload), + acker: Box::new(RedisStreamsAcker { + redis: consumer.redis.clone(), + queue_key: consumer.queue_key.to_owned(), + consumer_group: consumer.consumer_group.to_owned(), + entry_id, + already_acked_or_nacked: false, + }), + }) +} + +struct RedisStreamsAcker { + redis: bb8::Pool, + queue_key: String, + consumer_group: String, + entry_id: String, + + already_acked_or_nacked: bool, +} + +#[async_trait] +impl Acker for RedisStreamsAcker { + async fn ack(&mut self) -> Result<()> { + if self.already_acked_or_nacked { + return Err(QueueError::CannotAckOrNackTwice); + } + + self.already_acked_or_nacked = true; + + let mut pipeline = redis::pipe(); + pipeline.xack(&self.queue_key, &self.consumer_group, &[&self.entry_id]); + pipeline.xdel(&self.queue_key, &[&self.entry_id]); + + let mut conn = self.redis.get().await.map_err(QueueError::generic)?; + pipeline + .query_async(&mut *conn) + .await + .map_err(QueueError::generic) + } + + async fn nack(&mut self) -> Result<()> { + if self.already_acked_or_nacked { + return Err(QueueError::CannotAckOrNackTwice); + } + + self.already_acked_or_nacked = true; + + Ok(()) + } +} + +pub(super) async fn add_to_main_queue( + keys: &[Vec], + main_queue_name: &str, + payload_key: &str, + conn: &mut impl redis::aio::ConnectionLike, +) -> Result<()> { + let mut pipe = redis::pipe(); + for key in keys { + let payload = from_delayed_queue_key(key)?; + let _ = pipe.xadd( + main_queue_name, + GENERATE_STREAM_ID, + &[(payload_key, payload)], + ); + } + + let _: () = pipe.query_async(conn).await.map_err(QueueError::generic)?; + + Ok(()) +} + +struct StreamAutoclaimReply { + ids: Vec, +} + +impl FromRedisValue for StreamAutoclaimReply { + fn from_redis_value(v: &redis::Value) -> RedisResult { + // First try the two member array from before Redis 7.0 + match <((), StreamClaimReply)>::from_redis_value(v) { + Ok(res) => Ok(StreamAutoclaimReply { ids: res.1.ids }), + + // If it's a type error, then try the three member array from Redis 7.0 and after + Err(e) if e.kind() == redis::ErrorKind::TypeError => { + <((), StreamClaimReply, ())>::from_redis_value(v) + .map(|ok| StreamAutoclaimReply { ids: ok.1.ids }) + } + // Any other error should be returned as is + Err(e) => Err(e), + } + } +} + +/// Scoops up messages that have been claimed but not handled by a deadline, +/// then re-queues them. +pub(super) async fn background_task_pending( + pool: bb8::Pool, + queue_key: String, + consumer_group: String, + consumer_name: String, + ack_deadline_ms: i64, +) -> Result<()> { + loop { + if let Err(err) = reenqueue_timed_out_messages( + &pool, + &queue_key, + &consumer_group, + &consumer_name, + ack_deadline_ms, + ) + .await + { + error!("{err}"); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + } + } +} + +async fn reenqueue_timed_out_messages( + pool: &bb8::Pool, + main_queue_name: &str, + consumer_group: &str, + consumer_name: &str, + ack_deadline_ms: i64, +) -> Result<()> { + let mut conn = pool.get().await.map_err(QueueError::generic)?; + + // Every iteration checks whether the processing queue has items that should + // be picked back up, claiming them in the process + let mut cmd = redis::cmd("XAUTOCLAIM"); + cmd.arg(main_queue_name) + .arg(consumer_group) + .arg(consumer_name) + .arg(ack_deadline_ms) + .arg("-") + .arg("COUNT") + .arg(PENDING_BATCH_SIZE); + + let StreamAutoclaimReply { ids } = cmd + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + if !ids.is_empty() { + trace!("Moving {} unhandled messages back to the queue", ids.len()); + + let mut pipe = redis::pipe(); + + // And reinsert the map of KV pairs into the MAIN queue with a new stream ID + for StreamId { map, .. } in &ids { + let _ = pipe.xadd( + main_queue_name, + GENERATE_STREAM_ID, + &map.iter() + .filter_map(|(k, v)| { + if let redis::Value::Data(data) = v { + Some((k.as_str(), data.as_slice())) + } else { + None + } + }) + .collect::>(), + ); + } + + let _: () = pipe + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + // Acknowledge all the stale ones so the pending queue is cleared + let ids: Vec<_> = ids.iter().map(|wrapped| &wrapped.id).collect(); + + let mut pipe = redis::pipe(); + pipe.xack(main_queue_name, consumer_group, &ids); + pipe.xdel(main_queue_name, &ids); + + let _: () = pipe + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + } else { + // Wait for half a second before attempting to fetch again if nothing was found + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Ok(()) +} From 028a1fa710610c80ae02f1c106f9f422d5680471 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Mon, 29 Apr 2024 17:36:06 +0200 Subject: [PATCH 3/4] redis: Add a fallback implementation that works without redis streams --- omniqueue/Cargo.toml | 4 +- omniqueue/src/backends/redis/fallback.rs | 196 +++++++++++++++++++++++ omniqueue/src/backends/redis/mod.rs | 132 +++++++++++---- omniqueue/src/backends/redis/streams.rs | 4 +- 4 files changed, 297 insertions(+), 39 deletions(-) create mode 100644 omniqueue/src/backends/redis/fallback.rs diff --git a/omniqueue/Cargo.toml b/omniqueue/Cargo.toml index a413e23..0480cf9 100644 --- a/omniqueue/Cargo.toml +++ b/omniqueue/Cargo.toml @@ -27,7 +27,7 @@ serde = "1.0.196" serde_json = "1" svix-ksuid = { version = "0.8.0", optional = true } thiserror = "1" -time = { version = "0.3.34", optional = true } +time = "0.3.34" tokio = { version = "1", features = ["rt", "sync", "time"] } tracing = "0.1" @@ -45,7 +45,7 @@ in_memory = [] gcp_pubsub = ["dep:futures-util", "dep:google-cloud-googleapis", "dep:google-cloud-pubsub"] rabbitmq = ["dep:futures-util", "dep:lapin"] # Generate message IDs for queue items. Likely not needed outside of Svix. -rabbitmq-with-message-ids = ["rabbitmq", "dep:time", "dep:svix-ksuid"] +rabbitmq-with-message-ids = ["rabbitmq", "dep:svix-ksuid"] redis = ["dep:bb8", "dep:bb8-redis", "dep:redis", "dep:svix-ksuid"] redis_cluster = ["redis", "redis/cluster-async"] sqs = ["dep:aws-config", "dep:aws-sdk-sqs"] diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs new file mode 100644 index 0000000..a774414 --- /dev/null +++ b/omniqueue/src/backends/redis/fallback.rs @@ -0,0 +1,196 @@ +//! Implementation of the main queue using two lists instead of redis streams, +//! for compatibility with redis versions older than 6.2.0. + +use std::time::Duration; + +use async_trait::async_trait; +use bb8::ManageConnection; +use redis::AsyncCommands; +use svix_ksuid::{KsuidLike as _, KsuidMs}; +use time::OffsetDateTime; +use tracing::{error, trace}; + +use super::{from_key, to_key, RawPayload, RedisConnection, RedisConsumer, RedisProducer}; +use crate::{queue::Acker, Delivery, QueueError, Result}; + +pub(super) async fn send_raw( + producer: &RedisProducer, + payload: &[u8], +) -> Result<()> { + producer + .redis + .get() + .await + .map_err(QueueError::generic)? + .lpush(&producer.queue_key, to_key(payload)) + .await + .map_err(QueueError::generic) +} + +pub(super) async fn receive(consumer: &RedisConsumer) -> Result { + let res = receive_with_timeout(consumer, Duration::ZERO).await?; + res.ok_or_else(|| QueueError::Generic("No data".into())) +} + +pub(super) async fn receive_all( + consumer: &RedisConsumer, + deadline: Duration, + _max_messages: usize, +) -> Result> { + // FIXME: Run up to max_messages RPOPLPUSH'es until there is a null reply? + let delivery = receive_with_timeout(consumer, deadline).await?; + Ok(delivery.into_iter().collect()) +} + +async fn receive_with_timeout( + consumer: &RedisConsumer, + timeout: Duration, +) -> Result> { + let key: Option> = consumer + .redis + .get() + .await + .map_err(QueueError::generic)? + .brpoplpush( + &consumer.queue_key, + &consumer.processing_queue_key, + // The documentation at https://redis.io/docs/latest/commands/brpoplpush/ does not + // state what unit the timeout is, but `BLPOP` and `BLMPOP` have similar timeout + // parameters that are documented as being seconds. + timeout.as_secs_f64(), + ) + .await + .map_err(QueueError::generic)?; + + key.map(|key| make_delivery(consumer, &key)).transpose() +} + +fn make_delivery(consumer: &RedisConsumer, key: &[u8]) -> Result { + let (_, payload) = from_key(key)?; + + Ok(Delivery { + payload: Some(payload.to_owned()), + acker: Box::new(RedisFallbackAcker { + redis: consumer.redis.clone(), + processing_queue_key: consumer.processing_queue_key.clone(), + key: key.to_owned(), + already_acked_or_nacked: false, + }), + }) +} + +struct RedisFallbackAcker { + redis: bb8::Pool, + processing_queue_key: String, + key: RawPayload, + + already_acked_or_nacked: bool, +} + +#[async_trait] +impl Acker for RedisFallbackAcker { + async fn ack(&mut self) -> Result<()> { + if self.already_acked_or_nacked { + return Err(QueueError::CannotAckOrNackTwice); + } + + self.already_acked_or_nacked = true; + + self.redis + .get() + .await + .map_err(QueueError::generic)? + .lrem(&self.processing_queue_key, 1, &self.key) + .await + .map_err(QueueError::generic)?; + + Ok(()) + } + + async fn nack(&mut self) -> Result<()> { + if self.already_acked_or_nacked { + return Err(QueueError::CannotAckOrNackTwice); + } + + self.already_acked_or_nacked = true; + + Ok(()) + } +} + +pub(super) async fn add_to_main_queue( + keys: &[Vec], + main_queue_name: &str, + conn: &mut impl AsyncCommands, +) -> Result<()> { + let new_keys = keys + .iter() + .map(|x| regenerate_key(x)) + .collect::>>()?; + let _: () = conn + .lpush(main_queue_name, new_keys) + .await + .map_err(QueueError::generic)?; + Ok(()) +} + +pub(super) async fn background_task_processing( + pool: bb8::Pool, + queue_key: String, + processing_queue_key: String, + ack_deadline_ms: i64, +) -> Result<()> { + // FIXME: ack_deadline_ms should be unsigned + let ack_deadline = Duration::from_millis(ack_deadline_ms as _); + loop { + if let Err(err) = + reenqueue_timed_out_messages(&pool, &queue_key, &processing_queue_key, ack_deadline) + .await + { + error!("{err}"); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + } + } +} + +async fn reenqueue_timed_out_messages( + pool: &bb8::Pool, + queue_key: &str, + processing_queue_key: &str, + ack_deadline: Duration, +) -> Result<(), Box> { + const BATCH_SIZE: isize = 50; + + let mut conn = pool.get().await?; + + let keys: Vec = conn.lrange(processing_queue_key, 0, 1).await?; + + // If the key is older than now, it means we should be processing keys + let validity_limit = KsuidMs::new(Some(OffsetDateTime::now_utc() - ack_deadline), None) + .to_string() + .into_bytes(); + + if !keys.is_empty() && keys[0] <= validity_limit { + let keys: Vec = conn.lrange(processing_queue_key, 0, BATCH_SIZE).await?; + for key in keys { + if key <= validity_limit { + // We use LREM to be sure we only delete the keys we should be deleting + trace!("Pushing back overdue task to queue"); + let refreshed_key = regenerate_key(&key)?; + let _: () = conn.rpush(queue_key, &refreshed_key).await?; + let _: () = conn.lrem(processing_queue_key, 1, &key).await?; + } + } + } else { + // Sleep before attempting to fetch again if nothing was found + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Ok(()) +} + +fn regenerate_key(key: &[u8]) -> Result { + let (_, payload) = from_key(key)?; + Ok(to_key(payload)) +} diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 3d4895a..9f838ec 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -1,4 +1,7 @@ -//! Redis stream-based queue implementation +//! Redis queue implementation. +//! +//! By default, this uses redis streams. There is a fallback implementation that +//! you can select via `RedisBackend::builder(cfg).use_redis_streams(false)`. //! //! # Redis Streams in Brief //! @@ -31,6 +34,7 @@ use std::{ marker::PhantomData, + str, sync::Arc, time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}, }; @@ -53,6 +57,7 @@ use crate::{ #[cfg(feature = "redis_cluster")] mod cluster; +mod fallback; mod streams; #[cfg(feature = "redis_cluster")] @@ -191,6 +196,7 @@ impl QueueBackend for RedisBackend { pub struct RedisBackendBuilder { config: RedisConfig, + use_redis_streams: bool, _phantom: PhantomData (R, S)>, } @@ -201,6 +207,15 @@ impl RedisBackendBuilder { fn new(config: RedisConfig) -> Self { Self { config, + use_redis_streams: true, + _phantom: PhantomData, + } + } + + fn map_phantom(self) -> RedisBackendBuilder { + RedisBackendBuilder { + config: self.config, + use_redis_streams: self.use_redis_streams, _phantom: PhantomData, } } @@ -211,10 +226,7 @@ impl RedisBackendBuilder { /// manager implementation. For clustered redis, use /// [`.cluster()`][Self::cluster]. pub fn connection_manager(self) -> RedisBackendBuilder { - RedisBackendBuilder { - config: self.config, - _phantom: PhantomData, - } + self.map_phantom() } #[cfg(feature = "redis_cluster")] @@ -222,6 +234,21 @@ impl RedisBackendBuilder { self.connection_manager() } + /// Whether to use redis streams. + /// + /// Default: `true`.\ + /// Set this to `false` if you want to use this backend with a version of + /// redis older than 6.2.0. + /// + /// Note: Make sure this setting matches between producers and consumers, + /// and don't change it as part of an upgrade unless you have made sure to + /// either empty the previous data, migrate it yourself or use different + /// queue keys. + pub fn use_redis_streams(mut self, value: bool) -> Self { + self.use_redis_streams = value; + self + } + pub async fn build_pair(self) -> Result<(RedisProducer, RedisConsumer)> { let redis = R::from_dsn(&self.config.dsn)?; let redis = bb8::Pool::builder() @@ -231,6 +258,7 @@ impl RedisBackendBuilder { .map_err(QueueError::generic)?; let background_tasks = self.start_background_tasks(redis.clone()).await; + let processing_queue_key = format!("{}_processing", self.config.queue_key); Ok(( RedisProducer { @@ -238,14 +266,17 @@ impl RedisBackendBuilder { queue_key: self.config.queue_key.clone(), delayed_queue_key: self.config.delayed_queue_key, payload_key: self.config.payload_key.clone(), + use_redis_streams: self.use_redis_streams, _background_tasks: background_tasks.clone(), }, RedisConsumer { redis, queue_key: self.config.queue_key, + processing_queue_key, consumer_group: self.config.consumer_group, consumer_name: self.config.consumer_name, payload_key: self.config.payload_key, + use_redis_streams: self.use_redis_streams, _background_tasks: background_tasks.clone(), }, )) @@ -265,6 +296,7 @@ impl RedisBackendBuilder { queue_key: self.config.queue_key, delayed_queue_key: self.config.delayed_queue_key, payload_key: self.config.payload_key, + use_redis_streams: self.use_redis_streams, _background_tasks, }) } @@ -278,22 +310,22 @@ impl RedisBackendBuilder { .map_err(QueueError::generic)?; let _background_tasks = self.start_background_tasks(redis.clone()).await; + let processing_queue_key = format!("{}_processing", self.config.queue_key); Ok(RedisConsumer { redis, queue_key: self.config.queue_key, + processing_queue_key, consumer_group: self.config.consumer_group, consumer_name: self.config.consumer_name, payload_key: self.config.payload_key, + use_redis_streams: self.use_redis_streams, _background_tasks, }) } pub fn make_dynamic(self) -> RedisBackendBuilder { - RedisBackendBuilder { - config: self.config, - _phantom: PhantomData, - } + self.map_phantom() } // FIXME(onelson): there's a trait, `SchedulerBackend`, but no obvious way to @@ -315,6 +347,7 @@ impl RedisBackendBuilder { let delayed_queue_key = self.config.delayed_queue_key.to_owned(); let delayed_lock_key = self.config.delayed_lock_key.to_owned(); let payload_key = self.config.payload_key.to_owned(); + let use_redis_streams = self.use_redis_streams; #[rustfmt::skip] debug!( @@ -330,6 +363,7 @@ impl RedisBackendBuilder { &delayed_queue_key, &delayed_lock_key, &payload_key, + use_redis_streams, ) .await { @@ -342,13 +376,22 @@ impl RedisBackendBuilder { }); } - join_set.spawn(streams::background_task_pending( - redis.clone(), - self.config.queue_key.to_owned(), - self.config.consumer_group.to_owned(), - self.config.consumer_name.to_owned(), - self.config.ack_deadline_ms, - )); + if self.use_redis_streams { + join_set.spawn(streams::background_task_pending( + redis.clone(), + self.config.queue_key.to_owned(), + self.config.consumer_group.to_owned(), + self.config.consumer_name.to_owned(), + self.config.ack_deadline_ms, + )); + } else { + join_set.spawn(fallback::background_task_processing( + redis.clone(), + self.config.queue_key.to_owned(), + format!("{}_processing", self.config.queue_key), + self.config.ack_deadline_ms, + )); + } join_set.spawn({ async move { @@ -391,6 +434,7 @@ async fn background_task_delayed( delayed_queue_name: &str, delayed_lock: &str, payload_key: &str, + use_redis_streams: bool, ) -> Result<()> { const BATCH_SIZE: isize = 50; @@ -429,7 +473,11 @@ async fn background_task_delayed( if !keys.is_empty() { trace!("Moving {} messages from delayed to main queue", keys.len()); - streams::add_to_main_queue(&keys, main_queue_name, payload_key, &mut *conn).await?; + if use_redis_streams { + streams::add_to_main_queue(&keys, main_queue_name, payload_key, &mut *conn).await?; + } else { + fallback::add_to_main_queue(&keys, main_queue_name, &mut *conn).await?; + } // Then remove the tasks from the delayed queue so they aren't resent let _: () = conn @@ -459,12 +507,17 @@ pub struct RedisProducer { queue_key: String, delayed_queue_key: String, payload_key: String, + use_redis_streams: bool, _background_tasks: Arc>>, } impl RedisProducer { pub async fn send_raw(&self, payload: &[u8]) -> Result<()> { - streams::send_raw(self, payload).await + if self.use_redis_streams { + streams::send_raw(self, payload).await + } else { + fallback::send_raw(self, payload).await + } } pub async fn send_serde_json(&self, payload: &P) -> Result<()> { @@ -479,11 +532,7 @@ impl RedisProducer { .get() .await .map_err(QueueError::generic)? - .zadd( - &self.delayed_queue_key, - to_delayed_queue_key(payload), - timestamp, - ) + .zadd(&self.delayed_queue_key, to_key(payload), timestamp) .await .map_err(QueueError::generic)?; @@ -513,44 +562,53 @@ fn unix_timestamp(time: SystemTime) -> Result { /// This ensures that messages with identical payloads: /// - don't only get delivered once instead of N times. /// - don't replace each other's "delivery due" timestamp. -fn delayed_key_id() -> RawPayload { - svix_ksuid::Ksuid::new(None, None).to_base62().into_bytes() +fn delayed_key_id() -> String { + svix_ksuid::Ksuid::new(None, None).to_base62() } /// Prefixes a payload with an id, separated by a pipe, e.g `ID|payload`. -fn to_delayed_queue_key(payload: &[u8]) -> RawPayload { - // Base62-encoded KSUID is always 27 bytes long, 1 byte for separator. - let mut result = Vec::with_capacity(payload.len() + 28); +fn to_key(payload: &[u8]) -> RawPayload { + let id = delayed_key_id(); - result.extend(delayed_key_id()); + let mut result = Vec::with_capacity(id.len() + payload.len() + 1); + result.extend(id.as_bytes()); result.push(b'|'); - result.extend(payload.iter().copied()); + result.extend(payload); result } -/// Returns the payload portion of a delayed zset key. -fn from_delayed_queue_key(key: &[u8]) -> Result<&[u8]> { +/// Splits a key encoded with [`to_key`] into ID and payload. +fn from_key(key: &[u8]) -> Result<(&str, &[u8])> { // All information is stored in the key in which the ID and JSON formatted task // are separated by a `|`. So, take the key, then take the part after the `|`. let sep_pos = key .iter() .position(|&byte| byte == b'|') .ok_or_else(|| QueueError::Generic("Improper key format".into()))?; - Ok(&key[sep_pos + 1..]) + let id = str::from_utf8(&key[..sep_pos]) + .map_err(|_| QueueError::Generic("Non-UTF8 key ID".into()))?; + + Ok((id, &key[sep_pos + 1..])) } pub struct RedisConsumer { redis: bb8::Pool, queue_key: String, + processing_queue_key: String, consumer_group: String, consumer_name: String, payload_key: String, + use_redis_streams: bool, _background_tasks: Arc>>, } impl RedisConsumer { pub async fn receive(&mut self) -> Result { - streams::receive(self).await + if self.use_redis_streams { + streams::receive(self).await + } else { + fallback::receive(self).await + } } pub async fn receive_all( @@ -558,7 +616,11 @@ impl RedisConsumer { max_messages: usize, deadline: Duration, ) -> Result> { - streams::receive_all(self, deadline, max_messages).await + if self.use_redis_streams { + streams::receive_all(self, deadline, max_messages).await + } else { + fallback::receive_all(self, deadline, max_messages).await + } } } diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index 3f863fc..3f50843 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -10,7 +10,7 @@ use redis::{ }; use tracing::{error, trace}; -use super::{from_delayed_queue_key, RedisConnection, RedisConsumer, RedisProducer}; +use super::{from_key, RedisConnection, RedisConsumer, RedisProducer}; use crate::{queue::Acker, Delivery, QueueError, Result}; /// Special ID for XADD command's which generates a stream ID automatically @@ -173,7 +173,7 @@ pub(super) async fn add_to_main_queue( ) -> Result<()> { let mut pipe = redis::pipe(); for key in keys { - let payload = from_delayed_queue_key(key)?; + let (_, payload) = from_key(key)?; let _ = pipe.xadd( main_queue_name, GENERATE_STREAM_ID, From 8907130ac3e2032d774cd0398bbc6111c1ecac3b Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 30 Apr 2024 12:48:39 +0200 Subject: [PATCH 4/4] redis: Add tests for new fallback implementation --- omniqueue/tests/it/main.rs | 2 + omniqueue/tests/it/redis_fallback.rs | 291 +++++++++++++++++++++++++++ 2 files changed, 293 insertions(+) create mode 100644 omniqueue/tests/it/redis_fallback.rs diff --git a/omniqueue/tests/it/main.rs b/omniqueue/tests/it/main.rs index 14a9d89..34eba23 100644 --- a/omniqueue/tests/it/main.rs +++ b/omniqueue/tests/it/main.rs @@ -8,5 +8,7 @@ mod rabbitmq; mod redis; #[cfg(feature = "redis_cluster")] mod redis_cluster; +#[cfg(feature = "redis")] +mod redis_fallback; #[cfg(feature = "sqs")] mod sqs; diff --git a/omniqueue/tests/it/redis_fallback.rs b/omniqueue/tests/it/redis_fallback.rs new file mode 100644 index 0000000..5606770 --- /dev/null +++ b/omniqueue/tests/it/redis_fallback.rs @@ -0,0 +1,291 @@ +use std::time::{Duration, Instant}; + +use omniqueue::backends::{redis::RedisBackendBuilder, RedisBackend, RedisConfig}; +use redis::{Client, Commands}; +use serde::{Deserialize, Serialize}; + +const ROOT_URL: &str = "redis://localhost"; + +pub struct RedisKeyDrop(String); +impl Drop for RedisKeyDrop { + fn drop(&mut self) { + let client = Client::open(ROOT_URL).unwrap(); + let mut conn = client.get_connection().unwrap(); + let _: () = conn.del(&self.0).unwrap(); + } +} + +/// Returns a [`QueueBuilder`] configured to connect to the Redis instance +/// spawned by the file `testing-docker-compose.yaml` in the root of the +/// repository. +/// +/// Additionally this will make a temporary stream on that instance for the +/// duration of the test such as to ensure there is no stealing +/// +/// This will also return a [`RedisKeyDrop`] to clean up the stream after the +/// test ends. +async fn make_test_queue() -> (RedisBackendBuilder, RedisKeyDrop) { + let queue_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: queue_key.clone(), + delayed_queue_key: format!("{queue_key}::delayed"), + delayed_lock_key: format!("{queue_key}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 5_000, + }; + + ( + RedisBackend::builder(config).use_redis_streams(false), + RedisKeyDrop(queue_key), + ) +} + +#[tokio::test] +async fn test_raw_send_recv() { + let (builder, _drop) = make_test_queue().await; + let payload = b"{\"test\": \"data\"}"; + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_raw(payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.borrow_payload().unwrap(), payload); +} + +#[tokio::test] +async fn test_bytes_send_recv() { + use omniqueue::QueueProducer as _; + + let (builder, _drop) = make_test_queue().await; + let payload = b"hello"; + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_bytes(payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.borrow_payload().unwrap(), payload); + d.ack().await.unwrap(); +} + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub struct ExType { + a: u8, +} + +#[tokio::test] +async fn test_serde_send_recv() { + let (builder, _drop) = make_test_queue().await; + let payload = ExType { a: 2 }; + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.payload_serde_json::().unwrap().unwrap(), payload); + d.ack().await.unwrap(); +} + +// Fallback implementation currently implements receive_all such that it always +// only returns the first item, uncomment when the implementation is changed. +/* +/// Consumer will return immediately if there are fewer than max messages to +/// start with. +#[tokio::test] +async fn test_send_recv_all_partial() { + let (builder, _drop) = make_test_queue().await; + + let payload = ExType { a: 2 }; + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 1); + let d = xs.remove(0); + assert_eq!(d.payload_serde_json::().unwrap().unwrap(), payload); + d.ack().await.unwrap(); + assert!(now.elapsed() <= deadline); +} + +/// Consumer should yield items immediately if there's a full batch ready on the +/// first poll. +#[tokio::test] +async fn test_send_recv_all_full() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + // N.b. it's still possible this could turn up false if the test runs too + // slow. + assert!(now.elapsed() < deadline); +} + +/// Consumer will return the full batch immediately, but also return immediately +/// if a partial batch is ready. +#[tokio::test] +async fn test_send_recv_all_full_then_partial() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let payload3 = ExType { a: 3 }; + + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + p.send_serde_json(&payload3).await.unwrap(); + + let deadline = Duration::from_secs(1); + let now1 = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + assert!(now1.elapsed() < deadline); + + // 2nd call + let now2 = Instant::now(); + let mut ys = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(ys.len(), 1); + let d3 = ys.remove(0); + assert_eq!( + d3.payload_serde_json::().unwrap().unwrap(), + payload3 + ); + d3.ack().await.unwrap(); + assert!(now2.elapsed() < deadline); +} + +/// Consumer will NOT wait indefinitely for at least one item. +#[tokio::test] +async fn test_send_recv_all_late_arriving_items() { + let (builder, _drop) = make_test_queue().await; + + let (_p, mut c) = builder.build_pair().await.unwrap(); + + let deadline = Duration::from_secs(1); + let now = Instant::now(); + let xs = c.receive_all(2, deadline).await.unwrap(); + let elapsed = now.elapsed(); + + assert_eq!(xs.len(), 0); + // Elapsed should be around the deadline, ballpark + assert!(elapsed >= deadline); + assert!(elapsed <= deadline + Duration::from_millis(200)); +} +*/ + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} + +#[tokio::test] +async fn test_pending() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + let delivery1 = c.receive().await.unwrap(); + let delivery2 = c.receive().await.unwrap(); + + // All items claimed, but not yet ack'd. There shouldn't be anything available + // yet. + assert!(c + .receive_all(1, Duration::from_millis(1)) + .await + .unwrap() + .is_empty()); + + assert_eq!( + Some(&payload1), + delivery1.payload_serde_json().unwrap().as_ref() + ); + assert_eq!( + Some(&payload2), + delivery2.payload_serde_json().unwrap().as_ref() + ); + + // ack 2, but neglect 1 + let _ = delivery2.ack().await; + + // After the deadline, the first payload should appear again. + let delivery3 = c.receive().await.unwrap(); + assert_eq!( + Some(&payload1), + delivery3.payload_serde_json().unwrap().as_ref() + ); + + // queue should be empty once again + assert!(c + .receive_all(1, Duration::from_millis(1)) + .await + .unwrap() + .is_empty()); +}