From e079981215a131e95d81b18a263271ca49e98a29 Mon Sep 17 00:00:00 2001 From: "xinfan.wu" <13708123240@163.com> Date: Mon, 10 Mar 2025 10:38:07 +0800 Subject: [PATCH] feat: enhance SeataServer status with categorized error tracking (#26) --- api/v1alpha1/seataserver_types.go | 28 ++++- ...perator.seata.apache.org_seataservers.yaml | 17 +++ controllers/seataserver_controller.go | 112 ++++++++++++------ 3 files changed, 121 insertions(+), 36 deletions(-) diff --git a/api/v1alpha1/seataserver_types.go b/api/v1alpha1/seataserver_types.go index 6fc2ccf..50d2a92 100644 --- a/api/v1alpha1/seataserver_types.go +++ b/api/v1alpha1/seataserver_types.go @@ -76,11 +76,33 @@ func (s *SeataServerSpec) withDefaults() (changed bool) { return changed } +type ServerErrorType string + +const ( + ErrorTypeK8s_SeataServer ServerErrorType = "k8s-seata-server" + ErrorTypeK8s_HeadlessService ServerErrorType = "k8s-headless-service" + ErrorTypeK8s_Pvc ServerErrorType = "k8s-pvc" + ErrorTypeK8s_StatefulSet ServerErrorType = "k8s-statefulset" + ErrorTypeRuntime ServerErrorType = "runtime" +) + +func (e ServerErrorType) String() string { + return string(e) +} + +// SeataServerError defines the error of SeataServer +type SeataServerError struct { + Type string `json:"type"` + Message string `json:"message"` + Timestamp metav1.Time `json:"timestamp"` +} + // SeataServerStatus defines the observed state of SeataServer type SeataServerStatus struct { - Synchronized bool `json:"synchronized"` - Replicas int32 `json:"replicas"` - ReadyReplicas int32 `json:"readyReplicas,omitempty"` + Synchronized bool `json:"synchronized"` + Replicas int32 `json:"replicas"` + ReadyReplicas int32 `json:"readyReplicas,omitempty"` + Errors []SeataServerError `json:"errors,omitempty"` } //+kubebuilder:object:root=true diff --git a/config/crd/bases/operator.seata.apache.org_seataservers.yaml b/config/crd/bases/operator.seata.apache.org_seataservers.yaml index 92e51dc..a43d693 100644 --- a/config/crd/bases/operator.seata.apache.org_seataservers.yaml +++ b/config/crd/bases/operator.seata.apache.org_seataservers.yaml @@ -461,6 +461,23 @@ spec: status: description: SeataServerStatus defines the observed state of SeataServer properties: + errors: + items: + description: SeataServerError defines the error of SeataServer + properties: + message: + type: string + timestamp: + format: date-time + type: string + type: + type: string + required: + - message + - timestamp + - type + type: object + type: array readyReplicas: format: int32 type: integer diff --git a/controllers/seataserver_controller.go b/controllers/seataserver_controller.go index 0e83928..0e02335 100644 --- a/controllers/seataserver_controller.go +++ b/controllers/seataserver_controller.go @@ -20,34 +20,40 @@ package controllers import ( "context" "fmt" - "github.com/apache/seata-k8s/pkg/seata" - "github.com/apache/seata-k8s/pkg/utils" + "time" + + "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "time" seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1" + "github.com/apache/seata-k8s/pkg/seata" + "github.com/apache/seata-k8s/pkg/utils" ) // SeataServerReconciler reconciles a SeataServer object type SeataServerReconciler struct { client.Client Scheme *runtime.Scheme + Log logr.Logger } type reconcileFun func(ctx context.Context, s *seatav1alpha1.SeataServer) error -const RequeueSeconds = 10 +const ( + RequeueSeconds = 10 + MaxRecentErrorRecords = 5 +) //+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers/status,verbs=get;update;patch @@ -68,24 +74,19 @@ const RequeueSeconds = 10 // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.14.1/pkg/reconcile func (r *SeataServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) - s := &seatav1alpha1.SeataServer{} if err := r.Get(ctx, req.NamespacedName, s); err != nil { if errors.IsNotFound(err) { - logger.Info(fmt.Sprintf("SeataServer(%v) resource not found", req.NamespacedName)) + r.recordError(ctx, req.NamespacedName, seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to get resource SeataServer(%v),err %s", req.NamespacedName, err.Error()), err) return ctrl.Result{}, nil } - - logger.Error(err, fmt.Sprintf("Failed to get resource SeataServer(%v)", req.NamespacedName)) - return ctrl.Result{}, err } changed := s.WithDefaults() if changed { - logger.Info(fmt.Sprintf("Setting default values for SeataServer(%v)", req.NamespacedName)) + r.Log.Info(fmt.Sprintf("Setting default values for SeataServer(%v)", req.NamespacedName)) if err := r.Client.Update(ctx, s); err != nil { - logger.Error(err, fmt.Sprintf("Failed to update resource SeataServer(%v)", req.NamespacedName)) + r.recordError(ctx, req.NamespacedName, seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to update resource SeataServer(%v)", req.NamespacedName), err) return reconcile.Result{}, err } return reconcile.Result{Requeue: true}, nil @@ -102,7 +103,7 @@ func (r *SeataServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if !s.Status.Synchronized { - logger.Info(fmt.Sprintf("SeataServer(%v) has not been synchronized yet, requeue in %d seconds", + r.Log.Info(fmt.Sprintf("SeataServer(%v) has not been synchronized yet, requeue in %d seconds", req.NamespacedName, RequeueSeconds)) return ctrl.Result{Requeue: true, RequeueAfter: RequeueSeconds * time.Second}, nil } @@ -111,10 +112,10 @@ func (r *SeataServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) } func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context, s *seatav1alpha1.SeataServer) (err error) { - logger := log.FromContext(ctx) - svc := seata.MakeHeadlessService(s) if err := controllerutil.SetControllerReference(s, svc, r.Scheme); err != nil { + r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace}, + seatav1alpha1.ErrorTypeK8s_HeadlessService, fmt.Sprintf("Failed to set owner reference for SeataServer(%v)", s.Name), err) return err } foundSvc := &apiv1.Service{} @@ -123,22 +124,30 @@ func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context, s Namespace: svc.Namespace, }, foundSvc) if err != nil && errors.IsNotFound(err) { - logger.Info(fmt.Sprintf("Creating a new SeataServer Service {%s:%s}", + r.Log.Info(fmt.Sprintf("Creating a new SeataServer Service {%s:%s}", svc.Namespace, svc.Name)) err = r.Client.Create(ctx, svc) if err != nil { + r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace}, + seatav1alpha1.ErrorTypeK8s_HeadlessService, fmt.Sprintf("Failed to create SeataServer Service {%s:%s}", + svc.Namespace, svc.Name), err) return err } return nil } else if err != nil { + r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace}, + seatav1alpha1.ErrorTypeK8s_HeadlessService, fmt.Sprintf("Failed to get SeataServer Service {%s:%s}", + svc.Namespace, svc.Name), err) return err } else { - logger.Info(fmt.Sprintf("Updating existing SeataServer Service {%s:%s}", + r.Log.Info(fmt.Sprintf("Updating existing SeataServer Service {%s:%s}", foundSvc.Namespace, foundSvc.Name)) - seata.SyncService(foundSvc, svc) err = r.Client.Update(ctx, foundSvc) if err != nil { + r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace}, + seatav1alpha1.ErrorTypeK8s_HeadlessService, fmt.Sprintf("Failed to update SeataServer Service {%s:%s}", + foundSvc.Namespace, foundSvc.Name), err) return err } } @@ -146,8 +155,6 @@ func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context, s } func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s *seatav1alpha1.SeataServer) (err error) { - logger := log.FromContext(ctx) - sts := seata.MakeStatefulSet(s) if err := controllerutil.SetControllerReference(s, sts, r.Scheme); err != nil { return err @@ -158,18 +165,27 @@ func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s *sea Namespace: sts.Namespace, }, foundSts) if err != nil && errors.IsNotFound(err) { - logger.Info(fmt.Sprintf("Creating a new SeataServer StatefulSet {%s:%s}", + r.Log.Info(fmt.Sprintf("Creating a new SeataServer StatefulSet {%s:%s}", sts.Namespace, sts.Name)) err = r.Client.Create(ctx, sts) if err != nil { + r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace}, + seatav1alpha1.ErrorTypeK8s_StatefulSet, fmt.Sprintf("Failed to create SeataServer StatefulSet {%s:%s}", + sts.Namespace, sts.Name), err) return err } return nil } else if err != nil { + r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace}, + seatav1alpha1.ErrorTypeK8s_StatefulSet, fmt.Sprintf("Failed to get SeataServer StatefulSet {%s:%s}", + sts.Namespace, sts.Name), err) return err } else { err = r.updateStatefulSet(ctx, s, foundSts, sts) if err != nil { + r.recordError(ctx, types.NamespacedName{Name: s.Name, Namespace: s.Namespace}, + seatav1alpha1.ErrorTypeK8s_StatefulSet, fmt.Sprintf("Failed to update SeataServer StatefulSet {%s:%s}", + foundSts.Namespace, foundSts.Name), err) return err } } @@ -179,9 +195,8 @@ func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s *sea func (r *SeataServerReconciler) updateStatefulSet(ctx context.Context, s *seatav1alpha1.SeataServer, foundSts *appsv1.StatefulSet, sts *appsv1.StatefulSet) (err error) { - logger := log.FromContext(ctx) - logger.Info(fmt.Sprintf("Updating existing SeataServer StatefulSet {%s:%s}", foundSts.Namespace, foundSts.Name)) + r.Log.Info(fmt.Sprintf("Updating existing SeataServer StatefulSet {%s:%s}", foundSts.Namespace, foundSts.Name)) seata.SyncStatefulSet(foundSts, sts) err = r.Client.Update(ctx, foundSts) @@ -202,21 +217,21 @@ func (r *SeataServerReconciler) updateStatefulSet(ctx context.Context, s *seatav if env.Name == "console.user.username" { username, err = seata.FetchEnvVar(ctx, r.Client, s, env) if err != nil { - logger.Error(err, "Failed to fetch Env console.user.username") + r.Log.Error(err, "Failed to fetch Env console.user.username") } } if env.Name == "console.user.password" { password, err = seata.FetchEnvVar(ctx, r.Client, s, env) if err != nil { - logger.Error(err, "Failed to fetch Env console.user.password") + r.Log.Error(err, "Failed to fetch Env console.user.password") } } } if err = seata.SyncRaftCluster(ctx, s, username, password); err != nil { - logger.Error(err, "Failed to synchronize the raft cluster") + r.Log.Error(err, "Failed to synchronize the raft cluster") s.Status.Synchronized = false } else { - logger.Info("Successfully synchronized the raft cluster") + r.Log.Info("Successfully synchronized the raft cluster") s.Status.Synchronized = true } } @@ -231,6 +246,8 @@ func (r *SeataServerReconciler) reconcileFinalizers(ctx context.Context, instanc if !utils.ContainsString(instance.ObjectMeta.Finalizers, utils.SeataFinalizer) { instance.ObjectMeta.Finalizers = append(instance.ObjectMeta.Finalizers, utils.SeataFinalizer) if err = r.Client.Update(ctx, instance); err != nil { + r.recordError(ctx, types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, + seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to update resource SeataServer(%v) Finalizers", instance.Name), err) return err } } @@ -238,10 +255,14 @@ func (r *SeataServerReconciler) reconcileFinalizers(ctx context.Context, instanc } else { if utils.ContainsString(instance.ObjectMeta.Finalizers, utils.SeataFinalizer) { if err = r.cleanUpAllPVCs(ctx, instance); err != nil { + r.recordError(ctx, types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, + seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to delete resource SeataServer(%v) PVCs", instance.Name), err) return err } instance.ObjectMeta.Finalizers = utils.RemoveString(instance.ObjectMeta.Finalizers, utils.SeataFinalizer) if err = r.Client.Update(ctx, instance); err != nil { + r.recordError(ctx, types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, + seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to update resource SeataServer(%v) Finalizers", instance.Name), err) return err } } @@ -250,14 +271,13 @@ func (r *SeataServerReconciler) reconcileFinalizers(ctx context.Context, instanc } func (r *SeataServerReconciler) cleanupOrphanPVCs(ctx context.Context, s *seatav1alpha1.SeataServer) (err error) { - logger := log.FromContext(ctx) // this check should make sure we do not delete the PVCs before the STS has scaled down if s.Status.ReadyReplicas == s.Spec.Replicas { pvcCount, err := r.getPVCCount(ctx, s) if err != nil { return err } - logger.Info(fmt.Sprintf("cleanupOrphanPVCs with PVC count %d and ReadyReplicas count %d", pvcCount, s.Status.ReadyReplicas)) + r.Log.Info(fmt.Sprintf("cleanupOrphanPVCs with PVC count %d and ReadyReplicas count %d", pvcCount, s.Status.ReadyReplicas)) if pvcCount > int(s.Spec.Replicas) { pvcList, err := r.getPVCList(ctx, s) if err != nil { @@ -286,6 +306,9 @@ func (r *SeataServerReconciler) getPVCList(ctx context.Context, s *seatav1alpha1 selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ MatchLabels: map[string]string{"app": s.GetName(), "uid": string(s.UID)}, }) + if err != nil { + return pvList, err + } pvclistOps := &client.ListOptions{ Namespace: s.Namespace, LabelSelector: selector, @@ -307,17 +330,16 @@ func (r *SeataServerReconciler) cleanUpAllPVCs(ctx context.Context, s *seatav1al } func (r *SeataServerReconciler) deletePVC(ctx context.Context, pvcItem apiv1.PersistentVolumeClaim) { - logger := log.FromContext(ctx) pvcDelete := &apiv1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: pvcItem.Name, Namespace: pvcItem.Namespace, }, } - logger.Info(fmt.Sprintf("Deleting PVC with name %s", pvcItem.Name)) + r.Log.Info(fmt.Sprintf("Deleting PVC with name %s", pvcItem.Name)) err := r.Client.Delete(ctx, pvcDelete) if err != nil { - logger.Error(err, fmt.Sprintf("Error deleting PVC with name %s", pvcDelete)) + r.Log.Error(err, fmt.Sprintf("Error deleting PVC with name %s", pvcDelete)) } } @@ -330,3 +352,27 @@ func (r *SeataServerReconciler) SetupWithManager(mgr ctrl.Manager) error { WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } + +// update SeataServer error status +func (r *SeataServerReconciler) recordError(ctx context.Context, prKey client.ObjectKey, errorType seatav1alpha1.ServerErrorType, errMsg string, err error) error { + r.Log.Error(err, errMsg) + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + newError := seatav1alpha1.SeataServerError{ + Type: errorType.String(), + Message: errMsg, + Timestamp: metav1.Now(), + } + toUpdate := seatav1alpha1.SeataServer{} + err := r.Get(ctx, prKey, &toUpdate) + if err != nil { + r.Log.Error(err, "get seata server object error ", "prkey", prKey) + return err + } + // save recently `MaxErrorRecords` error + if len(toUpdate.Status.Errors) >= MaxRecentErrorRecords { + toUpdate.Status.Errors = toUpdate.Status.Errors[1:] + } + toUpdate.Status.Errors = append(toUpdate.Status.Errors, newError) + return r.Status().Update(ctx, &toUpdate) + }) +}