Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support pause and resume reconciliation of a cluster #7435

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6deb80e
wip
yipeng1030 May 27, 2024
a64cc39
support cluster pause and resume
yipeng1030 May 28, 2024
42d509e
merge from main
yipeng1030 May 28, 2024
94c6462
pause configuration as well
yipeng1030 May 28, 2024
4ed5dd5
Merge remote-tracking branch 'upstream/main'
yipeng1030 May 28, 2024
a89df33
Merge remote-tracking branch 'upstream/main' into featue/support_clus…
yipeng1030 May 31, 2024
25b85f6
merge from main and improve some implements
yipeng1030 Jun 4, 2024
e4e0740
tidy import order
yipeng1030 Jun 4, 2024
8466b1d
Merge remote-tracking branch 'upstream/main'
yipeng1030 Jun 18, 2024
46b47c8
Merge branch 'main' into featue/support_cluster_pause_and_resume
yipeng1030 Jun 18, 2024
1cc3982
merge from main
yipeng1030 Jun 18, 2024
a42ebcf
merge from main
yipeng1030 Jun 25, 2024
578967e
fix pr
yipeng1030 Jun 25, 2024
4aeae0f
fix pr
yipeng1030 Jun 25, 2024
5398f89
fix pr
yipeng1030 Jun 25, 2024
72f8e43
Merge branch 'featue/support_cluster_pause_and_resume' of github.com:…
yipeng1030 Jul 15, 2024
1a588d2
fix several reviews
yipeng1030 Jul 16, 2024
419c5ba
fix
yipeng1030 Jul 16, 2024
d730712
Merge remote-tracking branch 'upstream/main' into featue/support_clus…
yipeng1030 Jul 16, 2024
77908af
fix
yipeng1030 Jul 16, 2024
55b118d
fix
yipeng1030 Jul 16, 2024
ad1ac48
fix
yipeng1030 Jul 16, 2024
23b44dc
add ut
yipeng1030 Jul 17, 2024
59d0c45
fix
yipeng1030 Jul 17, 2024
cef893c
fix
yipeng1030 Jul 17, 2024
c5e8e34
pause back up as well
yipeng1030 Jul 18, 2024
5bc432a
fix typo
yipeng1030 Jul 19, 2024
f7b3b48
make code flatter
yipeng1030 Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions controllers/apps/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
AddTransformer(
// handle cluster deletion first
&clusterDeletionTransformer{},
// handle cluster pause and resume
&clusterPauseTransformer{},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the pause and resume operations be executed before all transformers? According to your design, can a cluster that is being deleted be paused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deletion has a higher priority than pause in my design, which is refer to the design of rollout pause of the k8s deployment.

// check is recovering from halted cluster
&clusterHaltRecoveryTransformer{},
// update finalizer and cd&cv labels
Expand Down
2 changes: 2 additions & 0 deletions controllers/apps/component_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (r *ComponentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
AddTransformer(
// handle component deletion and pre-terminate
&componentDeletionTransformer{},
// handle component pause and resume
&componentPauseTransformer{Client: r.Client},
// handle finalizers and referenced definition labels
&componentMetaTransformer{},
// validate referenced componentDefinition objects, and build synthesized component
Expand Down
2 changes: 2 additions & 0 deletions controllers/apps/configuration/configconstraint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type ConfigConstraintReconciler struct {
// +kubebuilder:rbac:groups=apps.kubeblocks.io,resources=configconstraints/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps.kubeblocks.io,resources=configconstraints/finalizers,verbs=update

// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;update;patch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
Expand Down
5 changes: 5 additions & 0 deletions controllers/apps/configuration/configuration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
configctrl "github.com/apecloud/kubeblocks/pkg/controller/configuration"
"github.com/apecloud/kubeblocks/pkg/controller/model"
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
Expand Down Expand Up @@ -113,6 +114,10 @@ func (r *ConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
reqCtx.Log.Info("cluster is deleting, skip reconcile")
return intctrlutil.Reconciled()
}
if model.IsReconciliationPaused(config) {
reqCtx.Log.Info(fmt.Sprintf("cluster is paused, skip reconcile"))
return intctrlutil.Reconciled()
}
if fetcherTask.ClusterComObj == nil || fetcherTask.ComponentObj == nil {
return r.failWithInvalidComponent(config, reqCtx)
}
Expand Down
6 changes: 6 additions & 0 deletions controllers/apps/configuration/reconfigure_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/apecloud/kubeblocks/pkg/controller/model"
"math"
"time"

Expand Down Expand Up @@ -200,6 +201,11 @@ func (r *ReconfigureReconciler) sync(reqCtx intctrlutil.RequestCtx, configMap *c
return intctrlutil.RequeueWithErrorAndRecordEvent(configMap, r.Recorder, err, reqCtx.Log)
}

if model.IsReconciliationPaused(configMap) {
reqCtx.Log.Info(fmt.Sprintf("reconfigure is paused beacuse cluster %s is paused", resources.clusterName))
return intctrlutil.Reconciled()
}

// Assumption: It is required that the cluster must have a component.
if reconcileContext.ClusterComObj == nil {
reqCtx.Log.Info("not found component.")
Expand Down
93 changes: 93 additions & 0 deletions controllers/apps/transformer_cluster_pause.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd

This file is part of KubeBlocks project

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package apps

import (
"github.com/apecloud/kubeblocks/controllers/extensions"
"github.com/apecloud/kubeblocks/pkg/controller/component"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
)

// clusterPauseTransformer handles cluster pause and resume
type clusterPauseTransformer struct {
}

var _ graph.Transformer = &clusterPauseTransformer{}

func (t *clusterPauseTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error {
transCtx, _ := ctx.(*clusterTransformContext)
cluster := transCtx.OrigCluster
graphCli, _ := transCtx.Client.(model.GraphClient)
if model.IsReconciliationPaused(cluster) {
// set paused for all components
compList, err := component.ListClusterComponents(transCtx.Context, transCtx.Client, cluster)
if err != nil {
return err
}
notPaused := false
for _, comp := range compList {
if !model.IsReconciliationPaused(&comp) {
annotations := comp.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[extensions.ControllerPaused] = trueVal
comp.SetAnnotations(annotations)
graphCli.Update(dag, nil, &comp)
notPaused = true
}

}
if notPaused {
transCtx.EventRecorder.Eventf(cluster, corev1.EventTypeNormal, "Paused",
"cluster is paused")
}
return graph.ErrPrematureStop

} else {
// set resumed for all components
compList := &appsv1alpha1.ComponentList{}
labels := constant.GetClusterWellKnownLabels(cluster.Name)
if err := transCtx.Client.List(transCtx.Context, compList, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels)); err != nil {
return err
}
hasPaused := false

for _, comp := range compList.Items {
if model.IsReconciliationPaused(&comp) {
delete(comp.Annotations, extensions.ControllerPaused)
hasPaused = true
graphCli.Update(dag, nil, &comp)
}
}

if hasPaused {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasPaused is calculated based on component objects, so it should set the dependencies for components to those CM objects explicitly.

transCtx.EventRecorder.Eventf(cluster, corev1.EventTypeNormal, "Resumed",
"cluster is resumed")
}
return nil
}
}
125 changes: 125 additions & 0 deletions controllers/apps/transformer_component_pause.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd

This file is part of KubeBlocks project

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package apps

import (
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
"github.com/apecloud/kubeblocks/controllers/extensions"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
)

// componentDeletionTransformer handles component deletion
type componentPauseTransformer struct {
client.Client
}

var _ graph.Transformer = &componentDeletionTransformer{}

func (t *componentPauseTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error {
transCtx, _ := ctx.(*componentTransformContext)

graphCli, _ := transCtx.Client.(model.GraphClient)
comp := transCtx.Component
if model.IsReconciliationPaused(comp) {
// get instanceSet and set paused
instanceSet, err := t.getInstanceSet(transCtx, comp)
if err != nil {
return err
}
if !instanceSet.Spec.Paused {
instanceSet.Spec.Paused = true
graphCli.Update(dag, nil, instanceSet)
}
// list configmaps and set paused
configMapList, err := t.listConfigMaps(transCtx, comp)
if err != nil {
return err
}
if err != nil {
return err
}
for _, configMap := range configMapList.Items {
annotations := configMap.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[extensions.ControllerPaused] = trueVal
configMap.SetAnnotations(annotations)
graphCli.Update(dag, nil, &configMap)
}

return graph.ErrPrematureStop
} else {
// get instanceSet and cancel paused
oldInstanceSet, _ := t.getInstanceSet(transCtx, comp)
if model.IsReconciliationPaused(oldInstanceSet) {
oldInstanceSet.Spec.Paused = false
graphCli.Update(dag, nil, oldInstanceSet)
return nil
}
// list configmaps and cancel paused
configMapList, err := t.listConfigMaps(transCtx, comp)
if err != nil {
return err
}
for _, configMap := range configMapList.Items {
if model.IsReconciliationPaused(&configMap) {
delete(configMap.Annotations, extensions.ControllerPaused)
graphCli.Update(dag, configMap.DeepCopy(), &configMap)
}
}
return nil
}
}

func (t *componentPauseTransformer) getInstanceSet(transCtx *componentTransformContext, comp *appsv1alpha1.Component) (*workloads.InstanceSet, error) {
instanceName := comp.Name
instanceSet := &workloads.InstanceSet{}
err := transCtx.Client.Get(transCtx.Context, types.NamespacedName{Name: instanceName, Namespace: comp.Namespace}, instanceSet)
if err != nil {
return nil, errors.New(fmt.Sprintf("failed to get instanceSet %s: %v", instanceName, err))
}
return instanceSet, nil
}

func (t *componentPauseTransformer) listConfigMaps(transCtx *componentTransformContext, component *appsv1alpha1.Component) (*corev1.ConfigMapList, error) {
cmList := &corev1.ConfigMapList{}
ml := constant.GetComponentWellKnownLabels(component.Labels[constant.AppInstanceLabelKey], component.Labels[constant.KBAppComponentLabelKey])

listOpts := []client.ListOption{
client.InNamespace(component.Namespace),
client.MatchingLabels(ml),
}
err := t.Client.List(transCtx, cmList, listOpts...)
if err != nil {
return nil, err
}
return cmList, nil
}
4 changes: 1 addition & 3 deletions pkg/controller/instanceset/reconciler_deletion.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ func (r *deletionReconciler) PreCondition(tree *kubebuilderx.ObjectTree) *kubebu
if tree.GetRoot() == nil || !model.IsObjectDeleting(tree.GetRoot()) {
return kubebuilderx.ResultUnsatisfied
}
if model.IsReconciliationPaused(tree.GetRoot()) {
return kubebuilderx.ResultUnsatisfied
}
// deletion need to be handled while paused
return kubebuilderx.ResultSatisfied
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/model/transform_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"github.com/apecloud/kubeblocks/controllers/extensions"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
)

const trueval string = "true"

func FindRootVertex(dag *graph.DAG) (*ObjectVertex, error) {
root := dag.Root()
if root == nil {
Expand Down Expand Up @@ -176,6 +179,11 @@ func IsReconciliationPaused(object client.Object) bool {
if value.Kind() != reflect.Struct {
return false
}
if annotations := object.GetAnnotations(); annotations != nil {
if val, ok := annotations[extensions.ControllerPaused]; ok && val == trueval {
return true
}
}
spec := value.FieldByName("Spec")
if !spec.IsValid() {
return false
Expand Down
Loading