From 8907130ac3e2032d774cd0398bbc6111c1ecac3b Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 30 Apr 2024 12:48:39 +0200 Subject: [PATCH] redis: Add tests for new fallback implementation --- omniqueue/tests/it/main.rs | 2 + omniqueue/tests/it/redis_fallback.rs | 291 +++++++++++++++++++++++++++ 2 files changed, 293 insertions(+) create mode 100644 omniqueue/tests/it/redis_fallback.rs diff --git a/omniqueue/tests/it/main.rs b/omniqueue/tests/it/main.rs index 14a9d89..34eba23 100644 --- a/omniqueue/tests/it/main.rs +++ b/omniqueue/tests/it/main.rs @@ -8,5 +8,7 @@ mod rabbitmq; mod redis; #[cfg(feature = "redis_cluster")] mod redis_cluster; +#[cfg(feature = "redis")] +mod redis_fallback; #[cfg(feature = "sqs")] mod sqs; diff --git a/omniqueue/tests/it/redis_fallback.rs b/omniqueue/tests/it/redis_fallback.rs new file mode 100644 index 0000000..5606770 --- /dev/null +++ b/omniqueue/tests/it/redis_fallback.rs @@ -0,0 +1,291 @@ +use std::time::{Duration, Instant}; + +use omniqueue::backends::{redis::RedisBackendBuilder, RedisBackend, RedisConfig}; +use redis::{Client, Commands}; +use serde::{Deserialize, Serialize}; + +const ROOT_URL: &str = "redis://localhost"; + +pub struct RedisKeyDrop(String); +impl Drop for RedisKeyDrop { + fn drop(&mut self) { + let client = Client::open(ROOT_URL).unwrap(); + let mut conn = client.get_connection().unwrap(); + let _: () = conn.del(&self.0).unwrap(); + } +} + +/// Returns a [`QueueBuilder`] configured to connect to the Redis instance +/// spawned by the file `testing-docker-compose.yaml` in the root of the +/// repository. +/// +/// Additionally this will make a temporary stream on that instance for the +/// duration of the test such as to ensure there is no stealing +/// +/// This will also return a [`RedisKeyDrop`] to clean up the stream after the +/// test ends. +async fn make_test_queue() -> (RedisBackendBuilder, RedisKeyDrop) { + let queue_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: queue_key.clone(), + delayed_queue_key: format!("{queue_key}::delayed"), + delayed_lock_key: format!("{queue_key}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 5_000, + }; + + ( + RedisBackend::builder(config).use_redis_streams(false), + RedisKeyDrop(queue_key), + ) +} + +#[tokio::test] +async fn test_raw_send_recv() { + let (builder, _drop) = make_test_queue().await; + let payload = b"{\"test\": \"data\"}"; + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_raw(payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.borrow_payload().unwrap(), payload); +} + +#[tokio::test] +async fn test_bytes_send_recv() { + use omniqueue::QueueProducer as _; + + let (builder, _drop) = make_test_queue().await; + let payload = b"hello"; + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_bytes(payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.borrow_payload().unwrap(), payload); + d.ack().await.unwrap(); +} + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub struct ExType { + a: u8, +} + +#[tokio::test] +async fn test_serde_send_recv() { + let (builder, _drop) = make_test_queue().await; + let payload = ExType { a: 2 }; + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.payload_serde_json::().unwrap().unwrap(), payload); + d.ack().await.unwrap(); +} + +// Fallback implementation currently implements receive_all such that it always +// only returns the first item, uncomment when the implementation is changed. +/* +/// Consumer will return immediately if there are fewer than max messages to +/// start with. +#[tokio::test] +async fn test_send_recv_all_partial() { + let (builder, _drop) = make_test_queue().await; + + let payload = ExType { a: 2 }; + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 1); + let d = xs.remove(0); + assert_eq!(d.payload_serde_json::().unwrap().unwrap(), payload); + d.ack().await.unwrap(); + assert!(now.elapsed() <= deadline); +} + +/// Consumer should yield items immediately if there's a full batch ready on the +/// first poll. +#[tokio::test] +async fn test_send_recv_all_full() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + // N.b. it's still possible this could turn up false if the test runs too + // slow. + assert!(now.elapsed() < deadline); +} + +/// Consumer will return the full batch immediately, but also return immediately +/// if a partial batch is ready. +#[tokio::test] +async fn test_send_recv_all_full_then_partial() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let payload3 = ExType { a: 3 }; + + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + p.send_serde_json(&payload3).await.unwrap(); + + let deadline = Duration::from_secs(1); + let now1 = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + assert!(now1.elapsed() < deadline); + + // 2nd call + let now2 = Instant::now(); + let mut ys = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(ys.len(), 1); + let d3 = ys.remove(0); + assert_eq!( + d3.payload_serde_json::().unwrap().unwrap(), + payload3 + ); + d3.ack().await.unwrap(); + assert!(now2.elapsed() < deadline); +} + +/// Consumer will NOT wait indefinitely for at least one item. +#[tokio::test] +async fn test_send_recv_all_late_arriving_items() { + let (builder, _drop) = make_test_queue().await; + + let (_p, mut c) = builder.build_pair().await.unwrap(); + + let deadline = Duration::from_secs(1); + let now = Instant::now(); + let xs = c.receive_all(2, deadline).await.unwrap(); + let elapsed = now.elapsed(); + + assert_eq!(xs.len(), 0); + // Elapsed should be around the deadline, ballpark + assert!(elapsed >= deadline); + assert!(elapsed <= deadline + Duration::from_millis(200)); +} +*/ + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} + +#[tokio::test] +async fn test_pending() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + let delivery1 = c.receive().await.unwrap(); + let delivery2 = c.receive().await.unwrap(); + + // All items claimed, but not yet ack'd. There shouldn't be anything available + // yet. + assert!(c + .receive_all(1, Duration::from_millis(1)) + .await + .unwrap() + .is_empty()); + + assert_eq!( + Some(&payload1), + delivery1.payload_serde_json().unwrap().as_ref() + ); + assert_eq!( + Some(&payload2), + delivery2.payload_serde_json().unwrap().as_ref() + ); + + // ack 2, but neglect 1 + let _ = delivery2.ack().await; + + // After the deadline, the first payload should appear again. + let delivery3 = c.receive().await.unwrap(); + assert_eq!( + Some(&payload1), + delivery3.payload_serde_json().unwrap().as_ref() + ); + + // queue should be empty once again + assert!(c + .receive_all(1, Duration::from_millis(1)) + .await + .unwrap() + .is_empty()); +}