Skip to content

Commit

Permalink
Before this PR we have an issue if the vm creation fails because there
Browse files Browse the repository at this point in the history
is an validation error from the virt-api we didn't clean the allocation.

This PR introduce a cleanup loop into the system.
When we hit the create vm mutating webhook we create the regular
allocation but we also mark the mac address in a waiting configmap.

We have a waiting look the will check if the object is removed from the
map and if is not after 30 second we assume the object wasn't saved
into the etcd (no controller event) so we release the object.

If we get an event from the controller then we remove the mac address
from the configmap.
  • Loading branch information
SchSeba committed Aug 29, 2019
1 parent 719dc55 commit 6f2a41c
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 25 deletions.
4 changes: 3 additions & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ func loadMacAddressFromEnvVar(envName string) (net.HardwareAddr, error) {

func main() {
var logType, metricsAddr string
var waitingTime int

flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&logType, "v", "production", "Log type (debug/production).")
flag.IntVar(&waitingTime, "wait-time", 600, "waiting time to release the mac if object was not created")
flag.Parse()

if logType == "debug" {
Expand Down Expand Up @@ -80,7 +82,7 @@ func main() {
os.Exit(1)
}

kubemacpoolManager := manager.NewKubeMacPoolManager(podNamespace, podName, metricsAddr)
kubemacpoolManager := manager.NewKubeMacPoolManager(podNamespace, podName, metricsAddr, waitingTime)

err = kubemacpoolManager.Run(rangeStart, rangeEnd)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions config/default/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ spec:
- /manager
args:
- "--v=production"
- "--wait-time=600"
image: quay.io/kubevirt/kubemacpool:latest
imagePullPolicy: Always
name: manager
Expand Down
1 change: 1 addition & 0 deletions config/release/kubemacpool.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ spec:
containers:
- args:
- --v=production
- --wait-time=600
command:
- /manager
env:
Expand Down
1 change: 1 addition & 0 deletions config/test/kubemacpool.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ spec:
containers:
- args:
- --v=debug
- --wait-time=10
command:
- /manager
env:
Expand Down
1 change: 1 addition & 0 deletions config/test/manager_image_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ spec:
name: manager
args:
- "--v=debug"
- "--wait-time=10"
3 changes: 3 additions & 0 deletions pkg/controller/virtualmachine/virtualmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (r *ReconcilePolicy) addFinalizerAndUpdate(virtualMachine *kubevirt.Virtual
if helper.ContainsString(virtualMachine.ObjectMeta.Finalizers, pool_manager.RuntimeObjectFinalizerName) {
return nil
}

log.V(1).Info("The VM does not have a finalizer",
"virtualMachineName", request.Name,
"virtualMachineNamespace", request.Namespace)
Expand All @@ -130,6 +131,8 @@ func (r *ReconcilePolicy) addFinalizerAndUpdate(virtualMachine *kubevirt.Virtual
"virtualMachineName", request.Name,
"virtualMachineNamespace", request.Namespace)

r.poolManager.MarkVMAsReady(virtualMachine)

return nil
}

Expand Down
20 changes: 11 additions & 9 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,25 @@ type KubeMacPoolManager struct {
config *rest.Config
metricsAddr string
continueToRunManager bool
restartChannel chan struct{}
kubevirtInstalledChannel chan struct{}
stopSignalChannel chan os.Signal
podNamespace string
podName string
resourceLock resourcelock.Interface
restartChannel chan struct{} // Close the channel if we need to regenerate certs
kubevirtInstalledChannel chan struct{} // This channel is close after we found kubevirt to reload the manager
stopSignalChannel chan os.Signal // stop channel signal
podNamespace string // manager pod namespace
podName string // manager pod name
waitingTime int // Duration in second to lock a mac address before it was saved to etcd
resourceLock resourcelock.Interface // Use for the leader election
}

func NewKubeMacPoolManager(podNamespace, podName, metricsAddr string) *KubeMacPoolManager {
func NewKubeMacPoolManager(podNamespace, podName, metricsAddr string, waitingTime int) *KubeMacPoolManager {
kubemacpoolManager := &KubeMacPoolManager{
continueToRunManager: true,
restartChannel: make(chan struct{}),
kubevirtInstalledChannel: make(chan struct{}),
stopSignalChannel: make(chan os.Signal, 1),
podNamespace: podNamespace,
podName: podName,
metricsAddr: metricsAddr}
metricsAddr: metricsAddr,
waitingTime: waitingTime}

signal.Notify(kubemacpoolManager.stopSignalChannel, os.Interrupt, os.Kill)

Expand Down Expand Up @@ -120,7 +122,7 @@ func (k *KubeMacPoolManager) Run(rangeStart, rangeEnd net.HardwareAddr) error {
}

isKubevirtInstalled := checkForKubevirt(k.clientset)
poolManager, err := poolmanager.NewPoolManager(k.clientset, rangeStart, rangeEnd, isKubevirtInstalled)
poolManager, err := poolmanager.NewPoolManager(k.clientset, rangeStart, rangeEnd, isKubevirtInstalled, k.waitingTime)
if err != nil {
return fmt.Errorf("unable to create pool manager error %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/pool-manager/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
RuntimeObjectFinalizerName = "k8s.v1.cni.cncf.io/kubeMacPool"
networksAnnotation = "k8s.v1.cni.cncf.io/networks"
networksStatusAnnotation = "k8s.v1.cni.cncf.io/networks-status"
vmWaitConfigMapName = "kubemacpool-vm-configmap"
)

var log = logf.Log.WithName("PoolManager")
Expand All @@ -54,7 +55,7 @@ const (
AllocationStatusWaitingForPod AllocationStatus = "WaitingForPod"
)

func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.HardwareAddr, kubevirtExist bool) (*PoolManager, error) {
func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.HardwareAddr, kubevirtExist bool, waitTime int) (*PoolManager, error) {
err := checkRange(rangeStart, rangeEnd)
if err != nil {
return nil, err
Expand Down Expand Up @@ -86,6 +87,8 @@ func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.Ha
return nil, err
}

go poolManger.vmWaitingCleanupLook(waitTime)

return poolManger, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/pool-manager/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var _ = Describe("Pool", func() {
Expect(err).ToNot(HaveOccurred())
endPoolRangeEnv, err := net.ParseMAC(endMacAddr)
Expect(err).ToNot(HaveOccurred())
poolManager, err := NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false)
poolManager, err := NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10)
Expect(err).ToNot(HaveOccurred())

return poolManager
Expand Down Expand Up @@ -112,7 +112,7 @@ var _ = Describe("Pool", func() {
Expect(err).ToNot(HaveOccurred())
endPoolRangeEnv, err := net.ParseMAC("02:00:00:00:00:00")
Expect(err).ToNot(HaveOccurred())
_, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false)
_, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("Invalid range. rangeStart: 0a:00:00:00:00:00 rangeEnd: 02:00:00:00:00:00"))

Expand All @@ -124,7 +124,7 @@ var _ = Describe("Pool", func() {
Expect(err).ToNot(HaveOccurred())
endPoolRangeEnv, err := net.ParseMAC("06:00:00:00:00:00")
Expect(err).ToNot(HaveOccurred())
_, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false)
_, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("RangeStart is invalid: invalid mac address. Multicast addressing is not supported. Unicast addressing must be used. The first octet is 0X3"))

Expand All @@ -136,7 +136,7 @@ var _ = Describe("Pool", func() {
Expect(err).ToNot(HaveOccurred())
endPoolRangeEnv, err := net.ParseMAC("05:00:00:00:00:00")
Expect(err).ToNot(HaveOccurred())
_, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false)
_, err = NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, false, 10)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("RangeEnd is invalid: invalid mac address. Multicast addressing is not supported. Unicast addressing must be used. The first octet is 0X5"))
})
Expand Down
151 changes: 145 additions & 6 deletions pkg/pool-manager/virtualmachine_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ package pool_manager
import (
"fmt"
"net"

"k8s.io/apimachinery/pkg/api/errors"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubevirt "kubevirt.io/kubevirt/pkg/api/v1"

"github.com/K8sNetworkPlumbingWG/kubemacpool/pkg/names"
)

func (p *PoolManager) AllocateVirtualMachineMac(virtualMachine *kubevirt.VirtualMachine) error {
Expand Down Expand Up @@ -78,10 +82,17 @@ func (p *PoolManager) AllocateVirtualMachineMac(virtualMachine *kubevirt.Virtual
return err
}
copyVM.Spec.Template.Spec.Domain.Devices.Interfaces[idx].MacAddress = macAddr
allocations[iface.Name] = iface.MacAddress
allocations[iface.Name] = macAddr
}
}

err := p.AddMacToWaitingConfig(allocations)
if err != nil {
return err
}

virtualMachine.Spec.Template.Spec.Domain.Devices.Interfaces = copyVM.Spec.Template.Spec.Domain.Devices.Interfaces

return nil
}

Expand Down Expand Up @@ -198,7 +209,7 @@ func (p *PoolManager) allocateFromPoolForVirtualMachine(virtualMachine *kubevirt
return "", err
}

p.macPoolMap[macAddr.String()] = AllocationStatusAllocated
p.macPoolMap[macAddr.String()] = AllocationStatusWaitingForPod
log.Info("mac from pool was allocated for virtual machine",
"allocatedMac", macAddr.String(),
"virtualMachineName", virtualMachine.Name,
Expand All @@ -219,7 +230,7 @@ func (p *PoolManager) allocateRequestedVirtualMachineInterfaceMac(virtualMachine
return err
}

p.macPoolMap[requestedMac] = AllocationStatusAllocated
p.macPoolMap[requestedMac] = AllocationStatusWaitingForPod
log.Info("requested mac was allocated for virtual machine",
"requestedMap", requestedMac,
"virtualMachineName", virtualMachine.Name,
Expand All @@ -232,13 +243,24 @@ func (p *PoolManager) initVirtualMachineMap() error {
if !p.isKubevirt {
return nil
}

waitingMac, err := p.getOrCreateVmMacWaitMap()
if err != nil {
return err
}

for macAddress := range waitingMac {
macAddress = strings.Replace(macAddress, "-", ":", 5)
p.macPoolMap[macAddress] = AllocationStatusWaitingForPod
}

result := p.kubeClient.ExtensionsV1beta1().RESTClient().Get().RequestURI("apis/kubevirt.io/v1alpha3/virtualmachines").Do()
if result.Error() != nil {
return result.Error()
}

vms := &kubevirt.VirtualMachineList{}
err := result.Into(vms)
err = result.Into(vms)
if err != nil {
return err
}
Expand Down Expand Up @@ -333,6 +355,123 @@ func (p *PoolManager) revertAllocationOnVm(vmName string, allocations map[string
p.releaseMacAddressesFromInterfaceMap(allocations)
}

// This function return or creates a config map that contains mac address and the allocation time.
func (p *PoolManager) getOrCreateVmMacWaitMap() (map[string]string, error) {
configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
_, err = p.kubeClient.CoreV1().
ConfigMaps(names.MANAGER_NAMESPACE).
Create(&corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: vmWaitConfigMapName,
Namespace: names.MANAGER_NAMESPACE}})

return map[string]string{}, nil
}

return nil, err
}

return configMap.Data, nil
}

// Add all the allocated mac addresses to the waiting config map with the current time.
func (p *PoolManager) AddMacToWaitingConfig(allocations map[string]string) error {
configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{})
if err != nil {
return err
}

if configMap.Data == nil {
configMap.Data = map[string]string{}
}

for _, macAddress := range allocations {
log.V(1).Info("add mac address to waiting config", "macAddress", macAddress)
macAddress = strings.Replace(macAddress, ":", "-", 5)
configMap.Data[macAddress] = time.Now().Format(time.RFC3339)
}

_, err = p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Update(configMap)
return err
}

// Remove all the mac addresses from the waiting configmap this mean the vm was saved in the etcd and pass validations
func (p *PoolManager) MarkVMAsReady(vm *kubevirt.VirtualMachine) error {
p.poolMutex.Lock()
defer p.poolMutex.Unlock()

configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{})
if err != nil {
return err
}

if configMap.Data == nil {
log.Info("the configMap is empty")
return nil
}

for _, vmInterface := range vm.Spec.Template.Spec.Domain.Devices.Interfaces {
if vmInterface.MacAddress != "" {
p.macPoolMap[vmInterface.MacAddress] = AllocationStatusAllocated
macAddress := strings.Replace(vmInterface.MacAddress, ":", "-", 5)
delete(configMap.Data, macAddress)
}
}

_, err = p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Update(configMap)
log.V(1).Info("marked virtual machine as ready", "virtualMachineNamespace", vm.Namespace,
"virtualMachineName", vm.Name)
return err
}

// This function check if there are virtual machines that hits the create
// mutating webhook but we didn't get the creation event in the controller loop
// this mean the create was failed by some other mutating or validating webhook
// so we release the virtual machine
func (p *PoolManager) vmWaitingCleanupLook(waitTime int) {
c := time.Tick(3 * time.Second)
log.Info("starting cleanup loop for waiting mac addresses")
for _ = range c {
p.poolMutex.Lock()

configMap, err := p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Get(vmWaitConfigMapName, metav1.GetOptions{})
if err != nil {
log.Error(err, "failed to get config map", "configMapName", vmWaitConfigMapName)
p.poolMutex.Unlock()
continue
}

if configMap.Data == nil {
log.Info("the configMap is empty", "configMapName", vmWaitConfigMapName)
p.poolMutex.Unlock()
continue
}

for macAddress, allocationTime := range configMap.Data {
t, err := time.Parse(time.RFC3339, allocationTime)
if err != nil {
// TODO: remove the mac from the wait map??
log.Error(err, "failed to parse allocation time")
continue
}

if time.Now().After(t.Add(time.Duration(waitTime) * time.Second)) {
delete(configMap.Data, macAddress)
macAddress = strings.Replace(macAddress, "-", ":", 5)
delete(p.macPoolMap, macAddress)
log.V(1).Info("released mac address in waiting loop", "macAddress", macAddress)
}
}

_, err = p.kubeClient.CoreV1().ConfigMaps(names.MANAGER_NAMESPACE).Update(configMap)
if err != nil {
log.Error(err, "failed to update config map", "configMapName", vmWaitConfigMapName)
}

p.poolMutex.Unlock()
}
}

func vmNamespaced(machine *kubevirt.VirtualMachine) string {
return fmt.Sprintf("%s/%s", machine.Namespace, machine.Name)
}
6 changes: 2 additions & 4 deletions tests/virtual_machines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,8 @@ var _ = Describe("Virtual Machines", func() {
}
})
})
//TODO: remove the the pending annotation -"P"- from "PContext" when issue #44 is fixed :
//https://github.com/K8sNetworkPlumbingWG/kubemacpool/issues/44
//2633
PContext("When we re-apply a failed VM yaml", func() {
Context("When we re-apply a failed VM yaml", func() {
It("should allow to assign to the VM the same MAC addresses, with name as requested before and do not return an error", func() {
err := setRange(rangeStart, rangeEnd)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -485,7 +483,7 @@ var _ = Describe("Virtual Machines", func() {
Eventually(func() error {
return testClient.VirtClient.Create(context.TODO(), anotherVm)

}, 40*time.Second, 5*time.Second).Should(Not(HaveOccurred()), "failed to apply the new vm object")
}, timeout, pollingInterval).Should(Not(HaveOccurred()), "failed to apply the new vm object")
_, err = net.ParseMAC(anotherVm.Spec.Template.Spec.Domain.Devices.Interfaces[0].MacAddress)
Expect(err).ToNot(HaveOccurred())
})
Expand Down

0 comments on commit 6f2a41c

Please sign in to comment.