From c4f38d920c6d608a304215997f6c9aaf90dad2e5 Mon Sep 17 00:00:00 2001 From: Rafi Date: Sun, 13 Oct 2024 23:32:08 -0300 Subject: [PATCH] Remove unique lua scripts --- internal/rdb/rdb.go | 12 +++---- internal/script/add_to_group.lua | 14 ++++++++ internal/script/add_to_group_unique.lua | 26 -------------- internal/script/done.lua | 17 ++++++++- internal/script/done_unique.lua | 33 ----------------- internal/script/enqueue.lua | 1 - internal/script/mark_as_completed.lua | 15 ++++++++ internal/script/mark_as_completed_unique.lua | 37 -------------------- internal/script/schedule.lua | 25 +++++++++++-- internal/script/schedule_unique.lua | 23 ------------ internal/script/script.go | 27 -------------- 11 files changed, 72 insertions(+), 158 deletions(-) delete mode 100644 internal/script/add_to_group_unique.lua delete mode 100644 internal/script/done_unique.lua delete mode 100644 internal/script/mark_as_completed_unique.lua delete mode 100644 internal/script/schedule_unique.lua diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 6a3ef437..458a4396 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -198,7 +198,6 @@ func (r *RDB) Done(ctx context.Context, msg *base.TaskMessage) error { // Note: We cannot pass empty unique key when running this script in redis-cluster. if len(msg.UniqueKey) > 0 { keys = append(keys, msg.UniqueKey) - return r.runScript(ctx, op, script.DoneUniqueCmd, keys, argv...) } return r.runScript(ctx, op, script.DoneCmd, keys, argv...) } @@ -232,7 +231,6 @@ func (r *RDB) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error { // Note: We cannot pass empty unique key when running this script in redis-cluster. if len(msg.UniqueKey) > 0 { keys = append(keys, msg.UniqueKey) - return r.runScript(ctx, op, script.MarkAsCompleteUniqueCmd, keys, argv...) } return r.runScript(ctx, op, script.MarkAsCompleteCmd, keys, argv...) } @@ -301,7 +299,7 @@ func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, group groupKey, int(ttl.Seconds()), } - n, err := r.runScriptWithErrorCode(ctx, op, script.AddToGroupUniqueCmd, keys, argv...) + n, err := r.runScriptWithErrorCode(ctx, op, script.AddToGroupCmd, keys, argv...) if err != nil { return err } @@ -355,17 +353,17 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) } keys := []string{ - msg.UniqueKey, base.TaskKey(msg.Queue, msg.ID), base.ScheduledKey(msg.Queue), + msg.UniqueKey, } argv := []interface{}{ + encoded, + processAt.Unix(), msg.ID, int(ttl.Seconds()), - processAt.Unix(), - encoded, } - n, err := r.runScriptWithErrorCode(ctx, op, script.ScheduleUniqueCmd, keys, argv...) + n, err := r.runScriptWithErrorCode(ctx, op, script.ScheduleCmd, keys, argv...) if err != nil { return err } diff --git a/internal/script/add_to_group.lua b/internal/script/add_to_group.lua index 8099d73b..c859c1ad 100644 --- a/internal/script/add_to_group.lua +++ b/internal/script/add_to_group.lua @@ -1,18 +1,32 @@ -- KEYS[1] -> asynq:{}:t: -- KEYS[2] -> asynq:{}:g: -- KEYS[3] -> asynq:{}:groups +-- KEYS[4] -> unique key (optional, only for unique group additions) -- ------- -- ARGV[1] -> task message data -- ARGV[2] -> task ID -- ARGV[3] -> current time in Unix time -- ARGV[4] -> group key +-- ARGV[5] -> uniqueness lock TTL (optional, only for unique group additions) -- -- Output: -- Returns 1 if successfully added -- Returns 0 if task ID already exists +-- Returns -1 if task unique key already exists (only for unique group additions) + +local is_unique = (#KEYS == 4 and #ARGV == 5) + +if is_unique then + local ok = redis.call("SET", KEYS[4], ARGV[2], "NX", "EX", ARGV[5]) + if not ok then + return -1 + end +end + if redis.call("EXISTS", KEYS[1]) == 1 then return 0 end + redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "aggregating", "group", ARGV[4]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) redis.call("SADD", KEYS[3], ARGV[4]) diff --git a/internal/script/add_to_group_unique.lua b/internal/script/add_to_group_unique.lua deleted file mode 100644 index 5b3066e6..00000000 --- a/internal/script/add_to_group_unique.lua +++ /dev/null @@ -1,26 +0,0 @@ --- KEYS[1] -> asynq:{}:t: --- KEYS[2] -> asynq:{}:g: --- KEYS[3] -> asynq:{}:groups --- KEYS[4] -> unique key --- ------- --- ARGV[1] -> task message data --- ARGV[2] -> task ID --- ARGV[3] -> current time in Unix time --- ARGV[4] -> group key --- ARGV[5] -> uniqueness lock TTL --- --- Output: --- Returns 1 if successfully added --- Returns 0 if task ID already exists --- Returns -1 if task unique key already exists -local ok = redis.call("SET", KEYS[4], ARGV[2], "NX", "EX", ARGV[5]) -if not ok then - return -1 -end -if redis.call("EXISTS", KEYS[1]) == 1 then - return 0 -end -redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "aggregating", "group", ARGV[4]) -redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) -redis.call("SADD", KEYS[3], ARGV[4]) -return 1 diff --git a/internal/script/done.lua b/internal/script/done.lua index 1f086315..0fb4b974 100644 --- a/internal/script/done.lua +++ b/internal/script/done.lua @@ -3,10 +3,18 @@ -- KEYS[3] -> asynq:{}:t: -- KEYS[4] -> asynq:{}:processed: -- KEYS[5] -> asynq:{}:processed --- ------- +-- KEYS[6] -> unique key (optional, only for unique tasks) +-- -- ARGV[1] -> task ID -- ARGV[2] -> stats expiration timestamp -- ARGV[3] -> max int64 value +-- +-- Output: +-- Returns "OK" if successfully marked as done +-- Returns error if task is not found + +local is_unique = (#KEYS == 6) + if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end @@ -26,4 +34,11 @@ if tonumber(total) == tonumber(ARGV[3]) then else redis.call("INCR", KEYS[5]) end + +if is_unique then + if redis.call("GET", KEYS[6]) == ARGV[1] then + redis.call("DEL", KEYS[6]) + end +end + return redis.status_reply("OK") diff --git a/internal/script/done_unique.lua b/internal/script/done_unique.lua deleted file mode 100644 index 4cc2e65a..00000000 --- a/internal/script/done_unique.lua +++ /dev/null @@ -1,33 +0,0 @@ --- KEYS[1] -> asynq:{}:active --- KEYS[2] -> asynq:{}:lease --- KEYS[3] -> asynq:{}:t: --- KEYS[4] -> asynq:{}:processed: --- KEYS[5] -> asynq:{}:processed --- KEYS[6] -> unique key --- ------- --- ARGV[1] -> task ID --- ARGV[2] -> stats expiration timestamp --- ARGV[3] -> max int64 value -if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then - return redis.error_reply("NOT FOUND") -end -if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then - return redis.error_reply("NOT FOUND") -end -if redis.call("DEL", KEYS[3]) == 0 then - return redis.error_reply("NOT FOUND") -end -local n = redis.call("INCR", KEYS[4]) -if tonumber(n) == 1 then - redis.call("EXPIREAT", KEYS[4], ARGV[2]) -end -local total = redis.call("GET", KEYS[5]) -if tonumber(total) == tonumber(ARGV[3]) then - redis.call("SET", KEYS[5], 1) -else - redis.call("INCR", KEYS[5]) -end -if redis.call("GET", KEYS[6]) == ARGV[1] then - redis.call("DEL", KEYS[6]) -end -return redis.status_reply("OK") diff --git a/internal/script/enqueue.lua b/internal/script/enqueue.lua index 9d16ef31..a9d3949f 100644 --- a/internal/script/enqueue.lua +++ b/internal/script/enqueue.lua @@ -1,4 +1,3 @@ --- -- KEYS[1] -> asynq:{}:t: -- KEYS[2] -> asynq:{}:pending -- KEYS[3] -> unique key (optional, only for unique enqueue) diff --git a/internal/script/mark_as_completed.lua b/internal/script/mark_as_completed.lua index 2a329081..8cfdfcc6 100644 --- a/internal/script/mark_as_completed.lua +++ b/internal/script/mark_as_completed.lua @@ -4,12 +4,20 @@ -- KEYS[4] -> asynq:{}:t: -- KEYS[5] -> asynq:{}:processed: -- KEYS[6] -> asynq:{}:processed +-- KEYS[7] -> asynq:{}:unique:{} (optional, only for unique tasks) -- -- ARGV[1] -> task ID -- ARGV[2] -> stats expiration timestamp -- ARGV[3] -> task expiration time in unix time -- ARGV[4] -> task message data -- ARGV[5] -> max int64 value +-- +-- Output: +-- Returns "OK" if successfully marked as completed +-- Returns error if task is not found or internal error occurs + +local is_unique = (#KEYS == 7) + if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end @@ -30,4 +38,11 @@ if tonumber(total) == tonumber(ARGV[5]) then else redis.call("INCR", KEYS[6]) end + +if is_unique then + if redis.call("GET", KEYS[7]) == ARGV[1] then + redis.call("DEL", KEYS[7]) + end +end + return redis.status_reply("OK") diff --git a/internal/script/mark_as_completed_unique.lua b/internal/script/mark_as_completed_unique.lua deleted file mode 100644 index e946a268..00000000 --- a/internal/script/mark_as_completed_unique.lua +++ /dev/null @@ -1,37 +0,0 @@ --- KEYS[1] -> asynq:{}:active --- KEYS[2] -> asynq:{}:lease --- KEYS[3] -> asynq:{}:completed --- KEYS[4] -> asynq:{}:t: --- KEYS[5] -> asynq:{}:processed: --- KEYS[6] -> asynq:{}:processed --- KEYS[7] -> asynq:{}:unique:{} --- --- ARGV[1] -> task ID --- ARGV[2] -> stats expiration timestamp --- ARGV[3] -> task expiration time in unix time --- ARGV[4] -> task message data --- ARGV[5] -> max int64 value -if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then - return redis.error_reply("NOT FOUND") -end -if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then - return redis.error_reply("NOT FOUND") -end -if redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then - return redis.error_reply("INTERNAL") -end -redis.call("HSET", KEYS[4], "msg", ARGV[4], "state", "completed") -local n = redis.call("INCR", KEYS[5]) -if tonumber(n) == 1 then - redis.call("EXPIREAT", KEYS[5], ARGV[2]) -end -local total = redis.call("GET", KEYS[6]) -if tonumber(total) == tonumber(ARGV[5]) then - redis.call("SET", KEYS[6], 1) -else - redis.call("INCR", KEYS[6]) -end -if redis.call("GET", KEYS[7]) == ARGV[1] then - redis.call("DEL", KEYS[7]) -end -return redis.status_reply("OK") diff --git a/internal/script/schedule.lua b/internal/script/schedule.lua index e7689628..9a91df3e 100644 --- a/internal/script/schedule.lua +++ b/internal/script/schedule.lua @@ -1,16 +1,35 @@ -- KEYS[1] -> asynq:{}:t: -- KEYS[2] -> asynq:{}:scheduled +-- KEYS[3] -> unique key (optional, only for unique scheduling) -- ------- -- ARGV[1] -> task message data --- ARGV[2] -> process_at time in Unix time +-- ARGV[2] -> score (process_at timestamp) -- ARGV[3] -> task ID +-- ARGV[4] -> uniqueness lock TTL (optional, only for unique scheduling) -- -- Output: --- Returns 1 if successfully enqueued +-- Returns 1 if successfully scheduled -- Returns 0 if task ID already exists +-- Returns -1 if task unique key already exists (only for unique scheduling) + +local is_unique = (#KEYS == 3 and #ARGV == 4) + +if is_unique then + local ok = redis.call("SET", KEYS[3], ARGV[3], "NX", "EX", ARGV[4]) + if not ok then + return -1 + end +end + if redis.call("EXISTS", KEYS[1]) == 1 then return 0 end -redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "scheduled") + +if is_unique then + redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "scheduled", "unique_key", KEYS[3]) +else + redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "scheduled") +end + redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3]) return 1 diff --git a/internal/script/schedule_unique.lua b/internal/script/schedule_unique.lua deleted file mode 100644 index 071c8197..00000000 --- a/internal/script/schedule_unique.lua +++ /dev/null @@ -1,23 +0,0 @@ --- KEYS[1] -> unique key --- KEYS[2] -> asynq:{}:t: --- KEYS[3] -> asynq:{}:scheduled --- ------- --- ARGV[1] -> task ID --- ARGV[2] -> uniqueness lock TTL --- ARGV[3] -> score (process_at timestamp) --- ARGV[4] -> task message --- --- Output: --- Returns 1 if successfully scheduled --- Returns 0 if task ID already exists --- Returns -1 if task unique key already exists -local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) -if not ok then - return -1 -end -if redis.call("EXISTS", KEYS[2]) == 1 then - return 0 -end -redis.call("HSET", KEYS[2], "msg", ARGV[4], "state", "scheduled", "unique_key", KEYS[1]) -redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) -return 1 diff --git a/internal/script/script.go b/internal/script/script.go index 8e3d7a0b..e15924af 100644 --- a/internal/script/script.go +++ b/internal/script/script.go @@ -47,14 +47,10 @@ var ( EnqueueCmd *redis.Script DequeueCmd *redis.Script DoneCmd *redis.Script - DoneUniqueCmd *redis.Script MarkAsCompleteCmd *redis.Script - MarkAsCompleteUniqueCmd *redis.Script RequeueCmd *redis.Script AddToGroupCmd *redis.Script - AddToGroupUniqueCmd *redis.Script ScheduleCmd *redis.Script - ScheduleUniqueCmd *redis.Script RetryCmd *redis.Script ArchiveCmd *redis.Script ForwardCmd *redis.Script @@ -74,12 +70,9 @@ const ( enqueueCmd = "enqueue" dequeueCmd = "dequeue" doneCmd = "done" - doneUniqueCmd = "done_unique" markAsCompleteCmd = "mark_as_completed" - markAsCompleteUniqueCmd = "mark_as_completed_unique" requeueCmd = "requeue" addToGroupCmd = "add_to_group" - addToGroupUniqueCmd = "add_to_group_unique" scheduleCmd = "schedule" scheduleUniqueCmd = "schedule_unique" retryCmd = "retry" @@ -114,21 +107,11 @@ func init() { panic(err) } - DoneUniqueCmd, err = loadLuaScript(doneUniqueCmd) - if err != nil { - panic(err) - } - MarkAsCompleteCmd, err = loadLuaScript(markAsCompleteCmd) if err != nil { panic(err) } - MarkAsCompleteUniqueCmd, err = loadLuaScript(markAsCompleteUniqueCmd) - if err != nil { - panic(err) - } - RequeueCmd, err = loadLuaScript(requeueCmd) if err != nil { panic(err) @@ -139,21 +122,11 @@ func init() { panic(err) } - AddToGroupUniqueCmd, err = loadLuaScript(addToGroupUniqueCmd) - if err != nil { - panic(err) - } - ScheduleCmd, err = loadLuaScript(scheduleCmd) if err != nil { panic(err) } - ScheduleUniqueCmd, err = loadLuaScript(scheduleUniqueCmd) - if err != nil { - panic(err) - } - RetryCmd, err = loadLuaScript(retryCmd) if err != nil { panic(err)