From 4a80363177a36a5e087143fe6f53a878a8f63295 Mon Sep 17 00:00:00 2001 From: Hongliang Liu <75655411+hongliangl@users.noreply.github.com> Date: Thu, 16 Mar 2023 14:46:28 +0800 Subject: [PATCH] Keep the cached Endpoints the same as those installed in OVS (#4691) The main purpose of this PR is to avoid potential inconsistencies between the cached Endpoints and those installed in OVS, like #4681, #4692. This PR also updates: - Method UninstallEndpointFlows of ofClient, support deleting flows of multiple Endpoints. - Remove possible groups when a Service is deleted. - Log something when a group for a Service is not created. - Optimize and unify log. Signed-off-by: Hongliang Liu --- pkg/agent/openflow/client.go | 57 +++-- pkg/agent/openflow/client_test.go | 4 +- pkg/agent/openflow/testing/mock_openflow.go | 2 +- pkg/agent/proxy/proxier.go | 250 ++++++++++++-------- pkg/agent/proxy/proxier_test.go | 49 +++- test/integration/agent/openflow_test.go | 5 +- 6 files changed, 242 insertions(+), 125 deletions(-) diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 16033a3dc98..092c12d55e5 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -95,7 +95,7 @@ type Client interface { InstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error // UninstallEndpointFlows removes flows of the Endpoint installed by // InstallEndpointFlows. - UninstallEndpointFlows(protocol binding.Protocol, endpoint proxy.Endpoint) error + UninstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error // InstallServiceFlows installs flows for accessing Service NodePort, LoadBalancer and ClusterIP. It installs the // flow that uses the group/bucket to do service LB. If the affinityTimeout is not zero, it also installs the flow @@ -452,21 +452,36 @@ func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flow // deleteFlows deletes all the flows in the flow cache indexed by the provided flowCacheKey. func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) error { - fCacheI, ok := cache.Load(flowCacheKey) - if !ok { - // no matching flows found in the cache - return nil + return c.deleteFlowsWithMultipleKeys(cache, []string{flowCacheKey}) +} + +// deleteFlowsWithMultipleKeys uninstalls the flows with different flowCache keys and remove them from the cache on success. +// It will skip the keys which are not in the cache. All flows will be uninstalled via a bundle. +func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []string) error { + // allFlows keeps the flows we will delete via a bundle. + var allFlows []binding.Flow + for _, key := range keys { + flows, ok := cache.Load(key) + // If a flow cache entry of the key does not exist, skip it. + if !ok { + klog.V(2).InfoS("Cached flow with provided key was not found", "key", key) + continue + } + for _, flow := range flows.(flowCache) { + allFlows = append(allFlows, flow) + } } - fCache := fCacheI.(flowCache) - // Delete flows from OVS. - delFlows := make([]binding.Flow, 0, len(fCache)) - for _, flow := range fCache { - delFlows = append(delFlows, flow) + if len(allFlows) == 0 { + return nil } - if err := c.ofEntryOperations.DeleteAll(delFlows); err != nil { + err := c.ofEntryOperations.DeleteAll(allFlows) + if err != nil { return err } - cache.Delete(flowCacheKey) + // Delete the keys and corresponding flows from the flow cache. + for _, key := range keys { + cache.Delete(key) + } return nil } @@ -684,16 +699,22 @@ func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []pro return c.addFlowsWithMultipleKeys(c.featureService.cachedFlows, keyToFlows) } -func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoint proxy.Endpoint) error { +func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - port, err := endpoint.Port() - if err != nil { - return fmt.Errorf("error when getting port: %w", err) + // keyToFlows is a map from the flows' cache key to the flows. + flowCacheKeys := make([]string, 0, len(endpoints)) + + for _, endpoint := range endpoints { + port, err := endpoint.Port() + if err != nil { + return fmt.Errorf("error when getting port: %w", err) + } + flowCacheKeys = append(flowCacheKeys, generateEndpointFlowCacheKey(endpoint.IP(), port, protocol)) } - cacheKey := generateEndpointFlowCacheKey(endpoint.IP(), port, protocol) - return c.deleteFlows(c.featureService.cachedFlows, cacheKey) + + return c.deleteFlowsWithMultipleKeys(c.featureService.cachedFlows, flowCacheKeys) } func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType, nested bool) error { diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 16d3265c52d..98d8e41ee75 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -1037,7 +1037,7 @@ func Test_client_InstallEndpointFlows(t *testing.T) { defer resetPipelines() m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) - m.EXPECT().DeleteAll(gomock.Any()).Return(nil).Times(len(tc.endpoints)) + m.EXPECT().DeleteAll(gomock.Any()).Return(nil).Times(1) assert.NoError(t, fc.InstallEndpointFlows(tc.protocol, tc.endpoints)) var flows []string @@ -1050,8 +1050,8 @@ func Test_client_InstallEndpointFlows(t *testing.T) { } assert.ElementsMatch(t, tc.expectedFlows, flows) + assert.NoError(t, fc.UninstallEndpointFlows(tc.protocol, tc.endpoints)) for _, ep := range tc.endpoints { - assert.NoError(t, fc.UninstallEndpointFlows(tc.protocol, ep)) endpointPort, _ := ep.Port() cacheKey := generateEndpointFlowCacheKey(ep.IP(), endpointPort, tc.protocol) _, ok := fc.featureService.cachedFlows.Load(cacheKey) diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index dfe41ba8398..31549ad080b 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -782,7 +782,7 @@ func (mr *MockClientMockRecorder) SubscribePacketIn(arg0, arg1 interface{}) *gom } // UninstallEndpointFlows mocks base method -func (m *MockClient) UninstallEndpointFlows(arg0 openflow.Protocol, arg1 proxy.Endpoint) error { +func (m *MockClient) UninstallEndpointFlows(arg0 openflow.Protocol, arg1 []proxy.Endpoint) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UninstallEndpointFlows", arg0, arg1) ret0, _ := ret[0].(error) diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index edc141d0668..cf809f77c3d 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -136,65 +136,66 @@ func (p *proxier) isInitialized() bool { return p.endpointsChanges.Synced() && p.serviceChanges.Synced() } -// removeStaleServices removes all expired Services. Once a Service is deleted, all -// its Endpoints will be expired, and the removeStaleEndpoints method takes -// responsibility for cleaning up, thus we don't need to call removeEndpoint in this -// function. +// removeStaleServices removes all the configurations of expired Services and their associated Endpoints. func (p *proxier) removeStaleServices() { for svcPortName, svcPort := range p.serviceInstalledMap { if _, ok := p.serviceMap[svcPortName]; ok { continue } svcInfo := svcPort.(*types.ServiceInfo) - klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String()) + svcInfoStr := svcInfo.String() + klog.V(2).InfoS("Removing stale Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } - - if p.proxyAll { - // Remove NodePort flows and configurations. - if svcInfo.NodePort() > 0 { - if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName) - continue - } + // Remove associated Endpoints flows. + if endpoints, ok := p.endpointsInstalledMap[svcPortName]; ok { + if err := p.removeStaleEndpoints(endpoints, svcInfo.Protocol()); err != nil { + klog.ErrorS(err, "Error when removing Endpoints flows for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) + continue + } + delete(p.endpointsInstalledMap, svcPortName) + } + // Remove NodePort flows and configurations. + if p.proxyAll && svcInfo.NodePort() > 0 { + if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { + klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) + continue } } // Remove LoadBalancer flows and configurations. if p.proxyLoadBalancerIPs && len(svcInfo.LoadBalancerIPStrings()) > 0 { if err := p.uninstallLoadBalancerService(svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } } - // Remove Service group whose Endpoints are local. - if svcInfo.ExternalPolicyLocal() { - if groupIDLocal, exist := p.groupCounter.Get(svcPortName, true); exist { - if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil { - klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName) - continue - } - p.groupCounter.Recycle(svcPortName, true) + // Remove Service group which has only local Endpoints. + if groupID, exist := p.groupCounter.Get(svcPortName, true); exist { + if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { + klog.ErrorS(err, "Error when uninstalling group of local Endpoints for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) + continue } + p.groupCounter.Recycle(svcPortName, true) } // Remove Service group which has all Endpoints. if groupID, exist := p.groupCounter.Get(svcPortName, false); exist { if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { - klog.ErrorS(err, "Failed to remove Group of all Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling group of all Endpoints for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } p.groupCounter.Recycle(svcPortName, false) } delete(p.serviceInstalledMap, svcPortName) - p.deleteServiceByIP(svcInfo.String()) + p.deleteServiceByIP(svcInfoStr) } } -func getBindingProtoForIPProto(endpointIP string, protocol corev1.Protocol) binding.Protocol { +func getBindingProtoForIPProto(isIPv6 bool, protocol corev1.Protocol) binding.Protocol { var bindingProtocol binding.Protocol - if utilnet.IsIPv6String(endpointIP) { + if isIPv6 { bindingProtocol = binding.ProtocolTCPv6 if protocol == corev1.ProtocolUDP { bindingProtocol = binding.ProtocolUDPv6 @@ -212,47 +213,47 @@ func getBindingProtoForIPProto(endpointIP string, protocol corev1.Protocol) bind return bindingProtocol } -// removeEndpoint removes flows for the given Endpoint from the data path if these flows are no longer +// removeStaleEndpoints removes flows for the given Endpoints from the data path if these flows are no longer // needed by any Service. Endpoints from different Services can have the same characteristics and thus -// can share the same flows. removeEndpoint must be called whenever an Endpoint is no longer used by a -// given Service. If the Endpoint is still referenced by any other Services, no flow will be removed. +// can share the same flows. removeStaleEndpoints must be called whenever Endpoints are no longer used by a +// given Service. If the Endpoints are still referenced by any other Services, no flow will be removed. // The method only returns an error if a data path operation fails. If the flows are successfully -// removed from the data path, the method returns true. Otherwise, if the flows are still needed for -// other Services, it returns false. -func (p *proxier) removeEndpoint(endpoint k8sproxy.Endpoint, protocol binding.Protocol) (bool, error) { - key := endpointKey(endpoint, protocol) - count := p.endpointReferenceCounter[key] - if count == 1 { - if err := p.ofClient.UninstallEndpointFlows(protocol, endpoint); err != nil { - return false, err +// removed from the data path, the method returns nil. +func (p *proxier) removeStaleEndpoints(staleEndpoints map[string]k8sproxy.Endpoint, ipProtocol corev1.Protocol) error { + var endpointsToRemove []k8sproxy.Endpoint + bindingProtocol := getBindingProtoForIPProto(p.isIPv6, ipProtocol) + + // Get all Endpoints whose reference counter is 1, and these Endpoints should be removed. + for _, endpoint := range staleEndpoints { + key := endpointKey(endpoint, bindingProtocol) + count := p.endpointReferenceCounter[key] + if count == 1 { + endpointsToRemove = append(endpointsToRemove, endpoint) + klog.V(2).InfoS("Endpoint will be removed", "Endpoint", endpoint.String(), "Protocol", bindingProtocol) } - delete(p.endpointReferenceCounter, key) - klog.V(2).Infof("Endpoint %s/%s removed", endpoint.String(), protocol) - } else if count > 1 { - p.endpointReferenceCounter[key] = count - 1 - klog.V(2).Infof("Stale Endpoint %s/%s is still referenced by other Services, decrementing reference count by 1", endpoint.String(), protocol) - return false, nil } - return true, nil -} -// removeStaleEndpoints compares Endpoints we installed with Endpoints we expected. All installed but unexpected Endpoints -// will be deleted by using removeEndpoint. -func (p *proxier) removeStaleEndpoints() { - for svcPortName, installedEps := range p.endpointsInstalledMap { - for installedEpName, installedEp := range installedEps { - if _, ok := p.endpointsMap[svcPortName][installedEpName]; !ok { - if _, err := p.removeEndpoint(installedEp, getBindingProtoForIPProto(installedEp.IP(), svcPortName.Protocol)); err != nil { - klog.Errorf("Error when removing Endpoint %v for %v", installedEp, svcPortName) - continue - } - delete(installedEps, installedEpName) - } + // Remove flows for these Endpoints. + if len(endpointsToRemove) != 0 { + if err := p.ofClient.UninstallEndpointFlows(bindingProtocol, endpointsToRemove); err != nil { + return err } - if len(installedEps) == 0 { - delete(p.endpointsInstalledMap, svcPortName) + } + + // Update the reference counter of Endpoints. + for _, endpoint := range staleEndpoints { + key := endpointKey(endpoint, bindingProtocol) + count := p.endpointReferenceCounter[key] + if count == 1 { + delete(p.endpointReferenceCounter, key) + klog.V(2).InfoS("Endpoint was removed", "Endpoint", endpoint.String(), "Protocol", bindingProtocol) + } else { + p.endpointReferenceCounter[key] = count - 1 + klog.V(2).InfoS("Stale Endpoint is still referenced by other Services, decrementing reference count by 1", "Endpoint", endpoint.String(), "Protocol", bindingProtocol) } } + + return nil } func serviceIdentityChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool { @@ -290,10 +291,10 @@ func (p *proxier) installNodePortService(groupID binding.GroupIDType, svcPort ui svcIP = agentconfig.VirtualNodePortDNATIPv6 } if err := p.ofClient.InstallServiceFlows(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeNodePort, false); err != nil { - return fmt.Errorf("failed to install Service NodePort load balancing flows: %w", err) + return fmt.Errorf("failed to install NodePort load balancing flows: %w", err) } if err := p.routeClient.AddNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { - return fmt.Errorf("failed to install Service NodePort traffic redirecting flows: %w", err) + return fmt.Errorf("failed to install NodePort traffic redirecting rules: %w", err) } return nil } @@ -304,10 +305,10 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot svcIP = agentconfig.VirtualNodePortDNATIPv6 } if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove Service NodePort NodePort load balancing flows: %w", err) + return fmt.Errorf("failed to remove NodePort load balancing flows: %w", err) } if err := p.routeClient.DeleteNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove Service NodePort traffic redirecting flows: %w", err) + return fmt.Errorf("failed to remove NodePort traffic redirecting rules: %w", err) } return nil } @@ -316,13 +317,13 @@ func (p *proxier) installLoadBalancerService(groupID binding.GroupIDType, loadBa for _, ingress := range loadBalancerIPStrings { if ingress != "" { if err := p.ofClient.InstallServiceFlows(groupID, net.ParseIP(ingress), svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer, false); err != nil { - return fmt.Errorf("failed to install Service LoadBalancer load balancing flows: %w", err) + return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err) } } } if p.proxyAll { if err := p.routeClient.AddLoadBalancer(loadBalancerIPStrings); err != nil { - return fmt.Errorf("failed to install Service LoadBalancer traffic redirecting flows: %w", err) + return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) } } @@ -333,13 +334,13 @@ func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, s for _, ingress := range loadBalancerIPStrings { if ingress != "" { if err := p.ofClient.UninstallServiceFlows(net.ParseIP(ingress), svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove Service LoadBalancer load balancing flows: %w", err) + return fmt.Errorf("failed to remove LoadBalancer load balancing flows: %w", err) } } } if p.proxyAll { if err := p.routeClient.DeleteLoadBalancer(loadBalancerIPStrings); err != nil { - return fmt.Errorf("failed to remove Service LoadBalancer traffic redirecting flows: %w", err) + return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) } } @@ -349,6 +350,7 @@ func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, s func (p *proxier) installServices() { for svcPortName, svcPort := range p.serviceMap { svcInfo := svcPort.(*types.ServiceInfo) + svcInfoStr := svcInfo.String() endpointsInstalled, ok := p.endpointsInstalledMap[svcPortName] if !ok { endpointsInstalled = map[string]k8sproxy.Endpoint{} @@ -409,12 +411,13 @@ func (p *proxier) installServices() { for _, endpoint := range allReachableEndpoints { if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. needUpdateEndpoints = true + klog.V(2).InfoS("At least one Endpoint of Service is not installed, updating Endpoints", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) break } } // If there are expired Endpoints, Endpoints installed should be updated. if len(allReachableEndpoints) < len(endpointsInstalled) { - klog.V(2).Infof("Some Endpoints of Service %s removed, updating Endpoints", svcInfo.String()) + klog.V(2).InfoS("Some Endpoints of Service was removed, updating Endpoints", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) needUpdateEndpoints = true } @@ -438,16 +441,16 @@ func (p *proxier) installServices() { } if pSvcInfo != nil { - klog.V(2).Infof("Updating Service %s %s", svcPortName.Name, svcInfo.String()) + klog.V(2).InfoS("Updating Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) } else { - klog.V(2).Infof("Installing Service %s %s", svcPortName.Name, svcInfo.String()) + klog.V(2).InfoS("Installing Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) } if needUpdateEndpoints { // Install Endpoints. err := p.ofClient.InstallEndpointFlows(svcInfo.OFProtocol, allReachableEndpoints) if err != nil { - klog.ErrorS(err, "Error when installing Endpoints flows") + klog.ErrorS(err, "Error when installing Endpoints flows for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } if internalPolicyLocal != externalPolicyLocal { @@ -457,19 +460,19 @@ func (p *proxier) installServices() { // local Endpoints. groupID := p.groupCounter.AllocateIfNotExist(svcPortName, true) if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, localEndpoints); err != nil { - klog.ErrorS(err, "Error when installing Group of local Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when installing group of local Endpoints for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } groupID = p.groupCounter.AllocateIfNotExist(svcPortName, false) if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, clusterEndpoints); err != nil { - klog.ErrorS(err, "Error when installing Group of all Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when installing group of all Endpoints for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } } else { // If the type of the Service is ClusterIP, install a group according to internalTrafficPolicy. groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalPolicyLocal) if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, allReachableEndpoints); err != nil { - klog.ErrorS(err, "Error when installing Group of Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when installing group of Endpoints for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } } @@ -485,26 +488,40 @@ func (p *proxier) installServices() { bothPolicyLocal := internalPolicyLocal groupID := p.groupCounter.AllocateIfNotExist(svcPortName, bothPolicyLocal) if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, allReachableEndpoints); err != nil { - klog.ErrorS(err, "Error when installing Group of local Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when installing group of Endpoints for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr, "BothTrafficPolicies", bothPolicyLocal) continue } if groupID, exist := p.groupCounter.Get(svcPortName, !bothPolicyLocal); exist { if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { - klog.ErrorS(err, "Failed to uninstall Group of all Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling group of Endpoints for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr, "BothTrafficPolicies", !bothPolicyLocal) continue } p.groupCounter.Recycle(svcPortName, !bothPolicyLocal) } } - for _, e := range allReachableEndpoints { - // If the Endpoint is newly installed, add a reference. - if _, ok := endpointsInstalled[e.String()]; !ok { - key := endpointKey(e, svcInfo.OFProtocol) - p.endpointReferenceCounter[key] = p.endpointReferenceCounter[key] + 1 - endpointsInstalled[e.String()] = e + // Map endpointsInstalled stores the Endpoints actually installed in last syncProxyRules call. Slice + // allReachableEndpoints stores the Endpoints actually installed in this syncProxyRules call. We call compareEndpoints + // to get: + // - Map updatedEndpointsInstalled, stores the Endpoints actually installed in this syncProxyRules call, and + // it is used to replace the old cache endpointsInstalled. + // - Map staleEndpoints, stores the Endpoints that should be removed in this syncProxyRules call. + // - Slice newEndpoints, stores the Endpoints newly installed in this syncProxyRules call. + updatedEndpointsInstalled, staleEndpoints, newEndpoints := compareEndpoints(endpointsInstalled, allReachableEndpoints) + // Remove stale Endpoints. + if len(staleEndpoints) != 0 { + if err = p.removeStaleEndpoints(staleEndpoints, svcPortName.Protocol); err != nil { + klog.ErrorS(err, "Error when removing flows of stale Endpoints for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) + continue } } + // Cache the Endpoints actually installed this time. + p.endpointsInstalledMap[svcPortName] = updatedEndpointsInstalled + // Update reference counter of Endpoints newly install. + for _, endpoint := range newEndpoints { + key := endpointKey(endpoint, svcInfo.OFProtocol) + p.endpointReferenceCounter[key] = p.endpointReferenceCounter[key] + 1 + } } if needUpdateService { @@ -512,7 +529,7 @@ func (p *proxier) installServices() { if needRemoval { // If previous Service should be removed, remove ClusterIP flows of previous Service. if err := p.ofClient.UninstallServiceFlows(pSvcInfo.ClusterIP(), uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -520,14 +537,14 @@ func (p *proxier) installServices() { // If previous Service which has NodePort should be removed, remove NodePort flows and configurations of previous Service. if pSvcInfo.NodePort() > 0 { if err := p.uninstallNodePortService(uint16(pSvcInfo.NodePort()), pSvcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } } // If previous Service which has ClusterIP should be removed, remove ClusterIP routes. if svcInfo.ClusterIP() != nil { if err := p.routeClient.DeleteClusterIPRoute(pSvcInfo.ClusterIP()); err != nil { - klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } } @@ -542,35 +559,47 @@ func (p *proxier) installServices() { } // Install ClusterIP flows for the Service. - groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalPolicyLocal) + groupID, exists := p.groupCounter.Get(svcPortName, internalPolicyLocal) + if !exists { + klog.ErrorS(nil, "Group for Service internalTrafficPolicy was not installed", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr, "internalTrafficPolicy", internalPolicyLocal) + continue + } if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), externalPolicyLocal, corev1.ServiceTypeClusterIP, isNestedService); err != nil { klog.Errorf("Error when installing Service flows: %v", err) continue } if p.proxyAll { - nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalPolicyLocal) // Install ClusterIP route on Node so that ClusterIP can be accessed on Node. Every time a new ClusterIP // is created, the routing target IP block will be recalculated for expansion to be able to route the new // created ClusterIP. Deleting a ClusterIP will not shrink the target routing IP block. The Service CIDR // can be finally calculated after creating enough ClusterIPs. if err := p.routeClient.AddClusterIPRoute(svcInfo.ClusterIP()); err != nil { - klog.ErrorS(err, "Failed to install ClusterIP route of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when installing ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } // If previous Service is nil or NodePort flows and configurations of previous Service have been removed, // install NodePort flows and configurations for current Service. if svcInfo.NodePort() > 0 && (pSvcInfo == nil || needRemoval) { - if err := p.installNodePortService(nGroupID, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { - klog.ErrorS(err, "Failed to install NodePort flows and configurations of Service", "Service", svcPortName) + groupID, exists = p.groupCounter.Get(svcPortName, externalPolicyLocal) + if !exists { + klog.ErrorS(nil, "Group for Service externalTrafficPolicy was not installed", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr, "externalTrafficPolicy", externalPolicyLocal) + continue + } + if err := p.installNodePortService(groupID, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { + klog.ErrorS(err, "Error when installing NodePort flows and configurations of Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } } } if p.proxyLoadBalancerIPs { - nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalPolicyLocal) + groupID, exists = p.groupCounter.Get(svcPortName, externalPolicyLocal) + if !exists { + klog.ErrorS(nil, "Group for Service externalTrafficPolicy is not installed", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr, svcPortName, "externalTrafficPolicy", externalPolicyLocal) + continue + } // Service LoadBalancer flows can be partially updated. var toDelete, toAdd []string if needRemoval { @@ -583,14 +612,14 @@ func (p *proxier) installServices() { // Remove LoadBalancer flows and configurations. if len(toDelete) > 0 { if err := p.uninstallLoadBalancerService(toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } } // Install LoadBalancer flows and configurations. if len(toAdd) > 0 { - if err := p.installLoadBalancerService(nGroupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { - klog.ErrorS(err, "Failed to install LoadBalancer flows and configurations of Service", "Service", svcPortName) + if err := p.installLoadBalancerService(groupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { + klog.ErrorS(err, "Error when installing LoadBalancer flows and configurations for Service", "Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } } @@ -598,8 +627,36 @@ func (p *proxier) installServices() { } p.serviceInstalledMap[svcPortName] = svcPort - p.addServiceByIP(svcInfo.String(), svcPortName) + p.addServiceByIP(svcInfoStr, svcPortName) + } +} + +func compareEndpoints(endpointsCached map[string]k8sproxy.Endpoint, endpointsInstalled []k8sproxy.Endpoint) (map[string]k8sproxy.Endpoint, map[string]k8sproxy.Endpoint, []k8sproxy.Endpoint) { + // Map endpointsToCache is used to store the Endpoints actually installed. + endpointsToCache := map[string]k8sproxy.Endpoint{} + // Map endpointsToRemove is used to store the Endpoints that should be removed. + endpointsToRemove := map[string]k8sproxy.Endpoint{} + // Slice newEndpoints is used to store the Endpoints that are newly installed. + var newEndpoints []k8sproxy.Endpoint + + // Copy every Endpoint in endpointsCached to endpointsToRemove. After removing all actually installed Endpoints, + // only stale Endpoints are left. + for endpointString, endpoint := range endpointsCached { + endpointsToRemove[endpointString] = endpoint + } + + for _, endpoint := range endpointsInstalled { + // Add the Endpoint to map endpointsToCache since Endpoints in endpointsInstalled are actually installed Endpoints. + endpointsToCache[endpoint.String()] = endpoint + // If the Endpoint is in the map endpointsCached, then it is not newly installed, remove it from map endpointsToRemove; + // otherwise, add it to slice newEndpoints. + if _, exists := endpointsCached[endpoint.String()]; exists { + delete(endpointsToRemove, endpoint.String()) + } else { + newEndpoints = append(newEndpoints, endpoint) + } } + return endpointsToCache, endpointsToRemove, newEndpoints } // syncProxyRules applies current changes in change trackers and then updates @@ -636,7 +693,6 @@ func (p *proxier) syncProxyRules() { p.removeStaleServices() p.installServices() - p.removeStaleEndpoints() if p.serviceHealthServer != nil { if err := p.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 442c689d457..37ee96a62b1 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -918,9 +918,15 @@ func testClusterIPRemove(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) fp.syncProxyRules() + _, ok := fp.endpointsInstalledMap[svcPortName] + assert.True(t, ok) + fp.serviceChanges.OnServiceUpdate(svc, nil) fp.endpointsChanges.OnEndpointUpdate(ep, nil) fp.syncProxyRules() + + _, ok = fp.endpointsInstalledMap[svcPortName] + assert.False(t, ok) } func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, epIP net.IP, isIPv6 bool) { @@ -962,16 +968,22 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedEp).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedAllEps).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) fp.syncProxyRules() + _, ok := fp.endpointsInstalledMap[svcPortName] + assert.True(t, ok) + fp.serviceChanges.OnServiceUpdate(svc, nil) fp.endpointsChanges.OnEndpointUpdate(ep, nil) fp.syncProxyRules() + + _, ok = fp.endpointsInstalledMap[svcPortName] + assert.False(t, ok) } func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, epIP net.IP, loadBalancerIP net.IP, isIPv6 bool) { @@ -1020,7 +1032,7 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net. mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedEp).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedAllEps).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) @@ -1029,9 +1041,15 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net. mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) fp.syncProxyRules() + _, ok := fp.endpointsInstalledMap[svcPortName] + assert.True(t, ok) + fp.serviceChanges.OnServiceUpdate(svc, nil) fp.endpointsChanges.OnEndpointUpdate(ep, nil) fp.syncProxyRules() + + _, ok = fp.endpointsInstalledMap[svcPortName] + assert.False(t, ok) } func TestClusterIPRemove(t *testing.T) { @@ -1161,8 +1179,15 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) fp.syncProxyRules() + _, ok := fp.endpointsInstalledMap[svcPortName] + assert.True(t, ok) + fp.endpointsChanges.OnEndpointUpdate(ep, nil) fp.syncProxyRules() + + endpointsMap, ok := fp.endpointsInstalledMap[svcPortName] + assert.True(t, ok) + assert.Equal(t, 0, len(endpointsMap)) } func TestClusterIPRemoveEndpoints(t *testing.T) { @@ -1705,7 +1730,8 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T, makeEndpointsMap(fp, eps) expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)} - expectedAllEps := append(expectedLocalEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil)) + expectedRemoteEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil)} + expectedAllEps := append(expectedLocalEps, expectedRemoteEps...) bindingProtocol := binding.ProtocolTCP if isIPv6 { @@ -1719,12 +1745,29 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T, mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) fp.syncProxyRules() + assertEndpoints := func(t *testing.T, expectedEndpoints []k8sproxy.Endpoint, gotEndpoints map[string]k8sproxy.Endpoint) { + var endpoints []k8sproxy.Endpoint + for _, e := range gotEndpoints { + endpoints = append(endpoints, e) + } + assert.ElementsMatch(t, expectedEndpoints, endpoints) + } + + svcEndpointsMap, ok := fp.endpointsInstalledMap[svcPortName] + assert.True(t, ok) + assertEndpoints(t, expectedAllEps, svcEndpointsMap) + fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) groupIDLocal := fp.groupCounter.AllocateIfNotExist(svcPortName, true) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedLocalEps)).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedRemoteEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, expectedLocalEps).Times(1) fp.syncProxyRules() + + svcEndpointsMap, ok = fp.endpointsInstalledMap[svcPortName] + assert.True(t, ok) + assertEndpoints(t, expectedLocalEps, svcEndpointsMap) } func TestServiceInternalTrafficPolicyUpdate(t *testing.T) { diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index d614c50c347..3e4ddeca89b 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -795,10 +795,7 @@ func uninstallServiceFlowsFunc(t *testing.T, gid uint32, svc svcConfig, endpoint assert.Nil(t, err) err = c.UninstallServiceGroup(groupID) assert.Nil(t, err) - for _, ep := range endpointList { - err := c.UninstallEndpointFlows(svc.protocol, ep) - assert.Nil(t, err) - } + assert.NoError(t, c.UninstallEndpointFlows(svc.protocol, endpointList)) } func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16, antreaPolicyEnabled bool) (tableFlows []expectTableFlows, groupBuckets []string) {