Skip to content

Commit

Permalink
ProcessCache: Remove stale entries
Browse files Browse the repository at this point in the history
Add stale entry removal functionality to the process cache. This won't
affect normal operations, but in the event that a custom ref count has been
placed on an entry and not removed (for whatever reason, maybe logic
error, maybe missed event, etc), remove entries where the process and
all its children have exited at least 10 minutes ago. Record a metric
when entries are removed.

Signed-off-by: Kevin Sheldrake <kevin.sheldrake@isovalent.com>
  • Loading branch information
kevsecurity committed Sep 25, 2024
1 parent 5c4652c commit 0812698
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 20 deletions.
63 changes: 56 additions & 7 deletions pkg/process/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
)

type Cache struct {
cache *lru.Cache[string, *ProcessInternal]
size int
deleteChan chan *ProcessInternal
stopChan chan bool
cache *lru.Cache[string, *ProcessInternal]
size int
deleteChan chan *ProcessInternal
stopChan chan bool
staleIntervalLastTime time.Time
}

// garbage collection states
Expand All @@ -40,9 +41,15 @@ var colorStr = map[int]string{
deleted: "deleted",
}

// garbage collection run interval
const (
// garbage collection run interval
intervalGC = time.Second * 30
// garbage collection stale entry check interval.
// We will check for stale entries once per interval.
staleIntervalGC = time.Hour * 1
// garbage collection stale entry max time.
// If a stale entry is older than the max time, then it will be removed.
staleIntervalMaxTime = time.Minute * 10
)

func (pc *Cache) cacheGarbageCollector() {
Expand Down Expand Up @@ -92,6 +99,11 @@ func (pc *Cache) cacheGarbageCollector() {
}
}
deleteQueue = newQueue
// Check if it is time to clean stale entries.
if pc.staleIntervalLastTime.Add(staleIntervalGC).Before(time.Now()) {
pc.cleanStaleEntries()
pc.staleIntervalLastTime = time.Now()
}
case p := <-pc.deleteChan:
// duplicate deletes can happen, if they do reset
// color to pending and move along. This will cause
Expand All @@ -117,6 +129,37 @@ func (pc *Cache) cacheGarbageCollector() {
}()
}

func (pc *Cache) cleanStaleEntries() {
var deleteProcesses []*ProcessInternal
for _, v := range pc.cache.Values() {
v.refcntOpsLock.Lock()
if processAndChildrenHaveExited(v) && v.exitTime.Add(staleIntervalMaxTime).Before(time.Now()) {
deleteProcesses = append(deleteProcesses, v)
}
v.refcntOpsLock.Unlock()
}
for _, d := range deleteProcesses {
processCacheRemovedStale.Inc()
pc.remove(d.process)
}
}

// Must be called with a lock held on p.refcntOpsLock
func processAndChildrenHaveExited(p *ProcessInternal) bool {
if p.refcntOps["process++"] == p.refcntOps["process--"] &&
p.refcntOps["parent++"] == p.refcntOps["parent--"] {
return true
}
return false
}

func processOrChildExit(reason string) bool {
if reason == "process--" || reason == "parent--" {
return true
}
return false
}

func (pc *Cache) deletePending(process *ProcessInternal) {
pc.deleteChan <- process
}
Expand All @@ -125,6 +168,11 @@ func (pc *Cache) refDec(p *ProcessInternal, reason string) {
p.refcntOpsLock.Lock()
// count number of times refcnt is decremented for a specific reason (i.e. process, parent, etc.)
p.refcntOps[reason]++
// if this is a process/child exit, check if the process and all its children have exited.
// If so, store the exit time so we can tell if this process becomes a stale entry.
if processOrChildExit(reason) && processAndChildrenHaveExited(p) {
p.exitTime = time.Now()
}
p.refcntOpsLock.Unlock()
ref := atomic.AddUint32(&p.refcnt, ^uint32(0))
if ref == 0 {
Expand Down Expand Up @@ -158,8 +206,9 @@ func NewCache(
return nil, err
}
pm := &Cache{
cache: lruCache,
size: processCacheSize,
cache: lruCache,
size: processCacheSize,
staleIntervalLastTime: time.Now(),
}
pm.cacheGarbageCollector()
return pm, nil
Expand Down
165 changes: 152 additions & 13 deletions pkg/process/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,182 @@
package process

import (
"strings"
"testing"
"time"

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/wrapperspb"
)

func TestProcessCache(t *testing.T) {
// add a process to the cache.
cache, err := NewCache(10)
require.NoError(t, err)
pid := wrapperspb.UInt32Value{Value: 1234}
execID := "process1"
func createFakeProcess(pid uint32, execID string) *ProcessInternal {
proc := ProcessInternal{
process: &tetragon.Process{
ExecId: execID,
Pid: &pid,
Pid: &wrapperspb.UInt32Value{Value: pid},
},
capabilities: &tetragon.Capabilities{
Permitted: []tetragon.CapabilitiesType{
tetragon.CapabilitiesType_CAP_AUDIT_READ,
tetragon.CapabilitiesType_CAP_AUDIT_WRITE,
},
},
refcntOps: make(map[string]int32),
}
cache.add(&proc)
return &proc
}

func addFakeProcess(pc *Cache, pid uint32, execID string) {
proc := createFakeProcess(pid, execID)
pc.add(proc)
}

func TestProcessCacheAddAndRemove(t *testing.T) {
// add a process to the cache.
execId := "process1"
cache, err := NewCache(10)
require.NoError(t, err)
addFakeProcess(cache, 1234, execId)
assert.Equal(t, cache.len(), 1)

result, err := cache.get(proc.process.ExecId)
result, err := cache.get(execId)
assert.NoError(t, err)
assert.Equal(t, proc.process.ExecId, result.process.ExecId)
assert.Equal(t, proc.capabilities, result.capabilities)
assert.Equal(t, execId, result.process.ExecId)
assert.Equal(t, result.capabilities,
&tetragon.Capabilities{
Permitted: []tetragon.CapabilitiesType{
tetragon.CapabilitiesType_CAP_AUDIT_READ,
tetragon.CapabilitiesType_CAP_AUDIT_WRITE,
},
})

// remove the entry from cache.
assert.True(t, cache.remove(proc.process))
assert.True(t, cache.remove(result.process))
assert.Equal(t, cache.len(), 0)
_, err = cache.get(proc.process.ExecId)
_, err = cache.get(execId)
assert.Error(t, err)
}

func TestProcessCacheProcessOrChildExit(t *testing.T) {
assert.True(t, processOrChildExit("process--"))
assert.True(t, processOrChildExit("parent--"))
assert.False(t, processOrChildExit("process++"))
assert.False(t, processOrChildExit("parent++"))
assert.False(t, processOrChildExit("process-x--"))
}

func TestProcessCacheProcessAndChildrenHaveExited(t *testing.T) {
p := createFakeProcess(123, "myProc")
// empty (invalid) process should be counted as exited (this situation shouldn't occur)
assert.True(t, processAndChildrenHaveExited(p))
p.refcntOps["process++"] = 1
// active process has not exited
assert.False(t, processAndChildrenHaveExited(p))
p.refcntOps["process--"] = 1
// completed process has exited
assert.True(t, processAndChildrenHaveExited(p))
// set the process as active again
p.refcntOps["process--"] = 0
p.refcntOps["parent++"] = 1
// active process with child has not exited
assert.False(t, processAndChildrenHaveExited(p))
p.refcntOps["parent--"] = 1
// active process with exited child has not exited
assert.False(t, processAndChildrenHaveExited(p))
p.refcntOps["process--"] = 1
// completed process with exited child has exited
assert.True(t, processAndChildrenHaveExited(p))
}

func TestProcessCacheRemoveStale(t *testing.T) {
// add some processes to the cache.
execId := []string{"process1", "process2", "process3", "process4", "process5", "process6", "process7"}
var p []*ProcessInternal
cache, err := NewCache(10)
require.NoError(t, err)

p = append(p, createFakeProcess(1234, execId[0]))
p[0].refcntOps["process++"] = 1 // process started (but not exited)
cache.add(p[0])
assert.Equal(t, cache.len(), 1)

p = append(p, createFakeProcess(1235, execId[1]))
p[1].refcntOps["process++"] = 1 // process started
p[1].refcntOps["process--"] = 1 // process exited
p[1].exitTime = time.Now().Add(-20 * time.Minute) // exitTime is 20 minutes ago (stale)
cache.add(p[1])
assert.Equal(t, cache.len(), 2)

p = append(p, createFakeProcess(1236, execId[2]))
p[2].refcntOps["process++"] = 1 // process started
p[2].refcntOps["process--"] = 1 // process exited
p[2].exitTime = time.Now().Add(-2 * time.Minute) // exitTime is 2 minutes ago (not stale)
cache.add(p[2])
assert.Equal(t, cache.len(), 3)

p = append(p, createFakeProcess(1237, execId[3]))
p[3].refcntOps["process++"] = 1 // process started
p[3].refcntOps["process--"] = 1 // process exited
p[3].refcntOps["parent++"] = 1 // child started
p[3].exitTime = time.Now().Add(-20 * time.Minute) // exitTime is 20 minutes ago (but process is not stale)
cache.add(p[3])
assert.Equal(t, cache.len(), 4)

p = append(p, createFakeProcess(1238, execId[4]))
p[4].refcntOps["process++"] = 1 // process started
p[4].refcntOps["process--"] = 1 // process exited
p[4].refcntOps["parent++"] = 1 // child started
p[4].exitTime = time.Now().Add(-2 * time.Minute) // exitTime is 2 minutes ago (not stale)
cache.add(p[4])
assert.Equal(t, cache.len(), 5)

p = append(p, createFakeProcess(1239, execId[5]))
p[5].refcntOps["process++"] = 1 // process started
p[5].refcntOps["process--"] = 1 // process exited
p[5].refcntOps["parent++"] = 1 // child started
p[5].refcntOps["parent--"] = 1 // child exited
p[5].exitTime = time.Now().Add(-20 * time.Minute) // exitTime is 20 minutes ago (stale)
cache.add(p[5])
assert.Equal(t, cache.len(), 6)

p = append(p, createFakeProcess(1240, execId[6]))
p[6].refcntOps["process++"] = 1 // process started
p[6].refcntOps["process--"] = 1 // process exited
p[6].refcntOps["parent++"] = 1 // child started
p[6].refcntOps["parent--"] = 1 // child exited
p[6].exitTime = time.Now().Add(-2 * time.Minute) // exitTime is 2 minutes ago (not stale)
cache.add(p[6])
assert.Equal(t, cache.len(), 7)

// confirm entries are in cache
for i := 0; i < 7; i++ {
result, err := cache.get(execId[i])
assert.NoError(t, err)
assert.Equal(t, execId[i], result.process.ExecId)
}

// remove stale entries from cache.
cache.cleanStaleEntries()

// two entries should have been removed
assert.Equal(t, cache.len(), 5)
assert.NoError(t, testutil.CollectAndCompare(processCacheRemovedStale, strings.NewReader(`# HELP tetragon_process_cache_removed_stale_total Number of process cache stale entries removed.
# TYPE tetragon_process_cache_removed_stale_total counter
tetragon_process_cache_removed_stale_total 2
`)))

// confirm entries are in cache
for i := 0; i < 7; i++ {
result, err := cache.get(execId[i])
switch i {
case 0, 2, 3, 4, 6:
assert.NoError(t, err)
assert.Equal(t, execId[i], result.process.ExecId)
case 1, 5:
assert.Error(t, err)
}
}
}
6 changes: 6 additions & 0 deletions pkg/process/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ var (
"Number of process cache misses.",
nil, []metrics.ConstrainedLabel{operationLabel}, nil,
), nil)
processCacheRemovedStale = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: consts.MetricsNamespace,
Name: "process_cache_removed_stale_total",
Help: "Number of process cache stale entries removed.",
})
)

func newCacheCollector() prometheus.Collector {
Expand All @@ -59,6 +64,7 @@ func RegisterMetrics(group metrics.Group) {
processCacheTotal,
processCacheEvictions,
processCacheMisses,
processCacheRemovedStale,
)
group.MustRegister(newCacheCollector())
}
3 changes: 3 additions & 0 deletions pkg/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cilium/tetragon/pkg/cgidmap"
"github.com/cilium/tetragon/pkg/fieldfilters"
Expand Down Expand Up @@ -62,6 +63,8 @@ type ProcessInternal struct {
// protects the refcntOps map
refcntOpsLock sync.Mutex
cgID uint64
// the time the process and all children exited
exitTime time.Time
}

var (
Expand Down

0 comments on commit 0812698

Please sign in to comment.