Skip to content

Commit

Permalink
feat: enhance SeataServer status with categorized error tracking (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
No-SilverBullet authored Mar 10, 2025
1 parent 3f8ed0f commit e079981
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 36 deletions.
28 changes: 25 additions & 3 deletions api/v1alpha1/seataserver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,33 @@ func (s *SeataServerSpec) withDefaults() (changed bool) {
return changed
}

type ServerErrorType string

const (
ErrorTypeK8s_SeataServer ServerErrorType = "k8s-seata-server"
ErrorTypeK8s_HeadlessService ServerErrorType = "k8s-headless-service"
ErrorTypeK8s_Pvc ServerErrorType = "k8s-pvc"
ErrorTypeK8s_StatefulSet ServerErrorType = "k8s-statefulset"
ErrorTypeRuntime ServerErrorType = "runtime"
)

func (e ServerErrorType) String() string {
return string(e)
}

// SeataServerError defines the error of SeataServer
type SeataServerError struct {
Type string `json:"type"`
Message string `json:"message"`
Timestamp metav1.Time `json:"timestamp"`
}

// SeataServerStatus defines the observed state of SeataServer
type SeataServerStatus struct {
Synchronized bool `json:"synchronized"`
Replicas int32 `json:"replicas"`
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
Synchronized bool `json:"synchronized"`
Replicas int32 `json:"replicas"`
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
Errors []SeataServerError `json:"errors,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
17 changes: 17 additions & 0 deletions config/crd/bases/operator.seata.apache.org_seataservers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,23 @@ spec:
status:
description: SeataServerStatus defines the observed state of SeataServer
properties:
errors:
items:
description: SeataServerError defines the error of SeataServer
properties:
message:
type: string
timestamp:
format: date-time
type: string
type:
type: string
required:
- message
- timestamp
- type
type: object
type: array
readyReplicas:
format: int32
type: integer
Expand Down
112 changes: 79 additions & 33 deletions controllers/seataserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,40 @@ package controllers
import (
"context"
"fmt"
"github.com/apache/seata-k8s/pkg/seata"
"github.com/apache/seata-k8s/pkg/utils"
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
"github.com/apache/seata-k8s/pkg/seata"
"github.com/apache/seata-k8s/pkg/utils"
)

// SeataServerReconciler reconciles a SeataServer object
type SeataServerReconciler struct {
client.Client
Scheme *runtime.Scheme
Log logr.Logger
}

type reconcileFun func(ctx context.Context, s *seatav1alpha1.SeataServer) error

const RequeueSeconds = 10
const (
RequeueSeconds = 10
MaxRecentErrorRecords = 5
)

//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers/status,verbs=get;update;patch
Expand All @@ -68,24 +74,19 @@ const RequeueSeconds = 10
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *SeataServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

s := &seatav1alpha1.SeataServer{}
if err := r.Get(ctx, req.NamespacedName, s); err != nil {
if errors.IsNotFound(err) {
logger.Info(fmt.Sprintf("SeataServer(%v) resource not found", req.NamespacedName))
r.recordError(ctx, req.NamespacedName, seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to get resource SeataServer(%v),err %s", req.NamespacedName, err.Error()), err)
return ctrl.Result{}, nil
}

logger.Error(err, fmt.Sprintf("Failed to get resource SeataServer(%v)", req.NamespacedName))
return ctrl.Result{}, err
}

changed := s.WithDefaults()
if changed {
logger.Info(fmt.Sprintf("Setting default values for SeataServer(%v)", req.NamespacedName))
r.Log.Info(fmt.Sprintf("Setting default values for SeataServer(%v)", req.NamespacedName))
if err := r.Client.Update(ctx, s); err != nil {
logger.Error(err, fmt.Sprintf("Failed to update resource SeataServer(%v)", req.NamespacedName))
r.recordError(ctx, req.NamespacedName, seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to update resource SeataServer(%v)", req.NamespacedName), err)
return reconcile.Result{}, err
}
return reconcile.Result{Requeue: true}, nil
Expand All @@ -102,7 +103,7 @@ func (r *SeataServerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

if !s.Status.Synchronized {
logger.Info(fmt.Sprintf("SeataServer(%v) has not been synchronized yet, requeue in %d seconds",
r.Log.Info(fmt.Sprintf("SeataServer(%v) has not been synchronized yet, requeue in %d seconds",
req.NamespacedName, RequeueSeconds))
return ctrl.Result{Requeue: true, RequeueAfter: RequeueSeconds * time.Second}, nil
}
Expand All @@ -111,10 +112,10 @@ func (r *SeataServerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context, s *seatav1alpha1.SeataServer) (err error) {
logger := log.FromContext(ctx)

svc := seata.MakeHeadlessService(s)
if err := controllerutil.SetControllerReference(s, svc, r.Scheme); err != nil {
r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace},
seatav1alpha1.ErrorTypeK8s_HeadlessService, fmt.Sprintf("Failed to set owner reference for SeataServer(%v)", s.Name), err)
return err
}
foundSvc := &apiv1.Service{}
Expand All @@ -123,31 +124,37 @@ func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context, s
Namespace: svc.Namespace,
}, foundSvc)
if err != nil && errors.IsNotFound(err) {
logger.Info(fmt.Sprintf("Creating a new SeataServer Service {%s:%s}",
r.Log.Info(fmt.Sprintf("Creating a new SeataServer Service {%s:%s}",
svc.Namespace, svc.Name))
err = r.Client.Create(ctx, svc)
if err != nil {
r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace},
seatav1alpha1.ErrorTypeK8s_HeadlessService, fmt.Sprintf("Failed to create SeataServer Service {%s:%s}",
svc.Namespace, svc.Name), err)
return err
}
return nil
} else if err != nil {
r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace},
seatav1alpha1.ErrorTypeK8s_HeadlessService, fmt.Sprintf("Failed to get SeataServer Service {%s:%s}",
svc.Namespace, svc.Name), err)
return err
} else {
logger.Info(fmt.Sprintf("Updating existing SeataServer Service {%s:%s}",
r.Log.Info(fmt.Sprintf("Updating existing SeataServer Service {%s:%s}",
foundSvc.Namespace, foundSvc.Name))

seata.SyncService(foundSvc, svc)
err = r.Client.Update(ctx, foundSvc)
if err != nil {
r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace},
seatav1alpha1.ErrorTypeK8s_HeadlessService, fmt.Sprintf("Failed to update SeataServer Service {%s:%s}",
foundSvc.Namespace, foundSvc.Name), err)
return err
}
}
return nil
}

func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s *seatav1alpha1.SeataServer) (err error) {
logger := log.FromContext(ctx)

sts := seata.MakeStatefulSet(s)
if err := controllerutil.SetControllerReference(s, sts, r.Scheme); err != nil {
return err
Expand All @@ -158,18 +165,27 @@ func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s *sea
Namespace: sts.Namespace,
}, foundSts)
if err != nil && errors.IsNotFound(err) {
logger.Info(fmt.Sprintf("Creating a new SeataServer StatefulSet {%s:%s}",
r.Log.Info(fmt.Sprintf("Creating a new SeataServer StatefulSet {%s:%s}",
sts.Namespace, sts.Name))
err = r.Client.Create(ctx, sts)
if err != nil {
r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace},
seatav1alpha1.ErrorTypeK8s_StatefulSet, fmt.Sprintf("Failed to create SeataServer StatefulSet {%s:%s}",
sts.Namespace, sts.Name), err)
return err
}
return nil
} else if err != nil {
r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace},
seatav1alpha1.ErrorTypeK8s_StatefulSet, fmt.Sprintf("Failed to get SeataServer StatefulSet {%s:%s}",
sts.Namespace, sts.Name), err)
return err
} else {
err = r.updateStatefulSet(ctx, s, foundSts, sts)
if err != nil {
r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace},
seatav1alpha1.ErrorTypeK8s_StatefulSet, fmt.Sprintf("Failed to update SeataServer StatefulSet {%s:%s}",
foundSts.Namespace, foundSts.Name), err)
return err
}
}
Expand All @@ -179,9 +195,8 @@ func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s *sea

func (r *SeataServerReconciler) updateStatefulSet(ctx context.Context, s *seatav1alpha1.SeataServer,
foundSts *appsv1.StatefulSet, sts *appsv1.StatefulSet) (err error) {
logger := log.FromContext(ctx)

logger.Info(fmt.Sprintf("Updating existing SeataServer StatefulSet {%s:%s}", foundSts.Namespace, foundSts.Name))
r.Log.Info(fmt.Sprintf("Updating existing SeataServer StatefulSet {%s:%s}", foundSts.Namespace, foundSts.Name))
seata.SyncStatefulSet(foundSts, sts)

err = r.Client.Update(ctx, foundSts)
Expand All @@ -202,21 +217,21 @@ func (r *SeataServerReconciler) updateStatefulSet(ctx context.Context, s *seatav
if env.Name == "console.user.username" {
username, err = seata.FetchEnvVar(ctx, r.Client, s, env)
if err != nil {
logger.Error(err, "Failed to fetch Env console.user.username")
r.Log.Error(err, "Failed to fetch Env console.user.username")
}
}
if env.Name == "console.user.password" {
password, err = seata.FetchEnvVar(ctx, r.Client, s, env)
if err != nil {
logger.Error(err, "Failed to fetch Env console.user.password")
r.Log.Error(err, "Failed to fetch Env console.user.password")
}
}
}
if err = seata.SyncRaftCluster(ctx, s, username, password); err != nil {
logger.Error(err, "Failed to synchronize the raft cluster")
r.Log.Error(err, "Failed to synchronize the raft cluster")
s.Status.Synchronized = false
} else {
logger.Info("Successfully synchronized the raft cluster")
r.Log.Info("Successfully synchronized the raft cluster")
s.Status.Synchronized = true
}
}
Expand All @@ -231,17 +246,23 @@ func (r *SeataServerReconciler) reconcileFinalizers(ctx context.Context, instanc
if !utils.ContainsString(instance.ObjectMeta.Finalizers, utils.SeataFinalizer) {
instance.ObjectMeta.Finalizers = append(instance.ObjectMeta.Finalizers, utils.SeataFinalizer)
if err = r.Client.Update(ctx, instance); err != nil {
r.recordError(ctx, types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace},
seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to update resource SeataServer(%v) Finalizers", instance.Name), err)
return err
}
}
return r.cleanupOrphanPVCs(ctx, instance)
} else {
if utils.ContainsString(instance.ObjectMeta.Finalizers, utils.SeataFinalizer) {
if err = r.cleanUpAllPVCs(ctx, instance); err != nil {
r.recordError(ctx, types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace},
seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to delete resource SeataServer(%v) PVCs", instance.Name), err)
return err
}
instance.ObjectMeta.Finalizers = utils.RemoveString(instance.ObjectMeta.Finalizers, utils.SeataFinalizer)
if err = r.Client.Update(ctx, instance); err != nil {
r.recordError(ctx, types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace},
seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to update resource SeataServer(%v) Finalizers", instance.Name), err)
return err
}
}
Expand All @@ -250,14 +271,13 @@ func (r *SeataServerReconciler) reconcileFinalizers(ctx context.Context, instanc
}

func (r *SeataServerReconciler) cleanupOrphanPVCs(ctx context.Context, s *seatav1alpha1.SeataServer) (err error) {
logger := log.FromContext(ctx)
// this check should make sure we do not delete the PVCs before the STS has scaled down
if s.Status.ReadyReplicas == s.Spec.Replicas {
pvcCount, err := r.getPVCCount(ctx, s)
if err != nil {
return err
}
logger.Info(fmt.Sprintf("cleanupOrphanPVCs with PVC count %d and ReadyReplicas count %d", pvcCount, s.Status.ReadyReplicas))
r.Log.Info(fmt.Sprintf("cleanupOrphanPVCs with PVC count %d and ReadyReplicas count %d", pvcCount, s.Status.ReadyReplicas))
if pvcCount > int(s.Spec.Replicas) {
pvcList, err := r.getPVCList(ctx, s)
if err != nil {
Expand Down Expand Up @@ -286,6 +306,9 @@ func (r *SeataServerReconciler) getPVCList(ctx context.Context, s *seatav1alpha1
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{"app": s.GetName(), "uid": string(s.UID)},
})
if err != nil {
return pvList, err
}
pvclistOps := &client.ListOptions{
Namespace: s.Namespace,
LabelSelector: selector,
Expand All @@ -307,17 +330,16 @@ func (r *SeataServerReconciler) cleanUpAllPVCs(ctx context.Context, s *seatav1al
}

func (r *SeataServerReconciler) deletePVC(ctx context.Context, pvcItem apiv1.PersistentVolumeClaim) {
logger := log.FromContext(ctx)
pvcDelete := &apiv1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcItem.Name,
Namespace: pvcItem.Namespace,
},
}
logger.Info(fmt.Sprintf("Deleting PVC with name %s", pvcItem.Name))
r.Log.Info(fmt.Sprintf("Deleting PVC with name %s", pvcItem.Name))
err := r.Client.Delete(ctx, pvcDelete)
if err != nil {
logger.Error(err, fmt.Sprintf("Error deleting PVC with name %s", pvcDelete))
r.Log.Error(err, fmt.Sprintf("Error deleting PVC with name %s", pvcDelete))
}
}

Expand All @@ -330,3 +352,27 @@ func (r *SeataServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}

// update SeataServer error status
func (r *SeataServerReconciler) recordError(ctx context.Context, prKey client.ObjectKey, errorType seatav1alpha1.ServerErrorType, errMsg string, err error) error {
r.Log.Error(err, errMsg)
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
newError := seatav1alpha1.SeataServerError{
Type: errorType.String(),
Message: errMsg,
Timestamp: metav1.Now(),
}
toUpdate := seatav1alpha1.SeataServer{}
err := r.Get(ctx, prKey, &toUpdate)
if err != nil {
r.Log.Error(err, "get seata server object error ", "prkey", prKey)
return err
}
// save recently `MaxErrorRecords` error
if len(toUpdate.Status.Errors) >= MaxRecentErrorRecords {
toUpdate.Status.Errors = toUpdate.Status.Errors[1:]
}
toUpdate.Status.Errors = append(toUpdate.Status.Errors, newError)
return r.Status().Update(ctx, &toUpdate)
})
}

0 comments on commit e079981

Please sign in to comment.