Skip to content

Commit

Permalink
implement shard controller and resource reconciling
Browse files Browse the repository at this point in the history
On-behalf-of: @SAP christoph.mewes@sap.com
  • Loading branch information
xrstf committed Jan 27, 2025
1 parent 602f8f7 commit 37602a0
Show file tree
Hide file tree
Showing 9 changed files with 871 additions and 35 deletions.
2 changes: 1 addition & 1 deletion config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ rules:
resources:
- cacheservers
- frontproxies
- shards
verbs:
- create
- delete
Expand Down Expand Up @@ -84,6 +83,7 @@ rules:
resources:
- kubeconfigs
- rootshards
- shards
verbs:
- get
- list
Expand Down
178 changes: 157 additions & 21 deletions internal/controller/shard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,25 @@ package controller

import (
"context"
"errors"
"fmt"

k8creconciling "k8c.io/reconciler/pkg/reconciling"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kcp-dev/kcp-operator/internal/reconciling"
"github.com/kcp-dev/kcp-operator/internal/resources"
"github.com/kcp-dev/kcp-operator/internal/resources/shard"
operatorv1alpha1 "github.com/kcp-dev/kcp-operator/sdk/apis/operator/v1alpha1"
)

Expand All @@ -33,30 +46,153 @@ type ShardReconciler struct {
Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=operator.kcp.io,resources=shards,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.kcp.io,resources=shards/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=operator.kcp.io,resources=shards/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the Shard object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/reconcile
func (r *ShardReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

// TODO(user): your logic here

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ShardReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&operatorv1alpha1.Shard{}).
Owns(&appsv1.Deployment{}).
Complete(r)
}

// +kubebuilder:rbac:groups=operator.kcp.io,resources=shards,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=operator.kcp.io,resources=shards/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=operator.kcp.io,resources=shards/finalizers,verbs=update
// +kubebuilder:rbac:groups=cert-manager.io,resources=certificates,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete

func (r *ShardReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
logger := log.FromContext(ctx)
logger.V(4).Info("Reconciling Shard object")

var s operatorv1alpha1.Shard
if err := r.Client.Get(ctx, req.NamespacedName, &s); err != nil {
if client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, fmt.Errorf("failed to get shard: %w", err)
}

return ctrl.Result{}, nil
}

defer func() {
if err := r.reconcileStatus(ctx, &s); err != nil {
recErr = kerrors.NewAggregate([]error{recErr, err})
}
}()

var rootShard operatorv1alpha1.RootShard
if ref := s.Spec.RootShard.Reference; ref != nil {
rootShardRef := types.NamespacedName{
Namespace: s.Namespace,
Name: ref.Name,
}

if err := r.Client.Get(ctx, rootShardRef, &rootShard); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get root shard: %w", err)
}
} else {
return ctrl.Result{}, errors.New("no RootShard reference specified in Shard spec")
}

return ctrl.Result{}, r.reconcile(ctx, &s, &rootShard)
}

func (r *ShardReconciler) reconcile(ctx context.Context, s *operatorv1alpha1.Shard, rootShard *operatorv1alpha1.RootShard) error {
var errs []error

ownerRefWrapper := k8creconciling.OwnerRefWrapper(*metav1.NewControllerRef(s, operatorv1alpha1.SchemeGroupVersion.WithKind("Shard")))

certReconcilers := []reconciling.NamedCertificateReconcilerFactory{
shard.ServerCertificateReconciler(s, rootShard),
shard.ServiceAccountCertificateReconciler(s, rootShard),
shard.VirtualWorkspacesCertificateReconciler(s, rootShard),
shard.RootShardClientCertificateReconciler(s, rootShard),
}

if err := reconciling.ReconcileCertificates(ctx, certReconcilers, s.Namespace, r.Client, ownerRefWrapper); err != nil {
errs = append(errs, err)
}

if err := k8creconciling.ReconcileSecrets(ctx, []k8creconciling.NamedSecretReconcilerFactory{
shard.RootShardClientKubeconfigReconciler(s, rootShard),
}, s.Namespace, r.Client, ownerRefWrapper); err != nil {
errs = append(errs, err)
}

if err := k8creconciling.ReconcileDeployments(ctx, []k8creconciling.NamedDeploymentReconcilerFactory{
shard.DeploymentReconciler(s, rootShard),
}, s.Namespace, r.Client, ownerRefWrapper); err != nil {
errs = append(errs, err)
}

if err := k8creconciling.ReconcileServices(ctx, []k8creconciling.NamedServiceReconcilerFactory{
shard.ServiceReconciler(s),
}, s.Namespace, r.Client, ownerRefWrapper); err != nil {
errs = append(errs, err)
}

return kerrors.NewAggregate(errs)
}

// reconcileStatus sets both phase and conditions on the reconciled Shard object.
func (r *ShardReconciler) reconcileStatus(ctx context.Context, oldShard *operatorv1alpha1.Shard) error {
newShard := oldShard.DeepCopy()
var errs []error

if newShard.Status.Phase == "" {
newShard.Status.Phase = operatorv1alpha1.ShardPhaseProvisioning
}

if newShard.DeletionTimestamp != nil {
newShard.Status.Phase = operatorv1alpha1.ShardPhaseDeleting
}

if err := r.setAvailableCondition(ctx, newShard); err != nil {
errs = append(errs, err)
}

if cond := apimeta.FindStatusCondition(newShard.Status.Conditions, string(operatorv1alpha1.ShardConditionTypeAvailable)); cond.Status == metav1.ConditionTrue {
newShard.Status.Phase = operatorv1alpha1.ShardPhaseRunning
}

// only patch the status if there are actual changes.
if !equality.Semantic.DeepEqual(oldShard.Status, newShard.Status) {
if err := r.Client.Status().Patch(ctx, newShard, client.MergeFrom(oldShard)); err != nil {
errs = append(errs, err)
}
}

return kerrors.NewAggregate(errs)
}

func (r *ShardReconciler) setAvailableCondition(ctx context.Context, s *operatorv1alpha1.Shard) error {
var dep appsv1.Deployment
depKey := types.NamespacedName{Namespace: s.Namespace, Name: resources.GetShardDeploymentName(s)}
if err := r.Client.Get(ctx, depKey, &dep); client.IgnoreNotFound(err) != nil {
return err
}

available := metav1.ConditionFalse
reason := operatorv1alpha1.ShardConditionReasonDeploymentUnavailable
msg := deploymentStatusString(dep, depKey)

if dep.Name != "" {
if deploymentReady(dep) {
available = metav1.ConditionTrue
reason = operatorv1alpha1.ShardConditionReasonReplicasUp
} else {
available = metav1.ConditionFalse
reason = operatorv1alpha1.ShardConditionReasonReplicasUnavailable
}
}

s.Status.Conditions = updateCondition(s.Status.Conditions, metav1.Condition{
Type: string(operatorv1alpha1.ShardConditionTypeAvailable),
Status: available,
ObservedGeneration: s.Generation,
Reason: string(reason),
Message: msg,
})

return nil
}
35 changes: 32 additions & 3 deletions internal/controller/shard_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"k8s.io/apimachinery/pkg/api/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -43,9 +44,32 @@ var _ = Describe("Shard Controller", func() {
shard := &operatorv1alpha1.Shard{}

BeforeEach(func() {
By("creating the custom resource for the Kind RootShard")
err := k8sClient.Get(ctx, typeNamespacedName, &operatorv1alpha1.RootShard{})
if err != nil && apierrors.IsNotFound(err) {
resource := &operatorv1alpha1.RootShard{
ObjectMeta: metav1.ObjectMeta{
Name: "my-root-shard",
Namespace: "default",
},
Spec: operatorv1alpha1.RootShardSpec{
External: operatorv1alpha1.ExternalConfig{
Hostname: "example.kcp.io",
Port: 6443,
},
CommonShardSpec: operatorv1alpha1.CommonShardSpec{
Etcd: operatorv1alpha1.EtcdConfig{
Endpoints: []string{"https://localhost:2379"},
},
},
},
}
Expect(k8sClient.Create(ctx, resource)).To(Succeed())
}

By("creating the custom resource for the Kind Shard")
err := k8sClient.Get(ctx, typeNamespacedName, shard)
if err != nil && errors.IsNotFound(err) {
err = k8sClient.Get(ctx, typeNamespacedName, shard)
if err != nil && apierrors.IsNotFound(err) {
resource := &operatorv1alpha1.Shard{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Expand All @@ -57,6 +81,11 @@ var _ = Describe("Shard Controller", func() {
Endpoints: []string{"https://localhost:2379"},
},
},
RootShard: operatorv1alpha1.RootShardConfig{
Reference: &corev1.LocalObjectReference{
Name: "my-root-shard",
},
},
},
}
Expect(k8sClient.Create(ctx, resource)).To(Succeed())
Expand Down
48 changes: 38 additions & 10 deletions internal/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,35 @@ func GetRootShardDeploymentName(r *operatorv1alpha1.RootShard) string {
return fmt.Sprintf("%s-kcp", r.Name)
}

func GetShardDeploymentName(s *operatorv1alpha1.Shard) string {
return fmt.Sprintf("%s-shard-kcp", s.Name)
}

func GetRootShardServiceName(r *operatorv1alpha1.RootShard) string {
return fmt.Sprintf("%s-kcp", r.Name)
}

func GetRootShardResourceLabels(r *operatorv1alpha1.RootShard) map[string]string {
func GetShardServiceName(s *operatorv1alpha1.Shard) string {
return fmt.Sprintf("%s-shard-kcp", s.Name)
}

func getResourceLabels(instance, component string) map[string]string {
return map[string]string{
appNameLabel: "kcp",
appInstanceLabel: r.Name,
appManagedByLabel: "kcp-operator",
appComponentLabel: "rootshard",
appNameLabel: "kcp",
appInstanceLabel: instance,
appComponentLabel: component,
}
}

func GetRootShardResourceLabels(r *operatorv1alpha1.RootShard) map[string]string {
return getResourceLabels(r.Name, "rootshard")
}

func GetShardResourceLabels(s *operatorv1alpha1.Shard) map[string]string {
return getResourceLabels(s.Name, "shard")
}

func GetRootShardBaseHost(r *operatorv1alpha1.RootShard) string {
clusterDomain := r.Spec.ClusterDomain
if clusterDomain == "" {
Expand All @@ -90,10 +106,27 @@ func GetRootShardBaseURL(r *operatorv1alpha1.RootShard) string {
return fmt.Sprintf("https://%s:6443", GetRootShardBaseHost(r))
}

func GetShardBaseHost(s *operatorv1alpha1.Shard) string {
clusterDomain := s.Spec.ClusterDomain
if clusterDomain == "" {
clusterDomain = "cluster.local"
}

return fmt.Sprintf("%s-shard-kcp.%s.svc.%s", s.Name, s.Namespace, clusterDomain)
}

func GetShardBaseURL(s *operatorv1alpha1.Shard) string {
return fmt.Sprintf("https://%s:6443", GetShardBaseHost(s))
}

func GetRootShardCertificateName(r *operatorv1alpha1.RootShard, certName operatorv1alpha1.Certificate) string {
return fmt.Sprintf("%s-%s", r.Name, certName)
}

func GetShardCertificateName(s *operatorv1alpha1.Shard, certName operatorv1alpha1.Certificate) string {
return fmt.Sprintf("%s-%s", s.Name, certName)
}

func GetRootShardCAName(r *operatorv1alpha1.RootShard, caName operatorv1alpha1.CA) string {
if caName == operatorv1alpha1.RootCA {
return fmt.Sprintf("%s-ca", r.Name)
Expand All @@ -102,12 +135,7 @@ func GetRootShardCAName(r *operatorv1alpha1.RootShard, caName operatorv1alpha1.C
}

func GetFrontProxyResourceLabels(f *operatorv1alpha1.FrontProxy) map[string]string {
return map[string]string{
appNameLabel: "kcp",
appInstanceLabel: f.Name,
appManagedByLabel: "kcp-operator",
appComponentLabel: "front-proxy",
}
return getResourceLabels(f.Name, "front-proxy")
}

func GetFrontProxyDeploymentName(f *operatorv1alpha1.FrontProxy) string {
Expand Down
Loading

0 comments on commit 37602a0

Please sign in to comment.