From 011c7d48ffd7fbab39dfe818eec5e8ebcff4e939 Mon Sep 17 00:00:00 2001 From: Allen Sun Date: Sat, 17 Sep 2022 12:54:21 +0800 Subject: [PATCH] refactor: replace kubectl operation with k8s apiclient call Signed-off-by: Allen Sun --- pkg/runtime/kubernetes/init.go | 17 +++---- pkg/runtime/kubernetes/join_masters.go | 48 ++++++++++---------- pkg/runtime/kubernetes/join_nodes.go | 14 ++---- pkg/runtime/kubernetes/runtime.go | 63 +++++++++++++++++--------- 4 files changed, 76 insertions(+), 66 deletions(-) diff --git a/pkg/runtime/kubernetes/init.go b/pkg/runtime/kubernetes/init.go index b3fe3cc50fe..0bb24c89f38 100644 --- a/pkg/runtime/kubernetes/init.go +++ b/pkg/runtime/kubernetes/init.go @@ -32,19 +32,16 @@ import ( ) const ( - RemoteCmdCopyStatic = "mkdir -p %s && cp -f %s %s" - RemoteApplyYaml = `echo '%s' | kubectl apply -f -` - RemoteCmdGetNetworkInterface = "ls /sys/class/net" - RemoteCmdExistNetworkInterface = "ip addr show %s | egrep \"%s\" || true" - WriteKubeadmConfigCmd = `cd %s && echo '%s' > etc/kubeadm.yml` - DefaultVIP = "10.103.97.2" - DefaultAPIserverDomain = "apiserver.cluster.local" - DefaultRegistryPort = 5000 - DockerCertDir = "/etc/docker/certs.d" + RemoteCmdCopyStatic = "mkdir -p %s && cp -f %s %s" + WriteKubeadmConfigCmd = `cd %s && echo '%s' > etc/kubeadm.yml` + DefaultVIP = "10.103.97.2" + DefaultAPIserverDomain = "apiserver.cluster.local" + DefaultRegistryPort = 5000 + DockerCertDir = "/etc/docker/certs.d" ) func (k *Runtime) ConfigKubeadmOnMaster0() error { - if err := k.LoadFromClusterfile(k.Config.ClusterFileKubeConfig); err != nil { + if err := k.LoadFromClusterfile(k.KubeadmConfigFromClusterfile); err != nil { return fmt.Errorf("failed to load kubeadm config from clusterfile: %v", err) } // TODO handle the kubeadm config, like kubeproxy config diff --git a/pkg/runtime/kubernetes/join_masters.go b/pkg/runtime/kubernetes/join_masters.go index c68f21943e2..d9c33f3d60c 100644 --- a/pkg/runtime/kubernetes/join_masters.go +++ b/pkg/runtime/kubernetes/join_masters.go @@ -23,7 +23,6 @@ import ( "strings" "sync" - "github.com/pkg/errors" "github.com/sealerio/sealer/common" "github.com/sealerio/sealer/pkg/clustercert" "github.com/sealerio/sealer/pkg/ipvs" @@ -32,6 +31,8 @@ import ( "github.com/sealerio/sealer/utils/ssh" versionUtils "github.com/sealerio/sealer/utils/version" "github.com/sealerio/sealer/utils/yaml" + + "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" ) @@ -71,7 +72,6 @@ rm -rf /var/lib/etcd && rm -rf /var/etcd RemoteRemoveRegistryCerts = "rm -rf " + DockerCertDir + "/%s*" RemoveLvscareStaticPod = "rm -rf /etc/kubernetes/manifests/kube-sealyun-lvscare*" CreateLvscareStaticPod = "mkdir -p /etc/kubernetes/manifests && echo '%s' > /etc/kubernetes/manifests/kube-sealyun-lvscare.yaml" - KubeDeleteNode = "kubectl delete node %s" // TODO check kubernetes certs RemoteCheckCerts = "kubeadm alpha certs check-expiration" ) @@ -389,30 +389,32 @@ func (k *Runtime) deleteMasters(masters []net.IP) error { return eg.Wait() } -func (k *Runtime) isHostName(master, host net.IP) (string, error) { - hostString, err := k.CmdToString(master, "kubectl get nodes | grep -v NAME | awk '{print $1}'", ",") +// isHostInExistingCluster checks if the input host is contained in the existing cluster. +// It gets all nodes within existing cluster, +// judges each node name with the hostname of input host node +// and finially get the comparison result. +func (k *Runtime) isHostInExistingCluster(host net.IP) (string, error) { + nodes, err := k.ListNodes() if err != nil { return "", err } + if len(nodes.Items) == 0 { + return "", fmt.Errorf("no node gotten by k8s client") + } + hostName, err := k.CmdToString(host, "hostname", "") if err != nil { return "", err } - hosts := strings.Split(hostString, ",") - var name string - for _, h := range hosts { - if strings.TrimSpace(h) == "" { - continue - } else { - hh := strings.ToLower(h) - fromH := strings.ToLower(hostName) - if hh == fromH { - name = h - break - } + + for _, nodeItem := range nodes.Items { + hh := strings.ToLower(nodeItem.Name) + fromH := strings.ToLower(hostName) + if hh == fromH { + return hh, nil } } - return name, nil + return "", fmt.Errorf("failed to get node: no node's hostname matches hostname of host(%s)", host) } func (k *Runtime) deleteMaster(master net.IP) error { @@ -449,19 +451,15 @@ func (k *Runtime) deleteMaster(master net.IP) error { } if len(masterIPs) > 0 { - hostname, err := k.isHostName(k.cluster.GetMaster0IP(), master) + hostname, err := k.isHostInExistingCluster(master) if err != nil { return err } - master0SSH, err := k.getHostSSHClient(k.cluster.GetMaster0IP()) - if err != nil { - return fmt.Errorf("failed to get master0 ssh client: %v", err) - } - - if err := master0SSH.CmdAsync(k.cluster.GetMaster0IP(), fmt.Sprintf(KubeDeleteNode, strings.TrimSpace(hostname))); err != nil { - return fmt.Errorf("failed to delete node %s: %v", hostname, err) + if err := k.DeleteNode(strings.TrimSpace(hostname)); err != nil { + return err } } + lvsImage := k.RegConfig.Repo() + "/fanux/lvscare:latest" yaml := ipvs.LvsStaticPodYaml(k.getVIP(), masterIPs, lvsImage) eg, _ := errgroup.WithContext(context.Background()) diff --git a/pkg/runtime/kubernetes/join_nodes.go b/pkg/runtime/kubernetes/join_nodes.go index ca07c29677a..c7f72e464e4 100644 --- a/pkg/runtime/kubernetes/join_nodes.go +++ b/pkg/runtime/kubernetes/join_nodes.go @@ -166,21 +166,17 @@ func (k *Runtime) deleteNode(node net.IP) error { if err := ssh.CmdAsync(node, remoteCleanCmds...); err != nil { return err } - //remove node + + // remove node if len(k.cluster.GetMasterIPList()) > 0 { - hostname, err := k.isHostName(k.cluster.GetMaster0IP(), node) + hostname, err := k.isHostInExistingCluster(node) if err != nil { return err } - ssh, err := k.getHostSSHClient(k.cluster.GetMaster0IP()) - if err != nil { - return fmt.Errorf("failed to get master0 ssh client(%s): %v", k.cluster.GetMaster0IP(), err) - } - if err := ssh.CmdAsync(k.cluster.GetMaster0IP(), fmt.Sprintf(KubeDeleteNode, strings.TrimSpace(hostname))); err != nil { - return fmt.Errorf("failed to delete node %s: %v", hostname, err) + if err := k.DeleteNode(strings.TrimSpace(hostname)); err != nil { + return err } } - return nil } diff --git a/pkg/runtime/kubernetes/runtime.go b/pkg/runtime/kubernetes/runtime.go index dd71cceb6c9..ba8ac07c01c 100644 --- a/pkg/runtime/kubernetes/runtime.go +++ b/pkg/runtime/kubernetes/runtime.go @@ -17,45 +17,53 @@ package kubernetes import ( "context" "fmt" - "strings" - "sync" - - "github.com/sealerio/sealer/pkg/registry" - "github.com/sealerio/sealer/pkg/runtime/kubernetes/kubeadm" - "github.com/sealerio/sealer/utils" - versionUtils "github.com/sealerio/sealer/utils/version" - "net" "path/filepath" + "strings" + "sync" "time" "github.com/sealerio/sealer/common" + "github.com/sealerio/sealer/pkg/client/k8s" + "github.com/sealerio/sealer/pkg/registry" "github.com/sealerio/sealer/pkg/runtime" + "github.com/sealerio/sealer/pkg/runtime/kubernetes/kubeadm" v2 "github.com/sealerio/sealer/types/api/v2" + "github.com/sealerio/sealer/utils" "github.com/sealerio/sealer/utils/platform" "github.com/sealerio/sealer/utils/ssh" strUtils "github.com/sealerio/sealer/utils/strings" - "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta2" + versionUtils "github.com/sealerio/sealer/utils/version" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" + "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta2" ) type Config struct { - Vlog int - VIP string - RegConfig *registry.Config // Clusterfile: the absolute path, we need to read kubeadm config from Clusterfile ClusterFileKubeConfig *kubeadm.KubeadmConfig - APIServerDomain string } // Runtime struct is the runtime interface for kubernetes type Runtime struct { *sync.Mutex + // TODO: remove field cluster from runtime pkg + // just make runtime to use essential data from cluster, rather than the whole cluster scope. cluster *v2.Cluster + + // The KubeadmConfig used to setup the final cluster. + // Its data is from KubeadmConfig input from Clusterfile and KubeadmConfig in ClusterImage. *kubeadm.KubeadmConfig - *Config + // *Config + KubeadmConfigFromClusterfile *kubeadm.KubeadmConfig + APIServerDomain string + Vlog int + VIP string + + // RegConfig contains the embedded registry configuration of cluster + RegConfig *registry.Config + *k8s.Client } // NewDefaultRuntime arg "clusterfileKubeConfig" is the Clusterfile path/name, runtime need read kubeadm config from it @@ -67,13 +75,24 @@ func NewDefaultRuntime(cluster *v2.Cluster, clusterfileKubeConfig *kubeadm.Kubea func newKubernetesRuntime(cluster *v2.Cluster, clusterFileKubeConfig *kubeadm.KubeadmConfig) (runtime.Interface, error) { k := &Runtime{ cluster: cluster, - Config: &Config{ + /*Config: &Config{ ClusterFileKubeConfig: clusterFileKubeConfig, - APIServerDomain: DefaultAPIserverDomain, - }, - KubeadmConfig: &kubeadm.KubeadmConfig{}, + },*/ + KubeadmConfigFromClusterfile: clusterFileKubeConfig, + KubeadmConfig: &kubeadm.KubeadmConfig{}, + APIServerDomain: DefaultAPIserverDomain, } - k.Config.RegConfig = registry.GetConfig(k.getImageMountDir(), k.cluster.GetMaster0IP()) + + var err error + if k.Client, err = k8s.Newk8sClient(); err != nil { + // In current design, as runtime controls all cluster operations including run, join, delete + // and so on, then when executing run operation, it will definitely fail when creating k8s client + // since no k8s cluster is setup. While when join and delete operation, the cluster already exists, + // we can make it to create k8s client. Therefore just throw a warn log to move on. + logrus.Warnf("failed to create k8s client: %v", err) + } + + k.RegConfig = registry.GetConfig(k.getImageMountDir(), k.cluster.GetMaster0IP()) k.setCertSANS(append( []string{"127.0.0.1", k.getAPIServerDomain(), k.getVIP().String()}, k.cluster.GetMasterIPStrList()...), @@ -239,7 +258,7 @@ func (k *Runtime) getDNSDomain() string { } func (k *Runtime) getAPIServerDomain() string { - return k.Config.APIServerDomain + return k.APIServerDomain } func (k *Runtime) getKubeVersion() string { @@ -369,8 +388,8 @@ func (k *Runtime) MergeKubeadmConfig() error { if k.getKubeVersion() != "" { return nil } - if k.Config.ClusterFileKubeConfig != nil { - if err := k.LoadFromClusterfile(k.Config.ClusterFileKubeConfig); err != nil { + if k.KubeadmConfigFromClusterfile != nil { + if err := k.LoadFromClusterfile(k.KubeadmConfigFromClusterfile); err != nil { return fmt.Errorf("failed to load kubeadm config from clusterfile: %v", err) } }