From 83eff54497506efc595eadda9c1f8673e6bd7600 Mon Sep 17 00:00:00 2001 From: Kevin Sheldrake Date: Wed, 25 Sep 2024 14:55:23 +0100 Subject: [PATCH] ProcessCache: Remove stale entries Add stale entry removal functionality to the process cache. This won't affect normal operations, but in the event that a custom ref count has been placed on an entry and not removed (for whatever reason, maybe logic error, maybe missed event, etc), remove entries where the process and all its children have exited at least 10 minutes ago. Record a metric when entries are removed. Signed-off-by: Kevin Sheldrake --- docs/content/en/docs/reference/metrics.md | 4 + .../use-cases/network-observability/_index.md | 37 ++++ .../approved-dns-servers.md | 37 ++++ docs/data/tetragon_flags.yaml | 3 + pkg/defaults/defaults.go | 3 + pkg/option/config.go | 7 +- pkg/option/flags.go | 6 + pkg/process/cache.go | 62 ++++++- pkg/process/cache_test.go | 165 ++++++++++++++++-- pkg/process/metrics.go | 6 + pkg/process/process.go | 3 + 11 files changed, 312 insertions(+), 21 deletions(-) create mode 100644 docs/content/en/docs/use-cases/network-observability/_index.md create mode 100644 docs/content/en/docs/use-cases/network-observability/approved-dns-servers.md diff --git a/docs/content/en/docs/reference/metrics.md b/docs/content/en/docs/reference/metrics.md index bec127bc64b..79f35dcfb33 100644 --- a/docs/content/en/docs/reference/metrics.md +++ b/docs/content/en/docs/reference/metrics.md @@ -275,6 +275,10 @@ Number of process cache misses. | ----- | ------ | | `operation` | `get, remove` | +### `tetragon_process_cache_removed_stale_total` + +Number of process cache stale entries removed. + ### `tetragon_process_cache_size` The size of the process cache diff --git a/docs/content/en/docs/use-cases/network-observability/_index.md b/docs/content/en/docs/use-cases/network-observability/_index.md new file mode 100644 index 00000000000..22a6c1b80ce --- /dev/null +++ b/docs/content/en/docs/use-cases/network-observability/_index.md @@ -0,0 +1,37 @@ +--- +title: "Network observability" +weight: 3 +icon: "overview" +description: "Monitor TCP connect using kprobe hooks" +--- + +To view TCP connect events, apply the example TCP connect `TracingPolicy`: + +```bash +kubectl apply -f https://raw.githubusercontent.com/cilium/tetragon/main/examples/tracingpolicy/tcp-connect.yaml +``` + +To start monitoring events in the `xwing` pod run the Tetragon CLI: + +```bash +kubectl logs -n kube-system -l app.kubernetes.io/name=tetragon -c export-stdout -f | tetra getevents -o compact --namespace default --pod xwing +``` + +In another terminal, start generate a TCP connection. Here we use +curl. +```bash +kubectl exec -it xwing -- curl http://cilium.io +``` +The output in the first terminal will capture the new connect and write, +```bash +๐Ÿš€ process default/xwing /usr/bin/curl http://cilium.io +๐Ÿ”Œ connect default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80 +๐Ÿ“ค sendmsg default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80 bytes 73 +๐Ÿงน close default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80 +๐Ÿ’ฅ exit default/xwing /usr/bin/curl http://cilium.io 0 +``` + +To disable the TracingPolicy run: +```bash +kubectl delete -f https://raw.githubusercontent.com/cilium/tetragon/main/examples/tracingpolicy/tcp-connect.yaml +``` diff --git a/docs/content/en/docs/use-cases/network-observability/approved-dns-servers.md b/docs/content/en/docs/use-cases/network-observability/approved-dns-servers.md new file mode 100644 index 00000000000..22a6c1b80ce --- /dev/null +++ b/docs/content/en/docs/use-cases/network-observability/approved-dns-servers.md @@ -0,0 +1,37 @@ +--- +title: "Network observability" +weight: 3 +icon: "overview" +description: "Monitor TCP connect using kprobe hooks" +--- + +To view TCP connect events, apply the example TCP connect `TracingPolicy`: + +```bash +kubectl apply -f https://raw.githubusercontent.com/cilium/tetragon/main/examples/tracingpolicy/tcp-connect.yaml +``` + +To start monitoring events in the `xwing` pod run the Tetragon CLI: + +```bash +kubectl logs -n kube-system -l app.kubernetes.io/name=tetragon -c export-stdout -f | tetra getevents -o compact --namespace default --pod xwing +``` + +In another terminal, start generate a TCP connection. Here we use +curl. +```bash +kubectl exec -it xwing -- curl http://cilium.io +``` +The output in the first terminal will capture the new connect and write, +```bash +๐Ÿš€ process default/xwing /usr/bin/curl http://cilium.io +๐Ÿ”Œ connect default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80 +๐Ÿ“ค sendmsg default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80 bytes 73 +๐Ÿงน close default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80 +๐Ÿ’ฅ exit default/xwing /usr/bin/curl http://cilium.io 0 +``` + +To disable the TracingPolicy run: +```bash +kubectl delete -f https://raw.githubusercontent.com/cilium/tetragon/main/examples/tracingpolicy/tcp-connect.yaml +``` diff --git a/docs/data/tetragon_flags.yaml b/docs/data/tetragon_flags.yaml index a023cbc756f..330329ffa14 100644 --- a/docs/data/tetragon_flags.yaml +++ b/docs/data/tetragon_flags.yaml @@ -180,6 +180,9 @@ options: - name: process-cache-size default_value: "65536" usage: Size of the process cache + - name: process-cache-stale-interval + default_value: "60" + usage: Interval in minutes between stale process cache checks - name: procfs default_value: /proc/ usage: Location of procfs to consume existing PIDs diff --git a/pkg/defaults/defaults.go b/pkg/defaults/defaults.go index b9920c42d53..dd8610f3c86 100644 --- a/pkg/defaults/defaults.go +++ b/pkg/defaults/defaults.go @@ -49,6 +49,9 @@ const ( // defaults for the event cache DefaultEventCacheNumRetries = 15 DefaultEventCacheRetryDelay = 2 + + // defaults for process cache + DefaultProcessCacheStaleInterval = 60 ) var ( diff --git a/pkg/option/config.go b/pkg/option/config.go index 924a588c7a2..20c17a04769 100644 --- a/pkg/option/config.go +++ b/pkg/option/config.go @@ -104,6 +104,8 @@ type config struct { EventCacheNumRetries int EventCacheRetryDelay int + + ProcessCacheStaleInterval uint } var ( @@ -122,10 +124,13 @@ var ( // Enable all metrics labels by default MetricsLabelFilter: DefaultLabelFilter(), - // set default valus for the event cache + // set default values for the event cache // mainly used in the case of testing EventCacheNumRetries: defaults.DefaultEventCacheNumRetries, EventCacheRetryDelay: defaults.DefaultEventCacheRetryDelay, + + // set default values for the process cache + ProcessCacheStaleInterval: defaults.DefaultProcessCacheStaleInterval, } ) diff --git a/pkg/option/flags.go b/pkg/option/flags.go index b2a6b9e1aae..66e45dbcd69 100644 --- a/pkg/option/flags.go +++ b/pkg/option/flags.go @@ -115,6 +115,8 @@ const ( KeyEventCacheRetries = "event-cache-retries" KeyEventCacheRetryDelay = "event-cache-retry-delay" + + KeyProcessCacheStaleInterval = "process-cache-stale-interval" ) type UsernameMetadaCode int @@ -245,6 +247,8 @@ func ReadAndSetFlags() error { Config.EventCacheNumRetries = viper.GetInt(KeyEventCacheRetries) Config.EventCacheRetryDelay = viper.GetInt(KeyEventCacheRetryDelay) + Config.ProcessCacheStaleInterval = viper.GetUint(KeyProcessCacheStaleInterval) + return nil } @@ -411,4 +415,6 @@ func AddFlags(flags *pflag.FlagSet) { flags.Int(KeyEventCacheRetries, defaults.DefaultEventCacheNumRetries, "Number of retries for event cache") flags.Int(KeyEventCacheRetryDelay, defaults.DefaultEventCacheRetryDelay, "Delay in seconds between event cache retries") + + flags.Uint(KeyProcessCacheStaleInterval, defaults.DefaultProcessCacheStaleInterval, "Interval in minutes between stale process cache checks") } diff --git a/pkg/process/cache.go b/pkg/process/cache.go index ecb503a443c..aac917355e4 100644 --- a/pkg/process/cache.go +++ b/pkg/process/cache.go @@ -13,16 +13,18 @@ import ( "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/defaults" "github.com/cilium/tetragon/pkg/logger" + "github.com/cilium/tetragon/pkg/option" "github.com/cilium/tetragon/pkg/sensors/exec/execvemap" lru "github.com/hashicorp/golang-lru/v2" "google.golang.org/protobuf/types/known/wrapperspb" ) type Cache struct { - cache *lru.Cache[string, *ProcessInternal] - size int - deleteChan chan *ProcessInternal - stopChan chan bool + cache *lru.Cache[string, *ProcessInternal] + size int + deleteChan chan *ProcessInternal + stopChan chan bool + staleIntervalLastTime time.Time } // garbage collection states @@ -40,9 +42,12 @@ var colorStr = map[int]string{ deleted: "deleted", } -// garbage collection run interval const ( + // garbage collection run interval intervalGC = time.Second * 30 + // garbage collection stale entry max time. + // If a stale entry is older than the max time, then it will be removed. + staleIntervalMaxTime = time.Minute * 10 ) func (pc *Cache) cacheGarbageCollector() { @@ -92,6 +97,12 @@ func (pc *Cache) cacheGarbageCollector() { } } deleteQueue = newQueue + // Check if it is time to clean stale entries. + if option.Config.ProcessCacheStaleInterval > 0 && + pc.staleIntervalLastTime.Add(time.Duration(option.Config.ProcessCacheStaleInterval)*time.Minute).Before(time.Now()) { + pc.cleanStaleEntries() + pc.staleIntervalLastTime = time.Now() + } case p := <-pc.deleteChan: // duplicate deletes can happen, if they do reset // color to pending and move along. This will cause @@ -117,6 +128,37 @@ func (pc *Cache) cacheGarbageCollector() { }() } +func (pc *Cache) cleanStaleEntries() { + var deleteProcesses []*ProcessInternal + for _, v := range pc.cache.Values() { + v.refcntOpsLock.Lock() + if processAndChildrenHaveExited(v) && v.exitTime.Add(staleIntervalMaxTime).Before(time.Now()) { + deleteProcesses = append(deleteProcesses, v) + } + v.refcntOpsLock.Unlock() + } + for _, d := range deleteProcesses { + processCacheRemovedStale.Inc() + pc.remove(d.process) + } +} + +// Must be called with a lock held on p.refcntOpsLock +func processAndChildrenHaveExited(p *ProcessInternal) bool { + if p.refcntOps["process++"] == p.refcntOps["process--"] && + p.refcntOps["parent++"] == p.refcntOps["parent--"] { + return true + } + return false +} + +func processOrChildExit(reason string) bool { + if reason == "process--" || reason == "parent--" { + return true + } + return false +} + func (pc *Cache) deletePending(process *ProcessInternal) { pc.deleteChan <- process } @@ -125,6 +167,11 @@ func (pc *Cache) refDec(p *ProcessInternal, reason string) { p.refcntOpsLock.Lock() // count number of times refcnt is decremented for a specific reason (i.e. process, parent, etc.) p.refcntOps[reason]++ + // if this is a process/child exit, check if the process and all its children have exited. + // If so, store the exit time so we can tell if this process becomes a stale entry. + if processOrChildExit(reason) && processAndChildrenHaveExited(p) { + p.exitTime = time.Now() + } p.refcntOpsLock.Unlock() ref := atomic.AddUint32(&p.refcnt, ^uint32(0)) if ref == 0 { @@ -158,8 +205,9 @@ func NewCache( return nil, err } pm := &Cache{ - cache: lruCache, - size: processCacheSize, + cache: lruCache, + size: processCacheSize, + staleIntervalLastTime: time.Now(), } pm.cacheGarbageCollector() return pm, nil diff --git a/pkg/process/cache_test.go b/pkg/process/cache_test.go index c4630daaad2..9a6719f0cf2 100644 --- a/pkg/process/cache_test.go +++ b/pkg/process/cache_test.go @@ -4,24 +4,22 @@ package process import ( + "strings" "testing" + "time" "github.com/cilium/tetragon/api/v1/tetragon" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/wrapperspb" ) -func TestProcessCache(t *testing.T) { - // add a process to the cache. - cache, err := NewCache(10) - require.NoError(t, err) - pid := wrapperspb.UInt32Value{Value: 1234} - execID := "process1" +func createFakeProcess(pid uint32, execID string) *ProcessInternal { proc := ProcessInternal{ process: &tetragon.Process{ ExecId: execID, - Pid: &pid, + Pid: &wrapperspb.UInt32Value{Value: pid}, }, capabilities: &tetragon.Capabilities{ Permitted: []tetragon.CapabilitiesType{ @@ -29,18 +27,159 @@ func TestProcessCache(t *testing.T) { tetragon.CapabilitiesType_CAP_AUDIT_WRITE, }, }, + refcntOps: make(map[string]int32), } - cache.add(&proc) + return &proc +} + +func addFakeProcess(pc *Cache, pid uint32, execID string) { + proc := createFakeProcess(pid, execID) + pc.add(proc) +} + +func TestProcessCacheAddAndRemove(t *testing.T) { + // add a process to the cache. + execId := "process1" + cache, err := NewCache(10) + require.NoError(t, err) + addFakeProcess(cache, 1234, execId) assert.Equal(t, cache.len(), 1) - result, err := cache.get(proc.process.ExecId) + result, err := cache.get(execId) assert.NoError(t, err) - assert.Equal(t, proc.process.ExecId, result.process.ExecId) - assert.Equal(t, proc.capabilities, result.capabilities) + assert.Equal(t, execId, result.process.ExecId) + assert.Equal(t, result.capabilities, + &tetragon.Capabilities{ + Permitted: []tetragon.CapabilitiesType{ + tetragon.CapabilitiesType_CAP_AUDIT_READ, + tetragon.CapabilitiesType_CAP_AUDIT_WRITE, + }, + }) // remove the entry from cache. - assert.True(t, cache.remove(proc.process)) + assert.True(t, cache.remove(result.process)) assert.Equal(t, cache.len(), 0) - _, err = cache.get(proc.process.ExecId) + _, err = cache.get(execId) assert.Error(t, err) } + +func TestProcessCacheProcessOrChildExit(t *testing.T) { + assert.True(t, processOrChildExit("process--")) + assert.True(t, processOrChildExit("parent--")) + assert.False(t, processOrChildExit("process++")) + assert.False(t, processOrChildExit("parent++")) + assert.False(t, processOrChildExit("process-x--")) +} + +func TestProcessCacheProcessAndChildrenHaveExited(t *testing.T) { + p := createFakeProcess(123, "myProc") + // empty (invalid) process should be counted as exited (this situation shouldn't occur) + assert.True(t, processAndChildrenHaveExited(p)) + p.refcntOps["process++"] = 1 + // active process has not exited + assert.False(t, processAndChildrenHaveExited(p)) + p.refcntOps["process--"] = 1 + // completed process has exited + assert.True(t, processAndChildrenHaveExited(p)) + // set the process as active again + p.refcntOps["process--"] = 0 + p.refcntOps["parent++"] = 1 + // active process with child has not exited + assert.False(t, processAndChildrenHaveExited(p)) + p.refcntOps["parent--"] = 1 + // active process with exited child has not exited + assert.False(t, processAndChildrenHaveExited(p)) + p.refcntOps["process--"] = 1 + // completed process with exited child has exited + assert.True(t, processAndChildrenHaveExited(p)) +} + +func TestProcessCacheRemoveStale(t *testing.T) { + // add some processes to the cache. + execId := []string{"process1", "process2", "process3", "process4", "process5", "process6", "process7"} + var p []*ProcessInternal + cache, err := NewCache(10) + require.NoError(t, err) + + p = append(p, createFakeProcess(1234, execId[0])) + p[0].refcntOps["process++"] = 1 // process started (but not exited) + cache.add(p[0]) + assert.Equal(t, cache.len(), 1) + + p = append(p, createFakeProcess(1235, execId[1])) + p[1].refcntOps["process++"] = 1 // process started + p[1].refcntOps["process--"] = 1 // process exited + p[1].exitTime = time.Now().Add(-20 * time.Minute) // exitTime is 20 minutes ago (stale) + cache.add(p[1]) + assert.Equal(t, cache.len(), 2) + + p = append(p, createFakeProcess(1236, execId[2])) + p[2].refcntOps["process++"] = 1 // process started + p[2].refcntOps["process--"] = 1 // process exited + p[2].exitTime = time.Now().Add(-2 * time.Minute) // exitTime is 2 minutes ago (not stale) + cache.add(p[2]) + assert.Equal(t, cache.len(), 3) + + p = append(p, createFakeProcess(1237, execId[3])) + p[3].refcntOps["process++"] = 1 // process started + p[3].refcntOps["process--"] = 1 // process exited + p[3].refcntOps["parent++"] = 1 // child started + p[3].exitTime = time.Now().Add(-20 * time.Minute) // exitTime is 20 minutes ago (but process is not stale) + cache.add(p[3]) + assert.Equal(t, cache.len(), 4) + + p = append(p, createFakeProcess(1238, execId[4])) + p[4].refcntOps["process++"] = 1 // process started + p[4].refcntOps["process--"] = 1 // process exited + p[4].refcntOps["parent++"] = 1 // child started + p[4].exitTime = time.Now().Add(-2 * time.Minute) // exitTime is 2 minutes ago (not stale) + cache.add(p[4]) + assert.Equal(t, cache.len(), 5) + + p = append(p, createFakeProcess(1239, execId[5])) + p[5].refcntOps["process++"] = 1 // process started + p[5].refcntOps["process--"] = 1 // process exited + p[5].refcntOps["parent++"] = 1 // child started + p[5].refcntOps["parent--"] = 1 // child exited + p[5].exitTime = time.Now().Add(-20 * time.Minute) // exitTime is 20 minutes ago (stale) + cache.add(p[5]) + assert.Equal(t, cache.len(), 6) + + p = append(p, createFakeProcess(1240, execId[6])) + p[6].refcntOps["process++"] = 1 // process started + p[6].refcntOps["process--"] = 1 // process exited + p[6].refcntOps["parent++"] = 1 // child started + p[6].refcntOps["parent--"] = 1 // child exited + p[6].exitTime = time.Now().Add(-2 * time.Minute) // exitTime is 2 minutes ago (not stale) + cache.add(p[6]) + assert.Equal(t, cache.len(), 7) + + // confirm entries are in cache + for i := 0; i < 7; i++ { + result, err := cache.get(execId[i]) + assert.NoError(t, err) + assert.Equal(t, execId[i], result.process.ExecId) + } + + // remove stale entries from cache. + cache.cleanStaleEntries() + + // two entries should have been removed + assert.Equal(t, cache.len(), 5) + assert.NoError(t, testutil.CollectAndCompare(processCacheRemovedStale, strings.NewReader(`# HELP tetragon_process_cache_removed_stale_total Number of process cache stale entries removed. +# TYPE tetragon_process_cache_removed_stale_total counter +tetragon_process_cache_removed_stale_total 2 +`))) + + // confirm entries are in cache + for i := 0; i < 7; i++ { + result, err := cache.get(execId[i]) + switch i { + case 0, 2, 3, 4, 6: + assert.NoError(t, err) + assert.Equal(t, execId[i], result.process.ExecId) + case 1, 5: + assert.Error(t, err) + } + } +} diff --git a/pkg/process/metrics.go b/pkg/process/metrics.go index 7bd2004292c..ff5d52b3d19 100644 --- a/pkg/process/metrics.go +++ b/pkg/process/metrics.go @@ -38,6 +38,11 @@ var ( "Number of process cache misses.", nil, []metrics.ConstrainedLabel{operationLabel}, nil, ), nil) + processCacheRemovedStale = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: consts.MetricsNamespace, + Name: "process_cache_removed_stale_total", + Help: "Number of process cache stale entries removed.", + }) ) func newCacheCollector() prometheus.Collector { @@ -59,6 +64,7 @@ func RegisterMetrics(group metrics.Group) { processCacheTotal, processCacheEvictions, processCacheMisses, + processCacheRemovedStale, ) group.MustRegister(newCacheCollector()) } diff --git a/pkg/process/process.go b/pkg/process/process.go index 1f2c4610005..4524c0bb7ab 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -10,6 +10,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/cilium/tetragon/pkg/cgidmap" "github.com/cilium/tetragon/pkg/fieldfilters" @@ -62,6 +63,8 @@ type ProcessInternal struct { // protects the refcntOps map refcntOpsLock sync.Mutex cgID uint64 + // the time the process and all children exited + exitTime time.Time } var (