diff --git a/pkg/cilium/monitor.go b/pkg/cilium/monitor.go index 503bd7df229..41409bb9ee7 100644 --- a/pkg/cilium/monitor.go +++ b/pkg/cilium/monitor.go @@ -67,8 +67,6 @@ func consumeMonitorEvents(ctx context.Context, conn net.Conn, ciliumState *ciliu dnsAdd := ciliumState.GetLogRecordNotifyChannel() ipCacheEvents := make(chan monitorAPI.AgentNotify, 100) ciliumState.StartMirroringIPCache(ipCacheEvents) - serviceEvents := make(chan monitorAPI.AgentNotify, 100) - ciliumState.StartMirroringServiceCache(serviceEvents) for { if err := pl.DecodeBinary(dec); err != nil { return err @@ -90,9 +88,6 @@ func consumeMonitorEvents(ctx context.Context, conn net.Conn, ciliumState *ciliu case monitorAPI.AgentNotifyIPCacheUpserted, monitorAPI.AgentNotifyIPCacheDeleted: ipCacheEvents <- an - case monitorAPI.AgentNotifyServiceUpserted, - monitorAPI.AgentNotifyServiceDeleted: - serviceEvents <- an } case monitorAPI.MessageTypeAccessLog: // TODO re-think the way this is being done. We are dissecting/ diff --git a/pkg/cilium/state.go b/pkg/cilium/state.go index 467019f7256..5b6e302a3c1 100644 --- a/pkg/cilium/state.go +++ b/pkg/cilium/state.go @@ -14,7 +14,6 @@ import ( "github.com/cilium/tetragon/pkg/oldhubble/cilium/client" "github.com/cilium/tetragon/pkg/oldhubble/fqdncache" "github.com/cilium/tetragon/pkg/oldhubble/ipcache" - "github.com/cilium/tetragon/pkg/oldhubble/servicecache" ) var ( @@ -43,7 +42,6 @@ func InitCiliumState(ctx context.Context, enableCiliumAPI bool) (*cilium.State, v1.NewEndpoints(), ipcache.New(), fqdncache.New(), - servicecache.New(), logger.GetLogger().WithField("subsystem", "cilium")) go ciliumState.Start() go HandleMonitorSocket(ctx, ciliumState) @@ -57,7 +55,6 @@ func GetFakeCiliumState() *cilium.State { v1.NewEndpoints(), ipcache.New(), fqdncache.New(), - servicecache.New(), logger.GetLogger().WithField("subsystem", "cilium")) } diff --git a/pkg/oldhubble/cilium/client/client.go b/pkg/oldhubble/cilium/client/client.go index 61c4b34e6fc..2c18b2dac07 100644 --- a/pkg/oldhubble/cilium/client/client.go +++ b/pkg/oldhubble/cilium/client/client.go @@ -20,7 +20,6 @@ type Client interface { GetIdentity(id uint64) (*models.Identity, error) GetFqdnCache() ([]*models.DNSLookup, error) GetIPCache() ([]*models.IPListEntry, error) - GetServiceCache() ([]*models.Service, error) } // Cilium is an abstraction to communicate with the cilium-agent. @@ -84,15 +83,6 @@ func (c *Cilium) GetIPCache() ([]*models.IPListEntry, error) { return ips.Payload, nil } -// GetServiceCache retrieves the contents of the Cilium service cache. -func (c *Cilium) GetServiceCache() ([]*models.Service, error) { - svcs, err := c.Client.Service.GetService(nil) - if err != nil { - return nil, err - } - return svcs.Payload, nil -} - // IsIPCacheNotFoundErr is true if the IPCache fetch error was a 404 func IsIPCacheNotFoundErr(err error) bool { _, ok := err.(*ciliumPolicy.GetIPNotFound) diff --git a/pkg/oldhubble/cilium/service.go b/pkg/oldhubble/cilium/service.go deleted file mode 100644 index 22789665e6e..00000000000 --- a/pkg/oldhubble/cilium/service.go +++ /dev/null @@ -1,104 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Authors of Hubble - -package cilium - -import ( - "encoding/json" - "time" - - monitorAPI "github.com/cilium/cilium/pkg/monitor/api" - "github.com/sirupsen/logrus" -) - -const ( - serviceCacheInitRetryInterval = 5 * time.Second - serviceCacheRefreshInterval = 5 * time.Minute -) - -// fetchServiceCache fetches the service cache from cilium and initializes the -// local service cache. -func (s *State) fetchServiceCache() error { - entries, err := s.ciliumClient.GetServiceCache() - if err != nil { - return err - } - if err := s.serviceCache.InitializeFrom(entries); err != nil { - return err - } - s.log.WithField("entries", len(entries)).Debug("Fetched service cache from cilium") - return nil -} - -// processServiceEvent decodes and applies a service update. It returns true -// when successful. -func (s *State) processServiceEvent(an monitorAPI.AgentNotify) bool { - switch an.Type { - case monitorAPI.AgentNotifyServiceUpserted: - n := monitorAPI.ServiceUpsertNotification{} - if err := json.Unmarshal([]byte(an.Text), &n); err != nil { - s.log.WithFields(logrus.Fields{ - "type": int(an.Type), - "ServiceUpsertNotification": an.Text, - }).Error("Unable to unmarshal service upsert notification") - return false - } - return s.serviceCache.Upsert(int64(n.ID), n.Name, n.Namespace, n.Frontend.IP, n.Frontend.Port) - case monitorAPI.AgentNotifyServiceDeleted: - n := monitorAPI.ServiceDeleteNotification{} - if err := json.Unmarshal([]byte(an.Text), &n); err != nil { - s.log.WithFields(logrus.Fields{ - "type": int(an.Type), - "ServiceDeleteNotification": an.Text, - }).Error("Unable to unmarshal service delete notification") - return false - } - return s.serviceCache.DeleteByID(int64(n.ID)) - default: - s.log.WithField("type", int(an.Type)).Warn("Received unknown service notification type") - return false - } -} - -func (s *State) syncServiceCache(serviceEvents <-chan monitorAPI.AgentNotify) { - for err := s.fetchServiceCache(); err != nil; err = s.fetchServiceCache() { - s.log.WithError(err).Error("Failed to fetch service cache from Cilium") - time.Sleep(serviceCacheInitRetryInterval) - } - - refresh := time.NewTimer(serviceCacheInitRetryInterval) - inSync := false - - for serviceEvents != nil { - select { - case <-refresh.C: - if err := s.fetchServiceCache(); err != nil { - s.log.WithError(err).Error("Failed to fetch service cache from Cilium") - } - refresh.Reset(serviceCacheInitRetryInterval) - case an, ok := <-serviceEvents: - if !ok { - return - } - // Initially we might see stale updates that were enqued before we - // initialized the service cache. - // Once we see the first applicable update though, all subsequent - // updates must be applicable as well. - updated := s.processServiceEvent(an) - switch { - case !updated && !inSync: - s.log.WithFields(logrus.Fields{ - "type": int(an.Type), - "AgentNotification": an.Text, - }).Debug("Received stale service update") - case !updated && inSync: - s.log.WithFields(logrus.Fields{ - "type": int(an.Type), - "AgentNotification": an.Text, - }).Warn("Received unapplicable service update") - case updated && !inSync: - inSync = true - } - } - } -} diff --git a/pkg/oldhubble/cilium/state.go b/pkg/oldhubble/cilium/state.go index a8f2c1a4cfe..d3f94e98ff2 100644 --- a/pkg/oldhubble/cilium/state.go +++ b/pkg/oldhubble/cilium/state.go @@ -9,7 +9,6 @@ import ( v1 "github.com/cilium/tetragon/pkg/oldhubble/api/v1" "github.com/cilium/tetragon/pkg/oldhubble/cilium/client" "github.com/cilium/tetragon/pkg/oldhubble/ipcache" - "github.com/cilium/tetragon/pkg/oldhubble/servicecache" "github.com/sirupsen/logrus" ) @@ -30,9 +29,6 @@ type State struct { // ipcache is a mirror of Cilium's IPCache ipcache *ipcache.IPCache - // serviceCache is a cache that contains information about services. - serviceCache *servicecache.ServiceCache - // logRecord is a channel used to exchange L7 DNS requests seens from the // monitor logRecord chan monitor.LogRecordNotify @@ -48,14 +44,13 @@ func NewCiliumState( endpoints v1.EndpointsHandler, ipCache *ipcache.IPCache, fqdnCache FqdnCache, - serviceCache *servicecache.ServiceCache, logger *logrus.Entry, ) *State { return &State{ - ciliumClient: ciliumClient, - endpoints: endpoints, - ipcache: ipCache, - fqdnCache: fqdnCache, serviceCache: serviceCache, + ciliumClient: ciliumClient, + endpoints: endpoints, + ipcache: ipCache, + fqdnCache: fqdnCache, logRecord: make(chan monitor.LogRecordNotify, 100), endpointEvents: make(chan monitorAPI.AgentNotify, 100), log: logger, @@ -82,17 +77,6 @@ func (s *State) StartMirroringIPCache(ipCacheEvents <-chan monitorAPI.AgentNotif go s.syncIPCache(ipCacheEvents) } -// StartMirroringServiceCache initially caches service information from Cilium -// and then starts to mirror service information based on events that are sent -// to the serviceEvents channel. Only messages of type -// `AgentNotifyServiceUpserted` and `AgentNotifyServiceDeleted` should be sent -// to this channel. This function assumes that the caller is already connected -// to Cilium Monitor, i.e. no Service notification must be lost after calling -// this method. -func (s *State) StartMirroringServiceCache(serviceEvents <-chan monitorAPI.AgentNotify) { - go s.syncServiceCache(serviceEvents) -} - // GetLogRecordNotifyChannel returns the event channel to receive // monitorAPI.LogRecordNotify events. func (s *State) GetLogRecordNotifyChannel() chan<- monitor.LogRecordNotify { @@ -125,8 +109,3 @@ func (s *State) GetFQDNCache() FqdnCache { func (s *State) GetIPCache() *ipcache.IPCache { return s.ipcache } - -// GetServiceCache returns serviceCache. -func (s *State) GetServiceCache() *servicecache.ServiceCache { - return s.serviceCache -} diff --git a/pkg/oldhubble/parser/getters/getters.go b/pkg/oldhubble/parser/getters/getters.go index 151b4900021..94969d7dced 100644 --- a/pkg/oldhubble/parser/getters/getters.go +++ b/pkg/oldhubble/parser/getters/getters.go @@ -6,7 +6,6 @@ package getters import ( "net" - pb "github.com/cilium/cilium/api/v1/flow" v1 "github.com/cilium/tetragon/pkg/oldhubble/api/v1" "github.com/cilium/tetragon/pkg/oldhubble/ipcache" @@ -37,8 +36,3 @@ type IPGetter interface { // GetIPIdentity fetches information known about a remote IP. GetIPIdentity(ip net.IP) (identity ipcache.IPIdentity, ok bool) } - -// ServiceGetter fetches service metadata. -type ServiceGetter interface { - GetServiceByAddr(ip net.IP, port uint16) (service pb.Service, ok bool) -} diff --git a/pkg/oldhubble/servicecache/service_cache.go b/pkg/oldhubble/servicecache/service_cache.go deleted file mode 100644 index 21e961dedb0..00000000000 --- a/pkg/oldhubble/servicecache/service_cache.go +++ /dev/null @@ -1,148 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Authors of Hubble - -package servicecache - -import ( - "fmt" - "net" - "strconv" - "sync" - - pb "github.com/cilium/cilium/api/v1/flow" - - "github.com/cilium/cilium/api/v1/models" -) - -// ServiceCache is a cache of existing services. -type ServiceCache struct { - mu sync.RWMutex - cache map[string]*entry -} - -// New creates a new empty ServiceCache. -func New() *ServiceCache { - return &ServiceCache{ - cache: map[string]*entry{}, - } -} - -type entry struct { - ID int64 - Name string - Namespace string - FrontendIP net.IP - FrontendPort uint16 -} - -// GetServiceByAddr retrieves a service from the cache given its frontend IP -// and port. If the service was found in the cache, ok is true. -func (svcc *ServiceCache) GetServiceByAddr(ip net.IP, port uint16) (service pb.Service, ok bool) { - svcc.mu.RLock() - defer svcc.mu.RUnlock() - - if e, ok := svcc.cache[genAddrKey(ip, port)]; ok { - return pb.Service{ - Name: e.Name, - Namespace: e.Namespace, - }, true - } - return pb.Service{}, false -} - -// InitializeFrom initializes the cache with the given list of services. -func (svcc *ServiceCache) InitializeFrom(entries []*models.Service) error { - cache := map[string]*entry{} - for _, e := range entries { - if e == nil || e.Spec == nil || e.Spec.FrontendAddress == nil || e.Spec.Flags == nil { - return fmt.Errorf("received invalid service entry from cilium: %+v", e) - } - frontendIP := net.ParseIP(e.Spec.FrontendAddress.IP) - if frontendIP == nil { - return fmt.Errorf("received service entry with invalid address: %s", e.Spec.FrontendAddress.IP) - } - frontendPort := e.Spec.FrontendAddress.Port - ce := &entry{ - ID: e.Spec.ID, - Name: e.Spec.Flags.Name, - Namespace: e.Spec.Flags.Namespace, - FrontendIP: frontendIP, - FrontendPort: frontendPort, - } - cache[genAddrKey(frontendIP, frontendPort)] = ce - cache[genIDKey(e.Spec.ID)] = ce - } - svcc.mu.Lock() - svcc.cache = cache - svcc.mu.Unlock() - return nil -} - -// Upsert updates or inserts a cache entry and returns true if the update was -// performed. -func (svcc *ServiceCache) Upsert(id int64, name, ns string, frontendIP net.IP, frontendPort uint16) bool { - svcc.mu.Lock() - defer svcc.mu.Unlock() - - idKey := genIDKey(id) - // the ID of a service should never change and should be unique - // thus, it acts as the reference key when unsure which entry to update - if old, exist := svcc.cache[idKey]; exist { - // make sure to remove the old addr reference - delete(svcc.cache, genAddrKey(old.FrontendIP, old.FrontendPort)) - } - e := &entry{ - ID: id, - Name: name, - Namespace: ns, - FrontendIP: frontendIP, - FrontendPort: frontendPort, - } - svcc.cache[genAddrKey(frontendIP, frontendPort)] = e - svcc.cache[idKey] = e - return true -} - -// DeleteByID removes the cache entry identified by the given id. It returns -// true if an entry was deleted. -func (svcc *ServiceCache) DeleteByID(id int64) bool { - svcc.mu.Lock() - defer svcc.mu.Unlock() - - idKey := genIDKey(id) - e, found := svcc.cache[idKey] - if found { - delete(svcc.cache, idKey) - delete(svcc.cache, genAddrKey(e.FrontendIP, e.FrontendPort)) - } - return found -} - -// DeleteByAddr removes the cache entry identified by the given service -// frontend ip and port. It returns true if an entry was deleted. -func (svcc *ServiceCache) DeleteByAddr(ip net.IP, port uint16) bool { - svcc.mu.Lock() - defer svcc.mu.Unlock() - - addrKey := genAddrKey(ip, port) - e, found := svcc.cache[addrKey] - if found { - delete(svcc.cache, addrKey) - delete(svcc.cache, genIDKey(e.ID)) - } - return found -} - -// genAddrKey generates an address key in the form addr:. -func genAddrKey(ip net.IP, port uint16) string { - var ipStr string - if len(ip) > 0 { // an empty IP is usually represented as , we prefer having an empty string - ipStr = ip.String() - } - return fmt.Sprintf("addr:%s", net.JoinHostPort(ipStr, strconv.FormatUint(uint64(port), 10))) -} - -// genIDKey generates an id key in the form id:. -func genIDKey(id int64) string { - return fmt.Sprintf("id:%d", id) -}