From b42ab6586f9c64652518b64733987f95ce86f561 Mon Sep 17 00:00:00 2001 From: Kevin Sheldrake Date: Thu, 26 Sep 2024 16:53:21 +0100 Subject: [PATCH] ProcessCache: Delay removal for custom refcnts The process cache uses reference counting to ensure we keep hold of entries that we still need to refer to, but allows us to remove those that we no longer need. Beyond "process" and "parent", messages could set custom refcnts on processes. For these we need to keep the process in the cache until all its descendants have exited (to allow events from any of the descendants to decrement the reference count on the process). This is to prevent the stale entry garbage collector from removing the process while it still has active descendants. This commit adds a flag on the internal process that specifies that the process has custom refcnts. It avoids removing the process or its descendants until all have exited by not decrementing the parent's refcnt until the process' refcnt is 0. Signed-off-by: Kevin Sheldrake --- pkg/grpc/exec/exec.go | 18 ++++++++++++++---- pkg/process/cache.go | 20 ++++++++++++++++++++ pkg/process/process.go | 16 ++++++++++++++++ 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/pkg/grpc/exec/exec.go b/pkg/grpc/exec/exec.go index 1e24f86e1fe..8aba1416e15 100644 --- a/pkg/grpc/exec/exec.go +++ b/pkg/grpc/exec/exec.go @@ -406,7 +406,7 @@ func GetProcessExit(event *MsgExitEventUnix) *tetragon.ProcessExit { ec.Add(nil, tetragonEvent, event.Common.Ktime, event.ProcessKey.Ktime, event) return nil } - if parent != nil { + if proc != nil && !proc.HasCustomRefCnt() && parent != nil { parent.RefDec("parent") } if proc != nil { @@ -427,10 +427,14 @@ func (msg *MsgExitEventUnix) Notify() bool { func (msg *MsgExitEventUnix) RetryInternal(ev notify.Event, timestamp uint64) (*process.ProcessInternal, error) { internal, parent := process.GetParentProcessInternal(msg.ProcessKey.Pid, timestamp) var err error + hasCustomRefCnt := false + if internal != nil && internal.HasCustomRefCnt() { + hasCustomRefCnt = true + } if parent != nil { ev.SetParent(parent.UnsafeGetProcess()) - if !msg.RefCntDone[ParentRefCnt] { + if !hasCustomRefCnt && !msg.RefCntDone[ParentRefCnt] { parent.RefDec("parent") msg.RefCntDone[ParentRefCnt] = true } @@ -494,9 +498,13 @@ func (msg *MsgProcessCleanupEventUnix) Notify() bool { func (msg *MsgProcessCleanupEventUnix) RetryInternal(_ notify.Event, timestamp uint64) (*process.ProcessInternal, error) { internal, parent := process.GetParentProcessInternal(msg.PID, timestamp) var err error + hasCustomRefCnt := false + if internal != nil && internal.HasCustomRefCnt() { + hasCustomRefCnt = true + } if parent != nil { - if !msg.RefCntDone[ParentRefCnt] { + if !hasCustomRefCnt && !msg.RefCntDone[ParentRefCnt] { parent.RefDec("parent") msg.RefCntDone[ParentRefCnt] = true } @@ -528,7 +536,9 @@ func (msg *MsgProcessCleanupEventUnix) Retry(_ *process.ProcessInternal, _ notif func (msg *MsgProcessCleanupEventUnix) HandleMessage() *tetragon.GetEventsResponse { msg.RefCntDone = [2]bool{false, false} if process, parent := process.GetParentProcessInternal(msg.PID, msg.Ktime); process != nil && parent != nil { - parent.RefDec("parent") + if !process.HasCustomRefCnt() { + parent.RefDec("parent") + } process.RefDec("process") } else { if ec := eventcache.Get(); ec != nil { diff --git a/pkg/process/cache.go b/pkg/process/cache.go index 3eb8c3ecbca..f94bd2d7e40 100644 --- a/pkg/process/cache.go +++ b/pkg/process/cache.go @@ -139,6 +139,10 @@ func (pc *Cache) cleanStaleEntries() { } for _, d := range deleteProcesses { processCacheRemovedStale.Inc() + parent, err := pc.get(d.process.ParentExecId) + if err == nil { + parent.RefDec("parent") + } pc.remove(d.process) } } @@ -153,6 +157,10 @@ func processOrChildExit(reason string) bool { return reason == "process--" || reason == "parent--" } +func processOrChildExec(reason string) bool { + return reason == "process++" || reason == "parent++" +} + func (pc *Cache) deletePending(process *ProcessInternal) { pc.deleteChan <- process } @@ -170,6 +178,14 @@ func (pc *Cache) refDec(p *ProcessInternal, reason string) { ref := atomic.AddUint32(&p.refcnt, ^uint32(0)) if ref == 0 { pc.deletePending(p) + // If the process has a custom refcnt then we need to reduce the parent's + // refcnt now we're finally deleting the process. + if p.HasCustomRefCnt() { + parent, err := pc.get(p.process.ParentExecId) + if err == nil { + parent.RefDec("parent") + } + } } } @@ -177,6 +193,10 @@ func (pc *Cache) refInc(p *ProcessInternal, reason string) { p.refcntOpsLock.Lock() // count number of times refcnt is increamented for a specific reason (i.e. process, parent, etc.) p.refcntOps[reason]++ + // Check if this is a custom refcnt. If so, set the flag. + if !processOrChildExec(reason) { + p.customRefcnt = true + } p.refcntOpsLock.Unlock() atomic.AddUint32(&p.refcnt, 1) } diff --git a/pkg/process/process.go b/pkg/process/process.go index 4524c0bb7ab..50262c2c967 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -65,6 +65,16 @@ type ProcessInternal struct { cgID uint64 // the time the process and all children exited exitTime time.Time + // Whether this process has custom refcnts (not "process" or "parent"). + // This cascades on fork, but is reset on exec. The purpose is to delay reducing the + // refcnt on a processes' parent on exit, to when the process itself has a refcnt of 0. + // The result is that any process that has this flag set will keep itself and all its + // forked (but not exec) descendants in the cache after they have exited, until all the + // (non-exec) descendants have exited (as any forked descendant could potentially cause + // an action that is attributed to the process and we need it in the cache in order to + // report it). This serves to prevent entries being removed by the stale entry GC until + // all (non-exec) descendants have exited. + customRefcnt bool } var ( @@ -123,6 +133,7 @@ func (pi *ProcessInternal) cloneInternalProcessCopy() *ProcessInternal { namespaces: pi.namespaces, refcnt: 1, // Explicitly initialize refcnt to 1 refcntOps: map[string]int32{"process++": 1}, + customRefcnt: pi.customRefcnt, } } @@ -149,6 +160,10 @@ func (pi *ProcessInternal) GetCgID() uint64 { return pi.cgID } +func (pi *ProcessInternal) HasCustomRefCnt() bool { + return pi.customRefcnt +} + // UpdateExecOutsideCache() checks if we must augment the ProcessExec.Process // with more fields without propagating again those fields into the process // cache. This means that those added fields will only show up for the @@ -401,6 +416,7 @@ func initProcessInternalExec( refcnt: 1, cgID: event.Kube.Cgrpid, refcntOps: map[string]int32{"process++": 1}, + customRefcnt: false, } }