Skip to content

Commit

Permalink
feat: reconcile pod ephemeral metadata in parallel (#4130)
Browse files Browse the repository at this point in the history
* perf: reconcile pod ephemeral metadata in parallel

Signed-off-by: Jordan Rodgers <jrodgers@figma.com>

* update tests for multiple pods

Signed-off-by: Jordan Rodgers <jrodgers@figma.com>

---------

Signed-off-by: Jordan Rodgers <jrodgers@figma.com>
  • Loading branch information
com6056 authored Feb 13, 2025
1 parent a312e28 commit d8994c0
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 23 deletions.
6 changes: 5 additions & 1 deletion cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/client-go/tools/clientcmd"

"github.com/argoproj/argo-rollouts/metricproviders"
"github.com/argoproj/argo-rollouts/rollout"
"github.com/argoproj/argo-rollouts/utils/record"

"github.com/argoproj/argo-rollouts/controller"
Expand Down Expand Up @@ -67,6 +68,7 @@ func newCommand() *cobra.Command {
analysisThreads int
serviceThreads int
ingressThreads int
ephemeralMetadataThreads int
istioVersion string
trafficSplitVersion string
traefikAPIGroup string
Expand Down Expand Up @@ -271,7 +273,8 @@ func newCommand() *cobra.Command {
istioDynamicInformerFactory,
namespaced,
kubeInformerFactory,
jobInformerFactory)
jobInformerFactory,
ephemeralMetadataThreads)
}
if err = cm.Run(ctx, rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts); err != nil {
log.Fatalf("Error running controller: %s", err.Error())
Expand Down Expand Up @@ -299,6 +302,7 @@ func newCommand() *cobra.Command {
command.Flags().IntVar(&analysisThreads, "analysis-threads", controller.DefaultAnalysisThreads, "Set the number of worker threads for the Experiment controller")
command.Flags().IntVar(&serviceThreads, "service-threads", controller.DefaultServiceThreads, "Set the number of worker threads for the Service controller")
command.Flags().IntVar(&ingressThreads, "ingress-threads", controller.DefaultIngressThreads, "Set the number of worker threads for the Ingress controller")
command.Flags().IntVar(&ephemeralMetadataThreads, "ephemeral-metadata-threads", rollout.DefaultEphemeralMetadataThreads, "Set the number of worker threads for the Ephemeral Metadata reconciler")
command.Flags().StringVar(&istioVersion, "istio-api-version", defaults.DefaultIstioVersion, "Set the default Istio apiVersion that controller should look when manipulating VirtualServices.")
command.Flags().StringVar(&ambassadorVersion, "ambassador-api-version", defaults.DefaultAmbassadorVersion, "Set the Ambassador apiVersion that controller should look when manipulating Ambassador Mappings.")
command.Flags().StringVar(&trafficSplitVersion, "traffic-split-api-version", defaults.DefaultSMITrafficSplitVersion, "Set the default TrafficSplit apiVersion that controller uses when creating TrafficSplits.")
Expand Down
2 changes: 2 additions & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func NewManager(
namespaced bool,
kubeInformerFactory kubeinformers.SharedInformerFactory,
jobInformerFactory kubeinformers.SharedInformerFactory,
ephemeralMetadataThreads int,
) *Manager {
runtime.Must(rolloutscheme.AddToScheme(scheme.Scheme))
log.Info("Creating event broadcaster")
Expand Down Expand Up @@ -346,6 +347,7 @@ func NewManager(
IngressWorkQueue: ingressWorkqueue,
MetricsServer: metricsServer,
Recorder: recorder,
EphemeralMetadataThreads: ephemeralMetadataThreads,
})

experimentController := experiments.NewController(experiments.ControllerConfig{
Expand Down
1 change: 1 addition & 0 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func TestNewManager(t *testing.T) {
false,
nil,
nil,
rolloutController.DefaultEphemeralMetadataThreads,
)

assert.NotNil(t, cm)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/tj/assert v0.0.3
github.com/valyala/fasttemplate v1.2.2
golang.org/x/oauth2 v0.26.0
golang.org/x/sync v0.10.0
google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a
google.golang.org/grpc v1.70.0
google.golang.org/protobuf v1.36.5
Expand Down Expand Up @@ -202,7 +203,6 @@ require (
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/term v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
Expand Down
8 changes: 5 additions & 3 deletions rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type ControllerConfig struct {
IngressWorkQueue workqueue.RateLimitingInterface
MetricsServer *metrics.MetricsServer
Recorder record.EventRecorder
EphemeralMetadataThreads int
}

// reconcilerBase is a shared datastructure containing all clients and configuration necessary to
Expand Down Expand Up @@ -153,8 +154,9 @@ type reconcilerBase struct {
newTrafficRoutingReconciler func(roCtx *rolloutContext) ([]trafficrouting.TrafficRoutingReconciler, error) //nolint:structcheck

// recorder is an event recorder for recording Event resources to the Kubernetes API.
recorder record.EventRecorder
resyncPeriod time.Duration
recorder record.EventRecorder
resyncPeriod time.Duration
ephemeralMetadataThreads int
}

type IngressWrapper interface {
Expand All @@ -166,7 +168,6 @@ type IngressWrapper interface {

// NewController returns a new rollout controller
func NewController(cfg ControllerConfig) *Controller {

replicaSetControl := controller.RealRSControl{
KubeClient: cfg.KubeClientSet,
Recorder: cfg.Recorder.K8sRecorder(),
Expand Down Expand Up @@ -201,6 +202,7 @@ func NewController(cfg ControllerConfig) *Controller {
resyncPeriod: cfg.ResyncPeriod,
podRestarter: podRestarter,
refResolver: cfg.RefResolver,
ephemeralMetadataThreads: cfg.EphemeralMetadataThreads,
}

controller := &Controller{
Expand Down
1 change: 1 addition & 0 deletions rollout/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
MetricsServer: metricsServer,
Recorder: record.NewFakeEventRecorder(),
RefResolver: &FakeWorkloadRefResolver{},
EphemeralMetadataThreads: DefaultEphemeralMetadataThreads,
})

c.enqueueRollout = func(obj any) {
Expand Down
31 changes: 23 additions & 8 deletions rollout/ephemeralmetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"context"
"fmt"

"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
replicasetutil "github.com/argoproj/argo-rollouts/utils/replicaset"
)

// DefaultEphemeralMetadataThreads is the default number of worker threads to run when reconciling ephemeral metadata
const DefaultEphemeralMetadataThreads = 10

// reconcileEphemeralMetadata syncs canary/stable ephemeral metadata to ReplicaSets and pods
func (c *rolloutContext) reconcileEphemeralMetadata() error {
ctx := context.TODO()
Expand Down Expand Up @@ -84,16 +88,27 @@ func (c *rolloutContext) syncEphemeralMetadata(ctx context.Context, rs *appsv1.R
return err
}
existingPodMetadata := replicasetutil.ParseExistingPodMetadata(originalRSCopy)

var eg errgroup.Group
eg.SetLimit(c.ephemeralMetadataThreads)

for _, pod := range pods {
newPodObjectMeta, podModified := replicasetutil.SyncEphemeralPodMetadata(&pod.ObjectMeta, existingPodMetadata, podMetadata)
if podModified {
pod.ObjectMeta = *newPodObjectMeta
_, err = c.kubeclientset.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{})
if err != nil {
return err
eg.Go(func() error {
newPodObjectMeta, podModified := replicasetutil.SyncEphemeralPodMetadata(&pod.ObjectMeta, existingPodMetadata, podMetadata)
if podModified {
pod.ObjectMeta = *newPodObjectMeta
_, err = c.kubeclientset.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{})
if err != nil {
return err
}
c.log.Infof("synced ephemeral metadata %v to Pod %s", podMetadata, pod.Name)
}
c.log.Infof("synced ephemeral metadata %v to Pod %s", podMetadata, pod.Name)
}
return nil
})
}

if err := eg.Wait(); err != nil {
return err
}

return nil
Expand Down
30 changes: 20 additions & 10 deletions rollout/ephemeralmetadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestSyncCanaryEphemeralMetadataSecondRevision(t *testing.T) {
r2.Status.StableRS = r1.Status.CurrentPodHash
rs2 := newReplicaSetWithStatus(r2, 3, 3)
rsGVK := schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "ReplicaSet"}
pod := corev1.Pod{
pod1 := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo-abc123",
Namespace: r1.Namespace,
Expand All @@ -125,17 +125,20 @@ func TestSyncCanaryEphemeralMetadataSecondRevision(t *testing.T) {
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(rs1, rsGVK)},
},
}
pod2 := pod1.DeepCopy()
pod2.Name = "foo-abc456"

f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)
f.kubeobjects = append(f.kubeobjects, rs1, &pod)
f.kubeobjects = append(f.kubeobjects, rs1, &pod1, pod2)
f.replicaSetLister = append(f.replicaSetLister, rs1)

f.expectUpdateRolloutStatusAction(r2) // Update Rollout conditions
rs2idx := f.expectCreateReplicaSetAction(rs2) // Create revision 2 ReplicaSet
rs1idx := f.expectUpdateReplicaSetAction(rs1) // update stable replicaset with stable metadata
f.expectListPodAction(r1.Namespace) // list pods to patch ephemeral data on revision 1 ReplicaSets pods
podIdx := f.expectUpdatePodAction(&pod) // Update pod with ephemeral data
pod1Idx := f.expectUpdatePodAction(&pod1) // Update pod1 with ephemeral data
pod2Idx := f.expectUpdatePodAction(pod2) // Update pod2 with ephemeral data
f.expectUpdateReplicaSetAction(rs1) // scale revision 1 ReplicaSet down
f.expectPatchRolloutAction(r2) // Patch Rollout status

Expand All @@ -158,8 +161,10 @@ func TestSyncCanaryEphemeralMetadataSecondRevision(t *testing.T) {
}
assert.Equal(t, expectedStableLabels, updatedRS1.Spec.Template.Labels)
// also it's pods
updatedPod := f.getUpdatedPod(podIdx)
assert.Equal(t, expectedStableLabels, updatedPod.Labels)
updatedPod1 := f.getUpdatedPod(pod1Idx)
assert.Equal(t, expectedStableLabels, updatedPod1.Labels)
updatedPod2 := f.getUpdatedPod(pod2Idx)
assert.Equal(t, expectedStableLabels, updatedPod2.Labels)
}

// TestSyncBlueGreenEphemeralMetadataSecondRevision verifies when we deploy a canary ReplicaSet, the canary
Expand Down Expand Up @@ -188,7 +193,7 @@ func TestSyncBlueGreenEphemeralMetadataSecondRevision(t *testing.T) {
rs2 := newReplicaSetWithStatus(r2, 3, 3)
rs2PodHash := rs2.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
rsGVK := schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "ReplicaSet"}
pod := corev1.Pod{
pod1 := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo-abc123",
Namespace: r1.Namespace,
Expand All @@ -199,13 +204,15 @@ func TestSyncBlueGreenEphemeralMetadataSecondRevision(t *testing.T) {
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(rs1, rsGVK)},
},
}
pod2 := pod1.DeepCopy()
pod2.Name = "foo-abc456"

previewSvc := newService("preview", 80, nil, r1)
activeSvc := newService("active", 80, nil, r1)

f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)
f.kubeobjects = append(f.kubeobjects, rs1, &pod, previewSvc, activeSvc)
f.kubeobjects = append(f.kubeobjects, rs1, &pod1, pod2, previewSvc, activeSvc)
f.replicaSetLister = append(f.replicaSetLister, rs1)
f.serviceLister = append(f.serviceLister, activeSvc, previewSvc)

Expand All @@ -215,7 +222,8 @@ func TestSyncBlueGreenEphemeralMetadataSecondRevision(t *testing.T) {
f.expectUpdateReplicaSetAction(rs2) // scale revision 2 ReplicaSet up
rs1idx := f.expectUpdateReplicaSetAction(rs1) // update stable replicaset with stable metadata
f.expectListPodAction(r1.Namespace) // list pods to patch ephemeral data on revision 1 ReplicaSets pods`
podIdx := f.expectUpdatePodAction(&pod) // Update pod with ephemeral data
pod1Idx := f.expectUpdatePodAction(&pod1) // Update pod1 with ephemeral data
pod2Idx := f.expectUpdatePodAction(pod2) // Update pod2 with ephemeral data
f.expectPatchRolloutAction(r2) // Patch Rollout status

f.run(getKey(r2, t))
Expand All @@ -237,8 +245,10 @@ func TestSyncBlueGreenEphemeralMetadataSecondRevision(t *testing.T) {
}
assert.Equal(t, expectedStableLabels, updatedRS1.Spec.Template.Labels)
// also it's pods
updatedPod := f.getUpdatedPod(podIdx)
assert.Equal(t, expectedStableLabels, updatedPod.Labels)
updatedPod1 := f.getUpdatedPod(pod1Idx)
assert.Equal(t, expectedStableLabels, updatedPod1.Labels)
updatedPod2 := f.getUpdatedPod(pod2Idx)
assert.Equal(t, expectedStableLabels, updatedPod2.Labels)
}

func TestReconcileEphemeralMetadata(t *testing.T) {
Expand Down

0 comments on commit d8994c0

Please sign in to comment.