From 80e543fe4fd0a8c90cc07ca11059afa181485663 Mon Sep 17 00:00:00 2001 From: Rafi Date: Sun, 13 Oct 2024 22:56:38 -0300 Subject: [PATCH] Remove enqueue unique --- internal/rdb/rdb.go | 8 ++++---- internal/script/enqueue.lua | 32 +++++++++++++++++++++++------- internal/script/enqueue_unique.lua | 25 ----------------------- internal/script/script.go | 7 ------- 4 files changed, 29 insertions(+), 43 deletions(-) delete mode 100644 internal/script/enqueue_unique.lua diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 1cda9fa4..6a3ef437 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -92,8 +92,8 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error { base.PendingKey(msg.Queue), } argv := []interface{}{ - encoded, msg.ID, + encoded, r.clock.Now().UnixNano(), } n, err := r.runScriptWithErrorCode(ctx, op, script.EnqueueCmd, keys, argv...) @@ -118,17 +118,17 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) } keys := []string{ - msg.UniqueKey, base.TaskKey(msg.Queue, msg.ID), base.PendingKey(msg.Queue), + msg.UniqueKey, } argv := []interface{}{ msg.ID, - int(ttl.Seconds()), encoded, r.clock.Now().UnixNano(), + int(ttl.Seconds()), } - n, err := r.runScriptWithErrorCode(ctx, op, script.EnqueueUniqueCmd, keys, argv...) + n, err := r.runScriptWithErrorCode(ctx, op, script.EnqueueCmd, keys, argv...) if err != nil { return err } diff --git a/internal/script/enqueue.lua b/internal/script/enqueue.lua index df904d6c..9d16ef31 100644 --- a/internal/script/enqueue.lua +++ b/internal/script/enqueue.lua @@ -1,19 +1,37 @@ --- enqueueCmd enqueues a given task message. -- --- Input: -- KEYS[1] -> asynq:{}:t: -- KEYS[2] -> asynq:{}:pending --- -- --- ARGV[1] -> task message data --- ARGV[2] -> task ID +-- KEYS[3] -> unique key (optional, only for unique enqueue) +-- +-- ARGV[1] -> task ID +-- ARGV[2] -> task message data -- ARGV[3] -> current unix time in nsec +-- ARGV[4] -> uniqueness lock TTL (optional, only for unique enqueue) -- -- Output: -- Returns 1 if successfully enqueued -- Returns 0 if task ID already exists +-- Returns -1 if task unique key already exists (only for unique enqueue) + +local is_unique = (#KEYS == 3 and #ARGV == 4) + +if is_unique then + -- Unique enqueue logic + local ok = redis.call("SET", KEYS[3], ARGV[1], "NX", "EX", ARGV[4]) + if not ok then + return -1 + end +end + if redis.call("EXISTS", KEYS[1]) == 1 then return 0 end -redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "pending", "pending_since", ARGV[3]) -redis.call("LPUSH", KEYS[2], ARGV[2]) + +if is_unique then + redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "pending", "pending_since", ARGV[3], "unique_key", KEYS[3]) +else + redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "pending", "pending_since", ARGV[3]) +end + +redis.call("LPUSH", KEYS[2], ARGV[1]) return 1 diff --git a/internal/script/enqueue_unique.lua b/internal/script/enqueue_unique.lua deleted file mode 100644 index b23761f0..00000000 --- a/internal/script/enqueue_unique.lua +++ /dev/null @@ -1,25 +0,0 @@ --- enqueueUniqueCmd enqueues the task message if the task is unique. --- --- KEYS[1] -> unique key --- KEYS[2] -> asynq:{}:t: --- KEYS[3] -> asynq:{}:pending --- -- --- ARGV[1] -> task ID --- ARGV[2] -> uniqueness lock TTL --- ARGV[3] -> task message data --- ARGV[4] -> current unix time in nsec --- --- Output: --- Returns 1 if successfully enqueued --- Returns 0 if task ID conflicts with another task --- Returns -1 if task unique key already exists -local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) -if not ok then - return -1 -end -if redis.call("EXISTS", KEYS[2]) == 1 then - return 0 -end -redis.call("HSET", KEYS[2], "msg", ARGV[3], "state", "pending", "pending_since", ARGV[4], "unique_key", KEYS[1]) -redis.call("LPUSH", KEYS[3], ARGV[1]) -return 1 diff --git a/internal/script/script.go b/internal/script/script.go index 6f834812..8e3d7a0b 100644 --- a/internal/script/script.go +++ b/internal/script/script.go @@ -45,7 +45,6 @@ func loadLuaScript(name string) (*redis.Script, error) { var ( EnqueueCmd *redis.Script - EnqueueUniqueCmd *redis.Script DequeueCmd *redis.Script DoneCmd *redis.Script DoneUniqueCmd *redis.Script @@ -73,7 +72,6 @@ var ( const ( enqueueCmd = "enqueue" - enqueueUniqueCmd = "enqueue_unique" dequeueCmd = "dequeue" doneCmd = "done" doneUniqueCmd = "done_unique" @@ -106,11 +104,6 @@ func init() { panic(err) } - EnqueueUniqueCmd, err = loadLuaScript(enqueueUniqueCmd) - if err != nil { - panic(err) - } - DequeueCmd, err = loadLuaScript(dequeueCmd) if err != nil { panic(err)