Skip to content

Commit

Permalink
Refactor: Extract kubeclient from cloud provider
Browse files Browse the repository at this point in the history
Signed-off-by: Fan Shang Xiang <shafan@microsoft.com>
  • Loading branch information
MartinForReal committed Dec 11, 2023
1 parent 3d09e17 commit 5ba5f74
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 182 deletions.
42 changes: 5 additions & 37 deletions pkg/blob/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package blob

import (
"errors"
"fmt"
"os"
"strings"
Expand All @@ -27,9 +26,7 @@ import (
"github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/go-autorest/autorest"
"golang.org/x/net/context"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/cloud-provider-azure/pkg/azclient/configloader"
Expand All @@ -45,36 +42,20 @@ var (

// IsAzureStackCloud decides whether the driver is running on Azure Stack Cloud.
func IsAzureStackCloud(cloud *azure.Cloud) bool {
return !cloud.Config.DisableAzureStackCloud && strings.EqualFold(cloud.Config.Cloud, "AZURESTACKCLOUD")
return !cloud.DisableAzureStackCloud && strings.EqualFold(cloud.Cloud, "AZURESTACKCLOUD")
}

// getCloudProvider get Azure Cloud Provider
func GetCloudProvider(ctx context.Context, kubeconfig, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool, kubeAPIQPS float64, kubeAPIBurst int) (*azure.Cloud, error) {
func GetCloudProvider(ctx context.Context, kubeClient kubernetes.Interface, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool) (*azure.Cloud, error) {
var (
config *azure.Config
kubeClient *clientset.Clientset
fromSecret bool
err error
)

az := &azure.Cloud{}
az.Environment.StorageEndpointSuffix = storage.DefaultBaseURL

kubeCfg, err := getKubeConfig(kubeconfig)
if err == nil && kubeCfg != nil {
// set QPS and QPS Burst as higher values
klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(kubeAPIQPS), kubeAPIBurst)
kubeCfg.QPS = float32(kubeAPIQPS)
kubeCfg.Burst = kubeAPIBurst
if kubeClient, err = clientset.NewForConfig(kubeCfg); err != nil {
klog.Warningf("NewForConfig failed with error: %v", err)
}
} else {
klog.Warningf("get kubeconfig(%s) failed with error: %v", kubeconfig, err)
if !os.IsNotExist(err) && !errors.Is(err, rest.ErrNotInCluster) {
return az, fmt.Errorf("failed to get KubeClient: %w", err)
}
}

if kubeClient != nil {
az.KubeClient = kubeClient
klog.V(2).Infof("reading cloud config from secret %s/%s", secretNamespace, secretName)
Expand Down Expand Up @@ -180,7 +161,7 @@ func (d *Driver) initializeKvClient() (*kv.BaseClient, error) {
func (d *Driver) getKeyvaultToken() (authorizer autorest.Authorizer, err error) {
env := d.cloud.Environment
kvEndPoint := strings.TrimSuffix(env.KeyVaultEndpoint, "/")
servicePrincipalToken, err := providerconfig.GetServicePrincipalToken(&d.cloud.Config.AzureAuthConfig, &env, kvEndPoint)
servicePrincipalToken, err := providerconfig.GetServicePrincipalToken(&d.cloud.AzureAuthConfig, &env, kvEndPoint)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -255,16 +236,3 @@ func (d *Driver) updateSubnetServiceEndpoints(ctx context.Context, vnetResourceG

return nil
}

func getKubeConfig(kubeconfig string) (config *rest.Config, err error) {
if kubeconfig != "" {
if config, err = clientcmd.BuildConfigFromFlags("", kubeconfig); err != nil {
return nil, err
}
} else {
if config, err = rest.InClusterConfig(); err != nil {
return nil, err
}
}
return config, err
}
96 changes: 11 additions & 85 deletions pkg/blob/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/Azure/go-autorest/autorest/azure"
"github.com/stretchr/testify/assert"

"sigs.k8s.io/blob-csi-driver/pkg/util"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/subnetclient/mocksubnetclient"
azureprovider "sigs.k8s.io/cloud-provider-azure/pkg/provider"

Expand Down Expand Up @@ -114,7 +115,7 @@ users:
kubeconfig: emptyKubeConfig,
nodeID: "",
allowEmptyCloudConfig: true,
expectedErr: fmt.Errorf("failed to get KubeClient: invalid configuration: no configuration has been provided, try setting KUBERNETES_MASTER environment variable"),
expectedErr: fmt.Errorf("invalid configuration: no configuration has been provided, try setting KUBERNETES_MASTER environment variable"),
},
{
desc: "[failure] out of cluster & in cluster, specify a fake kubeconfig, no credential file",
Expand Down Expand Up @@ -168,7 +169,15 @@ users:
}
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
}
cloud, err := GetCloudProvider(context.Background(), test.kubeconfig, test.nodeID, "", "", test.userAgent, test.allowEmptyCloudConfig, 25.0, 50)
kubeClient, err := util.GetKubeClient(test.kubeconfig, 25.0, 50, "")
if err != nil {
if !reflect.DeepEqual(err, test.expectedErr) && test.expectedErr != nil && !strings.Contains(err.Error(), test.expectedErr.Error()) {
t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeconfig, err, test.expectedErr)
}
continue

Check warning on line 177 in pkg/blob/azure_test.go

View workflow job for this annotation

GitHub Actions / Go Lint

unreachable-code: unreachable code after this statement (revive)
kubeClient = nil

Check failure on line 178 in pkg/blob/azure_test.go

View workflow job for this annotation

GitHub Actions / Go Lint

unreachable: unreachable code (govet)
}
cloud, err := GetCloudProvider(context.Background(), kubeClient, test.nodeID, "", "", test.userAgent, test.allowEmptyCloudConfig)
if !reflect.DeepEqual(err, test.expectedErr) && test.expectedErr != nil && !strings.Contains(err.Error(), test.expectedErr.Error()) {
t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeconfig, err, test.expectedErr)
}
Expand Down Expand Up @@ -374,86 +383,3 @@ func TestUpdateSubnetServiceEndpoints(t *testing.T) {
t.Run(tc.name, tc.testFunc)
}
}

func TestGetKubeConfig(t *testing.T) {
emptyKubeConfig := "empty-Kube-Config"
validKubeConfig := "valid-Kube-Config"
fakeContent := `
apiVersion: v1
clusters:
- cluster:
server: https://localhost:8080
name: foo-cluster
contexts:
- context:
cluster: foo-cluster
user: foo-user
namespace: bar
name: foo-context
current-context: foo-context
kind: Config
users:
- name: foo-user
user:
exec:
apiVersion: client.authentication.k8s.io/v1beta1
args:
- arg-1
- arg-2
command: foo-command
`
err := createTestFile(emptyKubeConfig)
if err != nil {
t.Error(err)
}
defer func() {
if err := os.Remove(emptyKubeConfig); err != nil {
t.Error(err)
}
}()

err = createTestFile(validKubeConfig)
if err != nil {
t.Error(err)
}
defer func() {
if err := os.Remove(validKubeConfig); err != nil {
t.Error(err)
}
}()

if err := os.WriteFile(validKubeConfig, []byte(fakeContent), 0666); err != nil {
t.Error(err)
}

tests := []struct {
desc string
kubeconfig string
expectError bool
envVariableHasConfig bool
envVariableConfigIsValid bool
}{
{
desc: "[success] valid kube config passed",
kubeconfig: validKubeConfig,
expectError: false,
envVariableHasConfig: false,
envVariableConfigIsValid: false,
},
{
desc: "[failure] invalid kube config passed",
kubeconfig: emptyKubeConfig,
expectError: true,
envVariableHasConfig: false,
envVariableConfigIsValid: false,
},
}

for _, test := range tests {
_, err := getKubeConfig(test.kubeconfig)
receiveError := (err != nil)
if test.expectError != receiveError {
t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectErr: %v", test.desc, test.kubeconfig, err, test.expectError)
}
}
}
30 changes: 25 additions & 5 deletions pkg/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package blob

import (
"context"
"flag"
"fmt"
"os"
"strconv"
Expand All @@ -29,8 +31,6 @@ import (
az "github.com/Azure/go-autorest/autorest/azure"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/pborman/uuid"
"golang.org/x/net/context"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -43,6 +43,7 @@ import (
csicommon "sigs.k8s.io/blob-csi-driver/pkg/csi-common"
"sigs.k8s.io/blob-csi-driver/pkg/util"
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
"sigs.k8s.io/cloud-provider-azure/pkg/provider"
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
)

Expand Down Expand Up @@ -167,11 +168,29 @@ type DriverOptions struct {
SasTokenExpirationMinutes int
}

func (option *DriverOptions) AddFlags() {
flag.StringVar(&option.BlobfuseProxyEndpoint, "blobfuse-proxy-endpoint", "unix://tmp/blobfuse-proxy.sock", "blobfuse-proxy endpoint")
flag.StringVar(&option.NodeID, "nodeid", "", "node id")
flag.StringVar(&option.DriverName, "drivername", DefaultDriverName, "name of the driver")
flag.BoolVar(&option.EnableBlobfuseProxy, "enable-blobfuse-proxy", false, "using blobfuse proxy for mounts")
flag.IntVar(&option.BlobfuseProxyConnTimout, "blobfuse-proxy-connect-timeout", 5, "blobfuse proxy connection timeout(seconds)")
flag.BoolVar(&option.EnableBlobMockMount, "enable-blob-mock-mount", false, "enable mock mount(only for testing)")
flag.BoolVar(&option.EnableGetVolumeStats, "enable-get-volume-stats", false, "allow GET_VOLUME_STATS on agent node")
flag.BoolVar(&option.AppendTimeStampInCacheDir, "append-timestamp-cache-dir", false, "append timestamp into cache directory on agent node")
flag.Uint64Var(&option.MountPermissions, "mount-permissions", 0777, "mounted folder permissions")
flag.BoolVar(&option.AllowInlineVolumeKeyAccessWithIdentity, "allow-inline-volume-key-access-with-idenitity", false, "allow accessing storage account key using cluster identity for inline volume")
flag.BoolVar(&option.AppendMountErrorHelpLink, "append-mount-error-help-link", true, "Whether to include a link for help with mount errors when a mount error occurs.")
flag.BoolVar(&option.EnableAznfsMount, "enable-aznfs-mount", false, "replace nfs mount with aznfs mount")
flag.IntVar(&option.VolStatsCacheExpireInMinutes, "vol-stats-cache-expire-in-minutes", 10, "The cache expire time in minutes for volume stats cache")
flag.IntVar(&option.SasTokenExpirationMinutes, "sas-token-expiration-minutes", 1440, "sas token expiration minutes during volume cloning")
}

// Driver implements all interfaces of CSI drivers
type Driver struct {
csicommon.CSIDriver

cloud *azure.Cloud
KubeClient kubernetes.Interface
blobfuseProxyEndpoint string
// enableBlobMockMount is only for testing, DO NOT set as true in non-testing scenario
enableBlobMockMount bool
Expand Down Expand Up @@ -206,7 +225,8 @@ type Driver struct {

// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
// does not support optional driver plugin info manifest field. Refer to CSI spec for more details.
func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver {
func NewDriver(options *DriverOptions, kubeClient kubernetes.Interface, cloud *provider.Cloud) *Driver {
var err error
d := Driver{
volLockMap: util.NewLockMap(),
subnetLockMap: util.NewLockMap(),
Expand All @@ -222,12 +242,13 @@ func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver {
enableAznfsMount: options.EnableAznfsMount,
sasTokenExpirationMinutes: options.SasTokenExpirationMinutes,
azcopy: &util.Azcopy{},
KubeClient: kubeClient,
cloud: cloud,
}
d.Name = options.DriverName
d.Version = driverVersion
d.NodeID = options.NodeID

var err error
getter := func(key string) (interface{}, error) { return nil, nil }
if d.accountSearchCache, err = azcache.NewTimedCache(time.Minute, getter, false); err != nil {
klog.Fatalf("%v", err)
Expand All @@ -242,7 +263,6 @@ func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver {
if d.volStatsCache, err = azcache.NewTimedCache(time.Duration(options.VolStatsCacheExpireInMinutes)*time.Minute, getter, false); err != nil {
klog.Fatalf("%v", err)
}
d.cloud = cloud
d.mounter = &mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: utilexec.New(),
Expand Down
12 changes: 6 additions & 6 deletions pkg/blob/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewFakeDriver() *Driver {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
driver := NewDriver(&driverOptions, &azure.Cloud{})
driver := NewDriver(&driverOptions, nil, &azure.Cloud{})
driver.Name = fakeDriverName
driver.Version = vendorVersion
driver.subnetLockMap = util.NewLockMap()
Expand All @@ -72,7 +72,7 @@ func TestNewFakeDriver(t *testing.T) {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
d := NewDriver(&driverOptions, &azure.Cloud{})
d := NewDriver(&driverOptions, nil, &azure.Cloud{})
assert.NotNil(t, d)
}

Expand All @@ -85,7 +85,7 @@ func TestNewDriver(t *testing.T) {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
driver := NewDriver(&driverOptions, &azure.Cloud{})
driver := NewDriver(&driverOptions, nil, &azure.Cloud{})
fakedriver := NewFakeDriver()
fakedriver.Name = DefaultDriverName
fakedriver.Version = driverVersion
Expand Down Expand Up @@ -1231,7 +1231,7 @@ func TestGetSubnetResourceID(t *testing.T) {
d.cloud.VnetResourceGroup = "foo"
actualOutput := d.getSubnetResourceID("", "", "")
expectedOutput := fmt.Sprintf(subnetTemplate, d.cloud.SubscriptionID, "foo", d.cloud.VnetName, d.cloud.SubnetName)
assert.Equal(t, actualOutput, expectedOutput, "cloud.SubscriptionID should be used as the SubID")
assert.Equal(t, actualOutput, expectedOutput, "config.SubscriptionID should be used as the SubID")
},
},
{
Expand Down Expand Up @@ -1259,7 +1259,7 @@ func TestGetSubnetResourceID(t *testing.T) {
d.cloud.VnetResourceGroup = ""
actualOutput := d.getSubnetResourceID("", "", "")
expectedOutput := fmt.Sprintf(subnetTemplate, "bar", d.cloud.ResourceGroup, d.cloud.VnetName, d.cloud.SubnetName)
assert.Equal(t, actualOutput, expectedOutput, "cloud.Resourcegroup should be used as the rg")
assert.Equal(t, actualOutput, expectedOutput, "config.ResourceGroup should be used as the rg")
},
},
{
Expand All @@ -1273,7 +1273,7 @@ func TestGetSubnetResourceID(t *testing.T) {
d.cloud.VnetResourceGroup = "fakeVnetResourceGroup"
actualOutput := d.getSubnetResourceID("", "", "")
expectedOutput := fmt.Sprintf(subnetTemplate, "bar", d.cloud.VnetResourceGroup, d.cloud.VnetName, d.cloud.SubnetName)
assert.Equal(t, actualOutput, expectedOutput, "cloud.VnetResourceGroup should be used as the rg")
assert.Equal(t, actualOutput, expectedOutput, "config.VnetResourceGroup should be used as the rg")
},
},
{
Expand Down
4 changes: 2 additions & 2 deletions pkg/blob/controllerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ func TestCreateVolume(t *testing.T) {
testFunc: func(t *testing.T) {
d := NewFakeDriver()
d.cloud = &azure.Cloud{}
d.cloud.Config.DisableAzureStackCloud = false
d.cloud.Config.Cloud = "AZURESTACKCLOUD"
d.cloud.DisableAzureStackCloud = false
d.cloud.Cloud = "AZURESTACKCLOUD"
d.cloud.SubscriptionID = "subID"
mp := make(map[string]string)
mp[storeAccountKeyField] = falseValue
Expand Down
Loading

0 comments on commit 5ba5f74

Please sign in to comment.