Skip to content


traffic: Refactor continous logic
Browse files Browse the repository at this point in the history
Signed-off-by: yunbo <>
  • Loading branch information
Funinu committed Jun 28, 2024
1 parent db761a9 commit 54cf862
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 19 deletions.
4 changes: 2 additions & 2 deletions api/v1beta1/rollout_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,10 @@ const (
FinalisingStepTypeStableService FinalisingStepType = "RestoreStableService"
// Restore the GatewayAPI/Ingress/Istio
FinalisingStepTypeGateway FinalisingStepType = "RestoreGateway"
// Delete Canary Service
FinalisingStepTypeCanaryService FinalisingStepType = "DeleteCanayService"
// Delete Batch Release
FinalisingStepTypeDeleteBR FinalisingStepType = "DeleteBatchRelease"
// Delete Canary Service
FinalisingStepTypeDeleteCanaryService FinalisingStepType = "DeleteCanaryService"

// +genclient
Expand Down
73 changes: 73 additions & 0 deletions pkg/controller/rollout/rollout_canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,79 @@ func (m *canaryReleaseManager) doCanaryFinalising(c *RolloutContext) (bool, erro
return true, nil

// do Canary Reset for continuous release, eg, v1->v2(not complete)->v3
func (m *canaryReleaseManager) doCanaryReset(c *RolloutContext) (bool, error) {
canaryStatus := c.NewStatus.CanaryStatus
// when CanaryStatus is nil, which means canary action hasn't started yet, don't need doing cleanup
if canaryStatus == nil {
return true, nil
// To ensure respect for grace time between finalising steps, we set start time before the first step
if len(canaryStatus.FinalisingStep) == 0 {
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
tr := newTrafficRoutingContext(c)
klog.Infof("rollout(%s/%s) Finalising Step is %s", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
switch canaryStatus.FinalisingStep {
// start from FinalisingStepTypeGateway
canaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeGateway
// firstly, restore the gateway resources (ingress/gatewayAPI/Istio), that means
// only stable Service will accept the traffic
case v1beta1.FinalisingStepTypeGateway:
done, err := m.trafficRoutingManager.RestoreGateway(tr)
if err != nil || !done {
canaryStatus.LastUpdateTime = tr.LastUpdateTime
return done, err
if canaryStatus.LastUpdateTime != nil && canaryStatus.LastUpdateTime.Add(time.Second*time.Duration(3)).After(time.Now()) {
klog.Infof("rollout(%s/%s) in step (%s), and wait 3 seconds", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
} else {
klog.Infof("rollout(%s/%s) in step (%s), and success", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
canaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteBR
// secondly, remove the batchRelease. For canary release, it means the immediate deletion of
// canary deployment, however, for partial style, the v2 pods won't be deleted right away
// in both cases, only the stable pods (v1) accept the traffic
case v1beta1.FinalisingStepTypeDeleteBR:
done, err := m.removeBatchRelease(c)
if err != nil {
klog.Errorf("rollout(%s/%s) Finalize batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
if canaryStatus.LastUpdateTime != nil && canaryStatus.LastUpdateTime.Add(time.Second*time.Duration(3)).After(time.Now()) {
klog.Infof("rollout(%s/%s) in step (%s), and wait 3 seconds", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
} else {
klog.Infof("rollout(%s/%s) in step (%s), and success", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
canaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteCanaryService
// finally, remove the canary service. This step can swap with the last step.
NOTE: we only remove the canary service, with stable service unchanged, that means the
stable service may still has the selector of stable pods, which is intended. Consider
this senario: continuous release v1->v2->3, and currently we are in step x, which expects
to route 0% traffic to v2 (or simply A/B test), if we release v3 in step x and remove the
stable service selector, then the traffic will route to both v1 and v2 before executing the
first step of v3 release.
case v1beta1.FinalisingStepTypeDeleteCanaryService:
done, err := m.trafficRoutingManager.RemoveCanaryService(tr)
if err != nil || !done {
canaryStatus.LastUpdateTime = tr.LastUpdateTime
return done, err
klog.Infof("rollout(%s/%s) in step (%s), and success", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
return true, nil

return false, nil

func (m *canaryReleaseManager) removeRolloutProgressingAnnotation(c *RolloutContext) error {
if c.Workload == nil {
return nil
Expand Down
27 changes: 13 additions & 14 deletions pkg/controller/rollout/rollout_progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,23 +409,22 @@ func isRollingBackInBatches(rollout *v1beta1.Rollout, workload *util.Workload) b
// 1. modify network api(ingress or gateway api) configuration, and route 100% traffic to stable pods
// 2. remove batchRelease CR.
func (r *RolloutReconciler) doProgressingReset(c *RolloutContext) (bool, error) {
if len(c.Rollout.Spec.Strategy.Canary.TrafficRoutings) > 0 {
// modify network api(ingress or gateway api) configuration, and route 100% traffic to stable pods
tr := newTrafficRoutingContext(c)
done, err := r.trafficRoutingManager.FinalisingTrafficRouting(tr, false)
c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime
if err != nil || !done {
return done, err
done, err := r.canaryManager.removeBatchRelease(c)
releaseManager, err := r.getReleaseManager(c.Rollout)
if err != nil {
klog.Errorf("rollout(%s/%s) DoFinalising batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
return true, nil
// if no trafficRouting exists, simply remove batchRelease
if !c.Rollout.Spec.Strategy.HasTrafficRoutings() {
done, err := releaseManager.removeBatchRelease(c)
if err != nil {
klog.Errorf("rollout(%s/%s) DoFinalising batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
return true, nil
return releaseManager.doCanaryReset(c)

func (r *RolloutReconciler) recalculateCanaryStep(c *RolloutContext) (int32, error) {
Expand Down
68 changes: 65 additions & 3 deletions pkg/controller/rollout/rollout_progressing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func TestReconcileRolloutProgressing(t *testing.T) {
name: "ReconcileRolloutProgressing rolling -> continueRelease",
name: "ReconcileRolloutProgressing rolling -> continueRelease1",
getObj: func() ([]*apps.Deployment, []*apps.ReplicaSet) {
dep1 := deploymentDemo.DeepCopy()
dep1.Spec.Template.Spec.Containers[0].Image = "echoserver:v3"
Expand Down Expand Up @@ -578,10 +578,72 @@ func TestReconcileRolloutProgressing(t *testing.T) {
expectStatus: func() *v1beta1.RolloutStatus {
s := rolloutDemo.Status.DeepCopy()
s.CanaryStatus = nil
s.CanaryStatus.ObservedWorkloadGeneration = 2
s.CanaryStatus.RolloutHash = "f55bvd874d5f2fzvw46bv966x4bwbdv4wx6bd9f7b46ww788954b8z8w29b7wxfd"
s.CanaryStatus.StableRevision = "pod-template-hash-v1"
s.CanaryStatus.CanaryRevision = "6f8cc56547"
s.CanaryStatus.CurrentStepIndex = 3
s.CanaryStatus.CanaryReplicas = 5
s.CanaryStatus.CanaryReadyReplicas = 3
s.CanaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeGateway
s.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInitializing
cond.Reason = v1alpha1.ProgressingReasonInRolling
util.SetRolloutCondition(s, *cond)
s.CurrentStepIndex = s.CanaryStatus.CurrentStepIndex
s.CurrentStepState = s.CanaryStatus.CurrentStepState
return s
name: "ReconcileRolloutProgressing rolling -> continueRelease2",
getObj: func() ([]*apps.Deployment, []*apps.ReplicaSet) {
dep1 := deploymentDemo.DeepCopy()
dep1.Spec.Template.Spec.Containers[0].Image = "echoserver:v3"
dep2 := deploymentDemo.DeepCopy()
dep2.UID = "1ca4d850-9ec3-48bd-84cb-19f2e8cf4180"
dep2.Name = dep1.Name + "-canary"
dep2.Labels[util.CanaryDeploymentLabel] = dep1.Name
rs1 := rsDemo.DeepCopy()
rs2 := rsDemo.DeepCopy()
rs2.Name = "echoserver-canary-2"
rs2.OwnerReferences = []metav1.OwnerReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: dep2.Name,
UID: "1ca4d850-9ec3-48bd-84cb-19f2e8cf4180",
Controller: utilpointer.BoolPtr(true),
rs2.Labels["pod-template-hash"] = "pod-template-hash-v2"
rs2.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
return []*apps.Deployment{dep1, dep2}, []*apps.ReplicaSet{rs1, rs2}
getNetwork: func() ([]*corev1.Service, []*netv1.Ingress) {
return []*corev1.Service{demoService.DeepCopy()}, []*netv1.Ingress{demoIngress.DeepCopy()}
getRollout: func() (*v1beta1.Rollout, *v1beta1.BatchRelease, *v1alpha1.TrafficRouting) {
obj := rolloutDemo.DeepCopy()
obj.Status.CanaryStatus.ObservedWorkloadGeneration = 2
obj.Status.CanaryStatus.RolloutHash = "f55bvd874d5f2fzvw46bv966x4bwbdv4wx6bd9f7b46ww788954b8z8w29b7wxfd"
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 3
obj.Status.CanaryStatus.CanaryReplicas = 5
obj.Status.CanaryStatus.CanaryReadyReplicas = 3
obj.Status.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
obj.Status.CanaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteCanaryService
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
util.SetRolloutCondition(&obj.Status, *cond)
return obj, nil, nil
expectStatus: func() *v1beta1.RolloutStatus {
s := rolloutDemo.Status.DeepCopy()
return s
Expand Down
23 changes: 23 additions & 0 deletions pkg/controller/rollout/rollout_releaseManager.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,36 @@
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.

package rollout

import (

type ReleaseManager interface {
// execute the core logic of a release step by step
runCanary(c *RolloutContext) error
// check the NextStepIndex field in status, if modifies detected, jump to target step
doCanaryJump(c *RolloutContext) bool
// called when user accomplishes a release / does a rollback, or disables/removes the Rollout Resource
doCanaryFinalising(c *RolloutContext) (bool, error)
// fetch the BatchRelease object
fetchBatchRelease(ns, name string) (*v1beta1.BatchRelease, error)
// remove the BatchRelease object
removeBatchRelease(c *RolloutContext) (bool, error)
// called when user does continuous release
doCanaryReset(c *RolloutContext) (bool, error)
113 changes: 113 additions & 0 deletions pkg/trafficrouting/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,119 @@ func (m *Manager) FinalisingTrafficRouting(c *TrafficRoutingContext, onlyRestore
return true, nil

// RestoreGateway can be seen as route all traffic to stable service
func (m *Manager) RestoreGateway(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds

cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
trController, err := newNetworkProvider(m.Client, c, trafficRouting.Service, cServiceName)
if err != nil {
klog.Errorf("%s newTrafficRoutingController failed: %s", c.Key, err.Error())
return false, err
if err = trController.Finalise(context.TODO()); err != nil {
return false, err
return true, nil

// RemoveCanaryService find and delete canary Service. stable Service won't be modified
func (m *Manager) RemoveCanaryService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds

cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}}
// end to end deployment, don't remove the canary service;
// because canary service is stable service
if !c.OnlyTrafficRouting && !c.DisableGenerateCanaryService {
// remove canary service
err := m.Delete(context.TODO(), cService)
if err != nil && !errors.IsNotFound(err) {
klog.Errorf("%s remove canary service(%s) failed: %s", c.Key, cService.Name, err.Error())
return false, err
klog.Infof("%s remove canary service(%s) success", c.Key, cService.Name)

return true, nil

func (m *Manager) PatchStableService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
if c.OnlyTrafficRouting {
return true, nil

//fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
if err != nil {
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error())
// not found, wait a moment, retry
if errors.IsNotFound(err) {
return false, nil
return false, err

if stableService.Spec.Selector[c.RevisionLabelKey] == c.StableRevision {
return true, nil

// patch stable service to only select the stable pods
body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.StableRevision)
if err = m.Patch(context.TODO(), stableService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
klog.Errorf("%s patch stable service(%s) selector failed: %s", c.Key, stableService.Name, err.Error())
return false, err
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision)
return true, nil

func (m *Manager) RestoreStableService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
//fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
if err != nil {
if errors.IsNotFound(err) {
return true, nil
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error())
return false, err
// restore stable Service
verify, err := m.restoreStableService(c)
if err != nil || !verify {
return false, err
return true, nil

func newNetworkProvider(c client.Client, con *TrafficRoutingContext, sService, cService string) (network.NetworkProvider, error) {
trafficRouting := con.ObjectRef[0]
if trafficRouting.CustomNetworkRefs != nil {
Expand Down

0 comments on commit 54cf862

Please sign in to comment.