Skip to content

Commit

Permalink
ProcessCache: Remove stale entries
Browse files Browse the repository at this point in the history
Add stale entry removal functionality to the process cache. This won't
affect normal operations, but in the event that a custom ref count has been
placed on an entry and not removed (for whatever reason, maybe logic
error, maybe missed event, etc), remove entries where the process and
all its children have exited at least 10 minutes ago. Record a metric
when entries are removed.

Signed-off-by: Kevin Sheldrake <kevin.sheldrake@isovalent.com>
  • Loading branch information
kevsecurity committed Sep 25, 2024
1 parent 5c4652c commit 83eff54
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 21 deletions.
4 changes: 4 additions & 0 deletions docs/content/en/docs/reference/metrics.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions docs/content/en/docs/use-cases/network-observability/_index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
title: "Network observability"
weight: 3
icon: "overview"
description: "Monitor TCP connect using kprobe hooks"
---

To view TCP connect events, apply the example TCP connect `TracingPolicy`:

```bash
kubectl apply -f https://raw.githubusercontent.com/cilium/tetragon/main/examples/tracingpolicy/tcp-connect.yaml
```

To start monitoring events in the `xwing` pod run the Tetragon CLI:

```bash
kubectl logs -n kube-system -l app.kubernetes.io/name=tetragon -c export-stdout -f | tetra getevents -o compact --namespace default --pod xwing
```

In another terminal, start generate a TCP connection. Here we use
curl.
```bash
kubectl exec -it xwing -- curl http://cilium.io
```
The output in the first terminal will capture the new connect and write,
```bash
🚀 process default/xwing /usr/bin/curl http://cilium.io
🔌 connect default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80
📤 sendmsg default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80 bytes 73
🧹 close default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80
💥 exit default/xwing /usr/bin/curl http://cilium.io 0
```

To disable the TracingPolicy run:
```bash
kubectl delete -f https://raw.githubusercontent.com/cilium/tetragon/main/examples/tracingpolicy/tcp-connect.yaml
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
title: "Network observability"
weight: 3
icon: "overview"
description: "Monitor TCP connect using kprobe hooks"
---

To view TCP connect events, apply the example TCP connect `TracingPolicy`:

```bash
kubectl apply -f https://raw.githubusercontent.com/cilium/tetragon/main/examples/tracingpolicy/tcp-connect.yaml
```

To start monitoring events in the `xwing` pod run the Tetragon CLI:

```bash
kubectl logs -n kube-system -l app.kubernetes.io/name=tetragon -c export-stdout -f | tetra getevents -o compact --namespace default --pod xwing
```

In another terminal, start generate a TCP connection. Here we use
curl.
```bash
kubectl exec -it xwing -- curl http://cilium.io
```
The output in the first terminal will capture the new connect and write,
```bash
🚀 process default/xwing /usr/bin/curl http://cilium.io
🔌 connect default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80
📤 sendmsg default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80 bytes 73
🧹 close default/xwing /usr/bin/curl tcp 10.244.0.6:34965 -> 104.198.14.52:80
💥 exit default/xwing /usr/bin/curl http://cilium.io 0
```

To disable the TracingPolicy run:
```bash
kubectl delete -f https://raw.githubusercontent.com/cilium/tetragon/main/examples/tracingpolicy/tcp-connect.yaml
```
3 changes: 3 additions & 0 deletions docs/data/tetragon_flags.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
// defaults for the event cache
DefaultEventCacheNumRetries = 15
DefaultEventCacheRetryDelay = 2

// defaults for process cache
DefaultProcessCacheStaleInterval = 60
)

var (
Expand Down
7 changes: 6 additions & 1 deletion pkg/option/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type config struct {

EventCacheNumRetries int
EventCacheRetryDelay int

ProcessCacheStaleInterval uint
}

var (
Expand All @@ -122,10 +124,13 @@ var (
// Enable all metrics labels by default
MetricsLabelFilter: DefaultLabelFilter(),

// set default valus for the event cache
// set default values for the event cache
// mainly used in the case of testing
EventCacheNumRetries: defaults.DefaultEventCacheNumRetries,
EventCacheRetryDelay: defaults.DefaultEventCacheRetryDelay,

// set default values for the process cache
ProcessCacheStaleInterval: defaults.DefaultProcessCacheStaleInterval,
}
)

Expand Down
6 changes: 6 additions & 0 deletions pkg/option/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ const (

KeyEventCacheRetries = "event-cache-retries"
KeyEventCacheRetryDelay = "event-cache-retry-delay"

KeyProcessCacheStaleInterval = "process-cache-stale-interval"
)

type UsernameMetadaCode int
Expand Down Expand Up @@ -245,6 +247,8 @@ func ReadAndSetFlags() error {
Config.EventCacheNumRetries = viper.GetInt(KeyEventCacheRetries)
Config.EventCacheRetryDelay = viper.GetInt(KeyEventCacheRetryDelay)

Config.ProcessCacheStaleInterval = viper.GetUint(KeyProcessCacheStaleInterval)

return nil
}

Expand Down Expand Up @@ -411,4 +415,6 @@ func AddFlags(flags *pflag.FlagSet) {

flags.Int(KeyEventCacheRetries, defaults.DefaultEventCacheNumRetries, "Number of retries for event cache")
flags.Int(KeyEventCacheRetryDelay, defaults.DefaultEventCacheRetryDelay, "Delay in seconds between event cache retries")

flags.Uint(KeyProcessCacheStaleInterval, defaults.DefaultProcessCacheStaleInterval, "Interval in minutes between stale process cache checks")
}
62 changes: 55 additions & 7 deletions pkg/process/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ import (
"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/defaults"
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/sensors/exec/execvemap"
lru "github.com/hashicorp/golang-lru/v2"
"google.golang.org/protobuf/types/known/wrapperspb"
)

type Cache struct {
cache *lru.Cache[string, *ProcessInternal]
size int
deleteChan chan *ProcessInternal
stopChan chan bool
cache *lru.Cache[string, *ProcessInternal]
size int
deleteChan chan *ProcessInternal
stopChan chan bool
staleIntervalLastTime time.Time
}

// garbage collection states
Expand All @@ -40,9 +42,12 @@ var colorStr = map[int]string{
deleted: "deleted",
}

// garbage collection run interval
const (
// garbage collection run interval
intervalGC = time.Second * 30
// garbage collection stale entry max time.
// If a stale entry is older than the max time, then it will be removed.
staleIntervalMaxTime = time.Minute * 10
)

func (pc *Cache) cacheGarbageCollector() {
Expand Down Expand Up @@ -92,6 +97,12 @@ func (pc *Cache) cacheGarbageCollector() {
}
}
deleteQueue = newQueue
// Check if it is time to clean stale entries.
if option.Config.ProcessCacheStaleInterval > 0 &&
pc.staleIntervalLastTime.Add(time.Duration(option.Config.ProcessCacheStaleInterval)*time.Minute).Before(time.Now()) {
pc.cleanStaleEntries()
pc.staleIntervalLastTime = time.Now()
}
case p := <-pc.deleteChan:
// duplicate deletes can happen, if they do reset
// color to pending and move along. This will cause
Expand All @@ -117,6 +128,37 @@ func (pc *Cache) cacheGarbageCollector() {
}()
}

func (pc *Cache) cleanStaleEntries() {
var deleteProcesses []*ProcessInternal
for _, v := range pc.cache.Values() {
v.refcntOpsLock.Lock()
if processAndChildrenHaveExited(v) && v.exitTime.Add(staleIntervalMaxTime).Before(time.Now()) {
deleteProcesses = append(deleteProcesses, v)
}
v.refcntOpsLock.Unlock()
}
for _, d := range deleteProcesses {
processCacheRemovedStale.Inc()
pc.remove(d.process)
}
}

// Must be called with a lock held on p.refcntOpsLock
func processAndChildrenHaveExited(p *ProcessInternal) bool {
if p.refcntOps["process++"] == p.refcntOps["process--"] &&
p.refcntOps["parent++"] == p.refcntOps["parent--"] {
return true
}
return false
}

func processOrChildExit(reason string) bool {
if reason == "process--" || reason == "parent--" {
return true
}
return false
}

func (pc *Cache) deletePending(process *ProcessInternal) {
pc.deleteChan <- process
}
Expand All @@ -125,6 +167,11 @@ func (pc *Cache) refDec(p *ProcessInternal, reason string) {
p.refcntOpsLock.Lock()
// count number of times refcnt is decremented for a specific reason (i.e. process, parent, etc.)
p.refcntOps[reason]++
// if this is a process/child exit, check if the process and all its children have exited.
// If so, store the exit time so we can tell if this process becomes a stale entry.
if processOrChildExit(reason) && processAndChildrenHaveExited(p) {
p.exitTime = time.Now()
}
p.refcntOpsLock.Unlock()
ref := atomic.AddUint32(&p.refcnt, ^uint32(0))
if ref == 0 {
Expand Down Expand Up @@ -158,8 +205,9 @@ func NewCache(
return nil, err
}
pm := &Cache{
cache: lruCache,
size: processCacheSize,
cache: lruCache,
size: processCacheSize,
staleIntervalLastTime: time.Now(),
}
pm.cacheGarbageCollector()
return pm, nil
Expand Down
Loading

0 comments on commit 83eff54

Please sign in to comment.