Skip to content

Commit

Permalink
move object states Secrets to the namespace where the agent is running
Browse files Browse the repository at this point in the history
On-behalf-of: @SAP christoph.mewes@sap.com
  • Loading branch information
xrstf committed Jan 13, 2025
1 parent 4107481 commit c5fe344
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 12 deletions.
2 changes: 1 addition & 1 deletion cmd/api-syncagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func run(ctx context.Context, log *zap.SugaredLogger, opts *Options) error {
return fmt.Errorf("failed to add apiexport controller: %w", err)
}

if err := syncmanager.Add(ctx, mgr, platformCluster, platformRestConfig, log, apiExport, opts.PublishedResourceSelector); err != nil {
if err := syncmanager.Add(ctx, mgr, platformCluster, platformRestConfig, log, apiExport, opts.PublishedResourceSelector, opts.Namespace); err != nil {
return fmt.Errorf("failed to add syncmanager controller: %w", err)
}

Expand Down
3 changes: 2 additions & 1 deletion internal/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func Create(
pubRes *syncagentv1alpha1.PublishedResource,
discoveryClient *discovery.Client,
apiExportName string,
stateNamespace string,
log *zap.SugaredLogger,
numWorkers int,
) (controller.Controller, error) {
Expand All @@ -86,7 +87,7 @@ func Create(

// create the syncer that holds the meat&potatoes of the synchronization logic
mutator := mutation.NewMutator(nil) // pubRes.Spec.Mutation
syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator)
syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator, stateNamespace)
if err != nil {
return nil, fmt.Errorf("failed to create syncer: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/syncmanager/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Reconciler struct {
recorder record.EventRecorder
discoveryClient *discovery.Client
prFilter labels.Selector
stateNamespace string

apiExport *kcpdevv1alpha1.APIExport

Expand All @@ -93,6 +94,7 @@ func Add(
log *zap.SugaredLogger,
apiExport *kcpdevv1alpha1.APIExport,
prFilter labels.Selector,
stateNamespace string,
) error {
reconciler := &Reconciler{
ctx: ctx,
Expand All @@ -105,6 +107,7 @@ func Add(
syncWorkers: map[string]lifecycle.Controller{},
discoveryClient: discovery.NewClient(localManager.GetClient()),
prFilter: prFilter,
stateNamespace: stateNamespace,
}

_, err := builder.ControllerManagedBy(localManager).
Expand Down Expand Up @@ -276,6 +279,7 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
&pubRes,
r.discoveryClient,
r.apiExport.Name,
r.stateNamespace,
r.log,
numSyncWorkers,
)
Expand Down
16 changes: 10 additions & 6 deletions internal/sync/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ type objectStateStore struct {
backend backend
}

func newObjectStateStore(primaryObject, stateCluster syncSide) ObjectStateStore {
kubernetes := newKubernetesBackend(primaryObject, stateCluster)

func newObjectStateStore(backend backend) ObjectStateStore {
return &objectStateStore{
backend: kubernetes,
backend: backend,
}
}

func newKubernetesStateStoreCreator(namespace string) newObjectStateStoreFunc {
return func(primaryObject, stateCluster syncSide) ObjectStateStore {
return newObjectStateStore(newKubernetesBackend(namespace, primaryObject, stateCluster))
}
}

Expand Down Expand Up @@ -124,7 +128,7 @@ func hashObject(obj *unstructured.Unstructured) string {
return hex.EncodeToString(hash.Sum(nil))
}

func newKubernetesBackend(primaryObject, stateCluster syncSide) *kubernetesBackend {
func newKubernetesBackend(namespace string, primaryObject, stateCluster syncSide) *kubernetesBackend {
keyHash := hashObject(primaryObject.object)

secretLabels := newObjectKey(primaryObject.object, primaryObject.clusterName).Labels()
Expand All @@ -134,7 +138,7 @@ func newKubernetesBackend(primaryObject, stateCluster syncSide) *kubernetesBacke
secretName: types.NamespacedName{
// trim hash down; 20 was chosen at random
Name: fmt.Sprintf("obj-state-%s-%s", primaryObject.clusterName, keyHash[:20]),
Namespace: "kcp-system",
Namespace: namespace,
},
labels: secretLabels,
stateCluster: stateCluster,
Expand Down
4 changes: 3 additions & 1 deletion internal/sync/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestStateStoreBasics(t *testing.T) {

serviceClusterClient := buildFakeClient()
ctx := context.Background()
stateNamespace := "kcp-system"

primaryObjectSide := syncSide{
object: primaryObject,
Expand All @@ -48,7 +49,8 @@ func TestStateStoreBasics(t *testing.T) {
client: serviceClusterClient,
}

store := newObjectStateStore(primaryObjectSide, stateSide)
storeCreator := newKubernetesStateStoreCreator(stateNamespace)
store := storeCreator(primaryObjectSide, stateSide)

///////////////////////////////////////
// get nil from empty store
Expand Down
3 changes: 2 additions & 1 deletion internal/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewResourceSyncer(
localCRD *apiextensionsv1.CustomResourceDefinition,
remoteAPIGroup string,
mutator mutation.Mutator,
stateNamespace string,
) (*ResourceSyncer, error) {
// create a dummy that represents the type used on the local service cluster
localGVK := projection.PublishedResourceSourceGVK(pubRes)
Expand Down Expand Up @@ -100,7 +101,7 @@ func NewResourceSyncer(
subresources: subresources,
destDummy: localDummy,
mutator: mutator,
newObjectStateStore: newObjectStateStore,
newObjectStateStore: newKubernetesStateStoreCreator(stateNamespace),
}, nil
}

Expand Down
10 changes: 8 additions & 2 deletions internal/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,8 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
},
}

const stateNamespace = "kcp-system"

for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
localClient := buildFakeClient(testcase.localObject)
Expand All @@ -806,6 +808,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
testcase.localCRD,
testcase.remoteAPIGroup,
nil,
stateNamespace,
)
if err != nil {
t.Fatalf("Failed to create syncer: %v", err)
Expand All @@ -820,7 +823,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
syncer.newObjectStateStore = func(primaryObject, stateCluster syncSide) ObjectStateStore {
// .Process() is called multiple times, but we want the state to persist between reconciles.
if backend == nil {
backend = newKubernetesBackend(primaryObject, stateCluster)
backend = newKubernetesBackend(stateNamespace, primaryObject, stateCluster)
if testcase.existingState != "" {
if err := backend.Put(testcase.remoteObject, clusterName.String(), []byte(testcase.existingState)); err != nil {
t.Fatalf("Failed to prime state store: %v", err)
Expand Down Expand Up @@ -1086,6 +1089,8 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
},
}

const stateNamespace = "kcp-system"

for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
localClient := buildFakeClientWithStatus(testcase.localObject)
Expand All @@ -1100,6 +1105,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
testcase.localCRD,
testcase.remoteAPIGroup,
nil,
stateNamespace,
)
if err != nil {
t.Fatalf("Failed to create syncer: %v", err)
Expand All @@ -1114,7 +1120,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
syncer.newObjectStateStore = func(primaryObject, stateCluster syncSide) ObjectStateStore {
// .Process() is called multiple times, but we want the state to persist between reconciles.
if backend == nil {
backend = newKubernetesBackend(primaryObject, stateCluster)
backend = newKubernetesBackend(stateNamespace, primaryObject, stateCluster)
if testcase.existingState != "" {
if err := backend.Put(testcase.remoteObject, clusterName.String(), []byte(testcase.existingState)); err != nil {
t.Fatalf("Failed to prime state store: %v", err)
Expand Down

0 comments on commit c5fe344

Please sign in to comment.