diff --git a/cmd/manager/main.go b/cmd/manager/main.go index e18beb3c4..19904bbd8 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -43,9 +43,11 @@ func loadMacAddressFromEnvVar(envName string) (net.HardwareAddr, error) { func main() { var logType, metricsAddr string + var waitingTime int flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&logType, "v", "production", "Log type (debug/production).") + flag.IntVar(&waitingTime, "wait-time", 600, "waiting time to release the mac if object was not created") flag.Parse() if logType == "debug" { @@ -80,7 +82,7 @@ func main() { os.Exit(1) } - kubemacpoolManager := manager.NewKubeMacPoolManager(podNamespace, podName, metricsAddr) + kubemacpoolManager := manager.NewKubeMacPoolManager(podNamespace, podName, metricsAddr, waitingTime) err = kubemacpoolManager.Run(rangeStart, rangeEnd) if err != nil { diff --git a/config/default/manager/manager.yaml b/config/default/manager/manager.yaml index 7dfde67a6..5fa0fd867 100644 --- a/config/default/manager/manager.yaml +++ b/config/default/manager/manager.yaml @@ -57,6 +57,7 @@ spec: - /manager args: - "--v=production" + - "--wait-time=600" image: quay.io/kubevirt/kubemacpool:latest imagePullPolicy: Always name: manager diff --git a/config/release/kubemacpool.yaml b/config/release/kubemacpool.yaml index 6dc4edec4..14a218aeb 100644 --- a/config/release/kubemacpool.yaml +++ b/config/release/kubemacpool.yaml @@ -172,6 +172,7 @@ spec: containers: - args: - --v=production + - --wait-time=600 command: - /manager env: diff --git a/config/test/kubemacpool.yaml b/config/test/kubemacpool.yaml index 77b8db04c..9b62f2d50 100644 --- a/config/test/kubemacpool.yaml +++ b/config/test/kubemacpool.yaml @@ -172,6 +172,7 @@ spec: containers: - args: - --v=debug + - --wait-time=10 command: - /manager env: diff --git a/config/test/manager_image_patch.yaml b/config/test/manager_image_patch.yaml index 7a60cc0d5..90b1fdd4a 100644 --- a/config/test/manager_image_patch.yaml +++ b/config/test/manager_image_patch.yaml @@ -10,3 +10,4 @@ spec: name: manager args: - "--v=debug" + - "--wait-time=10" diff --git a/pkg/controller/virtualmachine/virtualmachine_controller.go b/pkg/controller/virtualmachine/virtualmachine_controller.go index e3a44a817..c6ef6d7c8 100644 --- a/pkg/controller/virtualmachine/virtualmachine_controller.go +++ b/pkg/controller/virtualmachine/virtualmachine_controller.go @@ -113,6 +113,7 @@ func (r *ReconcilePolicy) addFinalizerAndUpdate(virtualMachine *kubevirt.Virtual if helper.ContainsString(virtualMachine.ObjectMeta.Finalizers, pool_manager.RuntimeObjectFinalizerName) { return nil } + log.V(1).Info("The VM does not have a finalizer", "virtualMachineName", request.Name, "virtualMachineNamespace", request.Namespace) @@ -130,6 +131,8 @@ func (r *ReconcilePolicy) addFinalizerAndUpdate(virtualMachine *kubevirt.Virtual "virtualMachineName", request.Name, "virtualMachineNamespace", request.Namespace) + r.poolManager.MarkVMAsReady(virtualMachine) + return nil } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2eddead3a..073f63123 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -45,15 +45,16 @@ type KubeMacPoolManager struct { config *rest.Config metricsAddr string continueToRunManager bool - restartChannel chan struct{} - kubevirtInstalledChannel chan struct{} - stopSignalChannel chan os.Signal - podNamespace string - podName string - resourceLock resourcelock.Interface + restartChannel chan struct{} // Close the channel if we need to regenerate certs + kubevirtInstalledChannel chan struct{} // This channel is close after we found kubevirt to reload the manager + stopSignalChannel chan os.Signal // stop channel signal + podNamespace string // manager pod namespace + podName string // manager pod name + waitingTime int // Duration in second to lock a mac address before it was saved to etcd + resourceLock resourcelock.Interface // Use for the leader election } -func NewKubeMacPoolManager(podNamespace, podName, metricsAddr string) *KubeMacPoolManager { +func NewKubeMacPoolManager(podNamespace, podName, metricsAddr string, waitingTime int) *KubeMacPoolManager { kubemacpoolManager := &KubeMacPoolManager{ continueToRunManager: true, restartChannel: make(chan struct{}), @@ -61,7 +62,8 @@ func NewKubeMacPoolManager(podNamespace, podName, metricsAddr string) *KubeMacPo stopSignalChannel: make(chan os.Signal, 1), podNamespace: podNamespace, podName: podName, - metricsAddr: metricsAddr} + metricsAddr: metricsAddr, + waitingTime: waitingTime} signal.Notify(kubemacpoolManager.stopSignalChannel, os.Interrupt, os.Kill) @@ -120,7 +122,7 @@ func (k *KubeMacPoolManager) Run(rangeStart, rangeEnd net.HardwareAddr) error { } isKubevirtInstalled := checkForKubevirt(k.clientset) - poolManager, err := poolmanager.NewPoolManager(k.clientset, rangeStart, rangeEnd, isKubevirtInstalled) + poolManager, err := poolmanager.NewPoolManager(k.clientset, rangeStart, rangeEnd, isKubevirtInstalled, k.waitingTime) if err != nil { return fmt.Errorf("unable to create pool manager error %v", err) } diff --git a/pkg/pool-manager/pool.go b/pkg/pool-manager/pool.go index a57f3a595..09c7775d1 100644 --- a/pkg/pool-manager/pool.go +++ b/pkg/pool-manager/pool.go @@ -31,6 +31,7 @@ const ( RuntimeObjectFinalizerName = "k8s.v1.cni.cncf.io/kubeMacPool" networksAnnotation = "k8s.v1.cni.cncf.io/networks" networksStatusAnnotation = "k8s.v1.cni.cncf.io/networks-status" + vmWaitConfigMapName = "kubemacpool-vm-configmap" ) var log = logf.Log.WithName("PoolManager") @@ -54,7 +55,7 @@ const ( AllocationStatusWaitingForPod AllocationStatus = "WaitingForPod" ) -func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.HardwareAddr, kubevirtExist bool) (*PoolManager, error) { +func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.HardwareAddr, kubevirtExist bool, waitTime int) (*PoolManager, error) { err := checkRange(rangeStart, rangeEnd) if err != nil { return nil, err @@ -86,6 +87,8 @@ func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.Ha return nil, err } + go poolManger.vmWaitingCleanupLook(waitTime) + return poolManger, nil } diff --git a/pkg/pool-manager/pool_test.go b/pkg/pool-manager/pool_test.go index 8cabbe50b..f32efe0ca 100644 --- a/pkg/pool-manager/pool_test.go +++ b/pkg/pool-manager/pool_test.go @@ -45,7 +45,7 @@ var _ = Describe("Pool", func() { Expect(err).ToNot(HaveOccurred()) endPoolRangeEnv, err := net.ParseMAC(endMacAddr) Expect(err).ToNot(HaveOccurred()) - poolManager, err := NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false) + poolManager, err := NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10) Expect(err).ToNot(HaveOccurred()) return poolManager @@ -112,7 +112,7 @@ var _ = Describe("Pool", func() { Expect(err).ToNot(HaveOccurred()) endPoolRangeEnv, err := net.ParseMAC("02:00:00:00:00:00") Expect(err).ToNot(HaveOccurred()) - _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false) + _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Invalid range. rangeStart: 0a:00:00:00:00:00 rangeEnd: 02:00:00:00:00:00")) @@ -124,7 +124,7 @@ var _ = Describe("Pool", func() { Expect(err).ToNot(HaveOccurred()) endPoolRangeEnv, err := net.ParseMAC("06:00:00:00:00:00") Expect(err).ToNot(HaveOccurred()) - _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false) + _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("RangeStart is invalid: invalid mac address. Multicast addressing is not supported. Unicast addressing must be used. The first octet is 0X3")) @@ -136,7 +136,7 @@ var _ = Describe("Pool", func() { Expect(err).ToNot(HaveOccurred()) endPoolRangeEnv, err := net.ParseMAC("05:00:00:00:00:00") Expect(err).ToNot(HaveOccurred()) - _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false) + _, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("RangeEnd is invalid: invalid mac address. Multicast addressing is not supported. Unicast addressing must be used. The first octet is 0X5")) }) diff --git a/pkg/pool-manager/virtualmachine_pool.go b/pkg/pool-manager/virtualmachine_pool.go index f799bae81..1ed8aa518 100644 --- a/pkg/pool-manager/virtualmachine_pool.go +++ b/pkg/pool-manager/virtualmachine_pool.go @@ -19,11 +19,15 @@ package pool_manager import ( "fmt" "net" - - "k8s.io/apimachinery/pkg/api/errors" + "strings" + "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubevirt "kubevirt.io/kubevirt/pkg/api/v1" + + "github.com/K8sNetworkPlumbingWG/kubemacpool/pkg/names" ) func (p *PoolManager) AllocateVirtualMachineMac(virtualMachine *kubevirt.VirtualMachine) error { @@ -78,10 +82,17 @@ func (p *PoolManager) AllocateVirtualMachineMac(virtualMachine *kubevirt.Virtual return err } copyVM.Spec.Template.Spec.Domain.Devices.Interfaces[idx].MacAddress = macAddr - allocations[iface.Name] = iface.MacAddress + allocations[iface.Name] = macAddr } } + + err := p.AddMacToWaitingConfig(allocations) + if err != nil { + return err + } + virtualMachine.Spec.Template.Spec.Domain.Devices.Interfaces = copyVM.Spec.Template.Spec.Domain.Devices.Interfaces + return nil } @@ -198,7 +209,7 @@ func (p *PoolManager) allocateFromPoolForVirtualMachine(virtualMachine *kubevirt return "", err } - p.macPoolMap[macAddr.String()] = AllocationStatusAllocated + p.macPoolMap[macAddr.String()] = AllocationStatusWaitingForPod log.Info("mac from pool was allocated for virtual machine", "allocatedMac", macAddr.String(), "virtualMachineName", virtualMachine.Name, @@ -219,7 +230,7 @@ func (p *PoolManager) allocateRequestedVirtualMachineInterfaceMac(virtualMachine return err } - p.macPoolMap[requestedMac] = AllocationStatusAllocated + p.macPoolMap[requestedMac] = AllocationStatusWaitingForPod log.Info("requested mac was allocated for virtual machine", "requestedMap", requestedMac, "virtualMachineName", virtualMachine.Name, @@ -232,13 +243,24 @@ func (p *PoolManager) initVirtualMachineMap() error { if !p.isKubevirt { return nil } + + waitingMac, err := p.getOrCreateVmMacWaitMap() + if err != nil { + return err + } + + for macAddress := range waitingMac { + macAddress = strings.Replace(macAddress, "-", ":", 5) + p.macPoolMap[macAddress] = AllocationStatusWaitingForPod + } + result := p.kubeClient.ExtensionsV1beta1().RESTClient().Get().RequestURI("apis/kubevirt.io/v1alpha3/virtualmachines").Do() if result.Error() != nil { return result.Error() } vms := &kubevirt.VirtualMachineList{} - err := result.Into(vms) + err = result.Into(vms) if err != nil { return err } @@ -333,6 +355,123 @@ func (p *PoolManager) revertAllocationOnVm(vmName string, allocations map[string p.releaseMacAddressesFromInterfaceMap(allocations) } +// This function return or creates a config map that contains mac address and the allocation time. +func (p *PoolManager) getOrCreateVmMacWaitMap() (map[string]string, error) { + configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + _, err = p.kubeClient.CoreV1(). + ConfigMaps(names.MANAGER_NAMESPACE). + Create(&corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: vmWaitConfigMapName, + Namespace: names.MANAGER_NAMESPACE}}) + + return map[string]string{}, nil + } + + return nil, err + } + + return configMap.Data, nil +} + +// Add all the allocated mac addresses to the waiting config map with the current time. +func (p *PoolManager) AddMacToWaitingConfig(allocations map[string]string) error { + configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{}) + if err != nil { + return err + } + + if configMap.Data == nil { + configMap.Data = map[string]string{} + } + + for _, macAddress := range allocations { + log.V(1).Info("add mac address to waiting config", "macAddress", macAddress) + macAddress = strings.Replace(macAddress, ":", "-", 5) + configMap.Data[macAddress] = time.Now().Format(time.RFC3339) + } + + _, err = p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Update(configMap) + return err +} + +// Remove all the mac addresses from the waiting configmap this mean the vm was saved in the etcd and pass validations +func (p *PoolManager) MarkVMAsReady(vm *kubevirt.VirtualMachine) error { + p.poolMutex.Lock() + defer p.poolMutex.Unlock() + + configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{}) + if err != nil { + return err + } + + if configMap.Data == nil { + log.Info("the configMap is empty") + return nil + } + + for _, vmInterface := range vm.Spec.Template.Spec.Domain.Devices.Interfaces { + if vmInterface.MacAddress != "" { + p.macPoolMap[vmInterface.MacAddress] = AllocationStatusAllocated + macAddress := strings.Replace(vmInterface.MacAddress, ":", "-", 5) + delete(configMap.Data, macAddress) + } + } + + _, err = p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Update(configMap) + log.V(1).Info("marked virtual machine as ready", "virtualMachineNamespace", vm.Namespace, + "virtualMachineName", vm.Name) + return err +} + +// This function check if there are virtual machines that hits the create +// mutating webhook but we didn't get the creation event in the controller loop +// this mean the create was failed by some other mutating or validating webhook +// so we release the virtual machine +func (p *PoolManager) vmWaitingCleanupLook(waitTime int) { + c := time.Tick(3 * time.Second) + log.Info("starting cleanup loop for waiting mac addresses") + for _ = range c { + p.poolMutex.Lock() + + configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{}) + if err != nil { + log.Error(err, "failed to get config map", "configMapName", vmWaitConfigMapName) + p.poolMutex.Unlock() + continue + } + + if configMap.Data == nil { + log.Info("the configMap is empty", "configMapName", vmWaitConfigMapName) + p.poolMutex.Unlock() + continue + } + + for macAddress, allocationTime := range configMap.Data { + t, err := time.Parse(time.RFC3339, allocationTime) + if err != nil { + // TODO: remove the mac from the wait map?? + log.Error(err, "failed to parse allocation time") + continue + } + + if time.Now().After(t.Add(time.Duration(waitTime) * time.Second)) { + delete(configMap.Data, macAddress) + macAddress = strings.Replace(macAddress, "-", ":", 5) + delete(p.macPoolMap, macAddress) + log.V(1).Info("released mac address in waiting loop", "macAddress", macAddress) + } + } + + _, err = p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Update(configMap) + if err != nil { + log.Error(err, "failed to update config map", "configMapName", vmWaitConfigMapName) + } + + p.poolMutex.Unlock() + } +} + func vmNamespaced(machine *kubevirt.VirtualMachine) string { return fmt.Sprintf("%s/%s", machine.Namespace, machine.Name) } diff --git a/tests/virtual_machines_test.go b/tests/virtual_machines_test.go index 88b798533..ed1f61c4e 100644 --- a/tests/virtual_machines_test.go +++ b/tests/virtual_machines_test.go @@ -370,10 +370,8 @@ var _ = Describe("Virtual Machines", func() { } }) }) - //TODO: remove the the pending annotation -"P"- from "PContext" when issue #44 is fixed : - //https://github.com/K8sNetworkPlumbingWG/kubemacpool/issues/44 //2633 - PContext("When we re-apply a failed VM yaml", func() { + Context("When we re-apply a failed VM yaml", func() { It("should allow to assign to the VM the same MAC addresses, with name as requested before and do not return an error", func() { err := setRange(rangeStart, rangeEnd) Expect(err).ToNot(HaveOccurred()) @@ -485,7 +483,7 @@ var _ = Describe("Virtual Machines", func() { Eventually(func() error { return testClient.VirtClient.Create(context.TODO(), anotherVm) - }, 40*time.Second, 5*time.Second).Should(Not(HaveOccurred()), "failed to apply the new vm object") + }, timeout, pollingInterval).Should(Not(HaveOccurred()), "failed to apply the new vm object") _, err = net.ParseMAC(anotherVm.Spec.Template.Spec.Domain.Devices.Interfaces[0].MacAddress) Expect(err).ToNot(HaveOccurred()) })