diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index a683a8d84e0..2f62bc6b003 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -314,7 +314,9 @@ func (c *EgressController) Run(stopCh <-chan struct{}) { return } - c.removeStaleEgressIPs() + if err := c.replaceEgressIPs(); err != nil { + klog.ErrorS(err, "failed to replace Egress IPs") + } go wait.NonSlidingUntil(c.watchEgressGroup, 5*time.Second, stopCh) @@ -324,10 +326,10 @@ func (c *EgressController) Run(stopCh <-chan struct{}) { <-stopCh } -// removeStaleEgressIPs unassigns stale Egress IPs that shouldn't be present on this Node. -// These Egresses were either deleted from the Kubernetes API or migrated to other Nodes when the agent on this Node -// was not running. -func (c *EgressController) removeStaleEgressIPs() { +// replaceEgressIPs unassigns stale Egress IPs that shouldn't be present on this Node and assigns the missing IPs +// on this node. The unassigned IPs are from Egresses that were either deleted from the Kubernetes API or migrated +// to other Nodes when the agent on this Node was not running. +func (c *EgressController) replaceEgressIPs() error { desiredLocalEgressIPs := sets.NewString() egresses, _ := c.egressLister.List(labels.Everything()) for _, egress := range egresses { @@ -335,12 +337,10 @@ func (c *EgressController) removeStaleEgressIPs() { desiredLocalEgressIPs.Insert(egress.Spec.EgressIP) } } - actualLocalEgressIPs := c.ipAssigner.AssignedIPs() - for ip := range actualLocalEgressIPs.Difference(desiredLocalEgressIPs) { - if err := c.ipAssigner.UnassignIP(ip); err != nil { - klog.ErrorS(err, "Failed to clean up stale Egress IP", "ip", ip) - } + if err := c.ipAssigner.InitIPs(desiredLocalEgressIPs); err != nil { + return err } + return nil } // worker is a long-running function that will continually call the processNextWorkItem function in diff --git a/pkg/agent/ipassigner/ip_assigner.go b/pkg/agent/ipassigner/ip_assigner.go index ac1e6862f45..581b68c712b 100644 --- a/pkg/agent/ipassigner/ip_assigner.go +++ b/pkg/agent/ipassigner/ip_assigner.go @@ -24,6 +24,8 @@ type IPAssigner interface { UnassignIP(ip string) error // AssignedIPs return the IPs that are assigned to the system by this IPAssigner. AssignedIPs() sets.String + // InitIPs ensures the IPs that are assigned to the system match the given IPs. + InitIPs(sets.String) error // Run starts the IP assigner. Run(<-chan struct{}) } diff --git a/pkg/agent/ipassigner/ip_assigner_linux.go b/pkg/agent/ipassigner/ip_assigner_linux.go index 80e976c8e2c..1ed1ab2f265 100644 --- a/pkg/agent/ipassigner/ip_assigner_linux.go +++ b/pkg/agent/ipassigner/ip_assigner_linux.go @@ -28,6 +28,7 @@ import ( "antrea.io/antrea/pkg/agent/ipassigner/responder" "antrea.io/antrea/pkg/agent/util" + "antrea.io/antrea/pkg/agent/util/sysctl" ) // ipAssigner creates a dummy device and assigns IPs to it. @@ -60,11 +61,22 @@ func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (*ipAs assignedIPs: sets.NewString(), } if ipv4 != nil { - arpResonder, err := responder.NewARPResponder(externalInterface) + // For the Egress scenario, the external IPs should always be present on the dummy + // interface as they are used as tunnel endpoints. If arp_ignore is set to a value + // other than 0, the host will not reply to ARP requests received on the transport + // interface when the target IPs are assigned on the dummy interface. So a userspace + // ARP responder is needed to handle ARP requests for the Egress IPs. + arpIgnore, err := getARPIgnoreForInterface(externalInterface.Name) if err != nil { - return nil, fmt.Errorf("failed to create ARP responder for link %s: %v", externalInterface.Name, err) + return nil, err + } + if dummyDeviceName == "" || arpIgnore > 0 { + arpResonder, err := responder.NewARPResponder(externalInterface) + if err != nil { + return nil, fmt.Errorf("failed to create ARP responder for link %s: %v", externalInterface.Name, err) + } + a.arpResponder = arpResonder } - a.arpResponder = arpResonder } if ipv6 != nil { ndpResponder, err := responder.NewNDPResponder(externalInterface) @@ -79,13 +91,27 @@ func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (*ipAs return nil, fmt.Errorf("error when ensuring dummy device exists: %v", err) } a.dummyDevice = dummyDevice - if err := a.loadIPAddresses(); err != nil { - return nil, fmt.Errorf("error when loading IP addresses from the system: %v", err) - } } return a, nil } +// getARPIgnoreForInterface gets the max value of conf/{all,interface}/arp_ignore form sysctl. +func getARPIgnoreForInterface(iface string) (int, error) { + arpIgnoreAll, err := sysctl.GetSysctlNet("ipv4/conf/all/arp_ignore") + if err != nil { + return 0, fmt.Errorf("failed to get arp_ignore for all interfaces: %w", err) + } + arpIgnore, err := sysctl.GetSysctlNet(fmt.Sprintf("ipv4/conf/%s/arp_ignore", iface)) + if err != nil { + return 0, fmt.Errorf("failed to get arp_ignore for %s: %w", iface, err) + } + if arpIgnore > arpIgnoreAll { + return arpIgnore, nil + } + return arpIgnoreAll, nil + +} + // ensureDummyDevice creates the dummy device if it doesn't exist. func ensureDummyDevice(deviceName string) (netlink.Link, error) { link, err := netlink.LinkByName(deviceName) @@ -102,19 +128,16 @@ func ensureDummyDevice(deviceName string) (netlink.Link, error) { } // loadIPAddresses gets the IP addresses on the dummy device and caches them in memory. -func (a *ipAssigner) loadIPAddresses() error { +func (a *ipAssigner) loadIPAddresses() (sets.String, error) { addresses, err := netlink.AddrList(a.dummyDevice, netlink.FAMILY_ALL) if err != nil { - return err + return nil, err } newAssignIPs := sets.NewString() for _, address := range addresses { newAssignIPs.Insert(address.IP.String()) } - a.mutex.Lock() - defer a.mutex.Unlock() - a.assignedIPs = newAssignIPs - return nil + return newAssignIPs, nil } // AssignIP ensures the provided IP is assigned to the dummy device and the ARP/NDP responders. @@ -208,13 +231,54 @@ func (a *ipAssigner) AssignedIPs() sets.String { return a.assignedIPs.Union(nil) } +// InitIPs loads the IPs from the dummy device and replaces the IPs that are assigned to it +// with the given ones. This function also adds the given IPs to the ARP/NDP responder if +// applicable. It can be used to recover the IP assigner to the desired state after Agent restarts. +func (a *ipAssigner) InitIPs(ips sets.String) error { + a.mutex.Lock() + defer a.mutex.Unlock() + if a.dummyDevice != nil { + assigned, err := a.loadIPAddresses() + if err != nil { + return fmt.Errorf("error when loading IP addresses from the system: %v", err) + } + for ip := range ips.Difference(assigned) { + addr := util.NewIPNet(net.ParseIP(ip)) + if err := netlink.AddrAdd(a.dummyDevice, &netlink.Addr{IPNet: addr}); err != nil { + if !errors.Is(err, unix.EEXIST) { + return fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err) + } + } + } + for ip := range assigned.Difference(ips) { + addr := util.NewIPNet(net.ParseIP(ip)) + if err := netlink.AddrDel(a.dummyDevice, &netlink.Addr{IPNet: addr}); err != nil { + if !errors.Is(err, unix.EADDRNOTAVAIL) { + return fmt.Errorf("failed to delete IP %v from interface %s: %v", ip, a.dummyDevice.Attrs().Name, err) + } + } + } + } + for ipStr := range ips { + ip := net.ParseIP(ipStr) + var err error + if utilnet.IsIPv4(ip) && a.arpResponder != nil { + err = a.arpResponder.AddIP(ip) + } + if utilnet.IsIPv6(ip) && a.ndpResponder != nil { + err = a.ndpResponder.AddIP(ip) + } + if err != nil { + return err + } + } + a.assignedIPs = ips.Union(nil) + return nil +} + // Run starts the ARP responder and NDP responder. func (a *ipAssigner) Run(ch <-chan struct{}) { - // Start the ARP responder only when the dummy device is not created. The kernel will handle ARP requests - // for IPs assigned to the dummy devices by default. - // TODO: Check the arp_ignore sysctl parameter of the transport interface to determine whether to start - // the ARP responder or not. - if a.dummyDevice == nil && a.arpResponder != nil { + if a.arpResponder != nil { go a.arpResponder.Run(ch) } if a.ndpResponder != nil { diff --git a/pkg/agent/ipassigner/ip_assigner_windows.go b/pkg/agent/ipassigner/ip_assigner_windows.go index bba7e0cc534..4b3db923f4c 100644 --- a/pkg/agent/ipassigner/ip_assigner_windows.go +++ b/pkg/agent/ipassigner/ip_assigner_windows.go @@ -37,5 +37,9 @@ func (a *ipAssigner) AssignedIPs() sets.String { return nil } +func (a *ipAssigner) InitIPs(ips sets.String) error { + return nil +} + func (a *ipAssigner) Run(ch <-chan struct{}) { } diff --git a/pkg/agent/ipassigner/testing/mock_ipassigner.go b/pkg/agent/ipassigner/testing/mock_ipassigner.go index d5268395185..3d2df119718 100644 --- a/pkg/agent/ipassigner/testing/mock_ipassigner.go +++ b/pkg/agent/ipassigner/testing/mock_ipassigner.go @@ -1,4 +1,4 @@ -// Copyright 2021 Antrea Authors +// Copyright 2022 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -76,6 +76,20 @@ func (mr *MockIPAssignerMockRecorder) AssignedIPs() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AssignedIPs", reflect.TypeOf((*MockIPAssigner)(nil).AssignedIPs)) } +// InitIPs mocks base method +func (m *MockIPAssigner) InitIPs(arg0 sets.String) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InitIPs", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// InitIPs indicates an expected call of InitIPs +func (mr *MockIPAssignerMockRecorder) InitIPs(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitIPs", reflect.TypeOf((*MockIPAssigner)(nil).InitIPs), arg0) +} + // Run mocks base method func (m *MockIPAssigner) Run(arg0 <-chan struct{}) { m.ctrl.T.Helper() diff --git a/test/integration/agent/ip_assigner_linux_test.go b/test/integration/agent/ip_assigner_linux_test.go index 8e308390c83..6e7650dc5bd 100644 --- a/test/integration/agent/ip_assigner_linux_test.go +++ b/test/integration/agent/ip_assigner_linux_test.go @@ -64,17 +64,25 @@ func TestIPAssigner(t *testing.T) { require.NoError(t, err, "Failed to list IP addresses") assert.Equal(t, desiredIPs, actualIPs, "Actual IPs don't match") - // NewIPAssigner should load existing IPs correctly. newIPAssigner, err := ipassigner.NewIPAssigner(nodeLinkName, dummyDeviceName) require.NoError(t, err, "Initializing new IP assigner failed") - assert.Equal(t, desiredIPs, newIPAssigner.AssignedIPs(), "Assigned IPs don't match") + assert.Equal(t, sets.NewString(), newIPAssigner.AssignedIPs(), "Assigned IPs don't match") - for ip := range desiredIPs { - err = ipAssigner.UnassignIP(ip) + ip4 := "2021:124:6020:1006:250:56ff:fea7:36c4" + newDesiredIPs := sets.NewString(ip1, ip2, ip4) + err = newIPAssigner.InitIPs(newDesiredIPs) + require.NoError(t, err, "InitIPs failed") + assert.Equal(t, newDesiredIPs, newIPAssigner.AssignedIPs(), "Assigned IPs don't match") + + actualIPs, err = listIPAddresses(dummyDevice) + require.NoError(t, err, "Failed to list IP addresses") + assert.Equal(t, newDesiredIPs, actualIPs, "Actual IPs don't match") + + for ip := range newDesiredIPs { + err = newIPAssigner.UnassignIP(ip) assert.NoError(t, err, "Failed to unassign a valid IP") } - - assert.Equal(t, sets.NewString(), ipAssigner.AssignedIPs(), "Assigned IPs don't match") + assert.Equal(t, sets.NewString(), newIPAssigner.AssignedIPs(), "Assigned IPs don't match") actualIPs, err = listIPAddresses(dummyDevice) require.NoError(t, err, "Failed to list IP addresses")