Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new experimental gc friendly flatten cache #712

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b2d37ed
feat: new experimental gc friendly flatten cache
rueian Dec 30, 2024
76151e2
feat: new experimental gc friendly flatten cache
rueian Jan 5, 2025
d9d2fb0
feat: new experimental gc friendly flatten cache
rueian Jan 5, 2025
180872a
refactor
rueian Jan 17, 2025
b8175b2
pref: avoid CacheUnmarshalView in locks
rueian Jan 20, 2025
4aebfc9
perf: avoid unnecessary llTailBatch
rueian Jan 20, 2025
a7301ca
feat: batch delete expired cache with llTailBatch
rueian Jan 20, 2025
0d30713
pref: avoid CacheUnmarshalView in locks
rueian Jan 20, 2025
b9b2beb
refactor
rueian Jan 23, 2025
b91898b
feat: correct the way of handling expired cache
rueian Jan 23, 2025
2db704a
feat: correct the way of handling expired cache
rueian Jan 23, 2025
26ba6aa
perf: add map estimated entry size into cache size estimation
rueian Jan 23, 2025
35822ea
refactor: move the flatten map to internal/cache
rueian Feb 10, 2025
14b1b58
Merge branch 'main' into flatten-cache
rueian Feb 12, 2025
4fe09da
fix: flatten cache concurrency and ll
rueian Feb 12, 2025
5c44807
feat: extend the CacheStore interface to have a chance to cleanup on …
rueian Feb 12, 2025
ee6bb13
feat: delay ll and fast path on Insert
rueian Feb 12, 2025
fb5c7f5
Merge branch 'main' into flatten-cache
rueian Feb 28, 2025
212e642
chore: rename to chained
rueian Feb 28, 2025
67db02b
feat: clean lru map in batch
rueian Feb 28, 2025
b323885
test: batch lru delete
rueian Mar 1, 2025
1603eb4
fix: avoid race on adapterEntry
rueian Mar 1, 2025
1740377
docs: add a comment on NewChainedCache
rueian Mar 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 91 additions & 8 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"
"time"

"github.com/redis/rueidis/internal/cache"
)

// NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
Expand All @@ -28,7 +30,7 @@ type CacheStore interface {
// Update is called when receiving the response of the request sent by the above Flight Case 1 from redis.
// It should not only update the store but also deliver the response to all CacheEntry.Wait and return a desired client side PXAT of the response.
// Note that the server side expire time can be retrieved from RedisMessage.CachePXAT.
Update(key, cmd string, val RedisMessage) (pxat int64)
Update(key, cmd string, val RedisMessage, now time.Time) (pxat int64)
// Cancel is called when the request sent by the above Flight Case 1 failed.
// It should not only deliver the error to all CacheEntry.Wait but also remove the CacheEntry from the store.
Cancel(key, cmd string, err error)
Expand Down Expand Up @@ -89,7 +91,7 @@ func (a *adapter) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red
return RedisMessage{}, flight
}

func (a *adapter) Update(key, cmd string, val RedisMessage) (sxat int64) {
func (a *adapter) Update(key, cmd string, val RedisMessage, _ time.Time) (sxat int64) {
a.mu.Lock()
entries := a.flights[key]
if flight, ok := entries[cmd].(*adapterEntry); ok {
Expand All @@ -99,7 +101,7 @@ func (a *adapter) Update(key, cmd string, val RedisMessage) (sxat int64) {
val.setExpireAt(sxat)
}
a.store.Set(key+cmd, val)
flight.set(val, nil)
flight.setVal(val)
entries[cmd] = nil
}
a.mu.Unlock()
Expand All @@ -110,7 +112,7 @@ func (a *adapter) Cancel(key, cmd string, err error) {
a.mu.Lock()
entries := a.flights[key]
if flight, ok := entries[cmd].(*adapterEntry); ok {
flight.set(RedisMessage{}, err)
flight.setErr(err)
entries[cmd] = nil
}
a.mu.Unlock()
Expand Down Expand Up @@ -152,7 +154,7 @@ func (a *adapter) Close(err error) {
for _, entries := range flights {
for _, e := range entries {
if e != nil {
e.(*adapterEntry).set(RedisMessage{}, err)
e.(*adapterEntry).setErr(err)
}
}
}
Expand All @@ -165,16 +167,97 @@ type adapterEntry struct {
xat int64
}

func (a *adapterEntry) set(val RedisMessage, err error) {
a.err, a.val = err, val
func (a *adapterEntry) setVal(val RedisMessage) {
a.val = val
close(a.ch)
}

func (a *adapterEntry) setErr(err error) {
a.err = err
close(a.ch)
}

func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) {
ctxCh := ctx.Done()
if ctxCh == nil {
<-a.ch
return a.val, a.err
}
select {
case <-ctx.Done():
case <-ctxCh:
return RedisMessage{}, ctx.Err()
case <-a.ch:
return a.val, a.err
}
}

// NewChainedCache returns a CacheStore optimized for concurrency, memory efficiency and GC, compared to
// the default client side caching CacheStore. However, it is not yet optimized for DoMultiCache.
func NewChainedCache(limit int) CacheStore {
return &chained{
flights: cache.NewDoubleMap[*adapterEntry](64),
cache: cache.NewLRUDoubleMap[[]byte](64, int64(limit)),
}
}

type chained struct {
flights *cache.DoubleMap[*adapterEntry]
cache *cache.LRUDoubleMap[[]byte]
}

func (f *chained) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) {
ts := now.UnixMilli()
if e, ok := f.cache.Find(key, cmd, ts); ok {
var ret RedisMessage
_ = ret.CacheUnmarshalView(e)
return ret, nil
}
xat := ts + ttl.Milliseconds()
if af, ok := f.flights.FindOrInsert(key, cmd, func() *adapterEntry {
return &adapterEntry{ch: make(chan struct{}), xat: xat}
}); ok {
return RedisMessage{}, af
}
return RedisMessage{}, nil
}

func (f *chained) Update(key, cmd string, val RedisMessage, now time.Time) (sxat int64) {
if af, ok := f.flights.Find(key, cmd); ok {
sxat = val.getExpireAt()
if af.xat < sxat || sxat == 0 {
sxat = af.xat
val.setExpireAt(sxat)
}
bs := val.CacheMarshal(nil)
f.cache.Insert(key, cmd, int64(len(bs)+len(key)+len(cmd))+int64(cache.LRUEntrySize)+64, sxat, now.UnixMilli(), bs)
if f.flights.Delete(key, cmd) {
af.setVal(val)
}
}
return sxat
}

func (f *chained) Cancel(key, cmd string, err error) {
if af, ok := f.flights.Find(key, cmd); ok {
if f.flights.Delete(key, cmd) {
af.setErr(err)
}
}
}

func (f *chained) Delete(keys []RedisMessage) {
if keys == nil {
f.cache.Reset()
} else {
for _, k := range keys {
f.cache.Delete(k.string)
}
}
}

func (f *chained) Close(err error) {
f.cache.DeleteAll()
f.flights.Close(func(entry *adapterEntry) {
entry.setErr(err)
})
}
13 changes: 9 additions & 4 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func test(t *testing.T, storeFn func() CacheStore) {

v = RedisMessage{typ: '+', string: "val"}
v.setExpireAt(now.Add(time.Second).UnixMilli())
if pttl := store.Update("key", "cmd", v); pttl < now.Add(90*time.Millisecond).UnixMilli() || pttl > now.Add(100*time.Millisecond).UnixMilli() {
if pttl := store.Update("key", "cmd", v, now); pttl < now.Add(90*time.Millisecond).UnixMilli() || pttl > now.Add(100*time.Millisecond).UnixMilli() {
t.Fatal("Update should return a desired pttl")
}

Expand Down Expand Up @@ -104,8 +104,8 @@ func test(t *testing.T, storeFn func() CacheStore) {
} {
store.Flight("key", "cmd1", time.Millisecond*100, now)
store.Flight("key", "cmd2", time.Millisecond*100, now)
store.Update("key", "cmd1", RedisMessage{typ: '+', string: "val"})
store.Update("key", "cmd2", RedisMessage{typ: '+', string: "val"})
store.Update("key", "cmd1", RedisMessage{typ: '+', string: "val"}, now)
store.Update("key", "cmd2", RedisMessage{typ: '+', string: "val"}, now)

store.Delete(deletions)

Expand All @@ -130,7 +130,7 @@ func test(t *testing.T, storeFn func() CacheStore) {

v = RedisMessage{typ: '+', string: "val"}
v.setExpireAt(now.Add(time.Millisecond).UnixMilli())
store.Update("key", "cmd", v)
store.Update("key", "cmd", v, now)

v, e = store.Flight("key", "cmd", time.Second, now.Add(time.Millisecond))
if v.typ != 0 || e != nil {
Expand Down Expand Up @@ -183,6 +183,11 @@ func TestCacheStore(t *testing.T) {
return NewSimpleCacheAdapter(&simple{store: map[string]RedisMessage{}})
})
})
t.Run("FlattenCache", func(t *testing.T) {
test(t, func() CacheStore {
return NewChainedCache(DefaultCacheBytes)
})
})
}

type simple struct {
Expand Down
74 changes: 74 additions & 0 deletions internal/cache/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package cache

type node[V any] struct {
key string
next *node[V]
val V
}
type chain[V any] struct {
node[V]
}

func (h *chain[V]) find(key string) (val V, ok bool) {
if h.node.key == key {
return h.node.val, true
}
for curr := h.node.next; curr != nil; curr = curr.next {
if curr.key == key {
return curr.val, true
}
}
return val, ok
}

func (h *chain[V]) insert(key string, val V) {
if h.node.key == "" {
h.node.key = key
h.node.val = val
} else if h.node.key == key {
h.node.val = val
} else {
n := &node[V]{key: key, val: val}
n.next = h.node.next
h.node.next = n
}
}

func (h *chain[V]) empty() bool {
return h.node.next == nil && h.node.key == ""
}

func (h *chain[V]) delete(key string) (bool, bool) {
var zero V
if h.node.key == key {
h.node.key = ""
h.node.val = zero
return h.node.next == nil, true
}

if h.node.next == nil {
return h.node.key == "", false
}

if h.node.next.key == key {
h.node.next.key = ""
h.node.next.val = zero
h.node.next, h.node.next.next = h.node.next.next, nil
return h.empty(), true
}

prev := h.node.next
curr := h.node.next.next
deleted := false
for curr != nil {
if curr.key == key {
curr.key = ""
curr.val = zero
prev.next, curr.next = curr.next, nil
deleted = true
break
}
prev, curr = curr, curr.next
}
return h.empty(), deleted
}
64 changes: 64 additions & 0 deletions internal/cache/chain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cache

import (
"testing"
)

func TestChain(t *testing.T) {
h := chain[int]{}
if h.empty() != true {
t.Fatal("chain is not empty")
}
if _, ok := h.find("any"); ok {
t.Fatal("value is found")
}
if empty, deleted := h.delete("any"); !empty || deleted {
t.Fatal("not empty")
}
h.insert("1", 1)
h.insert("2", 2)
h.insert("3", 3)
if v, ok := h.find("1"); !ok || v != 1 {
t.Fatal("value is not found")
}
if v, ok := h.find("2"); !ok || v != 2 {
t.Fatal("value is not found")
}
if v, ok := h.find("3"); !ok || v != 3 {
t.Fatal("value is not found")
}
if empty, deleted := h.delete("1"); empty || !deleted {
t.Fatal("empty")
}
if _, ok := h.find("1"); ok {
t.Fatal("value is found")
}
if v, ok := h.find("2"); !ok || v != 2 {
t.Fatal("value is not found")
}
if v, ok := h.find("3"); !ok || v != 3 {
t.Fatal("value is not found")
}
if empty, deleted := h.delete("2"); empty || !deleted {
t.Fatal("empty")
}
if _, ok := h.find("2"); ok {
t.Fatal("value is found")
}
if v, ok := h.find("3"); !ok || v != 3 {
t.Fatal("value is not found")
}
h.insert("4", 4)
if v, ok := h.find("3"); !ok || v != 3 {
t.Fatal("value is not found")
}
if v, ok := h.find("4"); !ok || v != 4 {
t.Fatal("value is not found")
}
if empty, deleted := h.delete("3"); empty || !deleted {
t.Fatal("empty")
}
if empty, deleted := h.delete("4"); !empty || !deleted {
t.Fatal("not empty")
}
}
Loading
Loading