Skip to content

Commit

Permalink
[controller] Add a controller to label nodes for sds-local-volume-csi
Browse files Browse the repository at this point in the history
Signed-off-by: Viktor Kramarenko <viktor.kramarenko@flant.com>
  • Loading branch information
ViktorKram committed Apr 23, 2024
1 parent 0c3955b commit b671f3a
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2024 Flant JSC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

type SdsLocalVolumeConfig struct {
NodeSelector map[string]string `yaml:"nodeSelector"`
}
9 changes: 7 additions & 2 deletions images/sds-local-volume-controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {

log.Info("[main] CfgParams has been successfully created")
log.Info(fmt.Sprintf("[main] %s = %s", config.LogLevel, cfgParams.Loglevel))
log.Info(fmt.Sprintf("[main] %s = %d", config.RequeueInterval, cfgParams.RequeueInterval))
log.Info(fmt.Sprintf("[main] %s = %d", config.RequeueInterval, cfgParams.RequeueStorageClassInterval))

kConfig, err := kubutils.KubernetesDefaultConfigCreate()
if err != nil {
Expand Down Expand Up @@ -97,7 +97,12 @@ func main() {
metrics := monitoring.GetMetrics("")

if _, err = controller.RunLocalStorageClassWatcherController(mgr, *cfgParams, *log, metrics); err != nil {
log.Error(err, "[main] unable to controller.RunBlockDeviceController")
log.Error(err, fmt.Sprintf("[main] unable to run %s", controller.LocalStorageClassCtrlName))
os.Exit(1)
}

if _, err = controller.RunLocalCSINodeWatcherController(mgr, *cfgParams, *log, metrics); err != nil {
log.Error(err, fmt.Sprintf("[main] unable to run %s", controller.LocalCSINodeWatcherCtrl))
os.Exit(1)
}

Expand Down
15 changes: 10 additions & 5 deletions images/sds-local-volume-controller/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
)

const (
LogLevel = "LOG_LEVEL"
RequeueInterval = "REQUEUE_INTERVAL"
LogLevel = "LOG_LEVEL"
RequeueInterval = "REQUEUE_INTERVAL"
ConfigSecretName = "d8-sds-local-volume-controller-config"
)

type Options struct {
Loglevel logger.Verbosity
RequeueInterval time.Duration
Loglevel logger.Verbosity
RequeueStorageClassInterval time.Duration
RequeueSecretInterval time.Duration
ConfigSecretName string
}

func NewConfig() *Options {
Expand All @@ -39,7 +42,9 @@ func NewConfig() *Options {
opts.Loglevel = logger.Verbosity(loglevel)
}

opts.RequeueInterval = 10
opts.RequeueStorageClassInterval = 10
opts.RequeueSecretInterval = 10
opts.ConfigSecretName = ConfigSecretName

return &opts
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
Copyright 2024 Flant JSC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"sds-local-volume-controller/api/v1alpha1"
"sds-local-volume-controller/pkg/config"
"sds-local-volume-controller/pkg/logger"
"sds-local-volume-controller/pkg/monitoring"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/yaml"
"time"
)

const (
LocalCSINodeWatcherCtrl = "local-csi-node-watcher-controller"
localCsiNodeSelectorLabel = "storage.deckhouse.io/sds-local-volume-node"
)

func RunLocalCSINodeWatcherController(
mgr manager.Manager,
cfg config.Options,
log logger.Logger,
metrics monitoring.Metrics,
) (controller.Controller, error) {
cl := mgr.GetClient()

c, err := controller.New(LocalCSINodeWatcherCtrl, mgr, controller.Options{
Reconciler: reconcile.Func(func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
log.Info(fmt.Sprintf("[RunLocalCSINodeWatcherController] Reconciler func starts a reconciliation for the request: %s", request.NamespacedName.String()))
if request.Name == cfg.ConfigSecretName {
log.Debug(fmt.Sprintf("[RunLocalCSINodeWatcherController] request name %s matches the target config secret name %s. Start to reconcile", request.Name, cfg.ConfigSecretName))

log.Debug(fmt.Sprintf("[RunLocalCSINodeWatcherController] tries to get a secret by the request %s", request.NamespacedName.String()))
secret, err := getSecret(ctx, cl, request.Namespace, request.Name)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLocalCSINodeWatcherController] unable to get a secret by the request %s", request.NamespacedName.String()))
return reconcile.Result{}, err
}
log.Debug(fmt.Sprintf("[RunLocalCSINodeWatcherController] successfully got a secret by the request %s", request.NamespacedName.String()))

log.Debug(fmt.Sprintf("[RunLocalCSINodeWatcherController] tries to reconcile local CSI nodes for the secret %s/%s", secret.Namespace, secret.Name))
err = reconcileLocalCSINodes(ctx, cl, log, secret)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLocalCSINodeWatcherController] unable to reconcile local CSI nodes for the secret %s/%s", secret.Namespace, secret.Name))
return reconcile.Result{}, err
}
log.Debug(fmt.Sprintf("[RunLocalCSINodeWatcherController] successfully reconciled local CSI nodes for the secret %s/%s", secret.Namespace, secret.Name))

return reconcile.Result{
RequeueAfter: cfg.RequeueSecretInterval * time.Second,
}, nil
}

return reconcile.Result{}, nil
}),
})
if err != nil {
return nil, err
}

err = c.Watch(source.Kind(mgr.GetCache(), &v1.Secret{}), &handler.EnqueueRequestForObject{})

return c, err
}

func getSecret(ctx context.Context, cl client.Client, namespace, name string) (*v1.Secret, error) {
secret := &v1.Secret{}
err := cl.Get(ctx,
client.ObjectKey{
Namespace: namespace,
Name: name,
}, secret)
return secret, err
}

func reconcileLocalCSINodes(ctx context.Context, cl client.Client, log logger.Logger, secret *v1.Secret) error {
log.Debug("[reconcileLocalCSINodes] tries to get a selector from the config")
nodeSelector, err := getNodeSelectorFromConfig(secret)
if err != nil {
log.Error(err, fmt.Sprintf("[reconcileLocalCSINodes] unable to get node selector from the secret %s/%s", secret.Namespace, secret.Name))
return err
}
log.Trace(fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] node selector from the config: %v", nodeSelector))
log.Debug("[reconcileLocalCSINodes] successfully got a selector from the config")

log.Debug(fmt.Sprintf("[reconcileLocalCSINodes] tries to get kubernetes nodes by the selector %v", nodeSelector))
nodesWithSelector, err := getKubernetesNodesBySelector(ctx, cl, nodeSelector)
if err != nil {
log.Error(err, fmt.Sprintf("[reconcileLocalCSINodes] unable to get nodes by selector %v", nodeSelector))
return err
}
for _, n := range nodesWithSelector.Items {
log.Trace(fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] node %s has been got by selector %v", n.Name, nodeSelector))
}
log.Debug("[reconcileLocalCSINodes] successfully got kubernetes nodes by the selector")

labelNodesWithLocalCSIIfNeeded(ctx, cl, log, nodesWithSelector)
log.Debug(fmt.Sprintf("[reconcileLocalCSINodes] finished labeling the selected nodes with a label %s", localCsiNodeSelectorLabel))

log.Debug(fmt.Sprintf("[reconcileLocalCSINodes] start to clear the nodes without the selector %v", nodeSelector))
log.Debug("[reconcileLocalCSINodes] tries to get all kubernetes nodes")
nodes, err := getKubeNodes(ctx, cl)
if err != nil {
log.Error(err, fmt.Sprintf("[reconcileLocalCSINodes] unable to get nodes"))
return err
}
for _, n := range nodes.Items {
log.Trace(fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] node %s has been got", n.Name))
}
log.Debug("[reconcileLocalCSINodes] successfully got all kubernetes nodes")

removeLocalCSILabelIfNeeded(ctx, cl, log, nodes, nodeSelector)
log.Debug(fmt.Sprintf("[reconcileLocalCSINodes] finished removing the label %s from the nodes without the selector %v", localCsiNodeSelectorLabel, nodeSelector))

return nil
}

func removeLocalCSILabelIfNeeded(ctx context.Context, cl client.Client, log logger.Logger, nodes *v1.NodeList, selector map[string]string) {
var err error
for _, node := range nodes.Items {
if labels.Set(selector).AsSelector().Matches(labels.Set(node.Labels)) {
log.Debug(fmt.Sprintf("[removeLocalCSILabelIfNeeded] no need to remove a label %s from the node %s as its labels match the selector", localCsiNodeSelectorLabel, node.Name))
continue
}

if _, exist := node.Labels[localCsiNodeSelectorLabel]; !exist {
log.Debug(fmt.Sprintf("[removeLocalCSILabelIfNeeded] no need to remove a label %s from the node %s as it does not has the label", localCsiNodeSelectorLabel, node.Name))
continue
}

delete(node.Labels, localCsiNodeSelectorLabel)
err = cl.Update(ctx, &node)
if err != nil {
log.Error(err, fmt.Sprintf("[removeLocalCSILabelIfNeeded] unable to update the node %s", node.Name))
continue
}

log.Debug(fmt.Sprintf("[removeLocalCSILabelIfNeeded] the label %s has been successfully removed from the node %s", localCsiNodeSelectorLabel, node.Name))
}
}

func getKubeNodes(ctx context.Context, cl client.Client) (*v1.NodeList, error) {
nodes := &v1.NodeList{}
err := cl.List(ctx, nodes)
return nodes, err
}

func getNodeSelectorFromConfig(secret *v1.Secret) (map[string]string, error) {
var sdsConfig v1alpha1.SdsLocalVolumeConfig
err := yaml.Unmarshal(secret.Data["config"], &sdsConfig)
if err != nil {
return nil, err
}

nodeSelector := sdsConfig.NodeSelector
return nodeSelector, nil
}

func getKubernetesNodesBySelector(ctx context.Context, cl client.Client, selector map[string]string) (*v1.NodeList, error) {
nodes := &v1.NodeList{}
err := cl.List(ctx, nodes, client.MatchingLabels(selector))
return nodes, err
}

func labelNodesWithLocalCSIIfNeeded(ctx context.Context, cl client.Client, log logger.Logger, nodes *v1.NodeList) {
var err error
for _, node := range nodes.Items {
if _, exist := node.Labels[localCsiNodeSelectorLabel]; exist {
log.Debug(fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] a node %s has already been labeled with label %s", node.Name, localCsiNodeSelectorLabel))
continue
}

node.Labels[localCsiNodeSelectorLabel] = ""

err = cl.Update(ctx, &node)
if err != nil {
log.Error(err, fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] unable to update a node %s", node.Name))
continue
}

log.Debug(fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] successufully added label %s to the node %s", localCsiNodeSelectorLabel, node.Name))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func RunLocalStorageClassWatcherController(
if shouldRequeue {
log.Warning(fmt.Sprintf("[LocalStorageClassReconciler] Reconciler will requeue the request, name: %s", request.Name))
return reconcile.Result{
RequeueAfter: cfg.RequeueInterval * time.Second,
RequeueAfter: cfg.RequeueStorageClassInterval * time.Second,
}, nil
}

Expand Down Expand Up @@ -151,7 +151,7 @@ func RunLocalStorageClassWatcherController(
Namespace: lsc.Namespace,
Name: lsc.Name,
},
}, cfg.RequeueInterval*time.Second)
}, cfg.RequeueStorageClassInterval*time.Second)
return
}

Expand All @@ -167,7 +167,7 @@ func RunLocalStorageClassWatcherController(
Namespace: lsc.Namespace,
Name: lsc.Name,
},
}, cfg.RequeueInterval*time.Second)
}, cfg.RequeueStorageClassInterval*time.Second)
}
log.Info(fmt.Sprintf("[CreateFunc] ends the reconciliation for the LocalStorageClass %s", e.Object.GetName()))
},
Expand Down Expand Up @@ -202,7 +202,7 @@ func RunLocalStorageClassWatcherController(
Namespace: newLsc.Namespace,
Name: newLsc.Name,
},
}, cfg.RequeueInterval*time.Second)
}, cfg.RequeueStorageClassInterval*time.Second)
return
}

Expand All @@ -218,7 +218,7 @@ func RunLocalStorageClassWatcherController(
Namespace: newLsc.Namespace,
Name: newLsc.Name,
},
}, cfg.RequeueInterval*time.Second)
}, cfg.RequeueStorageClassInterval*time.Second)
}

log.Info(fmt.Sprintf("[UpdateFunc] ends the reconciliation for the LocalStorageClass %s", e.ObjectNew.GetName()))
Expand Down
1 change: 1 addition & 0 deletions templates/sds-local-volume-controller/rbac-for-us.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ rules:
resources:
- nodes
- persistentvolumes
- secrets
verbs:
- get
- list
Expand Down

0 comments on commit b671f3a

Please sign in to comment.