Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(redis): add "pending" task to re-queue unhandled messages #20

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 203 additions & 41 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@
// have generic return types. This is cleaner than the turbofish operator in my opinion.
#![allow(clippy::let_unit_value)]

use std::sync::Arc;
use std::time::Duration;
use std::{any::TypeId, collections::HashMap, marker::PhantomData};

use async_trait::async_trait;
use bb8::ManageConnection;
pub use bb8_redis::RedisMultiplexedConnectionManager;
use redis::streams::{StreamId, StreamReadOptions, StreamReadReply};
use redis::{
streams::{StreamClaimReply, StreamId, StreamReadOptions, StreamReadReply},
FromRedisValue, RedisResult,
};
use svix_ksuid::KsuidLike;
use tokio::task::JoinHandle;
use tokio::task::JoinSet;

use crate::{
decoding::DecoderRegistry,
Expand Down Expand Up @@ -75,6 +79,7 @@ pub struct RedisConfig {
pub consumer_group: String,
pub consumer_name: String,
pub payload_key: String,
pub ack_deadline_ms: i64,
}

pub struct RedisQueueBackend<R = RedisMultiplexedConnectionManager>(PhantomData<R>);
Expand Down Expand Up @@ -110,13 +115,18 @@ where
.await
.map_err(QueueError::generic)?;

let _ = start_scheduler_background_task(
redis.clone(),
&cfg.queue_key,
&cfg.delayed_queue_key,
&cfg.payload_key,
)
.await;
let background_tasks = Arc::new(
start_background_tasks(
redis.clone(),
&cfg.queue_key,
&cfg.delayed_queue_key,
&cfg.payload_key,
&cfg.consumer_group,
&cfg.consumer_name,
cfg.ack_deadline_ms,
)
.await,
);

Ok((
RedisStreamProducer {
Expand All @@ -125,6 +135,7 @@ where
queue_key: cfg.queue_key.clone(),
delayed_queue_key: cfg.delayed_queue_key,
payload_key: cfg.payload_key.clone(),
_background_tasks: background_tasks.clone(),
},
RedisStreamConsumer {
registry: custom_decoders,
Expand All @@ -133,6 +144,7 @@ where
consumer_group: cfg.consumer_group,
consumer_name: cfg.consumer_name,
payload_key: cfg.payload_key,
_background_tasks: background_tasks.clone(),
},
))
}
Expand All @@ -148,19 +160,25 @@ where
.await
.map_err(QueueError::generic)?;

let _ = start_scheduler_background_task(
redis.clone(),
&cfg.queue_key,
&cfg.delayed_queue_key,
&cfg.payload_key,
)
.await;
let background_tasks = Arc::new(
start_background_tasks(
redis.clone(),
&cfg.queue_key,
&cfg.delayed_queue_key,
&cfg.payload_key,
&cfg.consumer_group,
&cfg.consumer_name,
cfg.ack_deadline_ms,
)
.await,
);
Ok(RedisStreamProducer {
registry: custom_encoders,
redis,
queue_key: cfg.queue_key,
delayed_queue_key: cfg.delayed_queue_key,
payload_key: cfg.payload_key,
_background_tasks: background_tasks,
})
}

Expand All @@ -175,20 +193,27 @@ where
.await
.map_err(QueueError::generic)?;

let _ = start_scheduler_background_task(
redis.clone(),
&cfg.queue_key,
&cfg.delayed_queue_key,
&cfg.payload_key,
)
.await;
let background_tasks = Arc::new(
start_background_tasks(
redis.clone(),
&cfg.queue_key,
&cfg.delayed_queue_key,
&cfg.payload_key,
&cfg.consumer_group,
&cfg.consumer_name,
cfg.ack_deadline_ms,
)
.await,
);

Ok(RedisStreamConsumer {
registry: custom_decoders,
redis,
queue_key: cfg.queue_key,
consumer_group: cfg.consumer_group,
consumer_name: cfg.consumer_name,
payload_key: cfg.payload_key,
_background_tasks: background_tasks,
})
}
}
Expand All @@ -198,51 +223,86 @@ where
// We need access to the pool, and various bits of config to spawn a task, but none of that is
// available where it matters right now.
// Doing my own thing for now - standalone function that takes what it needs.
async fn start_scheduler_background_task<R>(
async fn start_background_tasks<R>(
redis: bb8::Pool<R>,
queue_key: &str,
delayed_queue_key: &str,
payload_key: &str,
) -> Option<JoinHandle<Result<(), QueueError>>>
consumer_group: &str,
consumer_name: &str,
task_timeout_ms: i64,
) -> JoinSet<Result<(), QueueError>>
where
R: RedisConnection,
R::Connection: redis::aio::ConnectionLike + Send + Sync,
R::Error: 'static + std::error::Error + Send + Sync,
{
let mut join_set = JoinSet::new();

// FIXME(onelson): does it even make sense to treat delay support as optional here?
if delayed_queue_key.is_empty() {
tracing::warn!("no delayed_queue_key specified - delayed task scheduler disabled");
return None;
} else {
join_set.spawn({
let pool = redis.clone();
let mqn = queue_key.to_string();
let dqn = delayed_queue_key.to_string();
// FIXME(onelson): should delayed_lock be configurable?
// Should `delayed_queue_name` even? Could be a suffix on `queue_name`.
let delayed_lock = format!("{delayed_queue_key}__lock");
let payload_key = payload_key.to_string();
tracing::debug!(
"spawning delayed task scheduler: delayed_queue_key=`{delayed_queue_key}`, \
delayed_lock=`{delayed_lock}`"
);

async move {
loop {
if let Err(err) = background_task_delayed(
pool.clone(),
mqn.clone(),
dqn.clone(),
&delayed_lock,
&payload_key,
)
.await
{
tracing::error!("{}", err);
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
};
}
}
});
}

Some(tokio::spawn({
join_set.spawn({
let pool = redis.clone();

let mqn = queue_key.to_string();
let dqn = delayed_queue_key.to_string();
let delayed_lock = format!("{delayed_queue_key}__lock");
let payload_key = payload_key.to_string();
tracing::debug!(
"spawning delayed task scheduler: delayed_queue_key=`{delayed_queue_key}`, \
delayed_lock=`{delayed_lock}`"
);
// FIXME(onelson): expose in config and confirm this is milliseconds
let consumer_group = consumer_group.to_string();
let consumer_name = consumer_name.to_string();

async move {
loop {
if let Err(err) = background_task_delayed(
if let Err(err) = background_task_pending(
pool.clone(),
mqn.clone(),
dqn.clone(),
&delayed_lock,
&payload_key,
&mqn,
&consumer_group,
&consumer_name,
task_timeout_ms,
)
.await
{
tracing::error!("{}", err);
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
};
}
}
}
}))
});
join_set
}

/// Special ID for XADD command's which generates a stream ID automatically
Expand Down Expand Up @@ -353,6 +413,106 @@ where
Ok(())
}

struct StreamAutoclaimReply {
ids: Vec<StreamId>,
}

impl FromRedisValue for StreamAutoclaimReply {
fn from_redis_value(v: &redis::Value) -> RedisResult<Self> {
// First try the two member array from before Redis 7.0
match <((), StreamClaimReply)>::from_redis_value(v) {
Ok(res) => Ok(StreamAutoclaimReply { ids: res.1.ids }),

// If it's a type error, then try the three member array from Redis 7.0 and after
Err(e) if e.kind() == redis::ErrorKind::TypeError => {
<((), StreamClaimReply, ())>::from_redis_value(v)
.map(|ok| StreamAutoclaimReply { ids: ok.1.ids })
}
// Any other error should be returned as is
Err(e) => Err(e),
}
}
}

/// The maximum number of pending messages to reinsert into the queue after becoming stale per loop
// FIXME(onelson): expose in config?
const PENDING_BATCH_SIZE: i16 = 1000;

/// Scoops up messages that have been claimed but not handled by a deadline, then re-queues them.
async fn background_task_pending<R>(
pool: bb8::Pool<R>,
main_queue_name: &str,
consumer_group: &str,
consumer_name: &str,
pending_duration: i64,
) -> Result<(), QueueError>
where
R: RedisConnection,
R::Connection: redis::aio::ConnectionLike + Send + Sync,
R::Error: 'static + std::error::Error + Send + Sync,
{
let mut conn = pool.get().await.map_err(QueueError::generic)?;

// Every iteration checks whether the processing queue has items that should be picked back up,
// claiming them in the process
let mut cmd = redis::cmd("XAUTOCLAIM");
cmd.arg(main_queue_name)
.arg(consumer_group)
.arg(consumer_name)
.arg(pending_duration)
.arg("-")
.arg("COUNT")
.arg(PENDING_BATCH_SIZE);

let StreamAutoclaimReply { ids } = cmd
.query_async(&mut *conn)
.await
.map_err(QueueError::generic)?;

if !ids.is_empty() {
let mut pipe = redis::pipe();

// And reinsert the map of KV pairs into the MAIN queue with a new stream ID
for StreamId { map, .. } in &ids {
let _ = pipe.xadd(
main_queue_name,
GENERATE_STREAM_ID,
&map.iter()
.filter_map(|(k, v)| {
if let redis::Value::Data(data) = v {
Some((k.as_str(), data.as_slice()))
} else {
None
}
})
.collect::<Vec<(&str, &[u8])>>(),
);
}

let _: () = pipe
.query_async(&mut *conn)
.await
.map_err(QueueError::generic)?;

// Acknowledge all the stale ones so the pending queue is cleared
let ids: Vec<_> = ids.iter().map(|wrapped| &wrapped.id).collect();

let mut pipe = redis::pipe();
pipe.add_command(redis::Cmd::xack(main_queue_name, consumer_group, &ids));
pipe.add_command(redis::Cmd::xdel(main_queue_name, &ids));

let _: () = pipe
.query_async(&mut *conn)
.await
.map_err(QueueError::generic)?;
} else {
// Wait for half a second before attempting to fetch again if nothing was found
tokio::time::sleep(Duration::from_millis(500)).await;
}

Ok(())
}

pub struct RedisStreamAcker<M: ManageConnection> {
redis: bb8::Pool<M>,
queue_key: String,
Expand Down Expand Up @@ -402,6 +562,7 @@ pub struct RedisStreamProducer<M: ManageConnection> {
queue_key: String,
delayed_queue_key: String,
payload_key: String,
_background_tasks: Arc<JoinSet<Result<(), QueueError>>>,
}

#[async_trait]
Expand Down Expand Up @@ -502,6 +663,7 @@ pub struct RedisStreamConsumer<M: ManageConnection> {
consumer_group: String,
consumer_name: String,
payload_key: String,
_background_tasks: Arc<JoinSet<Result<(), QueueError>>>,
}

impl<M> RedisStreamConsumer<M>
Expand Down
Loading