Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Extract kubeclient from cloud provider #1164

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 5 additions & 37 deletions pkg/blob/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package blob

import (
"errors"
"fmt"
"os"
"strings"
Expand All @@ -27,9 +26,7 @@ import (
"github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/go-autorest/autorest"
"golang.org/x/net/context"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/cloud-provider-azure/pkg/azclient/configloader"
Expand All @@ -45,36 +42,20 @@ var (

// IsAzureStackCloud decides whether the driver is running on Azure Stack Cloud.
func IsAzureStackCloud(cloud *azure.Cloud) bool {
return !cloud.Config.DisableAzureStackCloud && strings.EqualFold(cloud.Config.Cloud, "AZURESTACKCLOUD")
return !cloud.DisableAzureStackCloud && strings.EqualFold(cloud.Cloud, "AZURESTACKCLOUD")
}

// getCloudProvider get Azure Cloud Provider
func GetCloudProvider(ctx context.Context, kubeconfig, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool, kubeAPIQPS float64, kubeAPIBurst int) (*azure.Cloud, error) {
func GetCloudProvider(ctx context.Context, kubeClient kubernetes.Interface, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool) (*azure.Cloud, error) {
var (
config *azure.Config
kubeClient *clientset.Clientset
fromSecret bool
err error
)

az := &azure.Cloud{}
az.Environment.StorageEndpointSuffix = storage.DefaultBaseURL

kubeCfg, err := getKubeConfig(kubeconfig)
if err == nil && kubeCfg != nil {
// set QPS and QPS Burst as higher values
klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(kubeAPIQPS), kubeAPIBurst)
kubeCfg.QPS = float32(kubeAPIQPS)
kubeCfg.Burst = kubeAPIBurst
if kubeClient, err = clientset.NewForConfig(kubeCfg); err != nil {
klog.Warningf("NewForConfig failed with error: %v", err)
}
} else {
klog.Warningf("get kubeconfig(%s) failed with error: %v", kubeconfig, err)
if !os.IsNotExist(err) && !errors.Is(err, rest.ErrNotInCluster) {
return az, fmt.Errorf("failed to get KubeClient: %w", err)
}
}

if kubeClient != nil {
az.KubeClient = kubeClient
klog.V(2).Infof("reading cloud config from secret %s/%s", secretNamespace, secretName)
Expand Down Expand Up @@ -180,7 +161,7 @@ func (d *Driver) initializeKvClient() (*kv.BaseClient, error) {
func (d *Driver) getKeyvaultToken() (authorizer autorest.Authorizer, err error) {
env := d.cloud.Environment
kvEndPoint := strings.TrimSuffix(env.KeyVaultEndpoint, "/")
servicePrincipalToken, err := providerconfig.GetServicePrincipalToken(&d.cloud.Config.AzureAuthConfig, &env, kvEndPoint)
servicePrincipalToken, err := providerconfig.GetServicePrincipalToken(&d.cloud.AzureAuthConfig, &env, kvEndPoint)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -255,16 +236,3 @@ func (d *Driver) updateSubnetServiceEndpoints(ctx context.Context, vnetResourceG

return nil
}

func getKubeConfig(kubeconfig string) (config *rest.Config, err error) {
if kubeconfig != "" {
if config, err = clientcmd.BuildConfigFromFlags("", kubeconfig); err != nil {
return nil, err
}
} else {
if config, err = rest.InClusterConfig(); err != nil {
return nil, err
}
}
return config, err
}
95 changes: 10 additions & 85 deletions pkg/blob/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/Azure/go-autorest/autorest/azure"
"github.com/stretchr/testify/assert"

"sigs.k8s.io/blob-csi-driver/pkg/util"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/subnetclient/mocksubnetclient"
azureprovider "sigs.k8s.io/cloud-provider-azure/pkg/provider"

Expand Down Expand Up @@ -114,7 +115,7 @@ users:
kubeconfig: emptyKubeConfig,
nodeID: "",
allowEmptyCloudConfig: true,
expectedErr: fmt.Errorf("failed to get KubeClient: invalid configuration: no configuration has been provided, try setting KUBERNETES_MASTER environment variable"),
expectedErr: fmt.Errorf("invalid configuration: no configuration has been provided, try setting KUBERNETES_MASTER environment variable"),
},
{
desc: "[failure] out of cluster & in cluster, specify a fake kubeconfig, no credential file",
Expand Down Expand Up @@ -168,7 +169,14 @@ users:
}
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
}
cloud, err := GetCloudProvider(context.Background(), test.kubeconfig, test.nodeID, "", "", test.userAgent, test.allowEmptyCloudConfig, 25.0, 50)
kubeClient, err := util.GetKubeClient(test.kubeconfig, 25.0, 50, "")
if err != nil {
if !reflect.DeepEqual(err, test.expectedErr) && test.expectedErr != nil && !strings.Contains(err.Error(), test.expectedErr.Error()) {
t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeconfig, err, test.expectedErr)
}
continue
}
cloud, err := GetCloudProvider(context.Background(), kubeClient, test.nodeID, "", "", test.userAgent, test.allowEmptyCloudConfig)
if !reflect.DeepEqual(err, test.expectedErr) && test.expectedErr != nil && !strings.Contains(err.Error(), test.expectedErr.Error()) {
t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeconfig, err, test.expectedErr)
}
Expand Down Expand Up @@ -374,86 +382,3 @@ func TestUpdateSubnetServiceEndpoints(t *testing.T) {
t.Run(tc.name, tc.testFunc)
}
}

func TestGetKubeConfig(t *testing.T) {
emptyKubeConfig := "empty-Kube-Config"
validKubeConfig := "valid-Kube-Config"
fakeContent := `
apiVersion: v1
clusters:
- cluster:
server: https://localhost:8080
name: foo-cluster
contexts:
- context:
cluster: foo-cluster
user: foo-user
namespace: bar
name: foo-context
current-context: foo-context
kind: Config
users:
- name: foo-user
user:
exec:
apiVersion: client.authentication.k8s.io/v1beta1
args:
- arg-1
- arg-2
command: foo-command
`
err := createTestFile(emptyKubeConfig)
if err != nil {
t.Error(err)
}
defer func() {
if err := os.Remove(emptyKubeConfig); err != nil {
t.Error(err)
}
}()

err = createTestFile(validKubeConfig)
if err != nil {
t.Error(err)
}
defer func() {
if err := os.Remove(validKubeConfig); err != nil {
t.Error(err)
}
}()

if err := os.WriteFile(validKubeConfig, []byte(fakeContent), 0666); err != nil {
t.Error(err)
}

tests := []struct {
desc string
kubeconfig string
expectError bool
envVariableHasConfig bool
envVariableConfigIsValid bool
}{
{
desc: "[success] valid kube config passed",
kubeconfig: validKubeConfig,
expectError: false,
envVariableHasConfig: false,
envVariableConfigIsValid: false,
},
{
desc: "[failure] invalid kube config passed",
kubeconfig: emptyKubeConfig,
expectError: true,
envVariableHasConfig: false,
envVariableConfigIsValid: false,
},
}

for _, test := range tests {
_, err := getKubeConfig(test.kubeconfig)
receiveError := (err != nil)
if test.expectError != receiveError {
t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectErr: %v", test.desc, test.kubeconfig, err, test.expectError)
}
}
}
30 changes: 25 additions & 5 deletions pkg/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package blob

import (
"context"
"flag"
"fmt"
"os"
"strconv"
Expand All @@ -29,8 +31,6 @@ import (
az "github.com/Azure/go-autorest/autorest/azure"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/pborman/uuid"
"golang.org/x/net/context"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -43,6 +43,7 @@ import (
csicommon "sigs.k8s.io/blob-csi-driver/pkg/csi-common"
"sigs.k8s.io/blob-csi-driver/pkg/util"
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
"sigs.k8s.io/cloud-provider-azure/pkg/provider"
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
)

Expand Down Expand Up @@ -167,11 +168,29 @@ type DriverOptions struct {
SasTokenExpirationMinutes int
}

func (option *DriverOptions) AddFlags() {
andyzhangx marked this conversation as resolved.
Show resolved Hide resolved
flag.StringVar(&option.BlobfuseProxyEndpoint, "blobfuse-proxy-endpoint", "unix://tmp/blobfuse-proxy.sock", "blobfuse-proxy endpoint")
flag.StringVar(&option.NodeID, "nodeid", "", "node id")
flag.StringVar(&option.DriverName, "drivername", DefaultDriverName, "name of the driver")
flag.BoolVar(&option.EnableBlobfuseProxy, "enable-blobfuse-proxy", false, "using blobfuse proxy for mounts")
flag.IntVar(&option.BlobfuseProxyConnTimout, "blobfuse-proxy-connect-timeout", 5, "blobfuse proxy connection timeout(seconds)")
flag.BoolVar(&option.EnableBlobMockMount, "enable-blob-mock-mount", false, "enable mock mount(only for testing)")
flag.BoolVar(&option.EnableGetVolumeStats, "enable-get-volume-stats", false, "allow GET_VOLUME_STATS on agent node")
flag.BoolVar(&option.AppendTimeStampInCacheDir, "append-timestamp-cache-dir", false, "append timestamp into cache directory on agent node")
flag.Uint64Var(&option.MountPermissions, "mount-permissions", 0777, "mounted folder permissions")
flag.BoolVar(&option.AllowInlineVolumeKeyAccessWithIdentity, "allow-inline-volume-key-access-with-idenitity", false, "allow accessing storage account key using cluster identity for inline volume")
flag.BoolVar(&option.AppendMountErrorHelpLink, "append-mount-error-help-link", true, "Whether to include a link for help with mount errors when a mount error occurs.")
flag.BoolVar(&option.EnableAznfsMount, "enable-aznfs-mount", false, "replace nfs mount with aznfs mount")
flag.IntVar(&option.VolStatsCacheExpireInMinutes, "vol-stats-cache-expire-in-minutes", 10, "The cache expire time in minutes for volume stats cache")
flag.IntVar(&option.SasTokenExpirationMinutes, "sas-token-expiration-minutes", 1440, "sas token expiration minutes during volume cloning")
}

// Driver implements all interfaces of CSI drivers
type Driver struct {
csicommon.CSIDriver

cloud *azure.Cloud
KubeClient kubernetes.Interface
blobfuseProxyEndpoint string
// enableBlobMockMount is only for testing, DO NOT set as true in non-testing scenario
enableBlobMockMount bool
Expand Down Expand Up @@ -206,7 +225,8 @@ type Driver struct {

// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
// does not support optional driver plugin info manifest field. Refer to CSI spec for more details.
func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver {
func NewDriver(options *DriverOptions, kubeClient kubernetes.Interface, cloud *provider.Cloud) *Driver {
var err error
d := Driver{
volLockMap: util.NewLockMap(),
subnetLockMap: util.NewLockMap(),
Expand All @@ -222,12 +242,13 @@ func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver {
enableAznfsMount: options.EnableAznfsMount,
sasTokenExpirationMinutes: options.SasTokenExpirationMinutes,
azcopy: &util.Azcopy{},
KubeClient: kubeClient,
cloud: cloud,
}
d.Name = options.DriverName
d.Version = driverVersion
d.NodeID = options.NodeID

var err error
getter := func(key string) (interface{}, error) { return nil, nil }
if d.accountSearchCache, err = azcache.NewTimedCache(time.Minute, getter, false); err != nil {
klog.Fatalf("%v", err)
Expand All @@ -242,7 +263,6 @@ func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver {
if d.volStatsCache, err = azcache.NewTimedCache(time.Duration(options.VolStatsCacheExpireInMinutes)*time.Minute, getter, false); err != nil {
klog.Fatalf("%v", err)
}
d.cloud = cloud
d.mounter = &mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: utilexec.New(),
Expand Down
33 changes: 25 additions & 8 deletions pkg/blob/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package blob
import (
"context"
"errors"
"flag"
"fmt"
"os"
"reflect"
Expand All @@ -29,12 +30,10 @@ import (
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

v1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"

"sigs.k8s.io/blob-csi-driver/pkg/util"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/storageaccountclient/mockstorageaccountclient"
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
Expand All @@ -56,7 +55,7 @@ func NewFakeDriver() *Driver {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
driver := NewDriver(&driverOptions, &azure.Cloud{})
driver := NewDriver(&driverOptions, nil, &azure.Cloud{})
driver.Name = fakeDriverName
driver.Version = vendorVersion
driver.subnetLockMap = util.NewLockMap()
Expand All @@ -72,7 +71,7 @@ func TestNewFakeDriver(t *testing.T) {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
d := NewDriver(&driverOptions, &azure.Cloud{})
d := NewDriver(&driverOptions, nil, &azure.Cloud{})
assert.NotNil(t, d)
}

Expand All @@ -85,7 +84,7 @@ func TestNewDriver(t *testing.T) {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
driver := NewDriver(&driverOptions, &azure.Cloud{})
driver := NewDriver(&driverOptions, nil, &azure.Cloud{})
fakedriver := NewFakeDriver()
fakedriver.Name = DefaultDriverName
fakedriver.Version = driverVersion
Expand Down Expand Up @@ -1231,7 +1230,7 @@ func TestGetSubnetResourceID(t *testing.T) {
d.cloud.VnetResourceGroup = "foo"
actualOutput := d.getSubnetResourceID("", "", "")
expectedOutput := fmt.Sprintf(subnetTemplate, d.cloud.SubscriptionID, "foo", d.cloud.VnetName, d.cloud.SubnetName)
assert.Equal(t, actualOutput, expectedOutput, "cloud.SubscriptionID should be used as the SubID")
assert.Equal(t, actualOutput, expectedOutput, "config.SubscriptionID should be used as the SubID")
},
},
{
Expand Down Expand Up @@ -1259,7 +1258,7 @@ func TestGetSubnetResourceID(t *testing.T) {
d.cloud.VnetResourceGroup = ""
actualOutput := d.getSubnetResourceID("", "", "")
expectedOutput := fmt.Sprintf(subnetTemplate, "bar", d.cloud.ResourceGroup, d.cloud.VnetName, d.cloud.SubnetName)
assert.Equal(t, actualOutput, expectedOutput, "cloud.Resourcegroup should be used as the rg")
assert.Equal(t, actualOutput, expectedOutput, "config.ResourceGroup should be used as the rg")
},
},
{
Expand All @@ -1273,7 +1272,7 @@ func TestGetSubnetResourceID(t *testing.T) {
d.cloud.VnetResourceGroup = "fakeVnetResourceGroup"
actualOutput := d.getSubnetResourceID("", "", "")
expectedOutput := fmt.Sprintf(subnetTemplate, "bar", d.cloud.VnetResourceGroup, d.cloud.VnetName, d.cloud.SubnetName)
assert.Equal(t, actualOutput, expectedOutput, "cloud.VnetResourceGroup should be used as the rg")
assert.Equal(t, actualOutput, expectedOutput, "config.VnetResourceGroup should be used as the rg")
},
},
{
Expand Down Expand Up @@ -1703,3 +1702,21 @@ func TestIsNFSProtocol(t *testing.T) {
}
}
}

func TestDriverOptions_AddFlags(t *testing.T) {
t.Run("test options", func(t *testing.T) {
option := DriverOptions{}
option.AddFlags()
typeInfo := reflect.TypeOf(option)
numOfExpectedOptions := typeInfo.NumField()
count := 0
flag.CommandLine.VisitAll(func(f *flag.Flag) {
if !strings.Contains(f.Name, "test") {
count++
}
})
if numOfExpectedOptions != count {
t.Errorf("expected %d flags, but found %d flag in DriverOptions", numOfExpectedOptions, count)
}
})
}
Loading
Loading