Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve CM Cert reconcile and deletion if feature is turned off #8519

Merged
merged 1 commit into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 54 additions & 15 deletions pkg/reconciler/integration/sink/integrationsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,31 +88,35 @@ func newReconciledNormal(namespace, name string) reconciler.Event {

func (r *Reconciler) ReconcileKind(ctx context.Context, sink *sinks.IntegrationSink) reconciler.Event {
featureFlags := feature.FromContext(ctx)
logger := logging.FromContext(ctx)

if featureFlags.IsPermissiveTransportEncryption() || featureFlags.IsStrictTransportEncryption() {
_, err := r.reconcileCMCertificate(ctx, sink)
if err != nil {
logging.FromContext(ctx).Errorw("Error reconciling Certificate", zap.Error(err))
return err
}
logger.Debugw("Reconciling IntegrationSink Certificate")
_, err := r.reconcileIntegrationSinkCertificate(ctx, sink)
if err != nil {
logging.FromContext(ctx).Errorw("Error reconciling Certificate", zap.Error(err))
return err
}

_, err := r.reconcileDeployment(ctx, sink, featureFlags)
logger.Debugw("Reconciling IntegrationSink Deployment")
_, err = r.reconcileDeployment(ctx, sink, featureFlags)
if err != nil {
logging.FromContext(ctx).Errorw("Error reconciling Pod", zap.Error(err))
return err
}

logger.Debugw("Reconciling IntegrationSink Service")
_, err = r.reconcileService(ctx, sink)
if err != nil {
logging.FromContext(ctx).Errorw("Error reconciling Service", zap.Error(err))
return err
}

logger.Debugw("Reconciling IntegrationSink address")
if err := r.reconcileAddress(ctx, sink); err != nil {
return fmt.Errorf("failed to reconcile address: %w", err)
}

logger.Debugw("Updating IntegrationSink status with EventPolicies")
err = auth.UpdateStatusWithEventPolicies(featureFlags, &sink.Status.AppliedEventPoliciesStatus, &sink.Status, r.eventPolicyLister, sinks.SchemeGroupVersion.WithKind("IntegrationSink"), sink.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update IntegrationSink status with EventPolicies: %v", err)
Expand Down Expand Up @@ -172,19 +176,20 @@ func (r *Reconciler) reconcileService(ctx context.Context, sink *sinks.Integrati
return svc, nil
}

func (r *Reconciler) reconcileCMCertificate(ctx context.Context, sink *sinks.IntegrationSink) (*cmv1.Certificate, error) {
func (r *Reconciler) reconcileIntegrationSinkCertificate(ctx context.Context, sink *sinks.IntegrationSink) (*cmv1.Certificate, error) {

expected := certificates.MakeCertificate(sink, certificates.WithDNSNames(
network.GetServiceHostname(resources.DeploymentName(sink.GetName()), sink.GetNamespace()),
fmt.Sprintf("%s.%s.svc", resources.DeploymentName(sink.GetName()), sink.GetNamespace()),
))
if f := feature.FromContext(ctx); !f.IsStrictTransportEncryption() && !f.IsPermissiveTransportEncryption() {
return nil, r.deleteIntegrationSinkCertificate(ctx, sink)
}

lister := r.cmCertificateLister.Load()
if lister == nil || *lister == nil {
expected := integrationSinkCertificate(sink)

cmCertificateLister := r.cmCertificateLister.Load()
if cmCertificateLister == nil || *cmCertificateLister == nil {
return nil, fmt.Errorf("no cert-manager certificate lister created yet, this should rarely happen and recover")
}

cert, err := (*lister).Certificates(sink.Namespace).Get(expected.Name)
cert, err := (*cmCertificateLister).Certificates(sink.Namespace).Get(expected.Name)
if apierrors.IsNotFound(err) {
cert, err := r.certManagerClient.CertmanagerV1().Certificates(sink.Namespace).Create(ctx, expected, metav1.CreateOptions{})
if err != nil {
Expand All @@ -202,6 +207,31 @@ func (r *Reconciler) reconcileCMCertificate(ctx context.Context, sink *sinks.Int
return cert, nil
}

func (r *Reconciler) deleteIntegrationSinkCertificate(ctx context.Context, sink *sinks.IntegrationSink) error {
certificate := integrationSinkCertificate(sink)

cmCertificateLister := r.cmCertificateLister.Load()
if cmCertificateLister != nil && *cmCertificateLister != nil {
_, err := (*cmCertificateLister).Certificates(certificate.GetNamespace()).Get(certificate.GetName())
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("failed to get certificate %s/%s: %w", certificate.GetNamespace(), certificate.GetName(), err)
}
}

err := r.certManagerClient.CertmanagerV1().Certificates(certificate.GetNamespace()).Delete(ctx, certificate.GetName(), metav1.DeleteOptions{})
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("failed to delete certificate %s/%s: %w", certificate.GetNamespace(), certificate.GetName(), err)
}
controller.GetEventRecorder(ctx).Event(sink, corev1.EventTypeNormal, "IntegrationSinkCertificateDeleted", certificate.GetName())
return nil
}

func (r *Reconciler) reconcileAddress(ctx context.Context, sink *sinks.IntegrationSink) error {

featureFlags := feature.FromContext(ctx)
Expand Down Expand Up @@ -304,3 +334,12 @@ func (r *Reconciler) podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1
}
return false
}

func integrationSinkCertificate(sink *sinks.IntegrationSink) *cmv1.Certificate {
return certificates.MakeCertificate(sink,
certificates.WithDNSNames(
network.GetServiceHostname(resources.DeploymentName(sink.Name), sink.Namespace),
fmt.Sprintf("%s.%s.svc", resources.DeploymentName(sink.Name), sink.Namespace),
),
)
}
19 changes: 14 additions & 5 deletions pkg/reconciler/integration/sink/integrationsink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package sink

import (
"fmt"
"sync/atomic"

cmlisters "knative.dev/eventing/pkg/client/certmanager/listers/certmanager/v1"

"knative.dev/eventing/pkg/certificates"

Expand Down Expand Up @@ -151,12 +154,18 @@ func TestReconcile(t *testing.T) {
logger := logtesting.TestLogger(t)
table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler {
ctx = addressable.WithDuck(ctx)

cmCertificatesListerAtomic := &atomic.Pointer[cmlisters.CertificateLister]{}
cmCertificatesLister := listers.GetCertificateLister()
cmCertificatesListerAtomic.Store(&cmCertificatesLister)

r := &Reconciler{
kubeClientSet: fakekubeclient.Get(ctx),
deploymentLister: listers.GetDeploymentLister(),
serviceLister: listers.GetServiceLister(),
secretLister: listers.GetSecretLister(),
eventPolicyLister: listers.GetEventPolicyLister(),
kubeClientSet: fakekubeclient.Get(ctx),
deploymentLister: listers.GetDeploymentLister(),
serviceLister: listers.GetServiceLister(),
secretLister: listers.GetSecretLister(),
cmCertificateLister: cmCertificatesListerAtomic,
eventPolicyLister: listers.GetEventPolicyLister(),
}

return integrationsink.NewReconciler(ctx, logging.FromContext(ctx), fakeeventingclient.Get(ctx), listers.GetIntegrationSinkLister(), controller.GetEventRecorder(ctx), r)
Expand Down
Loading