diff --git a/pkg/blob/azure.go b/pkg/blob/azure.go index c0bff4421c..a99a596535 100644 --- a/pkg/blob/azure.go +++ b/pkg/blob/azure.go @@ -17,7 +17,6 @@ limitations under the License. package blob import ( - "errors" "fmt" "os" "strings" @@ -27,9 +26,7 @@ import ( "github.com/Azure/azure-sdk-for-go/storage" "github.com/Azure/go-autorest/autorest" "golang.org/x/net/context" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/utils/pointer" "sigs.k8s.io/cloud-provider-azure/pkg/azclient/configloader" @@ -45,36 +42,20 @@ var ( // IsAzureStackCloud decides whether the driver is running on Azure Stack Cloud. func IsAzureStackCloud(cloud *azure.Cloud) bool { - return !cloud.Config.DisableAzureStackCloud && strings.EqualFold(cloud.Config.Cloud, "AZURESTACKCLOUD") + return !cloud.DisableAzureStackCloud && strings.EqualFold(cloud.Cloud, "AZURESTACKCLOUD") } // getCloudProvider get Azure Cloud Provider -func GetCloudProvider(ctx context.Context, kubeconfig, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool, kubeAPIQPS float64, kubeAPIBurst int) (*azure.Cloud, error) { +func GetCloudProvider(ctx context.Context, kubeClient kubernetes.Interface, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool) (*azure.Cloud, error) { var ( config *azure.Config - kubeClient *clientset.Clientset fromSecret bool + err error ) az := &azure.Cloud{} az.Environment.StorageEndpointSuffix = storage.DefaultBaseURL - kubeCfg, err := getKubeConfig(kubeconfig) - if err == nil && kubeCfg != nil { - // set QPS and QPS Burst as higher values - klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(kubeAPIQPS), kubeAPIBurst) - kubeCfg.QPS = float32(kubeAPIQPS) - kubeCfg.Burst = kubeAPIBurst - if kubeClient, err = clientset.NewForConfig(kubeCfg); err != nil { - klog.Warningf("NewForConfig failed with error: %v", err) - } - } else { - klog.Warningf("get kubeconfig(%s) failed with error: %v", kubeconfig, err) - if !os.IsNotExist(err) && !errors.Is(err, rest.ErrNotInCluster) { - return az, fmt.Errorf("failed to get KubeClient: %w", err) - } - } - if kubeClient != nil { az.KubeClient = kubeClient klog.V(2).Infof("reading cloud config from secret %s/%s", secretNamespace, secretName) @@ -180,7 +161,7 @@ func (d *Driver) initializeKvClient() (*kv.BaseClient, error) { func (d *Driver) getKeyvaultToken() (authorizer autorest.Authorizer, err error) { env := d.cloud.Environment kvEndPoint := strings.TrimSuffix(env.KeyVaultEndpoint, "/") - servicePrincipalToken, err := providerconfig.GetServicePrincipalToken(&d.cloud.Config.AzureAuthConfig, &env, kvEndPoint) + servicePrincipalToken, err := providerconfig.GetServicePrincipalToken(&d.cloud.AzureAuthConfig, &env, kvEndPoint) if err != nil { return nil, err } @@ -255,16 +236,3 @@ func (d *Driver) updateSubnetServiceEndpoints(ctx context.Context, vnetResourceG return nil } - -func getKubeConfig(kubeconfig string) (config *rest.Config, err error) { - if kubeconfig != "" { - if config, err = clientcmd.BuildConfigFromFlags("", kubeconfig); err != nil { - return nil, err - } - } else { - if config, err = rest.InClusterConfig(); err != nil { - return nil, err - } - } - return config, err -} diff --git a/pkg/blob/azure_test.go b/pkg/blob/azure_test.go index ce093fad69..cc20dfe03b 100644 --- a/pkg/blob/azure_test.go +++ b/pkg/blob/azure_test.go @@ -31,6 +31,7 @@ import ( "github.com/Azure/go-autorest/autorest/azure" "github.com/stretchr/testify/assert" + "sigs.k8s.io/blob-csi-driver/pkg/util" "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/subnetclient/mocksubnetclient" azureprovider "sigs.k8s.io/cloud-provider-azure/pkg/provider" @@ -114,7 +115,7 @@ users: kubeconfig: emptyKubeConfig, nodeID: "", allowEmptyCloudConfig: true, - expectedErr: fmt.Errorf("failed to get KubeClient: invalid configuration: no configuration has been provided, try setting KUBERNETES_MASTER environment variable"), + expectedErr: fmt.Errorf("invalid configuration: no configuration has been provided, try setting KUBERNETES_MASTER environment variable"), }, { desc: "[failure] out of cluster & in cluster, specify a fake kubeconfig, no credential file", @@ -168,7 +169,15 @@ users: } os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile) } - cloud, err := GetCloudProvider(context.Background(), test.kubeconfig, test.nodeID, "", "", test.userAgent, test.allowEmptyCloudConfig, 25.0, 50) + kubeClient, err := util.GetKubeClient(test.kubeconfig, 25.0, 50, "") + if err != nil { + if !reflect.DeepEqual(err, test.expectedErr) && test.expectedErr != nil && !strings.Contains(err.Error(), test.expectedErr.Error()) { + t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeconfig, err, test.expectedErr) + } + continue + kubeClient = nil + } + cloud, err := GetCloudProvider(context.Background(), kubeClient, test.nodeID, "", "", test.userAgent, test.allowEmptyCloudConfig) if !reflect.DeepEqual(err, test.expectedErr) && test.expectedErr != nil && !strings.Contains(err.Error(), test.expectedErr.Error()) { t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeconfig, err, test.expectedErr) } @@ -374,86 +383,3 @@ func TestUpdateSubnetServiceEndpoints(t *testing.T) { t.Run(tc.name, tc.testFunc) } } - -func TestGetKubeConfig(t *testing.T) { - emptyKubeConfig := "empty-Kube-Config" - validKubeConfig := "valid-Kube-Config" - fakeContent := ` -apiVersion: v1 -clusters: -- cluster: - server: https://localhost:8080 - name: foo-cluster -contexts: -- context: - cluster: foo-cluster - user: foo-user - namespace: bar - name: foo-context -current-context: foo-context -kind: Config -users: -- name: foo-user - user: - exec: - apiVersion: client.authentication.k8s.io/v1beta1 - args: - - arg-1 - - arg-2 - command: foo-command -` - err := createTestFile(emptyKubeConfig) - if err != nil { - t.Error(err) - } - defer func() { - if err := os.Remove(emptyKubeConfig); err != nil { - t.Error(err) - } - }() - - err = createTestFile(validKubeConfig) - if err != nil { - t.Error(err) - } - defer func() { - if err := os.Remove(validKubeConfig); err != nil { - t.Error(err) - } - }() - - if err := os.WriteFile(validKubeConfig, []byte(fakeContent), 0666); err != nil { - t.Error(err) - } - - tests := []struct { - desc string - kubeconfig string - expectError bool - envVariableHasConfig bool - envVariableConfigIsValid bool - }{ - { - desc: "[success] valid kube config passed", - kubeconfig: validKubeConfig, - expectError: false, - envVariableHasConfig: false, - envVariableConfigIsValid: false, - }, - { - desc: "[failure] invalid kube config passed", - kubeconfig: emptyKubeConfig, - expectError: true, - envVariableHasConfig: false, - envVariableConfigIsValid: false, - }, - } - - for _, test := range tests { - _, err := getKubeConfig(test.kubeconfig) - receiveError := (err != nil) - if test.expectError != receiveError { - t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectErr: %v", test.desc, test.kubeconfig, err, test.expectError) - } - } -} diff --git a/pkg/blob/blob.go b/pkg/blob/blob.go index 41c6515b21..0f6e693e12 100644 --- a/pkg/blob/blob.go +++ b/pkg/blob/blob.go @@ -17,6 +17,8 @@ limitations under the License. package blob import ( + "context" + "flag" "fmt" "os" "strconv" @@ -29,8 +31,6 @@ import ( az "github.com/Azure/go-autorest/autorest/azure" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/pborman/uuid" - "golang.org/x/net/context" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,6 +43,7 @@ import ( csicommon "sigs.k8s.io/blob-csi-driver/pkg/csi-common" "sigs.k8s.io/blob-csi-driver/pkg/util" azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache" + "sigs.k8s.io/cloud-provider-azure/pkg/provider" azure "sigs.k8s.io/cloud-provider-azure/pkg/provider" ) @@ -167,11 +168,29 @@ type DriverOptions struct { SasTokenExpirationMinutes int } +func (option *DriverOptions) AddFlags() { + flag.StringVar(&option.BlobfuseProxyEndpoint, "blobfuse-proxy-endpoint", "unix://tmp/blobfuse-proxy.sock", "blobfuse-proxy endpoint") + flag.StringVar(&option.NodeID, "nodeid", "", "node id") + flag.StringVar(&option.DriverName, "drivername", DefaultDriverName, "name of the driver") + flag.BoolVar(&option.EnableBlobfuseProxy, "enable-blobfuse-proxy", false, "using blobfuse proxy for mounts") + flag.IntVar(&option.BlobfuseProxyConnTimout, "blobfuse-proxy-connect-timeout", 5, "blobfuse proxy connection timeout(seconds)") + flag.BoolVar(&option.EnableBlobMockMount, "enable-blob-mock-mount", false, "enable mock mount(only for testing)") + flag.BoolVar(&option.EnableGetVolumeStats, "enable-get-volume-stats", false, "allow GET_VOLUME_STATS on agent node") + flag.BoolVar(&option.AppendTimeStampInCacheDir, "append-timestamp-cache-dir", false, "append timestamp into cache directory on agent node") + flag.Uint64Var(&option.MountPermissions, "mount-permissions", 0777, "mounted folder permissions") + flag.BoolVar(&option.AllowInlineVolumeKeyAccessWithIdentity, "allow-inline-volume-key-access-with-idenitity", false, "allow accessing storage account key using cluster identity for inline volume") + flag.BoolVar(&option.AppendMountErrorHelpLink, "append-mount-error-help-link", true, "Whether to include a link for help with mount errors when a mount error occurs.") + flag.BoolVar(&option.EnableAznfsMount, "enable-aznfs-mount", false, "replace nfs mount with aznfs mount") + flag.IntVar(&option.VolStatsCacheExpireInMinutes, "vol-stats-cache-expire-in-minutes", 10, "The cache expire time in minutes for volume stats cache") + flag.IntVar(&option.SasTokenExpirationMinutes, "sas-token-expiration-minutes", 1440, "sas token expiration minutes during volume cloning") +} + // Driver implements all interfaces of CSI drivers type Driver struct { csicommon.CSIDriver cloud *azure.Cloud + KubeClient kubernetes.Interface blobfuseProxyEndpoint string // enableBlobMockMount is only for testing, DO NOT set as true in non-testing scenario enableBlobMockMount bool @@ -206,7 +225,8 @@ type Driver struct { // NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version & // does not support optional driver plugin info manifest field. Refer to CSI spec for more details. -func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver { +func NewDriver(options *DriverOptions, kubeClient kubernetes.Interface, cloud *provider.Cloud) *Driver { + var err error d := Driver{ volLockMap: util.NewLockMap(), subnetLockMap: util.NewLockMap(), @@ -222,12 +242,13 @@ func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver { enableAznfsMount: options.EnableAznfsMount, sasTokenExpirationMinutes: options.SasTokenExpirationMinutes, azcopy: &util.Azcopy{}, + KubeClient: kubeClient, + cloud: cloud, } d.Name = options.DriverName d.Version = driverVersion d.NodeID = options.NodeID - var err error getter := func(key string) (interface{}, error) { return nil, nil } if d.accountSearchCache, err = azcache.NewTimedCache(time.Minute, getter, false); err != nil { klog.Fatalf("%v", err) @@ -242,7 +263,6 @@ func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver { if d.volStatsCache, err = azcache.NewTimedCache(time.Duration(options.VolStatsCacheExpireInMinutes)*time.Minute, getter, false); err != nil { klog.Fatalf("%v", err) } - d.cloud = cloud d.mounter = &mount.SafeFormatAndMount{ Interface: mount.New(""), Exec: utilexec.New(), diff --git a/pkg/blob/blob_test.go b/pkg/blob/blob_test.go index 190f42eef5..96ec182c7b 100644 --- a/pkg/blob/blob_test.go +++ b/pkg/blob/blob_test.go @@ -56,7 +56,7 @@ func NewFakeDriver() *Driver { BlobfuseProxyConnTimout: 5, EnableBlobMockMount: false, } - driver := NewDriver(&driverOptions, &azure.Cloud{}) + driver := NewDriver(&driverOptions, nil, &azure.Cloud{}) driver.Name = fakeDriverName driver.Version = vendorVersion driver.subnetLockMap = util.NewLockMap() @@ -72,7 +72,7 @@ func TestNewFakeDriver(t *testing.T) { BlobfuseProxyConnTimout: 5, EnableBlobMockMount: false, } - d := NewDriver(&driverOptions, &azure.Cloud{}) + d := NewDriver(&driverOptions, nil, &azure.Cloud{}) assert.NotNil(t, d) } @@ -85,7 +85,7 @@ func TestNewDriver(t *testing.T) { BlobfuseProxyConnTimout: 5, EnableBlobMockMount: false, } - driver := NewDriver(&driverOptions, &azure.Cloud{}) + driver := NewDriver(&driverOptions, nil, &azure.Cloud{}) fakedriver := NewFakeDriver() fakedriver.Name = DefaultDriverName fakedriver.Version = driverVersion @@ -1231,7 +1231,7 @@ func TestGetSubnetResourceID(t *testing.T) { d.cloud.VnetResourceGroup = "foo" actualOutput := d.getSubnetResourceID("", "", "") expectedOutput := fmt.Sprintf(subnetTemplate, d.cloud.SubscriptionID, "foo", d.cloud.VnetName, d.cloud.SubnetName) - assert.Equal(t, actualOutput, expectedOutput, "cloud.SubscriptionID should be used as the SubID") + assert.Equal(t, actualOutput, expectedOutput, "config.SubscriptionID should be used as the SubID") }, }, { @@ -1259,7 +1259,7 @@ func TestGetSubnetResourceID(t *testing.T) { d.cloud.VnetResourceGroup = "" actualOutput := d.getSubnetResourceID("", "", "") expectedOutput := fmt.Sprintf(subnetTemplate, "bar", d.cloud.ResourceGroup, d.cloud.VnetName, d.cloud.SubnetName) - assert.Equal(t, actualOutput, expectedOutput, "cloud.Resourcegroup should be used as the rg") + assert.Equal(t, actualOutput, expectedOutput, "config.ResourceGroup should be used as the rg") }, }, { @@ -1273,7 +1273,7 @@ func TestGetSubnetResourceID(t *testing.T) { d.cloud.VnetResourceGroup = "fakeVnetResourceGroup" actualOutput := d.getSubnetResourceID("", "", "") expectedOutput := fmt.Sprintf(subnetTemplate, "bar", d.cloud.VnetResourceGroup, d.cloud.VnetName, d.cloud.SubnetName) - assert.Equal(t, actualOutput, expectedOutput, "cloud.VnetResourceGroup should be used as the rg") + assert.Equal(t, actualOutput, expectedOutput, "config.VnetResourceGroup should be used as the rg") }, }, { diff --git a/pkg/blob/controllerserver_test.go b/pkg/blob/controllerserver_test.go index 0f7e037477..7446c20122 100644 --- a/pkg/blob/controllerserver_test.go +++ b/pkg/blob/controllerserver_test.go @@ -546,8 +546,8 @@ func TestCreateVolume(t *testing.T) { testFunc: func(t *testing.T) { d := NewFakeDriver() d.cloud = &azure.Cloud{} - d.cloud.Config.DisableAzureStackCloud = false - d.cloud.Config.Cloud = "AZURESTACKCLOUD" + d.cloud.DisableAzureStackCloud = false + d.cloud.Cloud = "AZURESTACKCLOUD" d.cloud.SubscriptionID = "subID" mp := make(map[string]string) mp[storeAccountKeyField] = falseValue diff --git a/pkg/blobplugin/main.go b/pkg/blobplugin/main.go index 474bb4a4ea..fe9934ea45 100644 --- a/pkg/blobplugin/main.go +++ b/pkg/blobplugin/main.go @@ -26,45 +26,37 @@ import ( "strings" "sigs.k8s.io/blob-csi-driver/pkg/blob" + "sigs.k8s.io/blob-csi-driver/pkg/util" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" ) +var driverOptions blob.DriverOptions var ( - endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") - blobfuseProxyEndpoint = flag.String("blobfuse-proxy-endpoint", "unix://tmp/blobfuse-proxy.sock", "blobfuse-proxy endpoint") - nodeID = flag.String("nodeid", "", "node id") - version = flag.Bool("version", false, "Print the version and exit.") - metricsAddress = flag.String("metrics-address", "", "export the metrics") - kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") - driverName = flag.String("drivername", blob.DefaultDriverName, "name of the driver") - enableBlobfuseProxy = flag.Bool("enable-blobfuse-proxy", false, "using blobfuse proxy for mounts") - blobfuseProxyConnTimout = flag.Int("blobfuse-proxy-connect-timeout", 5, "blobfuse proxy connection timeout(seconds)") - enableBlobMockMount = flag.Bool("enable-blob-mock-mount", false, "enable mock mount(only for testing)") - cloudConfigSecretName = flag.String("cloud-config-secret-name", "azure-cloud-provider", "secret name of cloud config") - cloudConfigSecretNamespace = flag.String("cloud-config-secret-namespace", "kube-system", "secret namespace of cloud config") - customUserAgent = flag.String("custom-user-agent", "", "custom userAgent") - userAgentSuffix = flag.String("user-agent-suffix", "", "userAgent suffix") - allowEmptyCloudConfig = flag.Bool("allow-empty-cloud-config", true, "allow running driver without cloud config") - enableGetVolumeStats = flag.Bool("enable-get-volume-stats", false, "allow GET_VOLUME_STATS on agent node") - appendTimeStampInCacheDir = flag.Bool("append-timestamp-cache-dir", false, "append timestamp into cache directory on agent node") - mountPermissions = flag.Uint64("mount-permissions", 0777, "mounted folder permissions") - allowInlineVolumeKeyAccessWithIdentity = flag.Bool("allow-inline-volume-key-access-with-idenitity", false, "allow accessing storage account key using cluster identity for inline volume") - kubeAPIQPS = flag.Float64("kube-api-qps", 25.0, "QPS to use while communicating with the kubernetes apiserver.") - kubeAPIBurst = flag.Int("kube-api-burst", 50, "Burst to use while communicating with the kubernetes apiserver.") - appendMountErrorHelpLink = flag.Bool("append-mount-error-help-link", true, "Whether to include a link for help with mount errors when a mount error occurs.") - enableAznfsMount = flag.Bool("enable-aznfs-mount", false, "replace nfs mount with aznfs mount") - volStatsCacheExpireInMinutes = flag.Int("vol-stats-cache-expire-in-minutes", 10, "The cache expire time in minutes for volume stats cache") - sasTokenExpirationMinutes = flag.Int("sas-token-expiration-minutes", 1440, "sas token expiration minutes during volume cloning") + metricsAddress = flag.String("metrics-address", "", "export the metrics") + version = flag.Bool("version", false, "Print the version and exit.") + endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") + kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") + cloudConfigSecretName = flag.String("cloud-config-secret-name", "azure-cloud-provider", "secret name of cloud config") + cloudConfigSecretNamespace = flag.String("cloud-config-secret-namespace", "kube-system", "secret namespace of cloud config") + customUserAgent = flag.String("custom-user-agent", "", "custom userAgent") + userAgentSuffix = flag.String("user-agent-suffix", "", "userAgent suffix") + allowEmptyCloudConfig = flag.Bool("allow-empty-cloud-config", true, "allow running driver without cloud config") + kubeAPIQPS = flag.Float64("kube-api-qps", 25.0, "QPS to use while communicating with the kubernetes apiserver.") + kubeAPIBurst = flag.Int("kube-api-burst", 50, "Burst to use while communicating with the kubernetes apiserver.") ) +func init() { + driverOptions.AddFlags() +} + func main() { klog.InitFlags(nil) _ = flag.Set("logtostderr", "true") flag.Parse() if *version { - info, err := blob.GetVersionYAML(*driverName) + info, err := blob.GetVersionYAML(driverOptions.DriverName) if err != nil { klog.Fatalln(err) } @@ -78,33 +70,21 @@ func main() { } func handle() { - driverOptions := blob.DriverOptions{ - NodeID: *nodeID, - DriverName: *driverName, - BlobfuseProxyEndpoint: *blobfuseProxyEndpoint, - EnableBlobfuseProxy: *enableBlobfuseProxy, - BlobfuseProxyConnTimout: *blobfuseProxyConnTimout, - EnableBlobMockMount: *enableBlobMockMount, - EnableGetVolumeStats: *enableGetVolumeStats, - AppendTimeStampInCacheDir: *appendTimeStampInCacheDir, - MountPermissions: *mountPermissions, - AllowInlineVolumeKeyAccessWithIdentity: *allowInlineVolumeKeyAccessWithIdentity, - AppendMountErrorHelpLink: *appendMountErrorHelpLink, - EnableAznfsMount: *enableAznfsMount, - VolStatsCacheExpireInMinutes: *volStatsCacheExpireInMinutes, - SasTokenExpirationMinutes: *sasTokenExpirationMinutes, - } - userAgent := blob.GetUserAgent(driverOptions.DriverName, *customUserAgent, *userAgentSuffix) klog.V(2).Infof("driver userAgent: %s", userAgent) - cloud, err := blob.GetCloudProvider(context.Background(), *kubeconfig, driverOptions.NodeID, *cloudConfigSecretName, *cloudConfigSecretNamespace, userAgent, *allowEmptyCloudConfig, *kubeAPIQPS, *kubeAPIBurst) + kubeClient, err := util.GetKubeClient(*kubeconfig, *kubeAPIQPS, *kubeAPIBurst, userAgent) + if err != nil { + klog.Fatalf("failed to get kubeClient, error: %v", err) + } + + cloud, err := blob.GetCloudProvider(context.Background(), kubeClient, driverOptions.NodeID, *cloudConfigSecretName, *cloudConfigSecretNamespace, userAgent, *allowEmptyCloudConfig) if err != nil { klog.Fatalf("failed to get Azure Cloud Provider, error: %v", err) } klog.V(2).Infof("cloud: %s, location: %s, rg: %s, VnetName: %s, VnetResourceGroup: %s, SubnetName: %s", cloud.Cloud, cloud.Location, cloud.ResourceGroup, cloud.VnetName, cloud.VnetResourceGroup, cloud.SubnetName) - driver := blob.NewDriver(&driverOptions, cloud) + driver := blob.NewDriver(&driverOptions, kubeClient, cloud) if driver == nil { klog.Fatalln("Failed to initialize Azure Blob Storage CSI driver") } diff --git a/pkg/util/util.go b/pkg/util/util.go index 3159ad1e38..47e9bc3476 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -26,6 +26,9 @@ import ( "github.com/go-ini/ini" "github.com/pkg/errors" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" ) @@ -305,3 +308,36 @@ func parseAzcopyJobShow(jobshow string) (AzcopyJobState, string, error) { } return AzcopyJobRunning, strings.ReplaceAll(segments[1], "\n", ""), nil } + +func GetKubeClient(kubeconfig string, kubeAPIQPS float64, kubeAPIBurst int, userAgent string) (kubernetes.Interface, error) { + var kubeClient kubernetes.Interface + var err error + var kubeCfg *rest.Config + if kubeconfig != "" { + if kubeCfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig); err != nil { + return nil, err + } + } else { + if kubeCfg, err = rest.InClusterConfig(); err != nil { + return nil, err + } + } + + if err == nil && kubeCfg != nil { + // set QPS and QPS Burst as higher values + klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(kubeAPIQPS), kubeAPIBurst) + kubeCfg.QPS = float32(kubeAPIQPS) + kubeCfg.Burst = kubeAPIBurst + kubeCfg.UserAgent = userAgent + if kubeClient, err = kubernetes.NewForConfig(kubeCfg); err != nil { + klog.Warningf("NewForConfig failed with error: %v", err) + return nil, err + } + } else { + klog.Warningf("get kubeconfig(%s) failed with error: %v", kubeconfig, err) + if !os.IsNotExist(err) && !errors.Is(err, rest.ErrNotInCluster) { + return nil, fmt.Errorf("failed to get KubeClient: %w", err) + } + } + return kubeClient, nil +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index dabfd77e9c..3b8cb5dc03 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -524,3 +524,71 @@ func TestParseAzcopyJobShow(t *testing.T) { } } } + +func TestGetKubeConfig(t *testing.T) { + emptyKubeConfig := "empty-Kube-Config" + validKubeConfig := "valid-Kube-Config" + fakeContent := ` +apiVersion: v1 +clusters: +- cluster: + server: https://localhost:8080 + name: foo-cluster +contexts: +- context: + cluster: foo-cluster + user: foo-user + namespace: bar + name: foo-context +current-context: foo-context +kind: Config +users: +- name: foo-user + user: + exec: + apiVersion: client.authentication.k8s.io/v1beta1 + args: + - arg-1 + - arg-2 + command: foo-command +` + if err := os.WriteFile(validKubeConfig, []byte(""), 0666); err != nil { + t.Error(err) + } + defer os.Remove(emptyKubeConfig) + if err := os.WriteFile(validKubeConfig, []byte(fakeContent), 0666); err != nil { + t.Error(err) + } + defer os.Remove(validKubeConfig) + + tests := []struct { + desc string + kubeconfig string + expectError bool + envVariableHasConfig bool + envVariableConfigIsValid bool + }{ + { + desc: "[success] valid kube config passed", + kubeconfig: validKubeConfig, + expectError: false, + envVariableHasConfig: false, + envVariableConfigIsValid: false, + }, + { + desc: "[failure] invalid kube config passed", + kubeconfig: emptyKubeConfig, + expectError: true, + envVariableHasConfig: false, + envVariableConfigIsValid: false, + }, + } + + for _, test := range tests { + _, err := GetKubeClient(test.kubeconfig, 25.0, 50, "") + receiveError := (err != nil) + if test.expectError != receiveError { + t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectErr: %v", test.desc, test.kubeconfig, err, test.expectError) + } + } +} diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 9b51476def..c4604f0a73 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework/config" _ "k8s.io/kubernetes/test/e2e/framework/debug/init" "sigs.k8s.io/blob-csi-driver/pkg/blob" + "sigs.k8s.io/blob-csi-driver/pkg/util" "sigs.k8s.io/blob-csi-driver/test/utils/azure" "sigs.k8s.io/blob-csi-driver/test/utils/credentials" "sigs.k8s.io/blob-csi-driver/test/utils/testutil" @@ -151,9 +152,11 @@ var _ = ginkgo.SynchronizedBeforeSuite(func(ctx ginkgo.SpecContext) []byte { BlobfuseProxyConnTimout: 5, EnableBlobMockMount: false, } - cloud, err := blob.GetCloudProvider(context.Background(), kubeconfig, driverOptions.NodeID, "", "", "", false, 0, 0) + kubeClient, err := util.GetKubeClient(kubeconfig, 25.0, 50, "") gomega.Expect(err).NotTo(gomega.HaveOccurred()) - blobDriver = blob.NewDriver(&driverOptions, cloud) + cloud, err := blob.GetCloudProvider(context.Background(), kubeClient, driverOptions.NodeID, "", "", "", false) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + blobDriver = blob.NewDriver(&driverOptions, kubeClient, cloud) go func() { blobDriver.Run(fmt.Sprintf("unix:///tmp/csi-%s.sock", uuid.NewUUID().String()), false) }()