-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpgadvisorylock.go
124 lines (102 loc) · 3.92 KB
/
pgadvisorylock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package pgadvisorylock
import (
"context"
"database/sql"
"encoding/json"
"github.com/zeebo/xxh3"
)
type AdvisoryLock struct {
Pid int64 `json:"pid"` // the process id of the process that acquired the lock
ObjectID int64 `json:"objectID"` // ObjectID when using 32 bit lock with class id
ClassID int64 `json:"classID"` // ClassID when using 32 bit lock with object id
Granted bool `json:"granted"` // Whether the lock is held or not
Locktype string `json:"locktype"` // the type of lock
}
// AcquireLock acquires a session-level postgresql advisory lock
// uses pg_try_advisory_lock which returns immediately
func AcquireLockInt64(p *sql.DB, ctx context.Context, lockID int64) (bool, error) {
var isLockAquired bool = false
err := p.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1);", lockID).Scan(&isLockAquired)
if err != nil {
return false, err
}
return isLockAquired, nil
}
// AcquireLock acquires a shared session-level postgresql advisory lock
// uses pg_try_advisory_lock which returns immediately
func AcquireSharedLockInt64(p *sql.DB, ctx context.Context, lockID int64) (bool, error) {
var isLockAquired bool = false
err := p.QueryRowContext(ctx, "SELECT pg_try_advisory_lock_shared($1);", lockID).Scan(&isLockAquired)
if err != nil {
return false, err
}
return isLockAquired, nil
}
// AcquireLock acquires a session-level postgresql advisory lock
// uses pg_try_advisory_lock which returns immediately
func AcquireSharedLock(p *sql.DB, ctx context.Context, lockID string) (bool, int64, error) {
lockIDHash := int64(xxh3.HashString(lockID))
ok, err := AcquireSharedLockInt64(p, ctx, lockIDHash)
if err != nil {
return false, 0, err
}
return ok, lockIDHash, nil
}
// AcquireLock acquires a transaction-level postgresql advisory lock
// uses pg_try_advisory_xact_lock which returns immediately
func AcquireTxnLock(p *sql.Tx, ctx context.Context, lockID int64) (bool, error) {
var isLockAquired bool = false
// fmt.Println("Acquiring lock on id:", lockID)
err := p.QueryRowContext(ctx, "SELECT pg_try_advisory_xact_lock($1);", lockID).Scan(&isLockAquired)
if err != nil {
return false, err
}
return isLockAquired, nil
}
// AcquireLock acquires a session-level postgresql advisory lock
// Hashes the value with xxh3 hash to generate a unique lockID
// see: AcquireLock
func AcquireLock(p *sql.DB, ctx context.Context, lockID string) (bool, int64, error) {
lockIDHash := int64(xxh3.HashString(lockID))
ok, err := AcquireLockInt64(p, ctx, lockIDHash)
if err != nil {
return false, 0, err
}
return ok, lockIDHash, nil
}
// ReleaseLock releases an advisory lock and returns whether lock was released
// successfully or not
func ReleaseLock(p *sql.DB, ctx context.Context, lockID int64) (bool, error) {
var isLockReleased bool
err := p.QueryRowContext(ctx, "SELECT pg_advisory_unlock($1)", lockID).Scan(&isLockReleased)
if err != nil {
return false, err
}
return isLockReleased, nil
}
// ReleaseLock releases a shared session-level advisory lock
// and returns whether lock was released successfully or not
func ReleaseSharedLock(p *sql.DB, ctx context.Context, lockID int64) (bool, error) {
var isLockReleased bool
err := p.QueryRowContext(ctx, "SELECT pg_advisory_unlock_shared($1::bigint)", lockID).Scan(&isLockReleased)
if err != nil {
return false, err
}
return isLockReleased, nil
}
func FetchAdvisoryLocks(conn *sql.DB, ctx context.Context) ([]*AdvisoryLock, error) {
rows, err := conn.QueryContext(ctx, "SELECT json_build_object('objectID', objid::integer, 'classID', classid, 'pid', pid, 'granted', granted, 'locktype', locktype) FROM pg_locks WHERE locktype = 'advisory'")
advisoryLocks := make([]*AdvisoryLock, 0)
defer rows.Close()
for rows.Next() {
var jsonstring string
err = rows.Scan(&jsonstring)
if err != nil {
return nil, err
}
lock := new(AdvisoryLock)
json.Unmarshal([]byte(jsonstring), &lock)
advisoryLocks = append(advisoryLocks, lock)
}
return advisoryLocks, nil
}