Skip to content

Commit

Permalink
Feature: support batch-id labeling for blue-green strategy
Browse files Browse the repository at this point in the history
Signed-off-by: AiRanthem <zhongtianyun.zty@alibaba-inc.com>
  • Loading branch information
AiRanthem committed Feb 12, 2025
1 parent 937c6b3 commit f5dcb21
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,20 @@ type NewInterfaceFunc func(cli client.Client, key types.NamespacedName, gvk sche

// NewControlPlane creates a new release controller with bluegreen-style to drive batch release state machine
func NewControlPlane(f NewInterfaceFunc, cli client.Client, recorder record.EventRecorder, release *v1beta1.BatchRelease, newStatus *v1beta1.BatchReleaseStatus, key types.NamespacedName, gvk schema.GroupVersionKind) *realBatchControlPlane {
return &realBatchControlPlane{
cp := &realBatchControlPlane{
Client: cli,
EventRecorder: recorder,
newStatus: newStatus,
Interface: f(cli, key, gvk),
release: release.DeepCopy(),
patcher: labelpatch.NewLabelPatcher(cli, klog.KObj(release), release.Spec.ReleasePlan.Batches),
}
switch gvk.Kind {
case "Deployment":
cp.patcher = labelpatch.NewDeploymentLabelPatcher(cli, klog.KObj(release), release.Spec.ReleasePlan.Batches)
default:
cp.patcher = labelpatch.NewLabelPatcher(cli, klog.KObj(release), release.Spec.ReleasePlan.Batches)
}
return cp
}

func (rc *realBatchControlPlane) Initialize() error {
Expand Down
73 changes: 55 additions & 18 deletions pkg/controller/batchrelease/labelpatch/patcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"github.com/openkruise/rollouts/api/v1beta1"
batchcontext "github.com/openkruise/rollouts/pkg/controller/batchrelease/context"
"github.com/openkruise/rollouts/pkg/util"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
Expand All @@ -39,6 +41,12 @@ func NewLabelPatcher(cli client.Client, logKey klog.ObjectRef, batches []v1beta1
return &realPatcher{Client: cli, logKey: logKey, batches: batches}
}

func NewDeploymentLabelPatcher(cli client.Client, logKey klog.ObjectRef, batches []v1beta1.ReleaseBatch) *nativeDeploymentPatcher {
return &nativeDeploymentPatcher{
rp: &realPatcher{Client: cli, logKey: logKey, batches: batches},
}
}

type realPatcher struct {
client.Client
logKey klog.ObjectRef
Expand All @@ -61,36 +69,23 @@ func (r *realPatcher) patchPodBatchLabel(pods []*corev1.Pod, ctx *batchcontext.B
plannedUpdatedReplicasForBatches := r.calculatePlannedStepIncrements(r.batches, int(ctx.Replicas), int(ctx.CurrentBatch))
var updatedButUnpatchedPods []*corev1.Pod

// this map is used to map pod revision to rollout-id directly, preventing a new but out-dated pod to be considered as the latest revision
podRevisionToRolloutIdMap := make(map[string]string)
for _, pod := range pods {
revision := util.GetPodRevision(pod)
if podRevisionToRolloutIdMap[revision] == "" {
podRevisionToRolloutIdMap[revision] = pod.Labels[v1beta1.RolloutIDLabel]
}
}

for _, pod := range pods {
if !pod.DeletionTimestamp.IsZero() {
klog.InfoS("Pod is being deleted, skip patching", "pod", klog.KObj(pod), "rollout", r.logKey)
continue
}
// we don't patch label for the active old revision pod
if util.IsConsistentWithRevision(pod, ctx.StableRevision) {
klog.InfoS("Pod is consistent with stable revision, skip patching", "pod", klog.KObj(pod), "rollout", r.logKey)
if !util.IsConsistentWithRevision(pod, ctx.UpdateRevision) {
klog.InfoS("Pod is not consistent with revision, skip patching", "pod", klog.KObj(pod), "rollout", r.logKey)
continue
}
if podRolloutId := podRevisionToRolloutIdMap[util.GetPodRevision(pod)]; podRolloutId != "" && podRolloutId != ctx.RolloutID {
klog.InfoS("Legacy pod found, skip patching", "pod", klog.KObj(pod), "rollout", r.logKey)
continue
}
if pod.Labels[v1beta1.RolloutIDLabel] == "" {
// latest newly created pods, for example: new/recreated pods
if pod.Labels[v1beta1.RolloutIDLabel] != ctx.RolloutID {
// for example: new/recreated pods
updatedButUnpatchedPods = append(updatedButUnpatchedPods, pod)
klog.InfoS("Find a pod to add updatedButUnpatchedPods", "pod", klog.KObj(pod), "rollout", r.logKey)
continue
}
// existing labeled latest pods

podBatchID, err := strconv.Atoi(pod.Labels[v1beta1.RolloutBatchIDLabel])
if err != nil {
klog.InfoS("Pod batchID is not a number, skip patching", "pod", klog.KObj(pod), "rollout", r.logKey)
Expand Down Expand Up @@ -154,3 +149,45 @@ func calculateBatchReplicas(batches []v1beta1.ReleaseBatch, workloadReplicas, cu
}
return batchSize
}

type nativeDeploymentPatcher struct {
rp *realPatcher
}

// PatchPodBatchLabel patches label controller-revision-hash to pods owned by native deployments for pod-template-hash calculated
// by rollouts differs from the one calculated by k8s. The hash calculated by rollouts is set as the value of controller-revision-hash
func (p *nativeDeploymentPatcher) PatchPodBatchLabel(ctx *batchcontext.BatchContext) error {
if ctx.RolloutID == "" || len(ctx.Pods) == 0 {
return nil
}
pods := ctx.Pods
if ctx.FilterFunc != nil {
pods = ctx.FilterFunc(pods, ctx)
}
for i, pod := range pods {
if pod.Labels[v1.ControllerRevisionHashLabelKey] != "" {
continue
}
owner := metav1.GetControllerOf(pod)
if owner == nil || owner.Kind != "ReplicaSet" {
klog.InfoS("Deployment Pod owner is not ReplicaSet", "pod", klog.KObj(pod), "rollout", p.rp.logKey)
continue
}
rs := &v1.ReplicaSet{}
if err := p.rp.Get(context.Background(), types.NamespacedName{Namespace: pod.Namespace, Name: owner.Name}, rs); err != nil {
klog.ErrorS(err, "Failed to get ReplicaSet", "pod", klog.KObj(pod), "rollout", p.rp.logKey)
return err
}
delete(rs.Spec.Template.ObjectMeta.Labels, v1.DefaultDeploymentUniqueLabelKey)
podClone := pod.DeepCopy()
hash := util.ComputeHash(&rs.Spec.Template, nil)
podClone.Labels[v1.ControllerRevisionHashLabelKey] = hash
by := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, v1.ControllerRevisionHashLabelKey, hash)
if err := p.rp.Patch(context.TODO(), podClone, client.RawPatch(types.StrategicMergePatchType, []byte(by))); err != nil {
klog.ErrorS(err, "Failed to patch Pod controller-revision-hash label", "pod", klog.KObj(pod), "rollout", p.rp.logKey)
}
pods[i] = podClone
klog.InfoS("Pod controller-revision-hash label patched", "pod", klog.KObj(pod), "rollout", p.rp.logKey, "label", hash)
}
return p.rp.patchPodBatchLabel(pods, ctx)
}

0 comments on commit f5dcb21

Please sign in to comment.