Skip to content

Commit

Permalink
Remove enqueue unique
Browse files Browse the repository at this point in the history
  • Loading branch information
Joker666 committed Oct 14, 2024
1 parent 4f9fad2 commit 80e543f
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 43 deletions.
8 changes: 4 additions & 4 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
base.PendingKey(msg.Queue),
}
argv := []interface{}{
encoded,
msg.ID,
encoded,
r.clock.Now().UnixNano(),
}
n, err := r.runScriptWithErrorCode(ctx, op, script.EnqueueCmd, keys, argv...)
Expand All @@ -118,17 +118,17 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
keys := []string{
msg.UniqueKey,
base.TaskKey(msg.Queue, msg.ID),
base.PendingKey(msg.Queue),
msg.UniqueKey,
}
argv := []interface{}{
msg.ID,
int(ttl.Seconds()),
encoded,
r.clock.Now().UnixNano(),
int(ttl.Seconds()),
}
n, err := r.runScriptWithErrorCode(ctx, op, script.EnqueueUniqueCmd, keys, argv...)
n, err := r.runScriptWithErrorCode(ctx, op, script.EnqueueCmd, keys, argv...)
if err != nil {
return err
}
Expand Down
32 changes: 25 additions & 7 deletions internal/script/enqueue.lua
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
-- enqueueCmd enqueues a given task message.
--
-- Input:
-- KEYS[1] -> asynq:{<queueName>}:t:<task_id>
-- KEYS[2] -> asynq:{<queueName>}:pending
-- --
-- ARGV[1] -> task message data
-- ARGV[2] -> task ID
-- KEYS[3] -> unique key (optional, only for unique enqueue)
--
-- ARGV[1] -> task ID
-- ARGV[2] -> task message data
-- ARGV[3] -> current unix time in nsec
-- ARGV[4] -> uniqueness lock TTL (optional, only for unique enqueue)
--
-- Output:
-- Returns 1 if successfully enqueued
-- Returns 0 if task ID already exists
-- Returns -1 if task unique key already exists (only for unique enqueue)

local is_unique = (#KEYS == 3 and #ARGV == 4)

if is_unique then
-- Unique enqueue logic
local ok = redis.call("SET", KEYS[3], ARGV[1], "NX", "EX", ARGV[4])
if not ok then
return -1
end
end

if redis.call("EXISTS", KEYS[1]) == 1 then
return 0
end
redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "pending", "pending_since", ARGV[3])
redis.call("LPUSH", KEYS[2], ARGV[2])

if is_unique then
redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "pending", "pending_since", ARGV[3], "unique_key", KEYS[3])
else
redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "pending", "pending_since", ARGV[3])
end

redis.call("LPUSH", KEYS[2], ARGV[1])
return 1
25 changes: 0 additions & 25 deletions internal/script/enqueue_unique.lua

This file was deleted.

7 changes: 0 additions & 7 deletions internal/script/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func loadLuaScript(name string) (*redis.Script, error) {

var (
EnqueueCmd *redis.Script
EnqueueUniqueCmd *redis.Script
DequeueCmd *redis.Script
DoneCmd *redis.Script
DoneUniqueCmd *redis.Script
Expand Down Expand Up @@ -73,7 +72,6 @@ var (

const (
enqueueCmd = "enqueue"
enqueueUniqueCmd = "enqueue_unique"
dequeueCmd = "dequeue"
doneCmd = "done"
doneUniqueCmd = "done_unique"
Expand Down Expand Up @@ -106,11 +104,6 @@ func init() {
panic(err)
}

EnqueueUniqueCmd, err = loadLuaScript(enqueueUniqueCmd)
if err != nil {
panic(err)
}

DequeueCmd, err = loadLuaScript(dequeueCmd)
if err != nil {
panic(err)
Expand Down

0 comments on commit 80e543f

Please sign in to comment.