Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StatefulSet test #1118

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ GO_CMD ?= go
TEMPLATE_KUSTOMIZE ?= "deploy-kustomize.yaml"

# ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary.
ENVTEST_K8S_VERSION = 1.28
ENVTEST_K8S_VERSION = 1.31

# Image URL to use all building/pushing image targets
ifeq ($(findstring -minikube,${MAKECMDGOALS}), -minikube)
Expand Down
6 changes: 6 additions & 0 deletions apis/cloud.redhat.com/v1alpha1/clowdapp_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ type Deployment struct {
DeploymentStrategy *DeploymentStrategy `json:"deploymentStrategy,omitempty"`

Metadata DeploymentMetadata `json:"metadata,omitempty"`

Stateful StatefulSpec `json:"statefulSpec,omitempty"`
}

type StatefulSpec struct {
Enabled bool `json:"enabled,omitempty"`
}

func (d *Deployment) GetReplicaCount() *int32 {
Expand Down
1 change: 1 addition & 0 deletions controllers/cloud.redhat.com/clowdapp_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (r *ClowdAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
watchers := []Watcher{
{obj: &apps.Deployment{}, filter: deploymentFilter},
{obj: &core.Service{}, filter: generationOnlyFilter},
{obj: &apps.StatefulSet{}, filter: statefulSetFilter},
{obj: &core.ConfigMap{}, filter: generationOnlyFilter},
{obj: &core.Secret{}, filter: alwaysFilter},
}
Expand Down
1 change: 1 addition & 0 deletions controllers/cloud.redhat.com/clowdapp_reconciliation.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ var applyOrder = []string{
"Service",
"Secret",
"Deployment",
"StatefulSet",
"Job",
"CronJob",
"ScaledObject",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (r *ClowdEnvironmentReconciler) SetupWithManager(mgr ctrl.Manager) error {
{obj: &apps.Deployment{}, filter: deploymentFilter},
{obj: &core.Service{}, filter: alwaysFilter},
{obj: &core.Secret{}, filter: alwaysFilter},
{obj: &apps.StatefulSet{}, filter: statefulSetFilter},
}

if clowderconfig.LoadedConfig.Features.WatchStrimziResources {
Expand Down
19 changes: 19 additions & 0 deletions controllers/cloud.redhat.com/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ func deploymentUpdateFunc(e event.UpdateEvent) bool {
return false
}

func statefulSetUpdateFunc(e event.UpdateEvent) bool {
objOld := e.ObjectOld.(*apps.StatefulSet)
objNew := e.ObjectNew.(*apps.StatefulSet)
if objNew.GetGeneration() != objOld.GetGeneration() {
return true
}
if (objOld.Status.AvailableReplicas != objNew.Status.AvailableReplicas) && (objNew.Status.AvailableReplicas == objNew.Status.ReadyReplicas) {
return true
}
if (objOld.Status.AvailableReplicas == objOld.Status.ReadyReplicas) && (objNew.Status.AvailableReplicas != objNew.Status.ReadyReplicas) {
return true
}
return false
}

func kafkaUpdateFunc(e event.UpdateEvent) bool {
objOld := e.ObjectOld.(*strimzi.Kafka)
objNew := e.ObjectNew.(*strimzi.Kafka)
Expand Down Expand Up @@ -114,6 +129,10 @@ func deploymentFilter(logr logr.Logger, ctrlName string) HandlerFuncs {
return genFilterFunc(deploymentUpdateFunc, logr, ctrlName)
}

func statefulSetFilter(logr logr.Logger, ctrlName string) HandlerFuncs {
return genFilterFunc(statefulSetUpdateFunc, logr, ctrlName)
}

func kafkaFilter(logr logr.Logger, ctrlName string) HandlerFuncs {
return genFilterFunc(kafkaUpdateFunc, logr, ctrlName)
}
Expand Down
13 changes: 7 additions & 6 deletions controllers/cloud.redhat.com/providers/autoscaler/keda.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers"
deployProvider "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers/deployment"
keda "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
apps "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func makeAutoScalers(deployment *crd.Deployment, app *crd.ClowdApp, c *config.AppConfig, asp *providers.Provider) error {
Expand All @@ -19,12 +19,13 @@ func makeAutoScalers(deployment *crd.Deployment, app *crd.ClowdApp, c *config.Ap
return err
}

d := &apps.Deployment{}
if err := asp.Cache.Get(deployProvider.CoreDeployment, d, nn); err != nil {
obj, err := deployProvider.GetClientObject(deployment, asp.Cache, nn)

if err != nil {
return err
}

initAutoScaler(asp.Env, app, d, s, nn, deployment, c)
initAutoScaler(asp.Env, app, obj, s, nn, deployment, c)

return asp.Cache.Update(CoreAutoScaler, s)
}
Expand All @@ -34,14 +35,14 @@ func ProvideKedaAutoScaler(app *crd.ClowdApp, c *config.AppConfig, asp *provider
return err
}

func initAutoScaler(env *crd.ClowdEnvironment, app *crd.ClowdApp, d *apps.Deployment, s *keda.ScaledObject, nn types.NamespacedName, deployment *crd.Deployment, c *config.AppConfig) {
func initAutoScaler(env *crd.ClowdEnvironment, app *crd.ClowdApp, obj client.Object, s *keda.ScaledObject, nn types.NamespacedName, deployment *crd.Deployment, c *config.AppConfig) {
labels := app.GetLabels()
labels["pod"] = nn.Name
app.SetObjectMeta(s, crd.Name(nn.Name), crd.Labels(labels))

// Set up the watcher to watch the Deployment we created earlier.
scalerSpec := keda.ScaledObjectSpec{
ScaleTargetRef: &keda.ScaleTarget{Name: d.Name, Kind: d.Kind, APIVersion: d.APIVersion},
ScaleTargetRef: &keda.ScaleTarget{Name: obj.GetName(), Kind: obj.GetObjectKind().GroupVersionKind().Kind, APIVersion: obj.GetObjectKind().GroupVersionKind().GroupVersion().String()},
PollingInterval: deployment.AutoScaler.PollingInterval,
CooldownPeriod: deployment.AutoScaler.CooldownPeriod,
Advanced: deployment.AutoScaler.Advanced,
Expand Down
55 changes: 28 additions & 27 deletions controllers/cloud.redhat.com/providers/autoscaler/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"fmt"

res "k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/controller-runtime/pkg/client"

crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers"
deployProvider "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers/deployment"
apps "k8s.io/api/apps/v1"
v2 "k8s.io/api/autoscaling/v2"
v1 "k8s.io/api/core/v1"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -25,11 +25,11 @@ const (

// Creates a simple HPA in the resource cache for the deployment and ClowdApp
func ProvideSimpleAutoScaler(app *crd.ClowdApp, appConfig *config.AppConfig, sp *providers.Provider, deployment crd.Deployment) error {
cachedDeployment, err := getDeploymentFromCache(&deployment, app, sp)
coreObject, err := getcoreObjectFromCache(&deployment, app, sp)
if err != nil {
return errors.Wrap("Could not get deployment from resource cache", err)
}
hpaMaker := newSimpleHPAMaker(&deployment, app, appConfig, cachedDeployment)
hpaMaker := newSimpleHPAMaker(&deployment, app, appConfig, coreObject)
hpaResource := hpaMaker.getResource()

err = cacheAutoscaler(app, sp, deployment, hpaResource)
Expand All @@ -47,32 +47,33 @@ func cacheAutoscaler(app *crd.ClowdApp, sp *providers.Provider, deployment crd.D
}

// Get the core apps.Deployment from the provider cache
func getDeploymentFromCache(clowdDeployment *crd.Deployment, app *crd.ClowdApp, sp *providers.Provider) (*apps.Deployment, error) {
func getcoreObjectFromCache(clowdDeployment *crd.Deployment, app *crd.ClowdApp, sp *providers.Provider) (client.Object, error) {
nn := app.GetDeploymentNamespacedName(clowdDeployment)
d := &apps.Deployment{}
if err := sp.Cache.Get(deployProvider.CoreDeployment, d, nn); err != nil {
return d, err

obj, err := deployProvider.GetClientObject(clowdDeployment, sp.Cache, nn)
if err != nil {
return nil, err
}
return d, nil
return obj, nil
}

// Factory for the simpleHPAMaker
func newSimpleHPAMaker(deployment *crd.Deployment, app *crd.ClowdApp, appConfig *config.AppConfig, coreDeployment *apps.Deployment) simpleHPAMaker {
func newSimpleHPAMaker(deployment *crd.Deployment, app *crd.ClowdApp, appConfig *config.AppConfig, coreObject client.Object) simpleHPAMaker {
return simpleHPAMaker{
deployment: deployment,
app: app,
appConfig: appConfig,
coreDeployment: coreDeployment,
deployment: deployment,
app: app,
appConfig: appConfig,
coreObject: coreObject,
}
}

// Creates a simple HPA and stores references
// to the resources and dependencies it requires
type simpleHPAMaker struct {
deployment *crd.Deployment
app *crd.ClowdApp
appConfig *config.AppConfig
coreDeployment *apps.Deployment
deployment *crd.Deployment
app *crd.ClowdApp
appConfig *config.AppConfig
coreObject client.Object
}

// Constructs the HPA in 2 parts: the HPA itself and the metric spec
Expand All @@ -96,13 +97,13 @@ func (d *simpleHPAMaker) makeHPA() v2.HorizontalPodAutoscaler {
UID: d.app.UID,
}},
Name: name,
Namespace: d.coreDeployment.Namespace,
Namespace: d.coreObject.GetNamespace(),
},
Spec: v2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: v2.CrossVersionObjectReference{
APIVersion: DeploymentAPIVersion,
Kind: DeploymentKind,
Name: d.coreDeployment.Name,
Name: d.coreObject.GetName(),
},
MinReplicas: &d.deployment.AutoScalerSimple.Replicas.Min,
MaxReplicas: d.deployment.AutoScalerSimple.Replicas.Max,
Expand All @@ -116,43 +117,43 @@ func (d *simpleHPAMaker) makeMetricsSpecs() []v2.MetricSpec {
metricsSpecs := []v2.MetricSpec{}

if d.deployment.AutoScalerSimple.RAM.ScaleAtUtilization != 0 {
metricsSpec := d.makeAverageUtilizationMetricSpec(v1.ResourceMemory, d.deployment.AutoScalerSimple.RAM.ScaleAtUtilization)
metricsSpec := d.makeAverageUtilizationMetricSpec(core.ResourceMemory, d.deployment.AutoScalerSimple.RAM.ScaleAtUtilization)
metricsSpecs = append(metricsSpecs, metricsSpec)
}
if d.deployment.AutoScalerSimple.RAM.ScaleAtValue != "" {
threshold := res.MustParse(d.deployment.AutoScalerSimple.RAM.ScaleAtValue)
metricsSpec := d.makeAverageValueMetricSpec(v1.ResourceMemory, threshold)
metricsSpec := d.makeAverageValueMetricSpec(core.ResourceMemory, threshold)
metricsSpecs = append(metricsSpecs, metricsSpec)
}

if d.deployment.AutoScalerSimple.CPU.ScaleAtUtilization != 0 {
metricsSpec := d.makeAverageUtilizationMetricSpec(v1.ResourceCPU, d.deployment.AutoScalerSimple.CPU.ScaleAtUtilization)
metricsSpec := d.makeAverageUtilizationMetricSpec(core.ResourceCPU, d.deployment.AutoScalerSimple.CPU.ScaleAtUtilization)
metricsSpecs = append(metricsSpecs, metricsSpec)
}
if d.deployment.AutoScalerSimple.CPU.ScaleAtValue != "" {
threshold := res.MustParse(d.deployment.AutoScalerSimple.CPU.ScaleAtValue)
metricsSpec := d.makeAverageValueMetricSpec(v1.ResourceCPU, threshold)
metricsSpec := d.makeAverageValueMetricSpec(core.ResourceCPU, threshold)
metricsSpecs = append(metricsSpecs, metricsSpec)
}

return metricsSpecs
}

func (d *simpleHPAMaker) makeAverageValueMetricSpec(resource v1.ResourceName, threshold res.Quantity) v2.MetricSpec {
func (d *simpleHPAMaker) makeAverageValueMetricSpec(resource core.ResourceName, threshold res.Quantity) v2.MetricSpec {
ms := d.makeBasicMetricSpec(resource)
ms.Resource.Target.Type = v2.AverageValueMetricType
ms.Resource.Target.AverageValue = &threshold
return ms
}

func (d *simpleHPAMaker) makeAverageUtilizationMetricSpec(resource v1.ResourceName, threshold int32) v2.MetricSpec {
func (d *simpleHPAMaker) makeAverageUtilizationMetricSpec(resource core.ResourceName, threshold int32) v2.MetricSpec {
ms := d.makeBasicMetricSpec(resource)
ms.Resource.Target.Type = v2.UtilizationMetricType
ms.Resource.Target.AverageUtilization = &threshold
return ms
}

func (d *simpleHPAMaker) makeBasicMetricSpec(resource v1.ResourceName) v2.MetricSpec {
func (d *simpleHPAMaker) makeBasicMetricSpec(resource core.ResourceName) v2.MetricSpec {
ms := v2.MetricSpec{
Type: v2.MetricSourceType("Resource"),
Resource: &v2.ResourceMetricSource{
Expand Down
34 changes: 24 additions & 10 deletions controllers/cloud.redhat.com/providers/confighash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func (ch *confighashProvider) volSecret(app *crd.ClowdApp, volume core.Volume) e
return ch.HashCache.AddClowdObjectToObject(app, sec)
}

func (ch *confighashProvider) iterateEnvVars(app *crd.ClowdApp, deployment apps.Deployment) error {
for _, cont := range deployment.Spec.Template.Spec.Containers {
func (ch *confighashProvider) iterateEnvVars(app *crd.ClowdApp, podTemplate core.PodTemplateSpec) error {
for _, cont := range podTemplate.Spec.Containers {
for _, env := range cont.Env {
if err := ch.envConfigMap(app, env); err != nil {
return err
Expand All @@ -120,8 +120,8 @@ func (ch *confighashProvider) iterateEnvVars(app *crd.ClowdApp, deployment apps.
return nil
}

func (ch *confighashProvider) iterateVolumes(app *crd.ClowdApp, deployment apps.Deployment) error {
for _, volume := range deployment.Spec.Template.Spec.Volumes {
func (ch *confighashProvider) iterateVolumes(app *crd.ClowdApp, podTemplate core.PodTemplateSpec) error {
for _, volume := range podTemplate.Spec.Volumes {
if err := ch.volConfigMap(app, volume); err != nil {
return err
}
Expand All @@ -133,13 +133,13 @@ func (ch *confighashProvider) iterateVolumes(app *crd.ClowdApp, deployment apps.
return nil
}

func (ch *confighashProvider) updateHashCache(dList *apps.DeploymentList, app *crd.ClowdApp) error {
for _, deployment := range dList.Items {
deploy := deployment
if err := ch.iterateEnvVars(app, deploy); err != nil {
func (ch *confighashProvider) updateHashCache(podTemplateSpecList *[]core.PodTemplateSpec, app *crd.ClowdApp) error {
for _, podTemplate := range *podTemplateSpecList {
pt := podTemplate
if err := ch.iterateEnvVars(app, pt); err != nil {
return err
}
if err := ch.iterateVolumes(app, deploy); err != nil {
if err := ch.iterateVolumes(app, pt); err != nil {
return err
}
}
Expand All @@ -161,7 +161,21 @@ func (ch *confighashProvider) persistConfig(app *crd.ClowdApp) (string, error) {
return "", err
}

if err := ch.updateHashCache(&dList, app); err != nil {
ssList := apps.StatefulSetList{}
if err := ch.Cache.List(deployProvider.CoreStatefulSet, &ssList); err != nil {
return "", err
}

podTemplateList := []core.PodTemplateSpec{}
for _, item := range dList.Items {
podTemplateList = append(podTemplateList, item.Spec.Template)
}

for _, item := range ssList.Items {
podTemplateList = append(podTemplateList, item.Spec.Template)
}

if err := ch.updateHashCache(&podTemplateList, app); err != nil {
return "", err
}

Expand Down
15 changes: 15 additions & 0 deletions controllers/cloud.redhat.com/providers/confighash/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ func (ch *confighashProvider) Provide(app *crd.ClowdApp) error {
}
}

ssList := apps.StatefulSetList{}
if err := ch.Cache.List(deployProvider.CoreStatefulSet, &ssList); err != nil {
return err
}

for _, statefulset := range ssList.Items {
ssInner := statefulset
annotations := map[string]string{"configHash": hash}
utils.UpdateAnnotations(&ssInner.Spec.Template, annotations)

if err := ch.Cache.Update(deployProvider.CoreStatefulSet, &ssInner); err != nil {
return err
}
}

jList := batch.CronJobList{}
if err := ch.Cache.List(cronjobProvider.CoreCronJob, &jList); err != nil {
return err
Expand Down
15 changes: 12 additions & 3 deletions controllers/cloud.redhat.com/providers/deployment/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ type deploymentProvider struct {
// CoreDeployment is the deployment for the apps deployments.
var CoreDeployment = rc.NewMultiResourceIdent(ProvName, "core_deployment", &apps.Deployment{})

// CoreStatefulSet is the statefulSet for the apps deployments.
var CoreStatefulSet = rc.NewMultiResourceIdent(ProvName, "core_statefulset", &apps.StatefulSet{})

func NewDeploymentProvider(p *providers.Provider) (providers.ClowderProvider, error) {
p.Cache.AddPossibleGVKFromIdent(CoreDeployment)
p.Cache.AddPossibleGVKFromIdent(CoreStatefulSet)
return &deploymentProvider{Provider: *p}, nil
}

Expand All @@ -26,9 +30,14 @@ func (dp *deploymentProvider) EnvProvide() error {
func (dp *deploymentProvider) Provide(app *crd.ClowdApp) error {

for _, deployment := range app.Spec.Deployments {

if err := dp.makeDeployment(deployment, app); err != nil {
return err
if deployment.Stateful.Enabled {
if err := dp.makeStatefulSet(deployment, app); err != nil {
return err
}
} else {
if err := dp.makeDeployment(deployment, app); err != nil {
return err
}
}
}
return nil
Expand Down
Loading
Loading