Skip to content

Commit

Permalink
Add SDK caching & Refactor (#75)
Browse files Browse the repository at this point in the history
* Add caching

* add handler

* refactor

* remove region from spec

* cleanup region references

* derive region

* fix tests

* move flushing up
  • Loading branch information
eytan-avisror authored Mar 11, 2020
1 parent e2d94b5 commit 9ba3ab6
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 53 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ RollingUpgrade provides a Kubernetes native mechanism for doing rolling-updates

- The RollingUpgrade Kubernetes custom resource has the following options in the spec:
- `asgName`: Name of the autoscaling group to perform the rolling-update.
- `region`: Name of the AWS region in which the ASG exists.
- `preDrain.script`: The script to run before draining a node.
- `postDrain.script`: The script to run after draining a node. This allows for performing actions such as quiescing network traffic, adding labels, etc.
- `postDrain.waitSeconds`: The seconds to wait after a node is drained.
Expand Down
1 change: 0 additions & 1 deletion api/v1alpha1/rollingupgrade_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type PostTerminateSpec struct {
type RollingUpgradeSpec struct {
PostDrainDelaySeconds int `json:"postDrainDelaySeconds,omitempty"`
NodeIntervalSeconds int `json:"nodeIntervalSeconds,omitempty"`
Region string `json:"region,omitempty"`
AsgName string `json:"asgName,omitempty"`
PreDrain PreDrainSpec `json:"preDrain,omitempty"`
PostDrain PostDrainSpec `json:"postDrain,omitempty"`
Expand Down
2 changes: 0 additions & 2 deletions config/crd/bases/upgrademgr.keikoproj.io_rollingupgrades.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ spec:
script:
type: string
type: object
region:
type: string
strategy:
description: UpdateStrategy holds the information needed to perform
update based on different update strategies
Expand Down
42 changes: 9 additions & 33 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
awsclient "github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/go-logr/logr"
"github.com/keikoproj/aws-sdk-go-cache/cache"
iebackoff "github.com/keikoproj/inverse-exp-backoff"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -95,14 +92,6 @@ var (
WaiterMaxAttempts = uint32(32)
)

var DefaultRetryer = awsclient.DefaultRetryer{
NumMaxRetries: 250,
MinThrottleDelay: time.Second * 5,
MaxThrottleDelay: time.Second * 20,
MinRetryDelay: time.Second * 1,
MaxRetryDelay: time.Second * 5,
}

// RollingUpgradeReconciler reconciles a RollingUpgrade object
type RollingUpgradeReconciler struct {
client.Client
Expand All @@ -115,6 +104,7 @@ type RollingUpgradeReconciler struct {
ruObjNameToASG sync.Map
ClusterState ClusterState
maxParallel int
CacheConfig *cache.Config
}

func (r *RollingUpgradeReconciler) SetMaxParallel(max int) {
Expand Down Expand Up @@ -289,6 +279,11 @@ func (r *RollingUpgradeReconciler) WaitForDesiredNodes(ruObj *upgrademgrv1alpha1
var err error
var ieb *iebackoff.IEBackoff
for ieb, err = iebackoff.NewIEBackoff(WaiterMaxDelay, WaiterMinDelay, 0.5, WaiterMaxAttempts); err == nil; err = ieb.Next() {
err = r.populateAsg(ruObj)
if err != nil {
return err
}

err = r.populateNodeList(ruObj, r.generatedClient.CoreV1().Nodes())
if err != nil {
log.Infof("%s: unable to populate node list: %v", ruObj.Name, err)
Expand Down Expand Up @@ -434,12 +429,6 @@ func (r *RollingUpgradeReconciler) TerminateNode(ruObj *upgrademgrv1alpha1.Rolli
return nil
}

func (r *RollingUpgradeReconciler) setDefaults(ruObj *upgrademgrv1alpha1.RollingUpgrade) {
if ruObj.Spec.Region == "" {
ruObj.Spec.Region = "us-west-2"
}
}

func (r *RollingUpgradeReconciler) getNodeName(i *autoscaling.Instance, nodeList *corev1.NodeList, ruObj *upgrademgrv1alpha1.RollingUpgrade) string {
node := r.getNodeFromAsg(i, nodeList, ruObj)
if node == nil {
Expand All @@ -464,8 +453,6 @@ func (r *RollingUpgradeReconciler) getNodeFromAsg(i *autoscaling.Instance, nodeL
}

func (r *RollingUpgradeReconciler) populateAsg(ruObj *upgrademgrv1alpha1.RollingUpgrade) error {
// Initialize a session in the given region that the SDK will use to load
// credentials.
input := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: []*string{
aws.String(ruObj.Spec.AsgName),
Expand Down Expand Up @@ -653,19 +640,8 @@ func (r *RollingUpgradeReconciler) Process(ctx *context.Context,
return reconcile.Result{}, nil
}

r.setDefaults(ruObj)

config := aws.NewConfig().WithRegion(ruObj.Spec.Region)
config = config.WithCredentialsChainVerboseErrors(true)
config = request.WithRetryer(config, log.NewRetryLogger(DefaultRetryer))
sess, err := session.NewSession(config)
if err != nil {
log.Fatalf("failed to create asg client, %v", err)
}
r.ASGClient = autoscaling.New(sess)
r.EC2Client = ec2.New(sess)

err = r.populateAsg(ruObj)
r.CacheConfig.FlushCache("autoscaling")
err := r.populateAsg(ruObj)
if err != nil {
return r.finishExecution(StatusError, 0, ctx, ruObj)
}
Expand Down
25 changes: 15 additions & 10 deletions controllers/rollingupgrade_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"k8s.io/client-go/kubernetes/scheme"

"github.com/keikoproj/aws-sdk-go-cache/cache"
"github.com/keikoproj/upgrade-manager/pkg/log"

"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -535,16 +536,6 @@ func TestLoadEnvironmentVariables(t *testing.T) {
g.Expect(os.Getenv(instanceNameKey)).To(gomega.Equal("instance-name-foo"))
}

func TestSetDefaults(t *testing.T) {
g := gomega.NewGomegaWithT(t)
ruObj := &upgrademgrv1alpha1.RollingUpgrade{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}}
rcRollingUpgrade := &RollingUpgradeReconciler{ClusterState: NewClusterState()}

g.Expect(ruObj.Spec.Region).To(gomega.Equal(""))
rcRollingUpgrade.setDefaults(ruObj)
g.Expect(ruObj.Spec.Region).To(gomega.Equal("us-west-2"))
}

func TestGetNodeNameFoundNode(t *testing.T) {
g := gomega.NewGomegaWithT(t)

Expand Down Expand Up @@ -876,6 +867,7 @@ func TestRunRestackSuccessOneNode(t *testing.T) {
ruObjNameToASG: sync.Map{},
NodeList: &nodeList,
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -925,6 +917,7 @@ func TestRunRestackSuccessMultipleNodes(t *testing.T) {
ruObjNameToASG: sync.Map{},
NodeList: &nodeList,
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -962,6 +955,7 @@ func TestRunRestackSameLaunchConfig(t *testing.T) {
admissionMap: sync.Map{},
ruObjNameToASG: sync.Map{},
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -1021,6 +1015,7 @@ func TestRunRestackRollingUpgradeNodeNameNotFound(t *testing.T) {
ruObjNameToASG: sync.Map{},
NodeList: &emptyNodeList,
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -1067,6 +1062,7 @@ func TestRunRestackNoNodeName(t *testing.T) {
ruObjNameToASG: sync.Map{},
NodeList: &nodeList,
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -1130,6 +1126,7 @@ func TestRunRestackDrainNodeFail(t *testing.T) {
ruObjNameToASG: sync.Map{},
NodeList: &nodeList,
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -1189,6 +1186,7 @@ func TestRunRestackTerminateNodeFail(t *testing.T) {
ruObjNameToASG: sync.Map{},
NodeList: &nodeList,
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -1280,6 +1278,7 @@ func TestUniformAcrossAzUpdateSuccessMultipleNodes(t *testing.T) {
ruObjNameToASG: sync.Map{},
NodeList: &nodeList,
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -1336,6 +1335,7 @@ func TestUpdateInstances(t *testing.T) {
ruObjNameToASG: sync.Map{},
NodeList: &nodeList,
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -1399,6 +1399,7 @@ func TestUpdateInstancesError(t *testing.T) {
ruObjNameToASG: sync.Map{},
NodeList: &nodeList,
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -1469,6 +1470,7 @@ func TestUpdateInstancesPartialError(t *testing.T) {
ruObjNameToASG: sync.Map{},
NodeList: &nodeList,
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -1498,6 +1500,7 @@ func TestUpdateInstancesWithZeroInstances(t *testing.T) {
admissionMap: sync.Map{},
ruObjNameToASG: sync.Map{},
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}

ctx := context.TODO()
Expand Down Expand Up @@ -2114,6 +2117,7 @@ func TestRunRestackNoNodeInAsg(t *testing.T) {
admissionMap: sync.Map{},
NodeList: &nodeList,
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)

Expand Down Expand Up @@ -2213,6 +2217,7 @@ func TestRunRestackWithNodesLessThanMaxUnavailable(t *testing.T) {
admissionMap: sync.Map{},
ruObjNameToASG: sync.Map{},
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
}
rcRollingUpgrade.ruObjNameToASG.Store(ruObj.Name, &mockAsg)
rcRollingUpgrade.ClusterState.deleteEntryOfAsg(someAsg)
Expand Down
1 change: 0 additions & 1 deletion docs/step-by-step-example.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ metadata:
generateName: rollingupgrade-sample-
spec:
asgName: nodes.test-cluster-kops.cluster.k8s.local
region: us-west-2
nodeIntervalSeconds: 300
preDrain:
script: |
Expand Down
1 change: 0 additions & 1 deletion examples/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ metadata:
generateName: rollingupgrade-sample-
spec:
asgName: my-asg
region: us-west-2
nodeIntervalSeconds: 300
postDrain:
waitSeconds: 90
1 change: 0 additions & 1 deletion examples/pre_post_drain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ metadata:
generateName: rollingupgrade-sample-
spec:
asgName: my-asg-1
region: us-west-2
nodeIntervalSeconds: 300
preDrain:
script: |
Expand Down
1 change: 0 additions & 1 deletion examples/random_update_strategy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ metadata:
generateName: rollingupgrade-sample-
spec:
asgName: my-asg-1
region: us-west-2
nodeIntervalSeconds: 300
preDrain:
script: |
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ require (
github.com/aws/aws-sdk-go v1.25.0
github.com/go-logr/logr v0.1.0
github.com/gogo/protobuf v1.3.1 // indirect
github.com/keikoproj/aws-sdk-go-cache v0.0.0-20200124200058-ab3c8c94044a
github.com/keikoproj/inverse-exp-backoff v0.0.0-20191216014651-04523236b6ca
github.com/keikoproj/upgrade-manager/pkg/log v0.0.0
github.com/onsi/ginkgo v1.10.1
github.com/onsi/gomega v1.7.1
github.com/pkg/errors v0.8.1
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8 // indirect
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 // indirect
gopkg.in/yaml.v2 v2.2.7
Expand Down
Loading

0 comments on commit 9ba3ab6

Please sign in to comment.