Skip to content

Commit

Permalink
Add kubestash controller for changing kubeDB phase (#1076)
Browse files Browse the repository at this point in the history
Signed-off-by: Anisur Rahman <anisur@appscode.com>
  • Loading branch information
anisurrahman75 authored Dec 22, 2023
1 parent c99d3ab commit d6368a1
Show file tree
Hide file tree
Showing 13 changed files with 866 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package stash
package restore

import (
"fmt"
"time"

amc "kubedb.dev/apimachinery/pkg/controller"
Expand All @@ -26,25 +27,31 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
kmapi "kmodules.xyz/client-go/api/v1"
dmcond "kmodules.xyz/client-go/dynamic/conditions"
"kmodules.xyz/client-go/tools/queue"
coreapi "kubestash.dev/apimachinery/apis/core/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/manager"
"stash.appscode.dev/apimachinery/apis/stash/v1beta1"
scs "stash.appscode.dev/apimachinery/client/clientset/versioned"
stashinformer "stash.appscode.dev/apimachinery/client/informers/externalversions"
)

type Controller struct {
manager *manager.Manager
*amc.Controller
*amc.StashInitializer
restrictToNamespace string
}

func NewController(
mgr *manager.Manager,
ctrl *amc.Controller,
initializer *amc.StashInitializer,
restrictToNamespace string,
) *Controller {
return &Controller{
manager: mgr,
Controller: ctrl,
StashInitializer: initializer,
restrictToNamespace: restrictToNamespace,
Expand All @@ -53,12 +60,22 @@ func NewController(

type restoreInfo struct {
invoker core.TypedLocalObjectReference
target *v1beta1.RestoreTarget
phase v1beta1.RestorePhase
stash *stashInfo
kubestash *kubestashInfo
do dmcond.DynamicOptions
invokerUID types.UID
}

type stashInfo struct {
target *v1beta1.RestoreTarget
phase v1beta1.RestorePhase
}

type kubestashInfo struct {
target *kmapi.TypedObjectReference
phase coreapi.RestorePhase
}

func Configure(cfg *rest.Config, s *amc.StashInitializer, resyncPeriod time.Duration) error {
var err error
if s.StashClient, err = scs.NewForConfig(cfg); err != nil {
Expand All @@ -68,7 +85,7 @@ func Configure(cfg *rest.Config, s *amc.StashInitializer, resyncPeriod time.Dura
return nil
}

func (c *Controller) StartAfterStashInstalled(maxNumRequeues, numThreads int, selector metav1.LabelSelector, stopCh <-chan struct{}) {
func (c *Controller) StartAfterStashInstalled(stopCh <-chan struct{}, maxNumRequeues, numThreads int, selector metav1.LabelSelector) {
// Wait until Stash operator installed
if err := c.waitUntilStashInstalled(stopCh); err != nil {
klog.Errorln("error during waiting for RestoreSession crd. Reason: ", err)
Expand Down Expand Up @@ -125,3 +142,17 @@ func (c *Controller) startController(stopCh <-chan struct{}) {
c.RSQueue.Run(stopCh)
c.RBQueue.Run(stopCh)
}

func (c *Controller) StartAfterKubeStashInstalled(stopCh <-chan struct{}, selector metav1.LabelSelector) {
// Here Wait until KubeStash operator installed
if err := c.waitUntilKubeStashInstalled(stopCh); err != nil {
klog.Errorln("error during waiting for RestoreSession crd. Reason: ", err)
return
}
if err := (&RestoreSessionReconciler{
ctrl: c,
}).SetupWithManager(*c.manager, selector); err != nil {
klog.Info(fmt.Errorf("unable to create RestoreSession controller. Reason: %w", err))
return
}
}
77 changes: 77 additions & 0 deletions pkg/controller/initializer/restore/kubestash-restoresession.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright AppsCode Inc. and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package restore

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
coreapi "kubestash.dev/apimachinery/apis/core/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// RestoreSessionReconciler reconciles a RestoreSession object
type RestoreSessionReconciler struct {
ctrl *Controller
}

func (r *RestoreSessionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Info("Reconciling: " + req.String())
c := r.ctrl.KBClient

rs := &coreapi.RestoreSession{}
if err := c.Get(ctx, req.NamespacedName, rs); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

ri, err := r.ctrl.extractRestoreInfo(rs)
if err != nil {
klog.Errorln("failed to extract kubeStash invoker info. Reason: ", err)
return ctrl.Result{}, err
}
if rs.DeletionTimestamp != nil {
return ctrl.Result{}, r.ctrl.handleTerminateEvent(ri)
}

return ctrl.Result{}, r.ctrl.handleRestoreInvokerEvent(ri)
}

// SetupWithManager sets up the controller with the Manager.
func (r *RestoreSessionReconciler) SetupWithManager(mgr ctrl.Manager, selector metav1.LabelSelector) error {
return ctrl.NewControllerManagedBy(mgr).
For(&coreapi.RestoreSession{}, builder.WithPredicates(
predicate.NewPredicateFuncs(func(object client.Object) bool {
return hasRequiredLabels(object.GetLabels(), selector.MatchLabels)
}),
)).
Complete(r)
}

func hasRequiredLabels(actualLabels, requiredLabels map[string]string) bool {
for key, value := range requiredLabels {
if actualValue, found := actualLabels[key]; !found || actualValue != value {
return false
}
}
return true
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package stash
package restore

import (
"time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package stash
package restore

import (
"time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package stash
package restore

import (
"context"
Expand All @@ -41,6 +41,9 @@ import (
dmcond "kmodules.xyz/client-go/dynamic/conditions"
appcat "kmodules.xyz/custom-resources/apis/appcatalog"
ab "kmodules.xyz/custom-resources/apis/appcatalog/v1alpha1"
addonapi "kubestash.dev/apimachinery/apis/addons/v1alpha1"
coreapi "kubestash.dev/apimachinery/apis/core/v1alpha1"
storageapi "kubestash.dev/apimachinery/apis/storage/v1alpha1"
sapis "stash.appscode.dev/apimachinery/apis"
"stash.appscode.dev/apimachinery/apis/stash"
"stash.appscode.dev/apimachinery/apis/stash/v1alpha1"
Expand All @@ -50,9 +53,6 @@ import (

func (c *Controller) extractRestoreInfo(inv interface{}) (*restoreInfo, error) {
ri := &restoreInfo{
invoker: core.TypedLocalObjectReference{
APIGroup: pointer.StringP(stash.GroupName),
},
do: dmcond.DynamicOptions{
Client: c.DynamicClient,
},
Expand All @@ -61,31 +61,55 @@ func (c *Controller) extractRestoreInfo(inv interface{}) (*restoreInfo, error) {
switch inv := inv.(type) {
case *v1beta1.RestoreSession:
// invoker information
ri.invoker.APIGroup = pointer.StringP(stash.GroupName)
ri.invoker.Kind = inv.Kind
ri.invoker.Name = inv.Name
// target information
ri.target = inv.Spec.Target
// restore status
ri.phase = inv.Status.Phase

// database information
ri.do.Namespace = inv.Namespace
ri.invokerUID = inv.UID

// stash information
ri.stash = &stashInfo{
target: ri.stash.target,
phase: ri.stash.phase,
}
case *v1beta1.RestoreBatch:
// invoker information
ri.invoker.APIGroup = pointer.StringP(stash.GroupName)
ri.invoker.Kind = inv.Kind
ri.invoker.Name = inv.Name
// target information

// database information
ri.do.Namespace = inv.Namespace
ri.invokerUID = inv.UID

// stash information
// RestoreBatch can have multiple targets. In this case, only the database related target's phase does matter.
ri.target, err = c.identifyTarget(inv.Spec.Members, ri.do.Namespace)
if err != nil {
return ri, err
info := &stashInfo{}
if info.target, err = c.identifyTarget(inv.Spec.Members, ri.do.Namespace); err != nil {
return nil, err
}

// restore status
// RestoreBatch can have multiple targets. In this case, finding the appropriate target is necessary.
ri.phase = getTargetPhase(inv.Status, ri.target)
info.phase = getTargetPhase(inv.Status, info.target)
ri.stash = info
case *coreapi.RestoreSession:
// invoker information
ri.invoker.APIGroup = pointer.StringP(storageapi.GroupVersion.Group)
ri.invoker.Kind = inv.Kind
ri.invoker.Name = inv.Name

// database information
ri.do.Namespace = inv.Namespace
ri.invokerUID = inv.UID

// kubestash information
ri.kubestash = &kubestashInfo{
target: inv.Spec.Target,
phase: inv.Status.Phase,
}
default:
return ri, fmt.Errorf("unknown restore invoker type")
}
Expand All @@ -102,7 +126,7 @@ func (c *Controller) handleTerminateEvent(ri *restoreInfo) error {
return fmt.Errorf("invalid restore information. it must not be nil")
}
// If the target could not be identified properly, we can't process further.
if ri.target == nil {
if ri.stash == nil && ri.kubestash == nil {
return fmt.Errorf("couldn't identify the restore target from invoker: %s/%s/%s", *ri.invoker.APIGroup, ri.invoker.Kind, ri.invoker.Name)
}

Expand Down Expand Up @@ -144,7 +168,7 @@ func (c *Controller) handleRestoreInvokerEvent(ri *restoreInfo) error {
return fmt.Errorf("invalid restore information. it must not be nil")
}
// If the target could not be identified properly, we can't process further.
if ri.target == nil {
if ri.stash == nil && ri.kubestash == nil {
return fmt.Errorf("couldn't identify the restore target from invoker: %s/%s/%s", *ri.invoker.APIGroup, ri.invoker.Kind, ri.invoker.Name)
}

Expand All @@ -155,7 +179,7 @@ func (c *Controller) handleRestoreInvokerEvent(ri *restoreInfo) error {
dbCond := kmapi.Condition{
Type: api.DatabaseDataRestored,
}
if ri.phase == v1beta1.RestoreSucceeded {
if (ri.stash != nil && ri.stash.phase == v1beta1.RestoreSucceeded) || (ri.kubestash != nil && ri.kubestash.phase == coreapi.RestoreSucceeded) {
dbCond.Status = metav1.ConditionTrue
dbCond.Reason = api.DatabaseSuccessfullyRestored
dbCond.Message = fmt.Sprintf("Successfully restored data by initializer %s %s/%s with UID %s",
Expand Down Expand Up @@ -281,27 +305,60 @@ func (c *Controller) waitUntilStashInstalled(stopCh <-chan struct{}) error {
}, stopCh)
}

// waitUntilKubeStashInstalled waits for KubeStash operator to be installed. It check whether all the CRDs that are necessary for backup KubeDB database,
// is present in the cluster or not. It wait until all the CRDs are found.
func (c *Controller) waitUntilKubeStashInstalled(stopCh <-chan struct{}) error {
klog.Infoln("Looking for the KubeStash operator.......")
return wait.PollImmediateUntil(time.Second*10, func() (bool, error) {
return discovery.ExistsGroupKinds(c.Client.Discovery(),
schema.GroupKind{Group: storageapi.GroupVersion.Group, Kind: storageapi.ResourceKindBackupStorage},
schema.GroupKind{Group: storageapi.GroupVersion.Group, Kind: storageapi.ResourceKindRepository},
schema.GroupKind{Group: storageapi.GroupVersion.Group, Kind: storageapi.ResourceKindSnapshot},
schema.GroupKind{Group: storageapi.GroupVersion.Group, Kind: storageapi.ResourceKindRetentionPolicy},
schema.GroupKind{Group: coreapi.GroupVersion.Group, Kind: coreapi.ResourceKindBackupBatch},
schema.GroupKind{Group: coreapi.GroupVersion.Group, Kind: coreapi.ResourceKindBackupBlueprint},
schema.GroupKind{Group: coreapi.GroupVersion.Group, Kind: coreapi.ResourceKindBackupConfiguration},
schema.GroupKind{Group: coreapi.GroupVersion.Group, Kind: coreapi.ResourceKindBackupSession},
schema.GroupKind{Group: coreapi.GroupVersion.Group, Kind: coreapi.ResourceKindRestoreSession},
schema.GroupKind{Group: addonapi.GroupVersion.Group, Kind: addonapi.ResourceKindAddon},
schema.GroupKind{Group: addonapi.GroupVersion.Group, Kind: addonapi.ResourceKindFunction},
), nil
}, stopCh)
}

func (c *Controller) extractDatabaseInfo(ri *restoreInfo) error {
if ri == nil {
return fmt.Errorf("invalid restoreInfo. It must not be nil")
}
if ri.target == nil {

// It is guaranteed that if either ri.stash or ri.kubestash is initialized, then 'target' field is also initialized.
if ri.stash == nil && ri.kubestash == nil {
return fmt.Errorf("invalid target. It must not be nil")
}

var owner *metav1.OwnerReference
if matched, err := targetOfGroupKind(ri.target.Ref, appcat.GroupName, ab.ResourceKindApp); err == nil && matched {
appBinding, err := c.AppCatalogClient.AppcatalogV1alpha1().AppBindings(ri.do.Namespace).Get(context.TODO(), ri.target.Ref.Name, metav1.GetOptions{})
if err != nil {
return err
if ri.stash != nil {
if matched, err := targetOfGroupKind(ri.stash.target.Ref, appcat.GroupName, ab.ResourceKindApp); err == nil && matched {
appBinding, err := c.AppCatalogClient.AppcatalogV1alpha1().AppBindings(ri.do.Namespace).Get(context.TODO(), ri.stash.target.Ref.Name, metav1.GetOptions{})
if err != nil {
return err
}
owner = metav1.GetControllerOf(appBinding)
} else if matched, err := targetOfGroupKind(ri.stash.target.Ref, apps.GroupName, sapis.KindStatefulSet); err == nil && matched {
sts, err := c.AppCatalogClient.AppcatalogV1alpha1().AppBindings(ri.do.Namespace).Get(context.TODO(), ri.stash.target.Ref.Name, metav1.GetOptions{})
if err != nil {
return err
}
owner = metav1.GetControllerOf(sts)
}
owner = metav1.GetControllerOf(appBinding)
} else if matched, err := targetOfGroupKind(ri.target.Ref, apps.GroupName, sapis.KindStatefulSet); err == nil && matched {
sts, err := c.AppCatalogClient.AppcatalogV1alpha1().AppBindings(ri.do.Namespace).Get(context.TODO(), ri.target.Ref.Name, metav1.GetOptions{})
} else {
appBinding, err := c.AppCatalogClient.AppcatalogV1alpha1().AppBindings(ri.kubestash.target.Namespace).Get(context.TODO(), ri.kubestash.target.Name, metav1.GetOptions{})
if err != nil {
return err
}
owner = metav1.GetControllerOf(sts)
owner = metav1.GetControllerOf(appBinding)
}

if owner == nil {
return fmt.Errorf("failed to extract database information from the target info. Reason: target does not have controlling owner")
}
Expand Down Expand Up @@ -346,7 +403,13 @@ func (c *Controller) writeRestoreCompletionEvent(do dmcond.DynamicOptions, cond
}

func isDataRestoreCompleted(ri *restoreInfo) bool {
return ri.phase == v1beta1.RestoreSucceeded ||
ri.phase == v1beta1.RestoreFailed ||
ri.phase == v1beta1.RestorePhaseUnknown
if ri.stash != nil {
return ri.stash.phase == v1beta1.RestoreSucceeded ||
ri.stash.phase == v1beta1.RestoreFailed ||
ri.stash.phase == v1beta1.RestorePhaseUnknown
} else {
return ri.kubestash.phase == coreapi.RestoreSucceeded ||
ri.kubestash.phase == coreapi.RestoreFailed ||
ri.kubestash.phase == coreapi.RestorePhaseUnknown
}
}
Loading

0 comments on commit d6368a1

Please sign in to comment.