Skip to content

Commit 5825486

Browse files
authored
List applying EventPolicies in KafkaSink (#4084)
* List applying EventPolicies in KafkaSink * Update policies in status after contract update * Only fallback to default authz mode policies, when OIDC is enabled * Update broker and channel to new utils function * Fix linter issue * Return early if OIDC is disabled * Fix function naming
1 parent 241e6a7 commit 5825486

File tree

10 files changed

+454
-116
lines changed

10 files changed

+454
-116
lines changed

control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import (
2222
)
2323

2424
const (
25-
ConditionAddressable apis.ConditionType = "Addressable"
25+
ConditionAddressable apis.ConditionType = "Addressable"
26+
ConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
2627
)
2728

2829
var conditionSet apis.ConditionSet
@@ -54,3 +55,19 @@ func (ks *KafkaSinkStatus) SetAddress(addr *duckv1.Addressable) {
5455
func (kss *KafkaSinkStatus) InitializeConditions() {
5556
kss.GetConditionSet().Manage(kss).InitializeConditions()
5657
}
58+
59+
func (kss *KafkaSinkStatus) MarkEventPoliciesTrue() {
60+
kss.GetConditionSet().Manage(kss).MarkTrue(ConditionEventPoliciesReady)
61+
}
62+
63+
func (kss *KafkaSinkStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
64+
kss.GetConditionSet().Manage(kss).MarkTrueWithReason(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
65+
}
66+
67+
func (kss *KafkaSinkStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
68+
kss.GetConditionSet().Manage(kss).MarkFalse(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
69+
}
70+
71+
func (kss *KafkaSinkStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
72+
kss.GetConditionSet().Manage(kss).MarkUnknown(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
73+
}

control-plane/pkg/core/config/utils.go

+14-10
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ import (
2424
"sort"
2525
"strings"
2626

27-
"knative.dev/eventing/pkg/apis/feature"
28-
"knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
27+
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
2928

3029
"github.com/rickb777/date/period"
3130
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3231
corev1listers "k8s.io/client-go/listers/core/v1"
3332
duck "knative.dev/eventing/pkg/apis/duck/v1"
33+
"knative.dev/eventing/pkg/apis/feature"
3434
"knative.dev/eventing/pkg/eventingtls"
3535
"knative.dev/pkg/resolver"
3636

@@ -55,14 +55,18 @@ func ContentModeFromString(mode string) contract.ContentMode {
5555
}
5656
}
5757

58-
// EventPoliciesFromAppliedEventPoliciesStatus resolves a AppliedEventPoliciesStatus into a list of contract.EventPolicy
59-
func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPoliciesStatus, lister v1alpha1.EventPolicyLister, namespace string, features feature.Flags) ([]*contract.EventPolicy, error) {
60-
eventPolicies := make([]*contract.EventPolicy, 0, len(status.Policies))
58+
// ContractEventPoliciesFromEventPolicies resolves a list of v1alpha1.EventPolicy into a list of contract.EventPolicy
59+
func ContractEventPoliciesFromEventPolicies(applyingEventPolicies []*eventingv1alpha1.EventPolicy, namespace string, features feature.Flags) []*contract.EventPolicy {
60+
if !features.IsOIDCAuthentication() {
61+
return nil
62+
}
6163

62-
for _, appliedPolicy := range status.Policies {
63-
policy, err := lister.EventPolicies(namespace).Get(appliedPolicy.Name)
64-
if err != nil {
65-
return nil, fmt.Errorf("failed to get eventPolicy %s: %w", appliedPolicy.Name, err)
64+
eventPolicies := make([]*contract.EventPolicy, 0, len(applyingEventPolicies))
65+
66+
for _, policy := range applyingEventPolicies {
67+
if !policy.Status.IsReady() {
68+
// only add ready eventpolicies to the contract
69+
continue
6670
}
6771

6872
contractPolicy := &contract.EventPolicy{}
@@ -132,7 +136,7 @@ func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPolicie
132136
// else: deny all -> add no additional policy
133137
}
134138

135-
return eventPolicies, nil
139+
return eventPolicies
136140
}
137141

138142
func EgressConfigFromDelivery(

control-plane/pkg/core/config/utils_test.go

+118-30
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,20 @@ import (
2323
"testing"
2424
"time"
2525

26+
corev1 "k8s.io/api/core/v1"
2627
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
2728

28-
"google.golang.org/protobuf/encoding/protojson"
29-
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
30-
"knative.dev/eventing/pkg/apis/feature"
31-
reconcilertesting "knative.dev/pkg/reconciler/testing"
32-
3329
"github.com/google/go-cmp/cmp"
3430
"github.com/google/go-cmp/cmp/cmpopts"
31+
"google.golang.org/protobuf/encoding/protojson"
3532
"google.golang.org/protobuf/runtime/protoimpl"
3633
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3734
"k8s.io/apimachinery/pkg/runtime"
3835
"k8s.io/apimachinery/pkg/types"
3936
"k8s.io/utils/pointer"
4037
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
38+
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
39+
"knative.dev/eventing/pkg/apis/feature"
4140
"knative.dev/pkg/apis"
4241
duckv1 "knative.dev/pkg/apis/duck/v1"
4342
"knative.dev/pkg/client/injection/ducks/duck/v1/addressable"
@@ -48,7 +47,6 @@ import (
4847
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
4948

5049
eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
51-
eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
5250
)
5351

5452
func TestContentModeFromString(t *testing.T) {
@@ -513,7 +511,7 @@ func TestMergeEgressConfig(t *testing.T) {
513511
}
514512
}
515513

516-
func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
514+
func TestContractEventPoliciesEventPolicies(t *testing.T) {
517515

518516
tests := []struct {
519517
name string
@@ -522,7 +520,7 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
522520
namespace string
523521
defaultAuthorizationMode feature.Flag
524522
expected []*contract.EventPolicy
525-
wantErr bool
523+
oidcDisabled bool
526524
}{
527525
{
528526
name: "Exact match",
@@ -539,6 +537,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
539537
From: []string{
540538
"from-1",
541539
},
540+
Status: duckv1.Status{
541+
Conditions: duckv1.Conditions{
542+
{
543+
Type: eventingv1alpha1.EventPolicyConditionReady,
544+
Status: corev1.ConditionTrue,
545+
},
546+
},
547+
},
542548
},
543549
},
544550
},
@@ -566,6 +572,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
566572
From: []string{
567573
"from-*",
568574
},
575+
Status: duckv1.Status{
576+
Conditions: duckv1.Conditions{
577+
{
578+
Type: eventingv1alpha1.EventPolicyConditionReady,
579+
Status: corev1.ConditionTrue,
580+
},
581+
},
582+
},
569583
},
570584
},
571585
},
@@ -594,6 +608,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
594608
From: []string{
595609
"from-1",
596610
},
611+
Status: duckv1.Status{
612+
Conditions: duckv1.Conditions{
613+
{
614+
Type: eventingv1alpha1.EventPolicyConditionReady,
615+
Status: corev1.ConditionTrue,
616+
},
617+
},
618+
},
597619
},
598620
}, {
599621
ObjectMeta: metav1.ObjectMeta{
@@ -604,6 +626,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
604626
From: []string{
605627
"from-2-*",
606628
},
629+
Status: duckv1.Status{
630+
Conditions: duckv1.Conditions{
631+
{
632+
Type: eventingv1alpha1.EventPolicyConditionReady,
633+
Status: corev1.ConditionTrue,
634+
},
635+
},
636+
},
607637
},
608638
},
609639
},
@@ -643,6 +673,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
643673
From: []string{
644674
"from-1",
645675
},
676+
Status: duckv1.Status{
677+
Conditions: duckv1.Conditions{
678+
{
679+
Type: eventingv1alpha1.EventPolicyConditionReady,
680+
Status: corev1.ConditionTrue,
681+
},
682+
},
683+
},
646684
},
647685
}, {
648686
ObjectMeta: metav1.ObjectMeta{
@@ -660,6 +698,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
660698
From: []string{
661699
"from-2-*",
662700
},
701+
Status: duckv1.Status{
702+
Conditions: duckv1.Conditions{
703+
{
704+
Type: eventingv1alpha1.EventPolicyConditionReady,
705+
Status: corev1.ConditionTrue,
706+
},
707+
},
708+
},
663709
},
664710
},
665711
},
@@ -728,46 +774,88 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
728774
defaultAuthorizationMode: feature.AuthorizationDenyAll,
729775
expected: []*contract.EventPolicy{},
730776
}, {
731-
name: "Applying policy does not exist",
777+
name: "Applying policy not ready",
732778
applyingPolicies: []string{
733-
"not-found",
779+
"policy-1",
780+
},
781+
existingEventPolicies: []*eventingv1alpha1.EventPolicy{
782+
{
783+
ObjectMeta: metav1.ObjectMeta{
784+
Name: "policy-1",
785+
Namespace: "my-ns",
786+
},
787+
Status: eventingv1alpha1.EventPolicyStatus{
788+
From: []string{
789+
"from-*",
790+
},
791+
Status: duckv1.Status{
792+
Conditions: duckv1.Conditions{
793+
{
794+
Type: eventingv1alpha1.EventPolicyConditionReady,
795+
Status: corev1.ConditionFalse,
796+
},
797+
},
798+
},
799+
},
800+
},
801+
},
802+
namespace: "my-ns",
803+
defaultAuthorizationMode: feature.AuthorizationDenyAll,
804+
expected: []*contract.EventPolicy{},
805+
}, {
806+
name: "No policy when OIDC is disabled",
807+
oidcDisabled: true,
808+
applyingPolicies: []string{
809+
"policy-1",
810+
},
811+
existingEventPolicies: []*eventingv1alpha1.EventPolicy{
812+
{
813+
ObjectMeta: metav1.ObjectMeta{
814+
Name: "policy-1",
815+
Namespace: "my-ns",
816+
},
817+
Status: eventingv1alpha1.EventPolicyStatus{
818+
From: []string{
819+
"from-1",
820+
},
821+
Status: duckv1.Status{
822+
Conditions: duckv1.Conditions{
823+
{
824+
Type: eventingv1alpha1.EventPolicyConditionReady,
825+
Status: corev1.ConditionFalse, // is false, as OIDC is disabled
826+
},
827+
},
828+
},
829+
},
830+
},
734831
},
735-
existingEventPolicies: []*eventingv1alpha1.EventPolicy{},
736832
namespace: "my-ns",
737833
defaultAuthorizationMode: feature.AuthorizationAllowSameNamespace,
738834
expected: []*contract.EventPolicy{},
739-
wantErr: true,
740835
},
741836
}
742837
for _, tt := range tests {
743838
t.Run(tt.name, func(t *testing.T) {
744839

745-
ctx, _ := reconcilertesting.SetupFakeContext(t)
746840
features := feature.Flags{
747841
feature.AuthorizationDefaultMode: tt.defaultAuthorizationMode,
842+
feature.OIDCAuthentication: feature.Enabled,
748843
}
749844

750-
for _, ep := range tt.existingEventPolicies {
751-
err := eventpolicyinformerfake.Get(ctx).Informer().GetStore().Add(ep)
752-
if err != nil {
753-
t.Fatal(err)
754-
}
845+
if tt.oidcDisabled {
846+
features[feature.OIDCAuthentication] = feature.Disabled
755847
}
756848

757-
applyingPoliciesStatus := eventingduck.AppliedEventPoliciesStatus{}
758-
for _, ep := range tt.applyingPolicies {
759-
applyingPoliciesStatus.Policies = append(applyingPoliciesStatus.Policies, eventingduck.AppliedEventPolicyRef{
760-
Name: ep,
761-
APIVersion: eventingv1alpha1.SchemeGroupVersion.String(),
762-
})
763-
}
764-
765-
got, err := EventPoliciesFromAppliedEventPoliciesStatus(applyingPoliciesStatus, eventpolicyinformerfake.Get(ctx).Lister(), tt.namespace, features)
766-
if (err != nil) != tt.wantErr {
767-
t.Errorf("EventPoliciesFromAppliedEventPoliciesStatus() error = %v, wantErr %v", err, tt.wantErr)
768-
return
849+
applyingPolicies := []*eventingv1alpha1.EventPolicy{}
850+
for _, applyingPolicyName := range tt.applyingPolicies {
851+
for _, existingPolicy := range tt.existingEventPolicies {
852+
if applyingPolicyName == existingPolicy.Name {
853+
applyingPolicies = append(applyingPolicies, existingPolicy)
854+
}
855+
}
769856
}
770857

858+
got := ContractEventPoliciesFromEventPolicies(applyingPolicies, tt.namespace, features)
771859
expectedJSON, err := protojson.Marshal(&contract.Ingress{
772860
EventPolicies: tt.expected,
773861
})

0 commit comments

Comments
 (0)