diff --git a/images/sds-local-volume-controller/api/v1alpha1/sds_local_volume_config.go b/images/sds-local-volume-controller/api/v1alpha1/sds_local_volume_config.go new file mode 100644 index 00000000..ad93d485 --- /dev/null +++ b/images/sds-local-volume-controller/api/v1alpha1/sds_local_volume_config.go @@ -0,0 +1,21 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +type SdsLocalVolumeConfig struct { + NodeSelector map[string]string `yaml:"nodeSelector"` +} diff --git a/images/sds-local-volume-controller/cmd/main.go b/images/sds-local-volume-controller/cmd/main.go index 0204fc55..e1d6fd73 100644 --- a/images/sds-local-volume-controller/cmd/main.go +++ b/images/sds-local-volume-controller/cmd/main.go @@ -63,7 +63,7 @@ func main() { log.Info("[main] CfgParams has been successfully created") log.Info(fmt.Sprintf("[main] %s = %s", config.LogLevel, cfgParams.Loglevel)) - log.Info(fmt.Sprintf("[main] %s = %d", config.RequeueInterval, cfgParams.RequeueInterval)) + log.Info(fmt.Sprintf("[main] %s = %d", config.RequeueInterval, cfgParams.RequeueStorageClassInterval)) kConfig, err := kubutils.KubernetesDefaultConfigCreate() if err != nil { @@ -97,7 +97,12 @@ func main() { metrics := monitoring.GetMetrics("") if _, err = controller.RunLocalStorageClassWatcherController(mgr, *cfgParams, *log, metrics); err != nil { - log.Error(err, "[main] unable to controller.RunBlockDeviceController") + log.Error(err, fmt.Sprintf("[main] unable to run %s", controller.LocalStorageClassCtrlName)) + os.Exit(1) + } + + if _, err = controller.RunLocalCSINodeWatcherController(mgr, *cfgParams, *log, metrics); err != nil { + log.Error(err, fmt.Sprintf("[main] unable to run %s", controller.LocalCSINodeWatcherCtrl)) os.Exit(1) } diff --git a/images/sds-local-volume-controller/pkg/config/config.go b/images/sds-local-volume-controller/pkg/config/config.go index 1fdd70dd..f151dab8 100644 --- a/images/sds-local-volume-controller/pkg/config/config.go +++ b/images/sds-local-volume-controller/pkg/config/config.go @@ -20,13 +20,16 @@ import ( ) const ( - LogLevel = "LOG_LEVEL" - RequeueInterval = "REQUEUE_INTERVAL" + LogLevel = "LOG_LEVEL" + RequeueInterval = "REQUEUE_INTERVAL" + ConfigSecretName = "d8-sds-local-volume-controller-config" ) type Options struct { - Loglevel logger.Verbosity - RequeueInterval time.Duration + Loglevel logger.Verbosity + RequeueStorageClassInterval time.Duration + RequeueSecretInterval time.Duration + ConfigSecretName string } func NewConfig() *Options { @@ -39,7 +42,9 @@ func NewConfig() *Options { opts.Loglevel = logger.Verbosity(loglevel) } - opts.RequeueInterval = 10 + opts.RequeueStorageClassInterval = 10 + opts.RequeueSecretInterval = 10 + opts.ConfigSecretName = ConfigSecretName return &opts } diff --git a/images/sds-local-volume-controller/pkg/controller/local_csi_node_watcher.go b/images/sds-local-volume-controller/pkg/controller/local_csi_node_watcher.go new file mode 100644 index 00000000..8c7f2f2a --- /dev/null +++ b/images/sds-local-volume-controller/pkg/controller/local_csi_node_watcher.go @@ -0,0 +1,207 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "sds-local-volume-controller/api/v1alpha1" + "sds-local-volume-controller/pkg/config" + "sds-local-volume-controller/pkg/logger" + "sds-local-volume-controller/pkg/monitoring" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + "sigs.k8s.io/yaml" + "time" +) + +const ( + LocalCSINodeWatcherCtrl = "local-csi-node-watcher-controller" + localCsiNodeSelectorLabel = "storage.deckhouse.io/sds-local-volume-node" +) + +func RunLocalCSINodeWatcherController( + mgr manager.Manager, + cfg config.Options, + log logger.Logger, + metrics monitoring.Metrics, +) (controller.Controller, error) { + cl := mgr.GetClient() + + c, err := controller.New(LocalCSINodeWatcherCtrl, mgr, controller.Options{ + Reconciler: reconcile.Func(func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + log.Info(fmt.Sprintf("[RunLocalCSINodeWatcherController] Reconciler func starts a reconciliation for the request: %s", request.NamespacedName.String())) + if request.Name == cfg.ConfigSecretName { + log.Debug(fmt.Sprintf("[RunLocalCSINodeWatcherController] request name %s matches the target config secret name %s. Start to reconcile", request.Name, cfg.ConfigSecretName)) + + log.Debug(fmt.Sprintf("[RunLocalCSINodeWatcherController] tries to get a secret by the request %s", request.NamespacedName.String())) + secret, err := getSecret(ctx, cl, request.Namespace, request.Name) + if err != nil { + log.Error(err, fmt.Sprintf("[RunLocalCSINodeWatcherController] unable to get a secret by the request %s", request.NamespacedName.String())) + return reconcile.Result{}, err + } + log.Debug(fmt.Sprintf("[RunLocalCSINodeWatcherController] successfully got a secret by the request %s", request.NamespacedName.String())) + + log.Debug(fmt.Sprintf("[RunLocalCSINodeWatcherController] tries to reconcile local CSI nodes for the secret %s/%s", secret.Namespace, secret.Name)) + err = reconcileLocalCSINodes(ctx, cl, log, secret) + if err != nil { + log.Error(err, fmt.Sprintf("[RunLocalCSINodeWatcherController] unable to reconcile local CSI nodes for the secret %s/%s", secret.Namespace, secret.Name)) + return reconcile.Result{}, err + } + log.Debug(fmt.Sprintf("[RunLocalCSINodeWatcherController] successfully reconciled local CSI nodes for the secret %s/%s", secret.Namespace, secret.Name)) + + return reconcile.Result{ + RequeueAfter: cfg.RequeueSecretInterval * time.Second, + }, nil + } + + return reconcile.Result{}, nil + }), + }) + if err != nil { + return nil, err + } + + err = c.Watch(source.Kind(mgr.GetCache(), &v1.Secret{}), &handler.EnqueueRequestForObject{}) + + return c, err +} + +func getSecret(ctx context.Context, cl client.Client, namespace, name string) (*v1.Secret, error) { + secret := &v1.Secret{} + err := cl.Get(ctx, + client.ObjectKey{ + Namespace: namespace, + Name: name, + }, secret) + return secret, err +} + +func reconcileLocalCSINodes(ctx context.Context, cl client.Client, log logger.Logger, secret *v1.Secret) error { + log.Debug("[reconcileLocalCSINodes] tries to get a selector from the config") + nodeSelector, err := getNodeSelectorFromConfig(secret) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLocalCSINodes] unable to get node selector from the secret %s/%s", secret.Namespace, secret.Name)) + return err + } + log.Trace(fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] node selector from the config: %v", nodeSelector)) + log.Debug("[reconcileLocalCSINodes] successfully got a selector from the config") + + log.Debug(fmt.Sprintf("[reconcileLocalCSINodes] tries to get kubernetes nodes by the selector %v", nodeSelector)) + nodesWithSelector, err := getKubernetesNodesBySelector(ctx, cl, nodeSelector) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLocalCSINodes] unable to get nodes by selector %v", nodeSelector)) + return err + } + for _, n := range nodesWithSelector.Items { + log.Trace(fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] node %s has been got by selector %v", n.Name, nodeSelector)) + } + log.Debug("[reconcileLocalCSINodes] successfully got kubernetes nodes by the selector") + + labelNodesWithLocalCSIIfNeeded(ctx, cl, log, nodesWithSelector) + log.Debug(fmt.Sprintf("[reconcileLocalCSINodes] finished labeling the selected nodes with a label %s", localCsiNodeSelectorLabel)) + + log.Debug(fmt.Sprintf("[reconcileLocalCSINodes] start to clear the nodes without the selector %v", nodeSelector)) + log.Debug("[reconcileLocalCSINodes] tries to get all kubernetes nodes") + nodes, err := getKubeNodes(ctx, cl) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLocalCSINodes] unable to get nodes")) + return err + } + for _, n := range nodes.Items { + log.Trace(fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] node %s has been got", n.Name)) + } + log.Debug("[reconcileLocalCSINodes] successfully got all kubernetes nodes") + + removeLocalCSILabelIfNeeded(ctx, cl, log, nodes, nodeSelector) + log.Debug(fmt.Sprintf("[reconcileLocalCSINodes] finished removing the label %s from the nodes without the selector %v", localCsiNodeSelectorLabel, nodeSelector)) + + return nil +} + +func removeLocalCSILabelIfNeeded(ctx context.Context, cl client.Client, log logger.Logger, nodes *v1.NodeList, selector map[string]string) { + var err error + for _, node := range nodes.Items { + if labels.Set(selector).AsSelector().Matches(labels.Set(node.Labels)) { + log.Debug(fmt.Sprintf("[removeLocalCSILabelIfNeeded] no need to remove a label %s from the node %s as its labels match the selector", localCsiNodeSelectorLabel, node.Name)) + continue + } + + if _, exist := node.Labels[localCsiNodeSelectorLabel]; !exist { + log.Debug(fmt.Sprintf("[removeLocalCSILabelIfNeeded] no need to remove a label %s from the node %s as it does not has the label", localCsiNodeSelectorLabel, node.Name)) + continue + } + + delete(node.Labels, localCsiNodeSelectorLabel) + err = cl.Update(ctx, &node) + if err != nil { + log.Error(err, fmt.Sprintf("[removeLocalCSILabelIfNeeded] unable to update the node %s", node.Name)) + continue + } + + log.Debug(fmt.Sprintf("[removeLocalCSILabelIfNeeded] the label %s has been successfully removed from the node %s", localCsiNodeSelectorLabel, node.Name)) + } +} + +func getKubeNodes(ctx context.Context, cl client.Client) (*v1.NodeList, error) { + nodes := &v1.NodeList{} + err := cl.List(ctx, nodes) + return nodes, err +} + +func getNodeSelectorFromConfig(secret *v1.Secret) (map[string]string, error) { + var sdsConfig v1alpha1.SdsLocalVolumeConfig + err := yaml.Unmarshal(secret.Data["config"], &sdsConfig) + if err != nil { + return nil, err + } + + nodeSelector := sdsConfig.NodeSelector + return nodeSelector, nil +} + +func getKubernetesNodesBySelector(ctx context.Context, cl client.Client, selector map[string]string) (*v1.NodeList, error) { + nodes := &v1.NodeList{} + err := cl.List(ctx, nodes, client.MatchingLabels(selector)) + return nodes, err +} + +func labelNodesWithLocalCSIIfNeeded(ctx context.Context, cl client.Client, log logger.Logger, nodes *v1.NodeList) { + var err error + for _, node := range nodes.Items { + if _, exist := node.Labels[localCsiNodeSelectorLabel]; exist { + log.Debug(fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] a node %s has already been labeled with label %s", node.Name, localCsiNodeSelectorLabel)) + continue + } + + node.Labels[localCsiNodeSelectorLabel] = "" + + err = cl.Update(ctx, &node) + if err != nil { + log.Error(err, fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] unable to update a node %s", node.Name)) + continue + } + + log.Debug(fmt.Sprintf("[labelNodesWithLocalCSIIfNeeded] successufully added label %s to the node %s", localCsiNodeSelectorLabel, node.Name)) + } +} diff --git a/images/sds-local-volume-controller/pkg/controller/local_storage_class_watcher.go b/images/sds-local-volume-controller/pkg/controller/local_storage_class_watcher.go index 9ed83d59..895907ac 100644 --- a/images/sds-local-volume-controller/pkg/controller/local_storage_class_watcher.go +++ b/images/sds-local-volume-controller/pkg/controller/local_storage_class_watcher.go @@ -117,7 +117,7 @@ func RunLocalStorageClassWatcherController( if shouldRequeue { log.Warning(fmt.Sprintf("[LocalStorageClassReconciler] Reconciler will requeue the request, name: %s", request.Name)) return reconcile.Result{ - RequeueAfter: cfg.RequeueInterval * time.Second, + RequeueAfter: cfg.RequeueStorageClassInterval * time.Second, }, nil } @@ -151,7 +151,7 @@ func RunLocalStorageClassWatcherController( Namespace: lsc.Namespace, Name: lsc.Name, }, - }, cfg.RequeueInterval*time.Second) + }, cfg.RequeueStorageClassInterval*time.Second) return } @@ -167,7 +167,7 @@ func RunLocalStorageClassWatcherController( Namespace: lsc.Namespace, Name: lsc.Name, }, - }, cfg.RequeueInterval*time.Second) + }, cfg.RequeueStorageClassInterval*time.Second) } log.Info(fmt.Sprintf("[CreateFunc] ends the reconciliation for the LocalStorageClass %s", e.Object.GetName())) }, @@ -202,7 +202,7 @@ func RunLocalStorageClassWatcherController( Namespace: newLsc.Namespace, Name: newLsc.Name, }, - }, cfg.RequeueInterval*time.Second) + }, cfg.RequeueStorageClassInterval*time.Second) return } @@ -218,7 +218,7 @@ func RunLocalStorageClassWatcherController( Namespace: newLsc.Namespace, Name: newLsc.Name, }, - }, cfg.RequeueInterval*time.Second) + }, cfg.RequeueStorageClassInterval*time.Second) } log.Info(fmt.Sprintf("[UpdateFunc] ends the reconciliation for the LocalStorageClass %s", e.ObjectNew.GetName())) diff --git a/templates/sds-local-volume-controller/rbac-for-us.yaml b/templates/sds-local-volume-controller/rbac-for-us.yaml index 443c18a3..999e850d 100644 --- a/templates/sds-local-volume-controller/rbac-for-us.yaml +++ b/templates/sds-local-volume-controller/rbac-for-us.yaml @@ -55,6 +55,7 @@ rules: resources: - nodes - persistentvolumes + - secrets verbs: - get - list