Skip to content

Commit

Permalink
ProcessCache: Remove stale entries
Browse files Browse the repository at this point in the history
Add stale entry removal functionality to the process cache. This won't
affect normal operations, but in the event that a custom ref count has been
placed on an entry and not removed (for whatever reason, maybe logic
error, maybe missed event, etc), remove entries where the process and
all its children have exited at least 10 minutes ago. Record a metric
when entries are removed.

Signed-off-by: Kevin Sheldrake <kevin.sheldrake@isovalent.com>
  • Loading branch information
kevsecurity committed Oct 1, 2024
1 parent 5c4652c commit becaae5
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 21 deletions.
4 changes: 4 additions & 0 deletions docs/content/en/docs/reference/metrics.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions docs/data/tetragon_flags.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package defaults

import "time"

const (
// DefaultMapRoot is the default path where BPFFS should be mounted
DefaultMapRoot = "/sys/fs/bpf"
Expand Down Expand Up @@ -49,6 +51,9 @@ const (
// defaults for the event cache
DefaultEventCacheNumRetries = 15
DefaultEventCacheRetryDelay = 2

// defaults for process cache
DefaultProcessCacheStaleInterval = time.Duration(60 * time.Minute)
)

var (
Expand Down
7 changes: 6 additions & 1 deletion pkg/option/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type config struct {

EventCacheNumRetries int
EventCacheRetryDelay int

ProcessCacheStaleInterval time.Duration
}

var (
Expand All @@ -122,10 +124,13 @@ var (
// Enable all metrics labels by default
MetricsLabelFilter: DefaultLabelFilter(),

// set default valus for the event cache
// set default values for the event cache
// mainly used in the case of testing
EventCacheNumRetries: defaults.DefaultEventCacheNumRetries,
EventCacheRetryDelay: defaults.DefaultEventCacheRetryDelay,

// set default values for the process cache
ProcessCacheStaleInterval: defaults.DefaultProcessCacheStaleInterval,
}
)

Expand Down
6 changes: 6 additions & 0 deletions pkg/option/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ const (

KeyEventCacheRetries = "event-cache-retries"
KeyEventCacheRetryDelay = "event-cache-retry-delay"

KeyProcessCacheStaleInterval = "process-cache-stale-interval"
)

type UsernameMetadaCode int
Expand Down Expand Up @@ -245,6 +247,8 @@ func ReadAndSetFlags() error {
Config.EventCacheNumRetries = viper.GetInt(KeyEventCacheRetries)
Config.EventCacheRetryDelay = viper.GetInt(KeyEventCacheRetryDelay)

Config.ProcessCacheStaleInterval = viper.GetDuration(KeyProcessCacheStaleInterval)

return nil
}

Expand Down Expand Up @@ -411,4 +415,6 @@ func AddFlags(flags *pflag.FlagSet) {

flags.Int(KeyEventCacheRetries, defaults.DefaultEventCacheNumRetries, "Number of retries for event cache")
flags.Int(KeyEventCacheRetryDelay, defaults.DefaultEventCacheRetryDelay, "Delay in seconds between event cache retries")

flags.Duration(KeyProcessCacheStaleInterval, defaults.DefaultProcessCacheStaleInterval, "Interval between stale process cache checks")
}
56 changes: 49 additions & 7 deletions pkg/process/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ import (
"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/defaults"
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/sensors/exec/execvemap"
lru "github.com/hashicorp/golang-lru/v2"
"google.golang.org/protobuf/types/known/wrapperspb"
)

type Cache struct {
cache *lru.Cache[string, *ProcessInternal]
size int
deleteChan chan *ProcessInternal
stopChan chan bool
cache *lru.Cache[string, *ProcessInternal]
size int
deleteChan chan *ProcessInternal
stopChan chan bool
staleIntervalLastTime time.Time
}

// garbage collection states
Expand All @@ -40,9 +42,12 @@ var colorStr = map[int]string{
deleted: "deleted",
}

// garbage collection run interval
const (
// garbage collection run interval
intervalGC = time.Second * 30
// garbage collection stale entry max time.
// If a stale entry is older than the max time, then it will be removed.
staleIntervalMaxTime = time.Minute * 10
)

func (pc *Cache) cacheGarbageCollector() {
Expand Down Expand Up @@ -92,6 +97,12 @@ func (pc *Cache) cacheGarbageCollector() {
}
}
deleteQueue = newQueue
// Check if it is time to clean stale entries.
if option.Config.ProcessCacheStaleInterval > 0 &&
pc.staleIntervalLastTime.Add(option.Config.ProcessCacheStaleInterval).Before(time.Now()) {
pc.cleanStaleEntries()
pc.staleIntervalLastTime = time.Now()
}
case p := <-pc.deleteChan:
// duplicate deletes can happen, if they do reset
// color to pending and move along. This will cause
Expand All @@ -117,6 +128,31 @@ func (pc *Cache) cacheGarbageCollector() {
}()
}

func (pc *Cache) cleanStaleEntries() {
var deleteProcesses []*ProcessInternal
for _, v := range pc.cache.Values() {
v.refcntOpsLock.Lock()
if processAndChildrenHaveExited(v) && !v.exitTime.IsZero() && v.exitTime.Add(staleIntervalMaxTime).Before(time.Now()) {
deleteProcesses = append(deleteProcesses, v)
}
v.refcntOpsLock.Unlock()
}
for _, d := range deleteProcesses {
processCacheRemovedStale.Inc()
pc.remove(d.process)
}
}

// Must be called with a lock held on p.refcntOpsLock
func processAndChildrenHaveExited(p *ProcessInternal) bool {
return p.refcntOps["process++"] == p.refcntOps["process--"] &&
p.refcntOps["parent++"] == p.refcntOps["parent--"]
}

func processOrChildExit(reason string) bool {
return reason == "process--" || reason == "parent--"
}

func (pc *Cache) deletePending(process *ProcessInternal) {
pc.deleteChan <- process
}
Expand All @@ -125,6 +161,11 @@ func (pc *Cache) refDec(p *ProcessInternal, reason string) {
p.refcntOpsLock.Lock()
// count number of times refcnt is decremented for a specific reason (i.e. process, parent, etc.)
p.refcntOps[reason]++
// if this is a process/child exit, check if the process and all its children have exited.
// If so, store the exit time so we can tell if this process becomes a stale entry.
if processOrChildExit(reason) && processAndChildrenHaveExited(p) {
p.exitTime = time.Now()
}
p.refcntOpsLock.Unlock()
ref := atomic.AddUint32(&p.refcnt, ^uint32(0))
if ref == 0 {
Expand Down Expand Up @@ -158,8 +199,9 @@ func NewCache(
return nil, err
}
pm := &Cache{
cache: lruCache,
size: processCacheSize,
cache: lruCache,
size: processCacheSize,
staleIntervalLastTime: time.Now(),
}
pm.cacheGarbageCollector()
return pm, nil
Expand Down
165 changes: 152 additions & 13 deletions pkg/process/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,182 @@
package process

import (
"strings"
"testing"
"time"

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/wrapperspb"
)

func TestProcessCache(t *testing.T) {
// add a process to the cache.
cache, err := NewCache(10)
require.NoError(t, err)
pid := wrapperspb.UInt32Value{Value: 1234}
execID := "process1"
func createFakeProcess(pid uint32, execID string) *ProcessInternal {
proc := ProcessInternal{
process: &tetragon.Process{
ExecId: execID,
Pid: &pid,
Pid: &wrapperspb.UInt32Value{Value: pid},
},
capabilities: &tetragon.Capabilities{
Permitted: []tetragon.CapabilitiesType{
tetragon.CapabilitiesType_CAP_AUDIT_READ,
tetragon.CapabilitiesType_CAP_AUDIT_WRITE,
},
},
refcntOps: make(map[string]int32),
}
cache.add(&proc)
return &proc
}

func addFakeProcess(pc *Cache, pid uint32, execID string) {
proc := createFakeProcess(pid, execID)
pc.add(proc)
}

func TestProcessCacheAddAndRemove(t *testing.T) {
// add a process to the cache.
execId := "process1"
cache, err := NewCache(10)
require.NoError(t, err)
addFakeProcess(cache, 1234, execId)
assert.Equal(t, cache.len(), 1)

result, err := cache.get(proc.process.ExecId)
result, err := cache.get(execId)
assert.NoError(t, err)
assert.Equal(t, proc.process.ExecId, result.process.ExecId)
assert.Equal(t, proc.capabilities, result.capabilities)
assert.Equal(t, execId, result.process.ExecId)
assert.Equal(t, result.capabilities,
&tetragon.Capabilities{
Permitted: []tetragon.CapabilitiesType{
tetragon.CapabilitiesType_CAP_AUDIT_READ,
tetragon.CapabilitiesType_CAP_AUDIT_WRITE,
},
})

// remove the entry from cache.
assert.True(t, cache.remove(proc.process))
assert.True(t, cache.remove(result.process))
assert.Equal(t, cache.len(), 0)
_, err = cache.get(proc.process.ExecId)
_, err = cache.get(execId)
assert.Error(t, err)
}

func TestProcessCacheProcessOrChildExit(t *testing.T) {
assert.True(t, processOrChildExit("process--"))
assert.True(t, processOrChildExit("parent--"))
assert.False(t, processOrChildExit("process++"))
assert.False(t, processOrChildExit("parent++"))
assert.False(t, processOrChildExit("process-x--"))
}

func TestProcessCacheProcessAndChildrenHaveExited(t *testing.T) {
p := createFakeProcess(123, "myProc")
// empty (invalid) process should be counted as exited (this situation shouldn't occur)
assert.True(t, processAndChildrenHaveExited(p))
p.refcntOps["process++"] = 1
// active process has not exited
assert.False(t, processAndChildrenHaveExited(p))
p.refcntOps["process--"] = 1
// completed process has exited
assert.True(t, processAndChildrenHaveExited(p))
// set the process as active again
p.refcntOps["process--"] = 0
p.refcntOps["parent++"] = 1
// active process with child has not exited
assert.False(t, processAndChildrenHaveExited(p))
p.refcntOps["parent--"] = 1
// active process with exited child has not exited
assert.False(t, processAndChildrenHaveExited(p))
p.refcntOps["process--"] = 1
// completed process with exited child has exited
assert.True(t, processAndChildrenHaveExited(p))
}

func TestProcessCacheRemoveStale(t *testing.T) {
// add some processes to the cache.
execId := []string{"process1", "process2", "process3", "process4", "process5", "process6", "process7"}
var p []*ProcessInternal
cache, err := NewCache(10)
require.NoError(t, err)

p = append(p, createFakeProcess(1234, execId[0]))
p[0].refcntOps["process++"] = 1 // process started (but not exited)
cache.add(p[0])
assert.Equal(t, cache.len(), 1)

p = append(p, createFakeProcess(1235, execId[1]))
p[1].refcntOps["process++"] = 1 // process started
p[1].refcntOps["process--"] = 1 // process exited
p[1].exitTime = time.Now().Add(-20 * time.Minute) // exitTime is 20 minutes ago (stale)
cache.add(p[1])
assert.Equal(t, cache.len(), 2)

p = append(p, createFakeProcess(1236, execId[2]))
p[2].refcntOps["process++"] = 1 // process started
p[2].refcntOps["process--"] = 1 // process exited
p[2].exitTime = time.Now().Add(-2 * time.Minute) // exitTime is 2 minutes ago (not stale)
cache.add(p[2])
assert.Equal(t, cache.len(), 3)

p = append(p, createFakeProcess(1237, execId[3]))
p[3].refcntOps["process++"] = 1 // process started
p[3].refcntOps["process--"] = 1 // process exited
p[3].refcntOps["parent++"] = 1 // child started
p[3].exitTime = time.Now().Add(-20 * time.Minute) // exitTime is 20 minutes ago (but process is not stale)
cache.add(p[3])
assert.Equal(t, cache.len(), 4)

p = append(p, createFakeProcess(1238, execId[4]))
p[4].refcntOps["process++"] = 1 // process started
p[4].refcntOps["process--"] = 1 // process exited
p[4].refcntOps["parent++"] = 1 // child started
p[4].exitTime = time.Now().Add(-2 * time.Minute) // exitTime is 2 minutes ago (not stale)
cache.add(p[4])
assert.Equal(t, cache.len(), 5)

p = append(p, createFakeProcess(1239, execId[5]))
p[5].refcntOps["process++"] = 1 // process started
p[5].refcntOps["process--"] = 1 // process exited
p[5].refcntOps["parent++"] = 1 // child started
p[5].refcntOps["parent--"] = 1 // child exited
p[5].exitTime = time.Now().Add(-20 * time.Minute) // exitTime is 20 minutes ago (stale)
cache.add(p[5])
assert.Equal(t, cache.len(), 6)

p = append(p, createFakeProcess(1240, execId[6]))
p[6].refcntOps["process++"] = 1 // process started
p[6].refcntOps["process--"] = 1 // process exited
p[6].refcntOps["parent++"] = 1 // child started
p[6].refcntOps["parent--"] = 1 // child exited
p[6].exitTime = time.Now().Add(-2 * time.Minute) // exitTime is 2 minutes ago (not stale)
cache.add(p[6])
assert.Equal(t, cache.len(), 7)

// confirm entries are in cache
for i := 0; i < 7; i++ {
result, err := cache.get(execId[i])
assert.NoError(t, err)
assert.Equal(t, execId[i], result.process.ExecId)
}

// remove stale entries from cache.
cache.cleanStaleEntries()

// two entries should have been removed
assert.Equal(t, cache.len(), 5)
assert.NoError(t, testutil.CollectAndCompare(processCacheRemovedStale, strings.NewReader(`# HELP tetragon_process_cache_removed_stale_total Number of process cache stale entries removed.
# TYPE tetragon_process_cache_removed_stale_total counter
tetragon_process_cache_removed_stale_total 2
`)))

// confirm entries are in cache
for i := 0; i < 7; i++ {
result, err := cache.get(execId[i])
switch i {
case 0, 2, 3, 4, 6:
assert.NoError(t, err)
assert.Equal(t, execId[i], result.process.ExecId)
case 1, 5:
assert.Error(t, err)
}
}
}
Loading

0 comments on commit becaae5

Please sign in to comment.