Skip to content

Commit

Permalink
Remove unique lua scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
Joker666 committed Oct 14, 2024
1 parent 80e543f commit c4f38d9
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 158 deletions.
12 changes: 5 additions & 7 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func (r *RDB) Done(ctx context.Context, msg *base.TaskMessage) error {
// Note: We cannot pass empty unique key when running this script in redis-cluster.
if len(msg.UniqueKey) > 0 {
keys = append(keys, msg.UniqueKey)
return r.runScript(ctx, op, script.DoneUniqueCmd, keys, argv...)
}
return r.runScript(ctx, op, script.DoneCmd, keys, argv...)
}
Expand Down Expand Up @@ -232,7 +231,6 @@ func (r *RDB) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error {
// Note: We cannot pass empty unique key when running this script in redis-cluster.
if len(msg.UniqueKey) > 0 {
keys = append(keys, msg.UniqueKey)
return r.runScript(ctx, op, script.MarkAsCompleteUniqueCmd, keys, argv...)
}
return r.runScript(ctx, op, script.MarkAsCompleteCmd, keys, argv...)
}
Expand Down Expand Up @@ -301,7 +299,7 @@ func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, group
groupKey,
int(ttl.Seconds()),
}
n, err := r.runScriptWithErrorCode(ctx, op, script.AddToGroupUniqueCmd, keys, argv...)
n, err := r.runScriptWithErrorCode(ctx, op, script.AddToGroupCmd, keys, argv...)
if err != nil {
return err
}
Expand Down Expand Up @@ -355,17 +353,17 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
keys := []string{
msg.UniqueKey,
base.TaskKey(msg.Queue, msg.ID),
base.ScheduledKey(msg.Queue),
msg.UniqueKey,
}
argv := []interface{}{
encoded,
processAt.Unix(),
msg.ID,
int(ttl.Seconds()),
processAt.Unix(),
encoded,
}
n, err := r.runScriptWithErrorCode(ctx, op, script.ScheduleUniqueCmd, keys, argv...)
n, err := r.runScriptWithErrorCode(ctx, op, script.ScheduleCmd, keys, argv...)
if err != nil {
return err
}
Expand Down
14 changes: 14 additions & 0 deletions internal/script/add_to_group.lua
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
-- KEYS[1] -> asynq:{<queueName>}:t:<task_id>
-- KEYS[2] -> asynq:{<queueName>}:g:<group_key>
-- KEYS[3] -> asynq:{<queueName>}:groups
-- KEYS[4] -> unique key (optional, only for unique group additions)
-- -------
-- ARGV[1] -> task message data
-- ARGV[2] -> task ID
-- ARGV[3] -> current time in Unix time
-- ARGV[4] -> group key
-- ARGV[5] -> uniqueness lock TTL (optional, only for unique group additions)
--
-- Output:
-- Returns 1 if successfully added
-- Returns 0 if task ID already exists
-- Returns -1 if task unique key already exists (only for unique group additions)

local is_unique = (#KEYS == 4 and #ARGV == 5)

if is_unique then
local ok = redis.call("SET", KEYS[4], ARGV[2], "NX", "EX", ARGV[5])
if not ok then
return -1
end
end

if redis.call("EXISTS", KEYS[1]) == 1 then
return 0
end

redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "aggregating", "group", ARGV[4])
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
redis.call("SADD", KEYS[3], ARGV[4])
Expand Down
26 changes: 0 additions & 26 deletions internal/script/add_to_group_unique.lua

This file was deleted.

17 changes: 16 additions & 1 deletion internal/script/done.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,18 @@
-- KEYS[3] -> asynq:{<queueName>}:t:<task_id>
-- KEYS[4] -> asynq:{<queueName>}:processed:<yyyy-mm-dd>
-- KEYS[5] -> asynq:{<queueName>}:processed
-- -------
-- KEYS[6] -> unique key (optional, only for unique tasks)
--
-- ARGV[1] -> task ID
-- ARGV[2] -> stats expiration timestamp
-- ARGV[3] -> max int64 value
--
-- Output:
-- Returns "OK" if successfully marked as done
-- Returns error if task is not found

local is_unique = (#KEYS == 6)

if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
Expand All @@ -26,4 +34,11 @@ if tonumber(total) == tonumber(ARGV[3]) then
else
redis.call("INCR", KEYS[5])
end

if is_unique then
if redis.call("GET", KEYS[6]) == ARGV[1] then
redis.call("DEL", KEYS[6])
end
end

return redis.status_reply("OK")
33 changes: 0 additions & 33 deletions internal/script/done_unique.lua

This file was deleted.

1 change: 0 additions & 1 deletion internal/script/enqueue.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
--
-- KEYS[1] -> asynq:{<queueName>}:t:<task_id>
-- KEYS[2] -> asynq:{<queueName>}:pending
-- KEYS[3] -> unique key (optional, only for unique enqueue)
Expand Down
15 changes: 15 additions & 0 deletions internal/script/mark_as_completed.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@
-- KEYS[4] -> asynq:{<queueName>}:t:<task_id>
-- KEYS[5] -> asynq:{<queueName>}:processed:<yyyy-mm-dd>
-- KEYS[6] -> asynq:{<queueName>}:processed
-- KEYS[7] -> asynq:{<queueName>}:unique:{<checksum>} (optional, only for unique tasks)
--
-- ARGV[1] -> task ID
-- ARGV[2] -> stats expiration timestamp
-- ARGV[3] -> task expiration time in unix time
-- ARGV[4] -> task message data
-- ARGV[5] -> max int64 value
--
-- Output:
-- Returns "OK" if successfully marked as completed
-- Returns error if task is not found or internal error occurs

local is_unique = (#KEYS == 7)

if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
Expand All @@ -30,4 +38,11 @@ if tonumber(total) == tonumber(ARGV[5]) then
else
redis.call("INCR", KEYS[6])
end

if is_unique then
if redis.call("GET", KEYS[7]) == ARGV[1] then
redis.call("DEL", KEYS[7])
end
end

return redis.status_reply("OK")
37 changes: 0 additions & 37 deletions internal/script/mark_as_completed_unique.lua

This file was deleted.

25 changes: 22 additions & 3 deletions internal/script/schedule.lua
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
-- KEYS[1] -> asynq:{<queueName>}:t:<task_id>
-- KEYS[2] -> asynq:{<queueName>}:scheduled
-- KEYS[3] -> unique key (optional, only for unique scheduling)
-- -------
-- ARGV[1] -> task message data
-- ARGV[2] -> process_at time in Unix time
-- ARGV[2] -> score (process_at timestamp)
-- ARGV[3] -> task ID
-- ARGV[4] -> uniqueness lock TTL (optional, only for unique scheduling)
--
-- Output:
-- Returns 1 if successfully enqueued
-- Returns 1 if successfully scheduled
-- Returns 0 if task ID already exists
-- Returns -1 if task unique key already exists (only for unique scheduling)

local is_unique = (#KEYS == 3 and #ARGV == 4)

if is_unique then
local ok = redis.call("SET", KEYS[3], ARGV[3], "NX", "EX", ARGV[4])
if not ok then
return -1
end
end

if redis.call("EXISTS", KEYS[1]) == 1 then
return 0
end
redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "scheduled")

if is_unique then
redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "scheduled", "unique_key", KEYS[3])
else
redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "scheduled")
end

redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
return 1
23 changes: 0 additions & 23 deletions internal/script/schedule_unique.lua

This file was deleted.

27 changes: 0 additions & 27 deletions internal/script/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,10 @@ var (
EnqueueCmd *redis.Script
DequeueCmd *redis.Script
DoneCmd *redis.Script
DoneUniqueCmd *redis.Script
MarkAsCompleteCmd *redis.Script
MarkAsCompleteUniqueCmd *redis.Script
RequeueCmd *redis.Script
AddToGroupCmd *redis.Script
AddToGroupUniqueCmd *redis.Script
ScheduleCmd *redis.Script
ScheduleUniqueCmd *redis.Script
RetryCmd *redis.Script
ArchiveCmd *redis.Script
ForwardCmd *redis.Script
Expand All @@ -74,12 +70,9 @@ const (
enqueueCmd = "enqueue"
dequeueCmd = "dequeue"
doneCmd = "done"
doneUniqueCmd = "done_unique"
markAsCompleteCmd = "mark_as_completed"
markAsCompleteUniqueCmd = "mark_as_completed_unique"
requeueCmd = "requeue"
addToGroupCmd = "add_to_group"
addToGroupUniqueCmd = "add_to_group_unique"
scheduleCmd = "schedule"
scheduleUniqueCmd = "schedule_unique"
retryCmd = "retry"
Expand Down Expand Up @@ -114,21 +107,11 @@ func init() {
panic(err)
}

DoneUniqueCmd, err = loadLuaScript(doneUniqueCmd)
if err != nil {
panic(err)
}

MarkAsCompleteCmd, err = loadLuaScript(markAsCompleteCmd)
if err != nil {
panic(err)
}

MarkAsCompleteUniqueCmd, err = loadLuaScript(markAsCompleteUniqueCmd)
if err != nil {
panic(err)
}

RequeueCmd, err = loadLuaScript(requeueCmd)
if err != nil {
panic(err)
Expand All @@ -139,21 +122,11 @@ func init() {
panic(err)
}

AddToGroupUniqueCmd, err = loadLuaScript(addToGroupUniqueCmd)
if err != nil {
panic(err)
}

ScheduleCmd, err = loadLuaScript(scheduleCmd)
if err != nil {
panic(err)
}

ScheduleUniqueCmd, err = loadLuaScript(scheduleUniqueCmd)
if err != nil {
panic(err)
}

RetryCmd, err = loadLuaScript(retryCmd)
if err != nil {
panic(err)
Expand Down

0 comments on commit c4f38d9

Please sign in to comment.