diff --git a/pkg/grpc/exec/exec_test_helper.go b/pkg/grpc/exec/exec_test_helper.go index eb2bbf3455a..49bba9e3023 100644 --- a/pkg/grpc/exec/exec_test_helper.go +++ b/pkg/grpc/exec/exec_test_helper.go @@ -59,7 +59,42 @@ var ( ) type DummyNotifier[EXEC notify.Message, EXIT notify.Message] struct { - t *testing.T + t *testing.T + ch chan bool +} + +func NewDummyNotifier[EXEC notify.Message, EXIT notify.Message](t *testing.T) DummyNotifier[EXEC, EXIT] { + ch := make(chan bool) + return DummyNotifier[EXEC, EXIT]{t: t, ch: ch} +} + +// Wait for specified number of events from notifier +func (n DummyNotifier[EXEC, EXIT]) WaitNotifier(events int) { + // Leave extra 100ms timeout for slow servers hiccups + ms := time.Duration((option.Config.EventCacheNumRetries + 100) * CacheTimerMs) + + ticker := time.NewTicker(time.Millisecond * ms) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + return + case <-n.ch: + events-- + if events == 0 { + return + } + } + } +} + +// Kick from notifier that unblocks one event for WaitNotifier +func (n DummyNotifier[EXEC, EXIT]) KickNotifier() { + select { + case n.ch <- true: + default: + } } func (n DummyNotifier[EXEC, EXIT]) AddListener(_ server.Listener) {} @@ -71,6 +106,7 @@ func (n DummyNotifier[EXEC, EXIT]) NotifyListener(original interface{}, processe case EXEC, EXIT: if processed != nil { AllEvents = append(AllEvents, processed) + n.KickNotifier() } else { n.t.Fatalf("Processed arg is nil in NotifyListener with type %T", v) } @@ -277,21 +313,23 @@ func CreateCloneEvents[CLONE notify.Message, EXIT notify.Message](Pid uint32, Kt return &cloneMsg, &exitMsg } -func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, cancelWg *sync.WaitGroup, watcher watcher.K8sResourceWatcher) context.CancelFunc { +func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, cancelWg *sync.WaitGroup, + watcher watcher.K8sResourceWatcher) (context.CancelFunc, DummyNotifier[EXEC, EXIT]) { + ctx, cancel := context.WithCancel(context.Background()) if err := process.InitCache(watcher, 65536); err != nil { t.Fatalf("failed to call process.InitCache %s", err) } - dn := DummyNotifier[EXEC, EXIT]{t} + dn := NewDummyNotifier[EXEC, EXIT](t) dr := rthooks.DummyHookRunner{} lServer := server.NewServer(ctx, cancelWg, dn, &server.FakeObserver{}, dr) // Exec cache is always needed to ensure events have an associated Process{} eventcache.NewWithTimer(lServer, time.Millisecond*CacheTimerMs) - return cancel + return cancel, dn } func GetProcessRefcntFromCache(t *testing.T, Pid uint32, Ktime uint64) uint32 { @@ -388,7 +426,7 @@ func GrpcExecOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *testing.T) AllEvents = nil watcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) + cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) defer func() { cancel() cancelWg.Wait() @@ -415,7 +453,7 @@ func GrpcExecOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *testing.T) AllEvents = append(AllEvents, e) } - time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work + dn.WaitNotifier(4) CheckExecEvents(t, AllEvents, parentPid, currentPid) } @@ -424,7 +462,7 @@ func GrpcExecInOrder[EXEC notify.Message, EXIT notify.Message](t *testing.T) { AllEvents = nil watcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) + cancel, _ := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) defer func() { cancel() cancelWg.Wait() @@ -459,7 +497,7 @@ func GrpcExecMisingParent[EXEC notify.Message, EXIT notify.Message](t *testing.T AllEvents = nil watcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) + cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) defer func() { cancel() cancelWg.Wait() @@ -474,9 +512,11 @@ func GrpcExecMisingParent[EXEC notify.Message, EXIT notify.Message](t *testing.T AllEvents = append(AllEvents, e) } - time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work + dn.WaitNotifier(1) - assert.Equal(t, len(AllEvents), 1) + if !assert.Equal(t, 1, len(AllEvents)) { + t.FailNow() + } execEv := AllEvents[0].GetProcessExec() assert.NotNil(t, execEv) assert.Equal(t, GetProcessRefcntFromCache(t, currentPid, 21034975089403), uint32(1)) @@ -488,7 +528,7 @@ func GrpcMissingExec[EXEC notify.Message, EXIT notify.Message](t *testing.T) { AllEvents = nil watcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) + cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) defer func() { cancel() cancelWg.Wait() @@ -503,7 +543,7 @@ func GrpcMissingExec[EXEC notify.Message, EXIT notify.Message](t *testing.T) { AllEvents = append(AllEvents, e) } - time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work + dn.WaitNotifier(2) assert.Equal(t, len(AllEvents), 1) ev := AllEvents[0] @@ -522,7 +562,7 @@ func GrpcExecParentOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *testi AllEvents = nil watcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) + cancel, _ := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) defer func() { cancel() cancelWg.Wait() @@ -589,7 +629,7 @@ func GrpcExecCloneInOrder[EXEC notify.Message, CLONE notify.Message, EXIT notify AllEvents = nil watcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) + cancel, _ := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) defer func() { cancel() cancelWg.Wait() @@ -628,7 +668,7 @@ func GrpcExecCloneOutOfOrder[EXEC notify.Message, CLONE notify.Message, EXIT not AllEvents = nil watcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) + cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) defer func() { cancel() cancelWg.Wait() @@ -659,7 +699,7 @@ func GrpcExecCloneOutOfOrder[EXEC notify.Message, CLONE notify.Message, EXIT not AllEvents = append(AllEvents, e) } - time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work + dn.WaitNotifier(3) CheckCloneEvents(t, AllEvents, currentPid, clonePid) } @@ -669,7 +709,7 @@ func GrpcParentInOrder[EXEC notify.Message, EXIT notify.Message](t *testing.T) { AllEvents = nil watcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) + cancel, _ := InitEnv[EXEC, EXIT](t, &cancelWg, watcher) defer func() { cancel() cancelWg.Wait() @@ -737,7 +777,9 @@ func GrpcParentInOrder[EXEC notify.Message, EXIT notify.Message](t *testing.T) { } func CheckPodEvents(t *testing.T, events []*tetragon.GetEventsResponse) { - assert.Equal(t, len(events), 2) + if !assert.Equal(t, 2, len(events)) { + t.FailNow() + } execEv, exitEv := GetEvents(t, events) @@ -764,7 +806,7 @@ func GrpcExecPodInfoInOrder[EXEC notify.Message, EXIT notify.Message](t *testing AllEvents = nil option.Config.EnableK8s = true // enable Kubernetes fakeWatcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) + cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) defer func() { cancel() cancelWg.Wait() @@ -793,8 +835,8 @@ func GrpcExecPodInfoInOrder[EXEC notify.Message, EXIT notify.Message](t *testing AllEvents = append(AllEvents, e) } - fakeWatcher.AddPod(dummyPod) // setup some dummy pod to return - time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work + fakeWatcher.AddPod(dummyPod) // setup some dummy pod to return + dn.WaitNotifier(2) // wait for cache to do it's work CheckPodEvents(t, AllEvents) } @@ -808,7 +850,7 @@ func GrpcExecPodInfoOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *test AllEvents = nil option.Config.EnableK8s = true // enable Kubernetes fakeWatcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) + cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) defer func() { cancel() cancelWg.Wait() @@ -838,7 +880,7 @@ func GrpcExecPodInfoOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *test } fakeWatcher.AddPod(dummyPod) - time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work + dn.WaitNotifier(2) // wait for cache to do it's work CheckPodEvents(t, AllEvents) } @@ -855,7 +897,7 @@ func GrpcExecPodInfoInOrderAfter[EXEC notify.Message, EXIT notify.Message](t *te AllEvents = nil option.Config.EnableK8s = true // enable Kubernetes fakeWatcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) + cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) defer func() { cancel() cancelWg.Wait() @@ -886,7 +928,7 @@ func GrpcExecPodInfoInOrderAfter[EXEC notify.Message, EXIT notify.Message](t *te AllEvents = append(AllEvents, e) } - time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work + dn.WaitNotifier(2) // wait for cache to do it's work CheckPodEvents(t, AllEvents) } @@ -903,7 +945,7 @@ func GrpcExecPodInfoOutOfOrderAfter[EXEC notify.Message, EXIT notify.Message](t AllEvents = nil option.Config.EnableK8s = true // enable Kubernetes fakeWatcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) + cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) defer func() { cancel() cancelWg.Wait() @@ -933,7 +975,7 @@ func GrpcExecPodInfoOutOfOrderAfter[EXEC notify.Message, EXIT notify.Message](t AllEvents = append(AllEvents, e) } - time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work + dn.WaitNotifier(2) // wait for cache to do it's work CheckPodEvents(t, AllEvents) } @@ -948,7 +990,7 @@ func GrpcExecPodInfoDelayedOutOfOrder[EXEC notify.Message, EXIT notify.Message]( AllEvents = nil option.Config.EnableK8s = true // enable Kubernetes fakeWatcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) + cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) defer func() { cancel() cancelWg.Wait() @@ -979,11 +1021,13 @@ func GrpcExecPodInfoDelayedOutOfOrder[EXEC notify.Message, EXIT notify.Message]( time.Sleep(time.Millisecond * (5 * CacheTimerMs)) // wait for cache to do it's work (but less than eventcache.CacheStrikes iterations) - assert.Equal(t, len(AllEvents), 0) // here we should still not have any events as we don't have the podinfo yet + if !assert.Equal(t, 0, len(AllEvents)) { // here we should still not have any events as we don't have the podinfo yet + t.FailNow() + } fakeWatcher.AddPod(dummyPod) // setup some dummy pod to return - time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work + dn.WaitNotifier(2) // wait for cache to do it's work CheckPodEvents(t, AllEvents) } @@ -997,7 +1041,7 @@ func GrpcExecPodInfoDelayedInOrder[EXEC notify.Message, EXIT notify.Message](t * AllEvents = nil option.Config.EnableK8s = true // enable Kubernetes fakeWatcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) + cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) defer func() { cancel() cancelWg.Wait() @@ -1028,11 +1072,13 @@ func GrpcExecPodInfoDelayedInOrder[EXEC notify.Message, EXIT notify.Message](t * time.Sleep(time.Millisecond * (5 * CacheTimerMs)) // wait for cache to do it's work (but less than eventcache.CacheStrikes iterations) - assert.Equal(t, len(AllEvents), 0) // here we should still not have any events as we don't have the podinfo yet + if !assert.Equal(t, 0, len(AllEvents)) { // here we should still not have any events as we don't have the podinfo yet + t.FailNow() + } fakeWatcher.AddPod(dummyPod) // setup some dummy pod to return - time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work + dn.WaitNotifier(2) // wait for cache to do it's work CheckPodEvents(t, AllEvents) } @@ -1045,7 +1091,7 @@ func GrpcDelayedExecK8sOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *t AllEvents = nil option.Config.EnableK8s = true // enable Kubernetes fakeWatcher := watcher.NewFakeK8sWatcher(nil) - cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) + cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher) defer func() { cancel() cancelWg.Wait() @@ -1073,13 +1119,15 @@ func GrpcDelayedExecK8sOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *t fakeWatcher.AddPod(dummyPod) // setup some dummy pod to return time.Sleep(time.Millisecond * (5 * CacheTimerMs)) // wait for cache to do it's work (but less than eventcache.CacheStrikes iterations) - assert.Equal(t, len(AllEvents), 0) // here we should still not have any events as we don't have the podinfo yet + if !assert.Equal(t, len(AllEvents), 0) { // here we should still not have any events as we don't have the podinfo yet + t.FailNow() + } if e := (*execMsg).HandleMessage(); e != nil { AllEvents = append(AllEvents, e) } - time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work + dn.WaitNotifier(2) // wait for cache to do it's work CheckPodEvents(t, AllEvents) }