From 6f2a41c0c7d918045a6a5f906499c293b26aafa0 Mon Sep 17 00:00:00 2001 From: Sebastian Sch Date: Thu, 29 Aug 2019 12:30:37 +0300 Subject: [PATCH] Before this PR we have an issue if the vm creation fails because there is an validation error from the virt-api we didn't clean the allocation. This PR introduce a cleanup loop into the system. When we hit the create vm mutating webhook we create the regular allocation but we also mark the mac address in a waiting configmap. We have a waiting look the will check if the object is removed from the map and if is not after 30 second we assume the object wasn't saved into the etcd (no controller event) so we release the object. If we get an event from the controller then we remove the mac address from the configmap. --- cmd/manager/main.go | 4 +- config/default/manager/manager.yaml | 1 + config/release/kubemacpool.yaml | 1 + config/test/kubemacpool.yaml | 1 + config/test/manager_image_patch.yaml | 1 + .../virtualmachine_controller.go | 3 + pkg/manager/manager.go | 20 +-- pkg/pool-manager/pool.go | 5 +- pkg/pool-manager/pool_test.go | 8 +- pkg/pool-manager/virtualmachine_pool.go | 151 +++++++++++++++++- tests/virtual_machines_test.go | 6 +- 11 files changed, 176 insertions(+), 25 deletions(-) diff --git a/cmd/manager/main.go b/cmd/manager/main.go index e18beb3c4..19904bbd8 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -43,9 +43,11 @@ func loadMacAddressFromEnvVar(envName string) (net.HardwareAddr, error) { func main() { var logType, metricsAddr string + var waitingTime int flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&logType, "v", "production", "Log type (debug/production).") + flag.IntVar(&waitingTime, "wait-time", 600, "waiting time to release the mac if object was not created") flag.Parse() if logType == "debug" { @@ -80,7 +82,7 @@ func main() { os.Exit(1) } - kubemacpoolManager := manager.NewKubeMacPoolManager(podNamespace, podName, metricsAddr) + kubemacpoolManager := manager.NewKubeMacPoolManager(podNamespace, podName, metricsAddr, waitingTime) err = kubemacpoolManager.Run(rangeStart, rangeEnd) if err != nil { diff --git a/config/default/manager/manager.yaml b/config/default/manager/manager.yaml index 7dfde67a6..5fa0fd867 100644 --- a/config/default/manager/manager.yaml +++ b/config/default/manager/manager.yaml @@ -57,6 +57,7 @@ spec: - /manager args: - "--v=production" + - "--wait-time=600" image: quay.io/kubevirt/kubemacpool:latest imagePullPolicy: Always name: manager diff --git a/config/release/kubemacpool.yaml b/config/release/kubemacpool.yaml index 6dc4edec4..14a218aeb 100644 --- a/config/release/kubemacpool.yaml +++ b/config/release/kubemacpool.yaml @@ -172,6 +172,7 @@ spec: containers: - args: - --v=production + - --wait-time=600 command: - /manager env: diff --git a/config/test/kubemacpool.yaml b/config/test/kubemacpool.yaml index 77b8db04c..9b62f2d50 100644 --- a/config/test/kubemacpool.yaml +++ b/config/test/kubemacpool.yaml @@ -172,6 +172,7 @@ spec: containers: - args: - --v=debug + - --wait-time=10 command: - /manager env: diff --git a/config/test/manager_image_patch.yaml b/config/test/manager_image_patch.yaml index 7a60cc0d5..90b1fdd4a 100644 --- a/config/test/manager_image_patch.yaml +++ b/config/test/manager_image_patch.yaml @@ -10,3 +10,4 @@ spec: name: manager args: - "--v=debug" + - "--wait-time=10" diff --git a/pkg/controller/virtualmachine/virtualmachine_controller.go b/pkg/controller/virtualmachine/virtualmachine_controller.go index e3a44a817..c6ef6d7c8 100644 --- a/pkg/controller/virtualmachine/virtualmachine_controller.go +++ b/pkg/controller/virtualmachine/virtualmachine_controller.go @@ -113,6 +113,7 @@ func (r *ReconcilePolicy) addFinalizerAndUpdate(virtualMachine *kubevirt.Virtual if helper.ContainsString(virtualMachine.ObjectMeta.Finalizers, pool_manager.RuntimeObjectFinalizerName) { return nil } + log.V(1).Info("The VM does not have a finalizer", "virtualMachineName", request.Name, "virtualMachineNamespace", request.Namespace) @@ -130,6 +131,8 @@ func (r *ReconcilePolicy) addFinalizerAndUpdate(virtualMachine *kubevirt.Virtual "virtualMachineName", request.Name, "virtualMachineNamespace", request.Namespace) + r.poolManager.MarkVMAsReady(virtualMachine) + return nil } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2eddead3a..073f63123 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -45,15 +45,16 @@ type KubeMacPoolManager struct { config *rest.Config metricsAddr string continueToRunManager bool - restartChannel chan struct{} - kubevirtInstalledChannel chan struct{} - stopSignalChannel chan os.Signal - podNamespace string - podName string - resourceLock resourcelock.Interface + restartChannel chan struct{} // Close the channel if we need to regenerate certs + kubevirtInstalledChannel chan struct{} // This channel is close after we found kubevirt to reload the manager + stopSignalChannel chan os.Signal // stop channel signal + podNamespace string // manager pod namespace + podName string // manager pod name + waitingTime int // Duration in second to lock a mac address before it was saved to etcd + resourceLock resourcelock.Interface // Use for the leader election } -func NewKubeMacPoolManager(podNamespace, podName, metricsAddr string) *KubeMacPoolManager { +func NewKubeMacPoolManager(podNamespace, podName, metricsAddr string, waitingTime int) *KubeMacPoolManager { kubemacpoolManager := &KubeMacPoolManager{ continueToRunManager: true, restartChannel: make(chan struct{}), @@ -61,7 +62,8 @@ func NewKubeMacPoolManager(podNamespace, podName, metricsAddr string) *KubeMacPo stopSignalChannel: make(chan os.Signal, 1), podNamespace: podNamespace, podName: podName, - metricsAddr: metricsAddr} + metricsAddr: metricsAddr, + waitingTime: waitingTime} signal.Notify(kubemacpoolManager.stopSignalChannel, os.Interrupt, os.Kill) @@ -120,7 +122,7 @@ func (k *KubeMacPoolManager) Run(rangeStart, rangeEnd net.HardwareAddr) error { } isKubevirtInstalled := checkForKubevirt(k.clientset) - poolManager, err := poolmanager.NewPoolManager(k.clientset, rangeStart, rangeEnd, isKubevirtInstalled) + poolManager, err := poolmanager.NewPoolManager(k.clientset, rangeStart, rangeEnd, isKubevirtInstalled, k.waitingTime) if err != nil { return fmt.Errorf("unable to create pool manager error %v", err) } diff --git a/pkg/pool-manager/pool.go b/pkg/pool-manager/pool.go index a57f3a595..09c7775d1 100644 --- a/pkg/pool-manager/pool.go +++ b/pkg/pool-manager/pool.go @@ -31,6 +31,7 @@ const ( RuntimeObjectFinalizerName = "k8s.v1.cni.cncf.io/kubeMacPool" networksAnnotation = "k8s.v1.cni.cncf.io/networks" networksStatusAnnotation = "k8s.v1.cni.cncf.io/networks-status" + vmWaitConfigMapName = "kubemacpool-vm-configmap" ) var log = logf.Log.WithName("PoolManager") @@ -54,7 +55,7 @@ const ( AllocationStatusWaitingForPod AllocationStatus = "WaitingForPod" ) -func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.HardwareAddr, kubevirtExist bool) (*PoolManager, error) { +func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.HardwareAddr, kubevirtExist bool, waitTime int) (*PoolManager, error) { err := checkRange(rangeStart, rangeEnd) if err != nil { return nil, err @@ -86,6 +87,8 @@ func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.Ha return nil, err } + go poolManger.vmWaitingCleanupLook(waitTime) + return poolManger, nil } diff --git a/pkg/pool-manager/pool_test.go b/pkg/pool-manager/pool_test.go index 8cabbe50b..f32efe0ca 100644 --- a/pkg/pool-manager/pool_test.go +++ b/pkg/pool-manager/pool_test.go @@ -45,7 +45,7 @@ var _ = Describe("Pool", func() { Expect(err).ToNot(HaveOccurred()) endPoolRangeEnv, err := net.ParseMAC(endMacAddr) Expect(err).ToNot(HaveOccurred()) - poolManager, err := NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false) + poolManager, err := NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10) Expect(err).ToNot(HaveOccurred()) return poolManager @@ -112,7 +112,7 @@ var _ = Describe("Pool", func() { Expect(err).ToNot(HaveOccurred()) endPoolRangeEnv, err := net.ParseMAC("02:00:00:00:00:00") Expect(err).ToNot(HaveOccurred()) - _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false) + _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Invalid range. rangeStart: 0a:00:00:00:00:00 rangeEnd: 02:00:00:00:00:00")) @@ -124,7 +124,7 @@ var _ = Describe("Pool", func() { Expect(err).ToNot(HaveOccurred()) endPoolRangeEnv, err := net.ParseMAC("06:00:00:00:00:00") Expect(err).ToNot(HaveOccurred()) - _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false) + _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("RangeStart is invalid: invalid mac address. Multicast addressing is not supported. Unicast addressing must be used. The first octet is 0X3")) @@ -136,7 +136,7 @@ var _ = Describe("Pool", func() { Expect(err).ToNot(HaveOccurred()) endPoolRangeEnv, err := net.ParseMAC("05:00:00:00:00:00") Expect(err).ToNot(HaveOccurred()) - _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false) + _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("RangeEnd is invalid: invalid mac address. Multicast addressing is not supported. Unicast addressing must be used. The first octet is 0X5")) }) diff --git a/pkg/pool-manager/virtualmachine_pool.go b/pkg/pool-manager/virtualmachine_pool.go index f799bae81..1ed8aa518 100644 --- a/pkg/pool-manager/virtualmachine_pool.go +++ b/pkg/pool-manager/virtualmachine_pool.go @@ -19,11 +19,15 @@ package pool_manager import ( "fmt" "net" - - "k8s.io/apimachinery/pkg/api/errors" + "strings" + "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubevirt "kubevirt.io/kubevirt/pkg/api/v1" + + "github.com/K8sNetworkPlumbingWG/kubemacpool/pkg/names" ) func (p *PoolManager) AllocateVirtualMachineMac(virtualMachine *kubevirt.VirtualMachine) error { @@ -78,10 +82,17 @@ func (p *PoolManager) AllocateVirtualMachineMac(virtualMachine *kubevirt.Virtual return err } copyVM.Spec.Template.Spec.Domain.Devices.Interfaces[idx].MacAddress = macAddr - allocations[iface.Name] = iface.MacAddress + allocations[iface.Name] = macAddr } } + + err := p.AddMacToWaitingConfig(allocations) + if err != nil { + return err + } + virtualMachine.Spec.Template.Spec.Domain.Devices.Interfaces = copyVM.Spec.Template.Spec.Domain.Devices.Interfaces + return nil } @@ -198,7 +209,7 @@ func (p *PoolManager) allocateFromPoolForVirtualMachine(virtualMachine *kubevirt return "", err } - p.macPoolMap[macAddr.String()] = AllocationStatusAllocated + p.macPoolMap[macAddr.String()] = AllocationStatusWaitingForPod log.Info("mac from pool was allocated for virtual machine", "allocatedMac", macAddr.String(), "virtualMachineName", virtualMachine.Name, @@ -219,7 +230,7 @@ func (p *PoolManager) allocateRequestedVirtualMachineInterfaceMac(virtualMachine return err } - p.macPoolMap[requestedMac] = AllocationStatusAllocated + p.macPoolMap[requestedMac] = AllocationStatusWaitingForPod log.Info("requested mac was allocated for virtual machine", "requestedMap", requestedMac, "virtualMachineName", virtualMachine.Name, @@ -232,13 +243,24 @@ func (p *PoolManager) initVirtualMachineMap() error { if !p.isKubevirt { return nil } + + waitingMac, err := p.getOrCreateVmMacWaitMap() + if err != nil { + return err + } + + for macAddress := range waitingMac { + macAddress = strings.Replace(macAddress, "-", ":", 5) + p.macPoolMap[macAddress] = AllocationStatusWaitingForPod + } + result := p.kubeClient.ExtensionsV1beta1().RESTClient().Get().RequestURI("apis/kubevirt.io/v1alpha3/virtualmachines").Do() if result.Error() != nil { return result.Error() } vms := &kubevirt.VirtualMachineList{} - err := result.Into(vms) + err = result.Into(vms) if err != nil { return err } @@ -333,6 +355,123 @@ func (p *PoolManager) revertAllocationOnVm(vmName string, allocations map[string p.releaseMacAddressesFromInterfaceMap(allocations) } +// This function return or creates a config map that contains mac address and the allocation time. +func (p *PoolManager) getOrCreateVmMacWaitMap() (map[string]string, error) { + configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + _, err = p.kubeClient.CoreV1(). + ConfigMaps(names.MANAGER_NAMESPACE). + Create(&corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: vmWaitConfigMapName, + Namespace: names.MANAGER_NAMESPACE}}) + + return map[string]string{}, nil + } + + return nil, err + } + + return configMap.Data, nil +} + +// Add all the allocated mac addresses to the waiting config map with the current time. +func (p *PoolManager) AddMacToWaitingConfig(allocations map[string]string) error { + configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{}) + if err != nil { + return err + } + + if configMap.Data == nil { + configMap.Data = map[string]string{} + } + + for _, macAddress := range allocations { + log.V(1).Info("add mac address to waiting config", "macAddress", macAddress) + macAddress = strings.Replace(macAddress, ":", "-", 5) + configMap.Data[macAddress] = time.Now().Format(time.RFC3339) + } + + _, err = p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Update(configMap) + return err +} + +// Remove all the mac addresses from the waiting configmap this mean the vm was saved in the etcd and pass validations +func (p *PoolManager) MarkVMAsReady(vm *kubevirt.VirtualMachine) error { + p.poolMutex.Lock() + defer p.poolMutex.Unlock() + + configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{}) + if err != nil { + return err + } + + if configMap.Data == nil { + log.Info("the configMap is empty") + return nil + } + + for _, vmInterface := range vm.Spec.Template.Spec.Domain.Devices.Interfaces { + if vmInterface.MacAddress != "" { + p.macPoolMap[vmInterface.MacAddress] = AllocationStatusAllocated + macAddress := strings.Replace(vmInterface.MacAddress, ":", "-", 5) + delete(configMap.Data, macAddress) + } + } + + _, err = p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Update(configMap) + log.V(1).Info("marked virtual machine as ready", "virtualMachineNamespace", vm.Namespace, + "virtualMachineName", vm.Name) + return err +} + +// This function check if there are virtual machines that hits the create +// mutating webhook but we didn't get the creation event in the controller loop +// this mean the create was failed by some other mutating or validating webhook +// so we release the virtual machine +func (p *PoolManager) vmWaitingCleanupLook(waitTime int) { + c := time.Tick(3 * time.Second) + log.Info("starting cleanup loop for waiting mac addresses") + for _ = range c { + p.poolMutex.Lock() + + configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{}) + if err != nil { + log.Error(err, "failed to get config map", "configMapName", vmWaitConfigMapName) + p.poolMutex.Unlock() + continue + } + + if configMap.Data == nil { + log.Info("the configMap is empty", "configMapName", vmWaitConfigMapName) + p.poolMutex.Unlock() + continue + } + + for macAddress, allocationTime := range configMap.Data { + t, err := time.Parse(time.RFC3339, allocationTime) + if err != nil { + // TODO: remove the mac from the wait map?? + log.Error(err, "failed to parse allocation time") + continue + } + + if time.Now().After(t.Add(time.Duration(waitTime) * time.Second)) { + delete(configMap.Data, macAddress) + macAddress = strings.Replace(macAddress, "-", ":", 5) + delete(p.macPoolMap, macAddress) + log.V(1).Info("released mac address in waiting loop", "macAddress", macAddress) + } + } + + _, err = p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Update(configMap) + if err != nil { + log.Error(err, "failed to update config map", "configMapName", vmWaitConfigMapName) + } + + p.poolMutex.Unlock() + } +} + func vmNamespaced(machine *kubevirt.VirtualMachine) string { return fmt.Sprintf("%s/%s", machine.Namespace, machine.Name) } diff --git a/tests/virtual_machines_test.go b/tests/virtual_machines_test.go index 88b798533..ed1f61c4e 100644 --- a/tests/virtual_machines_test.go +++ b/tests/virtual_machines_test.go @@ -370,10 +370,8 @@ var _ = Describe("Virtual Machines", func() { } }) }) - //TODO: remove the the pending annotation -"P"- from "PContext" when issue #44 is fixed : - //https://github.com/K8sNetworkPlumbingWG/kubemacpool/issues/44 //2633 - PContext("When we re-apply a failed VM yaml", func() { + Context("When we re-apply a failed VM yaml", func() { It("should allow to assign to the VM the same MAC addresses, with name as requested before and do not return an error", func() { err := setRange(rangeStart, rangeEnd) Expect(err).ToNot(HaveOccurred()) @@ -485,7 +483,7 @@ var _ = Describe("Virtual Machines", func() { Eventually(func() error { return testClient.VirtClient.Create(context.TODO(), anotherVm) - }, 40*time.Second, 5*time.Second).Should(Not(HaveOccurred()), "failed to apply the new vm object") + }, timeout, pollingInterval).Should(Not(HaveOccurred()), "failed to apply the new vm object") _, err = net.ParseMAC(anotherVm.Spec.Template.Spec.Domain.Devices.Interfaces[0].MacAddress) Expect(err).ToNot(HaveOccurred()) })