From ea0f0558c0cbf69a7d923d563efc0c4ea5f2f4fe Mon Sep 17 00:00:00 2001 From: Rahul Sawra Date: Mon, 2 Sep 2024 15:36:39 +0530 Subject: [PATCH] List applying policies in KafkaChannel (#4089) * list applying policies in kafka channel Signed-off-by: rahulii * watch eventpolicies for kafkachannel controller Signed-off-by: rahulii * update kafka channel status Signed-off-by: rahulii * add testing helper funcs for eventpolicies for kafkachannel Signed-off-by: rahulii * add unit tests Signed-off-by: rahulii * minor fix Signed-off-by: rahulii * fix review comments from creydr Signed-off-by: rahulii --------- Signed-off-by: rahulii --- .../v1beta1/kafka_channel_lifecycle.go | 18 ++ .../pkg/reconciler/channel/channel.go | 12 +- .../pkg/reconciler/channel/channel_test.go | 240 +++++++++++++++++- .../pkg/reconciler/channel/controller.go | 5 + .../pkg/reconciler/testing/objects_channel.go | 42 +++ 5 files changed, 315 insertions(+), 2 deletions(-) diff --git a/control-plane/pkg/apis/messaging/v1beta1/kafka_channel_lifecycle.go b/control-plane/pkg/apis/messaging/v1beta1/kafka_channel_lifecycle.go index daa3d03e14..43d3124438 100644 --- a/control-plane/pkg/apis/messaging/v1beta1/kafka_channel_lifecycle.go +++ b/control-plane/pkg/apis/messaging/v1beta1/kafka_channel_lifecycle.go @@ -51,6 +51,8 @@ const ( // KafkaChannelConditionChannelServiceReady has status True when the K8S Service representing the channel // is ready. Because this uses ExternalName, there are no endpoints to check. KafkaChannelConditionChannelServiceReady apis.ConditionType = "ChannelServiceReady" + + ConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady" ) // RegisterAlternateKafkaChannelConditionSet register a different apis.ConditionSet. @@ -129,3 +131,19 @@ func (kcs *KafkaChannelStatus) MarkChannelServiceFailed(reason, messageFormat st func (kcs *KafkaChannelStatus) MarkChannelServiceTrue() { kcs.GetConditionSet().Manage(kcs).MarkTrue(KafkaChannelConditionChannelServiceReady) } + +func (kcs *KafkaChannelStatus) MarkEventPoliciesTrue() { + kcs.GetConditionSet().Manage(kcs).MarkTrue(ConditionEventPoliciesReady) +} + +func (kcs *KafkaChannelStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) { + kcs.GetConditionSet().Manage(kcs).MarkTrueWithReason(ConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (kcs *KafkaChannelStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) { + kcs.GetConditionSet().Manage(kcs).MarkFalse(ConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (kcs *KafkaChannelStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) { + kcs.GetConditionSet().Manage(kcs).MarkUnknown(ConditionEventPoliciesReady, reason, messageFormat, messageA...) +} diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index cc9e8d8f53..49634855f6 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -234,8 +234,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta audience = nil } + applyingEventPolicies, err := auth.GetEventPoliciesForResource(r.EventPolicyLister, messagingv1beta1.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta) + if err != nil { + return fmt.Errorf("could not get applying eventpolicies for kafkaChannel: %v", err) + } + // Get resource configuration - channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig, audience, nil) + channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig, audience, applyingEventPolicies) if err != nil { return statusConditionManager.FailedToResolveConfig(err) } @@ -265,6 +270,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta logger.Debug("Contract config map updated") statusConditionManager.ConfigMapUpdated() + err = auth.UpdateStatusWithProvidedEventPolicies(featureFlags, &channel.Status.AppliedEventPoliciesStatus, &channel.Status, applyingEventPolicies) + if err != nil { + return fmt.Errorf("could not update KafkaChannel status with EventPolicies: %v", err) + } + // We update receiver pods annotation regardless of our contract changed or not due to the fact // that in a previous reconciliation we might have failed to update one of our data plane pod annotation, so we want // to anyway update remaining annotations with the contract generation that was saved in the CM. diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 67faf037eb..fc2da3c328 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -66,6 +66,7 @@ import ( kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" fakeconsumergroupinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake" eventingrekttesting "knative.dev/eventing/pkg/reconciler/testing/v1" + reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1" ) const ( @@ -76,6 +77,9 @@ const ( TestExpectedReplicationFactor = "TestExpectedReplicationFactor" TestExpectedRetentionDuration = "TestExpectedRetentionDuration" + readyEventPolicyName = "test-event-policy-ready" + unreadyEventPolicyName = "test-event-policy-unready" + kafkaFeatureFlags = "kafka-feature-flags" ) @@ -98,6 +102,12 @@ var DefaultEnv = &config.Env{ var ( testCaCerts = string(eventingtlstesting.CA) + + channelGVK = metav1.GroupVersionKind{ + Group: "messaging.knative.dev", + Version: "v1beta1", + Kind: "KafkaChannel", + } ) func TestReconcileKind(t *testing.T) { @@ -240,6 +250,7 @@ func TestReconcileKind(t *testing.T) { ChannelAddressable(&env), StatusProbeSucceeded, StatusChannelSubscribers(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -315,6 +326,7 @@ func TestReconcileKind(t *testing.T) { StatusProbeSucceeded, StatusChannelSubscribers(), WithChannelDeadLetterSinkURI(ServiceURL), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -367,6 +379,7 @@ func TestReconcileKind(t *testing.T) { StatusTopicReadyWithName(ChannelTopic()), StatusProbeFailed(prober.StatusNotReady), StatusChannelSubscribers(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -422,6 +435,7 @@ func TestReconcileKind(t *testing.T) { StatusTopicReadyWithName(ChannelTopic()), StatusProbeFailed(prober.StatusUnknown), StatusChannelSubscribers(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -497,6 +511,7 @@ func TestReconcileKind(t *testing.T) { StatusProbeSucceeded, WithSubscribers(Subscriber1(WithUnknownSubscriber)), StatusChannelSubscribersUnknown(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -570,6 +585,7 @@ func TestReconcileKind(t *testing.T) { StatusProbeSucceeded, WithSubscribers(Subscriber1(WithUnknownSubscriber)), StatusChannelSubscribersUnknown(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -642,6 +658,7 @@ func TestReconcileKind(t *testing.T) { StatusProbeSucceeded, WithSubscribers(Subscriber1(WithUnknownSubscriber)), StatusChannelSubscribersUnknown(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -724,6 +741,7 @@ func TestReconcileKind(t *testing.T) { WithSubscribers(Subscriber1(WithFreshSubscriber)), StatusChannelSubscribers(), StatusProbeSucceeded, + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -779,6 +797,7 @@ func TestReconcileKind(t *testing.T) { StatusProbeSucceeded, WithSubscribers(Subscriber1(WithUnreadySubscriber)), StatusChannelSubscribersUnknown(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -890,6 +909,7 @@ func TestReconcileKind(t *testing.T) { StatusProbeSucceeded, WithSubscribers(Subscriber1(WithUnknownSubscriber), Subscriber2(WithUnknownSubscriber)), StatusChannelSubscribersUnknown(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -980,6 +1000,7 @@ func TestReconcileKind(t *testing.T) { StatusProbeSucceeded, WithSubscribers(Subscriber1(WithUnknownSubscriber)), StatusChannelSubscribersUnknown(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1148,6 +1169,7 @@ func TestReconcileKind(t *testing.T) { ChannelAddressable(&env), StatusProbeSucceeded, StatusChannelSubscribers(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1251,6 +1273,7 @@ func TestReconcileKind(t *testing.T) { WithSubscribers(Subscriber1(WithFreshSubscriber)), StatusProbeSucceeded, StatusChannelSubscribers(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1356,6 +1379,7 @@ func TestReconcileKind(t *testing.T) { WithSubscribers(Subscriber1(WithFreshSubscriber)), StatusProbeSucceeded, StatusChannelSubscribers(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1457,6 +1481,7 @@ func TestReconcileKind(t *testing.T) { WithSubscribers(Subscriber1(WithFreshSubscriber)), StatusProbeSucceeded, StatusChannelSubscribers(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1536,6 +1561,7 @@ func TestReconcileKind(t *testing.T) { StatusProbeSucceeded, WithSubscribers(Subscriber1(WithUnknownSubscriber)), StatusChannelSubscribersUnknown(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1589,6 +1615,7 @@ func TestReconcileKind(t *testing.T) { ChannelAddressable(&env), StatusProbeSucceeded, StatusChannelSubscribers(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1641,6 +1668,7 @@ func TestReconcileKind(t *testing.T) { ChannelAddressable(&env), StatusProbeSucceeded, StatusChannelSubscribers(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1754,6 +1782,7 @@ func TestReconcileKind(t *testing.T) { StatusProbeSucceeded, StatusChannelSubscribers(), WithChannelDeadLetterSinkURI(ServiceURL), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1856,6 +1885,7 @@ func TestReconcileKind(t *testing.T) { URL: ChannelAddress(), }), WithChannelAddessable(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1948,6 +1978,7 @@ func TestReconcileKind(t *testing.T) { CACerts: pointer.String(testCaCerts), }), WithChannelAddessable(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1973,7 +2004,95 @@ func TestReconcileKind(t *testing.T) { }, Key: testKey, Ctx: feature.ToContext(context.Background(), feature.Flags{ - feature.OIDCAuthentication: feature.Enabled, + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationDenyAll, + }), + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: ChannelUUID, + Topics: []string{ChannelTopic()}, + BootstrapServers: ChannelBootstrapServers, + Reference: ChannelReference(), + Ingress: &contract.Ingress{ + Host: receiver.Host(ChannelNamespace, ChannelName), + Path: receiver.Path(ChannelNamespace, ChannelName), + Audience: ChannelAudience, + }, + FeatureFlags: FeatureFlagsETAutocreate(false), + }, + }, + }), + ChannelReceiverPodUpdate(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the channel namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewPerChannelService(&env), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewChannel( + WithInitKafkaChannelConditions, + StatusConfigParsed, + StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), + StatusTopicReadyWithName(ChannelTopic()), + ChannelAddressable(&env), + StatusProbeSucceeded, + StatusChannelSubscribers(), + WithChannelAddresses([]duckv1.Addressable{ + { + Name: pointer.String("http"), + URL: ChannelAddress(), + Audience: pointer.String(ChannelAudience), + }, + }), + WithChannelAddress(duckv1.Addressable{ + Name: pointer.String("http"), + URL: ChannelAddress(), + Audience: pointer.String(ChannelAudience), + }), + WithChannelAddessable(), + WithChannelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(feature.AuthorizationDenyAll), + ), + }, + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + }, + { + Name: "Should list applying EventPolicies", + Objects: []runtime.Object{ + NewChannel(), + NewConfigMapWithTextData(env.SystemNamespace, DefaultEnv.GeneralConfigMapName, map[string]string{ + kafka.BootstrapServersConfigMapKey: ChannelBootstrapServers, + }), + ChannelReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + reconcilertesting.NewEventPolicy(readyEventPolicyName, ChannelNamespace, + reconcilertesting.WithReadyEventPolicyCondition, + reconcilertesting.WithEventPolicyToRef(channelGVK, ChannelName), + reconcilertesting.WithEventPolicyStatusFromSub([]string{ + "sub", + }), + ), + }, + Key: testKey, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ @@ -1988,6 +2107,123 @@ func TestReconcileKind(t *testing.T) { Host: receiver.Host(ChannelNamespace, ChannelName), Path: receiver.Path(ChannelNamespace, ChannelName), Audience: ChannelAudience, + EventPolicies: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + { + Matcher: &contract.TokenMatcher_Exact{ + Exact: &contract.Exact{ + Attributes: map[string]string{ + "sub": "sub", + }, + }, + }, + }, + }, + }, + }, + }, + FeatureFlags: FeatureFlagsETAutocreate(false), + }, + }, + }), + ChannelReceiverPodUpdate(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the channel namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewPerChannelService(&env), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewChannel( + WithInitKafkaChannelConditions, + StatusConfigParsed, + StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), + StatusTopicReadyWithName(ChannelTopic()), + ChannelAddressable(&env), + StatusProbeSucceeded, + StatusChannelSubscribers(), + WithChannelAddresses([]duckv1.Addressable{ + { + Name: pointer.String("http"), + URL: ChannelAddress(), + Audience: pointer.String(ChannelAudience), + }, + }), + WithChannelAddress(duckv1.Addressable{ + Name: pointer.String("http"), + URL: ChannelAddress(), + Audience: pointer.String(ChannelAudience), + }), + WithChannelAddessable(), + WithChannelEventPoliciesReady(), + WithChannelEventPoliciesListed(readyEventPolicyName), + ), + }, + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + }, { + Name: "Should mark as NotReady on unready EventPolicies", + Objects: []runtime.Object{ + NewChannel(), + NewConfigMapWithTextData(env.SystemNamespace, DefaultEnv.GeneralConfigMapName, map[string]string{ + kafka.BootstrapServersConfigMapKey: ChannelBootstrapServers, + }), + ChannelReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + reconcilertesting.NewEventPolicy(unreadyEventPolicyName, ChannelNamespace, + reconcilertesting.WithUnreadyEventPolicyCondition("", ""), + reconcilertesting.WithEventPolicyToRef(channelGVK, ChannelName), + reconcilertesting.WithEventPolicyStatusFromSub([]string{ + "sub", + }), + ), + }, + Key: testKey, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: ChannelUUID, + Topics: []string{ChannelTopic()}, + BootstrapServers: ChannelBootstrapServers, + Reference: ChannelReference(), + Ingress: &contract.Ingress{ + Host: receiver.Host(ChannelNamespace, ChannelName), + Path: receiver.Path(ChannelNamespace, ChannelName), + Audience: ChannelAudience, + EventPolicies: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + { + Matcher: &contract.TokenMatcher_Prefix{ + Prefix: &contract.Prefix{ + Attributes: map[string]string{ + "sub": "system:serviceaccount:" + ChannelNamespace + ":", + }, + }, + }, + }, + }, + }, + }, }, FeatureFlags: FeatureFlagsETAutocreate(false), }, @@ -2027,6 +2263,8 @@ func TestReconcileKind(t *testing.T) { Audience: pointer.String(ChannelAudience), }), WithChannelAddessable(), + WithChannelEventPoliciesReady(), + WithChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName)), ), }, }, diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index d383051edf..e02ef9210a 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -58,6 +58,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup" + "knative.dev/eventing/pkg/auth" ) func NewController(ctx context.Context, watcher configmap.Watcher, configs *config.Env) *controller.Impl { @@ -176,5 +177,9 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf Handler: controller.HandleAll(consumergroup.Enqueue("kafkachannel", impl.EnqueueKey)), }) + channelGK := messagingv1beta.SchemeGroupVersion.WithKind("KafkaChannel").GroupKind() + // Enqueue KafkaChannel, if we have an EventPolicy which was referencing + // or got updated and now is referencing the KafkaSink + eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(channelInformer.Informer().GetIndexer(), channelGK, impl.EnqueueKey)) return impl } diff --git a/control-plane/pkg/reconciler/testing/objects_channel.go b/control-plane/pkg/reconciler/testing/objects_channel.go index 76d3c5850c..9657c8821d 100644 --- a/control-plane/pkg/reconciler/testing/objects_channel.go +++ b/control-plane/pkg/reconciler/testing/objects_channel.go @@ -43,6 +43,8 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/apis/feature" subscriptionv1 "knative.dev/eventing/pkg/reconciler/testing/v1" ) @@ -446,3 +448,43 @@ func ChannelAddress() *apis.URL { Host: fmt.Sprintf("%s.%s.svc.%s", ChannelServiceName, ChannelNamespace, network.GetClusterDomainName()), } } + +func WithChannelEventPoliciesReady() KRShapedOption { + return func(obj duckv1.KRShaped) { + ks := obj.(*messagingv1beta1.KafkaChannel) + ks.Status.MarkEventPoliciesTrue() + } +} + +func WithChannelEventPoliciesNotReady(reason, message string) KRShapedOption { + return func(obj duckv1.KRShaped) { + ks := obj.(*messagingv1beta1.KafkaChannel) + ks.Status.MarkEventPoliciesFailed(reason, message) + } +} + +func WithChannelEventPoliciesListed(policyNames ...string) KRShapedOption { + return func(obj duckv1.KRShaped) { + ks := obj.(*messagingv1beta1.KafkaChannel) + for _, name := range policyNames { + ks.Status.Policies = append(ks.Status.Policies, eventingduckv1.AppliedEventPolicyRef{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Name: name, + }) + } + } +} + +func WithChannelEventPoliciesReadyBecauseOIDCDisabled() KRShapedOption { + return func(obj duckv1.KRShaped) { + ks := obj.(*messagingv1beta1.KafkaChannel) + ks.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) + } +} + +func WithChannelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(authzMode feature.Flag) KRShapedOption { + return func(obj duckv1.KRShaped) { + ks := obj.(*messagingv1beta1.KafkaChannel) + ks.Status.MarkEventPoliciesTrueWithReason("DefaultAuthorizationMode", "Default authz mode is %q", authzMode) + } +}