Skip to content

Commit

Permalink
Manual informer
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Wessendorf <[email protected]>
  • Loading branch information
matzew committed Mar 4, 2025
1 parent 04c41de commit ace33a7
Showing 1 changed file with 32 additions and 12 deletions.
44 changes: 32 additions & 12 deletions pkg/reconciler/integration/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@ package sink

import (
"context"
"fmt"
"os"
"time"

pkgreconciler "knative.dev/pkg/reconciler"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
certmanagerclient "knative.dev/eventing/pkg/client/certmanager/clientset/versioned"
certmanagerinformers "knative.dev/eventing/pkg/client/certmanager/informers/externalversions"

"knative.dev/eventing/pkg/apis/feature"
v1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy"
"knative.dev/eventing/pkg/client/injection/informers/sinks/v1alpha1/integrationsink"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
"knative.dev/pkg/client/injection/kube/informers/core/v1/service"

cmclient "knative.dev/eventing/pkg/client/certmanager/injection/client"
cmcertinformer "knative.dev/eventing/pkg/client/certmanager/injection/informers/certmanager/v1/certificate"
pkgreconciler "knative.dev/pkg/reconciler"

integrationsinkreconciler "knative.dev/eventing/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink"
kubeclient "knative.dev/pkg/client/injection/kube/client"
Expand All @@ -41,7 +45,6 @@ import (
"knative.dev/pkg/logging"
)

// NewController creates a Reconciler for IntegrationSource and returns the result of NewImpl.
func NewController(
ctx context.Context,
cmw configmap.Watcher,
Expand All @@ -50,10 +53,27 @@ func NewController(
secretInformer := secretinformer.Get(ctx, "app.kubernetes.io/name")
eventPolicyInformer := eventpolicy.Get(ctx)
deploymentInformer := deploymentinformer.Get(ctx)
serviceInformer := service.Get(ctx)

cmCertificateInformer := cmcertinformer.Get(ctx)
config, err := rest.InClusterConfig()
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting cluster config: %v\n", err)
os.Exit(1)
}

serviceInformer := service.Get(ctx)
certManagerClient := certmanagerclient.NewForConfigOrDie(config)

resyncPeriod := 30 * time.Second

factory := certmanagerinformers.NewSharedInformerFactoryWithOptions(
certManagerClient,
resyncPeriod,
certmanagerinformers.WithTweakListOptions(func(options *v1.ListOptions) {
options.LabelSelector = "app.kubernetes.io/name"
}),
)

cmCertificateInformer := factory.Certmanager().V1().Certificates()

r := &Reconciler{
kubeClientSet: kubeclient.Get(ctx),
Expand All @@ -64,7 +84,7 @@ func NewController(
secretLister: secretInformer.Lister(),
eventPolicyLister: eventPolicyInformer.Lister(),
cmCertificateLister: cmCertificateInformer.Lister(),
certManagerClient: cmclient.Get(ctx),
certManagerClient: certManagerClient,
}

var globalResync func(obj interface{})
Expand Down Expand Up @@ -99,14 +119,14 @@ func NewController(
})

integrationSinkGK := v1alpha1.SchemeGroupVersion.WithKind("IntegrationSink").GroupKind()

// Enqueue the JobSink, if we have an EventPolicy which was referencing
// or got updated and now is referencing the JobSink.
eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(
integrationSinkInformer.Informer().GetIndexer(),
integrationSinkGK,
impl.EnqueueKey,
))

factory.Start(ctx.Done())
factory.WaitForCacheSync(ctx.Done())

return impl
}

0 comments on commit ace33a7

Please sign in to comment.