From c639b842e69b6b7652ef8681740b2fa786ccecfe Mon Sep 17 00:00:00 2001 From: Michi Mutsuzaki Date: Thu, 6 Jun 2024 01:31:08 +0000 Subject: [PATCH] Remove Cilium state service cache Tetragon doesn't use the service cache anymore. Signed-off-by: Michi Mutsuzaki --- pkg/cilium/monitor.go | 5 -- pkg/cilium/state.go | 3 - pkg/oldhubble/cilium/client/client.go | 10 --- pkg/oldhubble/cilium/service.go | 104 -------------------------- pkg/oldhubble/cilium/state.go | 29 +------ 5 files changed, 4 insertions(+), 147 deletions(-) delete mode 100644 pkg/oldhubble/cilium/service.go diff --git a/pkg/cilium/monitor.go b/pkg/cilium/monitor.go index 503bd7df229..41409bb9ee7 100644 --- a/pkg/cilium/monitor.go +++ b/pkg/cilium/monitor.go @@ -67,8 +67,6 @@ func consumeMonitorEvents(ctx context.Context, conn net.Conn, ciliumState *ciliu dnsAdd := ciliumState.GetLogRecordNotifyChannel() ipCacheEvents := make(chan monitorAPI.AgentNotify, 100) ciliumState.StartMirroringIPCache(ipCacheEvents) - serviceEvents := make(chan monitorAPI.AgentNotify, 100) - ciliumState.StartMirroringServiceCache(serviceEvents) for { if err := pl.DecodeBinary(dec); err != nil { return err @@ -90,9 +88,6 @@ func consumeMonitorEvents(ctx context.Context, conn net.Conn, ciliumState *ciliu case monitorAPI.AgentNotifyIPCacheUpserted, monitorAPI.AgentNotifyIPCacheDeleted: ipCacheEvents <- an - case monitorAPI.AgentNotifyServiceUpserted, - monitorAPI.AgentNotifyServiceDeleted: - serviceEvents <- an } case monitorAPI.MessageTypeAccessLog: // TODO re-think the way this is being done. We are dissecting/ diff --git a/pkg/cilium/state.go b/pkg/cilium/state.go index 467019f7256..5b6e302a3c1 100644 --- a/pkg/cilium/state.go +++ b/pkg/cilium/state.go @@ -14,7 +14,6 @@ import ( "github.com/cilium/tetragon/pkg/oldhubble/cilium/client" "github.com/cilium/tetragon/pkg/oldhubble/fqdncache" "github.com/cilium/tetragon/pkg/oldhubble/ipcache" - "github.com/cilium/tetragon/pkg/oldhubble/servicecache" ) var ( @@ -43,7 +42,6 @@ func InitCiliumState(ctx context.Context, enableCiliumAPI bool) (*cilium.State, v1.NewEndpoints(), ipcache.New(), fqdncache.New(), - servicecache.New(), logger.GetLogger().WithField("subsystem", "cilium")) go ciliumState.Start() go HandleMonitorSocket(ctx, ciliumState) @@ -57,7 +55,6 @@ func GetFakeCiliumState() *cilium.State { v1.NewEndpoints(), ipcache.New(), fqdncache.New(), - servicecache.New(), logger.GetLogger().WithField("subsystem", "cilium")) } diff --git a/pkg/oldhubble/cilium/client/client.go b/pkg/oldhubble/cilium/client/client.go index 61c4b34e6fc..2c18b2dac07 100644 --- a/pkg/oldhubble/cilium/client/client.go +++ b/pkg/oldhubble/cilium/client/client.go @@ -20,7 +20,6 @@ type Client interface { GetIdentity(id uint64) (*models.Identity, error) GetFqdnCache() ([]*models.DNSLookup, error) GetIPCache() ([]*models.IPListEntry, error) - GetServiceCache() ([]*models.Service, error) } // Cilium is an abstraction to communicate with the cilium-agent. @@ -84,15 +83,6 @@ func (c *Cilium) GetIPCache() ([]*models.IPListEntry, error) { return ips.Payload, nil } -// GetServiceCache retrieves the contents of the Cilium service cache. -func (c *Cilium) GetServiceCache() ([]*models.Service, error) { - svcs, err := c.Client.Service.GetService(nil) - if err != nil { - return nil, err - } - return svcs.Payload, nil -} - // IsIPCacheNotFoundErr is true if the IPCache fetch error was a 404 func IsIPCacheNotFoundErr(err error) bool { _, ok := err.(*ciliumPolicy.GetIPNotFound) diff --git a/pkg/oldhubble/cilium/service.go b/pkg/oldhubble/cilium/service.go deleted file mode 100644 index 22789665e6e..00000000000 --- a/pkg/oldhubble/cilium/service.go +++ /dev/null @@ -1,104 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Authors of Hubble - -package cilium - -import ( - "encoding/json" - "time" - - monitorAPI "github.com/cilium/cilium/pkg/monitor/api" - "github.com/sirupsen/logrus" -) - -const ( - serviceCacheInitRetryInterval = 5 * time.Second - serviceCacheRefreshInterval = 5 * time.Minute -) - -// fetchServiceCache fetches the service cache from cilium and initializes the -// local service cache. -func (s *State) fetchServiceCache() error { - entries, err := s.ciliumClient.GetServiceCache() - if err != nil { - return err - } - if err := s.serviceCache.InitializeFrom(entries); err != nil { - return err - } - s.log.WithField("entries", len(entries)).Debug("Fetched service cache from cilium") - return nil -} - -// processServiceEvent decodes and applies a service update. It returns true -// when successful. -func (s *State) processServiceEvent(an monitorAPI.AgentNotify) bool { - switch an.Type { - case monitorAPI.AgentNotifyServiceUpserted: - n := monitorAPI.ServiceUpsertNotification{} - if err := json.Unmarshal([]byte(an.Text), &n); err != nil { - s.log.WithFields(logrus.Fields{ - "type": int(an.Type), - "ServiceUpsertNotification": an.Text, - }).Error("Unable to unmarshal service upsert notification") - return false - } - return s.serviceCache.Upsert(int64(n.ID), n.Name, n.Namespace, n.Frontend.IP, n.Frontend.Port) - case monitorAPI.AgentNotifyServiceDeleted: - n := monitorAPI.ServiceDeleteNotification{} - if err := json.Unmarshal([]byte(an.Text), &n); err != nil { - s.log.WithFields(logrus.Fields{ - "type": int(an.Type), - "ServiceDeleteNotification": an.Text, - }).Error("Unable to unmarshal service delete notification") - return false - } - return s.serviceCache.DeleteByID(int64(n.ID)) - default: - s.log.WithField("type", int(an.Type)).Warn("Received unknown service notification type") - return false - } -} - -func (s *State) syncServiceCache(serviceEvents <-chan monitorAPI.AgentNotify) { - for err := s.fetchServiceCache(); err != nil; err = s.fetchServiceCache() { - s.log.WithError(err).Error("Failed to fetch service cache from Cilium") - time.Sleep(serviceCacheInitRetryInterval) - } - - refresh := time.NewTimer(serviceCacheInitRetryInterval) - inSync := false - - for serviceEvents != nil { - select { - case <-refresh.C: - if err := s.fetchServiceCache(); err != nil { - s.log.WithError(err).Error("Failed to fetch service cache from Cilium") - } - refresh.Reset(serviceCacheInitRetryInterval) - case an, ok := <-serviceEvents: - if !ok { - return - } - // Initially we might see stale updates that were enqued before we - // initialized the service cache. - // Once we see the first applicable update though, all subsequent - // updates must be applicable as well. - updated := s.processServiceEvent(an) - switch { - case !updated && !inSync: - s.log.WithFields(logrus.Fields{ - "type": int(an.Type), - "AgentNotification": an.Text, - }).Debug("Received stale service update") - case !updated && inSync: - s.log.WithFields(logrus.Fields{ - "type": int(an.Type), - "AgentNotification": an.Text, - }).Warn("Received unapplicable service update") - case updated && !inSync: - inSync = true - } - } - } -} diff --git a/pkg/oldhubble/cilium/state.go b/pkg/oldhubble/cilium/state.go index a8f2c1a4cfe..d3f94e98ff2 100644 --- a/pkg/oldhubble/cilium/state.go +++ b/pkg/oldhubble/cilium/state.go @@ -9,7 +9,6 @@ import ( v1 "github.com/cilium/tetragon/pkg/oldhubble/api/v1" "github.com/cilium/tetragon/pkg/oldhubble/cilium/client" "github.com/cilium/tetragon/pkg/oldhubble/ipcache" - "github.com/cilium/tetragon/pkg/oldhubble/servicecache" "github.com/sirupsen/logrus" ) @@ -30,9 +29,6 @@ type State struct { // ipcache is a mirror of Cilium's IPCache ipcache *ipcache.IPCache - // serviceCache is a cache that contains information about services. - serviceCache *servicecache.ServiceCache - // logRecord is a channel used to exchange L7 DNS requests seens from the // monitor logRecord chan monitor.LogRecordNotify @@ -48,14 +44,13 @@ func NewCiliumState( endpoints v1.EndpointsHandler, ipCache *ipcache.IPCache, fqdnCache FqdnCache, - serviceCache *servicecache.ServiceCache, logger *logrus.Entry, ) *State { return &State{ - ciliumClient: ciliumClient, - endpoints: endpoints, - ipcache: ipCache, - fqdnCache: fqdnCache, serviceCache: serviceCache, + ciliumClient: ciliumClient, + endpoints: endpoints, + ipcache: ipCache, + fqdnCache: fqdnCache, logRecord: make(chan monitor.LogRecordNotify, 100), endpointEvents: make(chan monitorAPI.AgentNotify, 100), log: logger, @@ -82,17 +77,6 @@ func (s *State) StartMirroringIPCache(ipCacheEvents <-chan monitorAPI.AgentNotif go s.syncIPCache(ipCacheEvents) } -// StartMirroringServiceCache initially caches service information from Cilium -// and then starts to mirror service information based on events that are sent -// to the serviceEvents channel. Only messages of type -// `AgentNotifyServiceUpserted` and `AgentNotifyServiceDeleted` should be sent -// to this channel. This function assumes that the caller is already connected -// to Cilium Monitor, i.e. no Service notification must be lost after calling -// this method. -func (s *State) StartMirroringServiceCache(serviceEvents <-chan monitorAPI.AgentNotify) { - go s.syncServiceCache(serviceEvents) -} - // GetLogRecordNotifyChannel returns the event channel to receive // monitorAPI.LogRecordNotify events. func (s *State) GetLogRecordNotifyChannel() chan<- monitor.LogRecordNotify { @@ -125,8 +109,3 @@ func (s *State) GetFQDNCache() FqdnCache { func (s *State) GetIPCache() *ipcache.IPCache { return s.ipcache } - -// GetServiceCache returns serviceCache. -func (s *State) GetServiceCache() *servicecache.ServiceCache { - return s.serviceCache -}