Skip to content

Commit ad89a1a

Browse files
committed
Provision contract with OIDC information (knative-extensions#3646)
* Provision contract with OIDC information * Add DLS audience in KafkaChannel CRD * Update KafkaSource to expose its sinks audience in status * Update Trigger test to include OIDC SA in contract * Propagate KafkaSources OIDC serviceAccountName to consumer and consumergroup * Propagate triggerv2s serviceAccountName to consumergroup * Fix unit test
1 parent 34a8ad9 commit ad89a1a

File tree

19 files changed

+149
-16
lines changed

19 files changed

+149
-16
lines changed

control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ spec:
8686
type: string
8787
CACerts:
8888
type: string
89+
audience:
90+
description: Audience is the OIDC audience for the deadLetterSink.
91+
type: string
8992
retry:
9093
description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
9194
type: integer
@@ -132,6 +135,9 @@ spec:
132135
type: string
133136
CACerts:
134137
type: string
138+
audience:
139+
description: Audience is the OIDC audience for the deadLetterSink.
140+
type: string
135141
retry:
136142
description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
137143
type: integer

control-plane/config/eventing-kafka-broker/100-source/100-kafka-source.yaml

+22-6
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,6 @@ spec:
8787
description: DeadLetterSink is the sink receiving event that could not be sent to a destination.
8888
type: object
8989
properties:
90-
CACerts:
91-
description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any.
92-
type: string
9390
ref:
9491
description: Ref points to an Addressable.
9592
type: object
@@ -118,6 +115,12 @@ spec:
118115
uri:
119116
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
120117
type: string
118+
CACerts:
119+
description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any.
120+
type: string
121+
audience:
122+
description: Audience is the OIDC audience for the deadLetterSink.
123+
type: string
121124
retry:
122125
description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
123126
type: integer
@@ -271,9 +274,6 @@ spec:
271274
description: Sink is a reference to an object that will resolve to a uri to use as the sink.
272275
type: object
273276
properties:
274-
CACerts:
275-
description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any.
276-
type: string
277277
ref:
278278
description: Ref points to an Addressable.
279279
type: object
@@ -302,6 +302,12 @@ spec:
302302
uri:
303303
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
304304
type: string
305+
CACerts:
306+
description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any.
307+
type: string
308+
audience:
309+
description: Audience is the OIDC audience for the sink.
310+
type: string
305311
topics:
306312
description: Topic topics to consume messages from
307313
type: array
@@ -392,6 +398,16 @@ spec:
392398
sinkUri:
393399
description: SinkURI is the current active sink URI that has been configured for the Source.
394400
type: string
401+
sinkAudience:
402+
description: SinkAudience is the OIDC audience of the sink.
403+
type: string
404+
auth:
405+
description: Auth provides the relevant information for OIDC authentication.
406+
type: object
407+
properties:
408+
serviceAccountName:
409+
description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication.
410+
type: string
395411
subresources:
396412
status: {}
397413
scale:

control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go

+8
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ type ConsumerGroupSpec struct {
100100
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
101101
// +optional
102102
Selector map[string]string `json:"selector,omitempty" protobuf:"bytes,2,rep,name=selector"`
103+
104+
// OIDCServiceAccountName is the name of service account used for this components
105+
// OIDC authentication.
106+
OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"`
103107
}
104108

105109
type ConsumerGroupStatus struct {
@@ -120,6 +124,10 @@ type ConsumerGroupStatus struct {
120124
// +optional
121125
SubscriberCACerts *string `json:"subscriberCACerts,omitempty"`
122126

127+
// SubscriberAudience is the OIDC audience for the resolved URI
128+
// +optional
129+
SubscriberAudience *string `json:"subscriberAudience,omitempty"`
130+
123131
// DeliveryStatus contains a resolved URL to the dead letter sink address, and any other
124132
// resolved delivery options.
125133
eventingduckv1.DeliveryStatus `json:",inline"`

control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go

+8
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ type ConsumerSpec struct {
103103

104104
// PodBind represents a reference to the pod in which the consumer should be placed.
105105
PodBind *PodBind `json:"podBind"`
106+
107+
// OIDCServiceAccountName is the name of the generated service account
108+
// used for this components OIDC authentication.
109+
OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"`
106110
}
107111

108112
type ReplyStrategy struct {
@@ -208,6 +212,10 @@ type ConsumerStatus struct {
208212
// +optional
209213
SubscriberCACerts *string `json:"subscriberCACerts,omitempty"`
210214

215+
// SubscriberAudience is the OIDC audience for the resolved URI
216+
// +optional
217+
SubscriberAudience *string `json:"subscriberAudience,omitempty"`
218+
211219
// DeliveryStatus contains a resolved URL to the dead letter sink address, and any other
212220
// resolved delivery options.
213221
eventingduck.DeliveryStatus `json:",inline"`

control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go

+20
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

control-plane/pkg/apis/sources/v1beta1/kafka_lifecycle.go

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func (s *KafkaSourceStatus) MarkSink(addr *duckv1.Addressable) {
9595
if addr.URL != nil && !addr.URL.IsEmpty() {
9696
s.SinkURI = addr.URL
9797
s.SinkCACerts = addr.CACerts
98+
s.SinkAudience = addr.Audience
9899
KafkaSourceCondSet.Manage(s).MarkTrue(KafkaConditionSinkProvided)
99100
} else {
100101
KafkaSourceCondSet.Manage(s).MarkUnknown(KafkaConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "")

control-plane/pkg/core/config/utils.go

+3
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ func EgressConfigFromDelivery(
7777
if deadLetterSinkAddr.CACerts != nil {
7878
egressConfig.DeadLetterCACerts = *deadLetterSinkAddr.CACerts
7979
}
80+
if deadLetterSinkAddr.Audience != nil {
81+
egressConfig.DeadLetterAudience = *deadLetterSinkAddr.Audience
82+
}
8083
}
8184

8285
if delivery.Retry != nil {

control-plane/pkg/reconciler/broker/broker.go

+4
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,10 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string,
641641
}
642642
}
643643

644+
if broker.Status.Address != nil && broker.Status.Address.Audience != nil {
645+
resource.Ingress.Audience = *broker.Status.Address.Audience
646+
}
647+
644648
egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, broker, broker.Spec.Delivery, r.DefaultBackoffDelayMs)
645649
if err != nil {
646650
return nil, err

control-plane/pkg/reconciler/channel/channel.go

+13
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,12 @@ func (r *Reconciler) getSubscriberConfig(ctx context.Context, channel *messaging
618618
if subscriber.SubscriberCACerts != nil && *subscriber.SubscriberCACerts != "" {
619619
egress.DestinationCACerts = *subscriber.SubscriberCACerts
620620
}
621+
if subscriber.SubscriberAudience != nil && *subscriber.SubscriberAudience != "" {
622+
egress.DestinationAudience = *subscriber.SubscriberAudience
623+
}
624+
if subscriber.Auth != nil && subscriber.Auth.ServiceAccountName != nil {
625+
egress.OidcServiceAccountName = *subscriber.Auth.ServiceAccountName
626+
}
621627

622628
if subscriptionName != "" {
623629
egress.Reference = &contract.Reference{
@@ -634,6 +640,9 @@ func (r *Reconciler) getSubscriberConfig(ctx context.Context, channel *messaging
634640
if subscriber.ReplyCACerts != nil && *subscriber.ReplyCACerts != "" {
635641
egress.ReplyUrlCACerts = *subscriber.ReplyCACerts
636642
}
643+
if subscriber.ReplyAudience != nil && *subscriber.ReplyAudience != "" {
644+
egress.ReplyUrlAudience = *subscriber.ReplyAudience
645+
}
637646
}
638647

639648
subscriptionEgressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, subscriber.Delivery, r.DefaultBackoffDelayMs)
@@ -713,6 +722,10 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin
713722
}
714723
}
715724

725+
if channel.Status.Address != nil && channel.Status.Address.Audience != nil {
726+
resource.Ingress.Audience = *channel.Status.Address.Audience
727+
}
728+
716729
egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, channel.Spec.Delivery, r.DefaultBackoffDelayMs)
717730
if err != nil {
718731
return nil, err

control-plane/pkg/reconciler/channel/v2/channelv2.go

+4
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,10 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin
703703
}
704704
}
705705

706+
if channel.Status.Address != nil && channel.Status.Address.Audience != nil {
707+
resource.Ingress.Audience = *channel.Status.Address.Audience
708+
}
709+
706710
egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, channel.Spec.Delivery, r.DefaultBackoffDelayMs)
707711
if err != nil {
708712
return nil, err

control-plane/pkg/reconciler/consumer/consumer.go

+14
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ func (r *Reconciler) reconcileContractEgress(ctx context.Context, c *kafkaintern
141141
}
142142
c.Status.SubscriberURI = destinationAddr.URL
143143
c.Status.SubscriberCACerts = destinationAddr.CACerts
144+
c.Status.SubscriberAudience = destinationAddr.Audience
144145

145146
egressConfig := &contract.EgressConfig{}
146147
if c.Spec.Delivery != nil {
@@ -154,6 +155,9 @@ func (r *Reconciler) reconcileContractEgress(ctx context.Context, c *kafkaintern
154155
if egressConfig.DeadLetterCACerts != "" {
155156
c.Status.DeliveryStatus.DeadLetterSinkCACerts = pointer.String(egressConfig.DeadLetterCACerts)
156157
}
158+
if egressConfig.DeadLetterAudience != "" {
159+
c.Status.DeliveryStatus.DeadLetterSinkAudience = pointer.String(egressConfig.DeadLetterAudience)
160+
}
157161
}
158162

159163
egress := &contract.Egress{
@@ -176,11 +180,18 @@ func (r *Reconciler) reconcileContractEgress(ctx context.Context, c *kafkaintern
176180
if destinationAddr.CACerts != nil {
177181
egress.DestinationCACerts = *destinationAddr.CACerts
178182
}
183+
if destinationAddr.Audience != nil {
184+
egress.DestinationAudience = *destinationAddr.Audience
185+
}
179186

180187
if c.Spec.Configs.KeyType != nil {
181188
egress.KeyType = coreconfig.KeyTypeFromString(*c.Spec.Configs.KeyType)
182189
}
183190

191+
if c.Spec.OIDCServiceAccountName != nil {
192+
egress.OidcServiceAccountName = *c.Spec.OIDCServiceAccountName
193+
}
194+
184195
if err := r.reconcileReplyStrategy(ctx, c, egress); err != nil {
185196
return nil, fmt.Errorf("failed to reconcile reply strategy: %w", err)
186197
}
@@ -290,6 +301,9 @@ func (r *Reconciler) reconcileReplyStrategy(ctx context.Context, c *kafkainterna
290301
if destination.CACerts != nil {
291302
egress.ReplyUrlCACerts = *destination.CACerts
292303
}
304+
if destination.Audience != nil {
305+
egress.ReplyUrlAudience = *destination.Audience
306+
}
293307
return nil
294308
}
295309
if c.Spec.Reply.TopicReply != nil && c.Spec.Reply.TopicReply.Enabled {

control-plane/pkg/reconciler/consumergroup/consumergroup.go

+7
Original file line numberDiff line numberDiff line change
@@ -506,12 +506,18 @@ func (r *Reconciler) propagateStatus(ctx context.Context, cg *kafkainternals.Con
506506
if c.Status.SubscriberCACerts != nil {
507507
cg.Status.SubscriberCACerts = c.Status.SubscriberCACerts
508508
}
509+
if c.Status.SubscriberAudience != nil {
510+
cg.Status.SubscriberAudience = c.Status.SubscriberAudience
511+
}
509512
if c.Status.DeliveryStatus.DeadLetterSinkURI != nil {
510513
cg.Status.DeliveryStatus.DeadLetterSinkURI = c.Status.DeadLetterSinkURI
511514
}
512515
if c.Status.DeliveryStatus.DeadLetterSinkCACerts != nil {
513516
cg.Status.DeliveryStatus.DeadLetterSinkCACerts = c.Status.DeadLetterSinkCACerts
514517
}
518+
if c.Status.DeliveryStatus.DeadLetterSinkAudience != nil {
519+
cg.Status.DeliveryStatus.DeadLetterSinkAudience = c.Status.DeadLetterSinkAudience
520+
}
515521
} else if condition == nil { // Propagate only a single false condition
516522
cond := c.GetConditionSet().Manage(c.GetStatus()).GetTopLevelCondition()
517523
if cond.IsFalse() {
@@ -530,6 +536,7 @@ func (r *Reconciler) propagateStatus(ctx context.Context, cg *kafkainternals.Con
530536
}
531537
cg.Status.SubscriberURI = subscriber.URL
532538
cg.Status.SubscriberCACerts = subscriber.CACerts
539+
cg.Status.SubscriberAudience = subscriber.Audience
533540
}
534541

535542
return condition, nil

control-plane/pkg/reconciler/sink/kafka_sink.go

+5
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
211211
},
212212
}
213213
}
214+
215+
if ks.Status.Address != nil && ks.Status.Address.Audience != nil {
216+
sinkConfig.Ingress.Audience = *ks.Status.Address.Audience
217+
}
218+
214219
statusConditionManager.ConfigResolved()
215220

216221
sinkIndex := coreconfig.FindResource(ct, ks.UID)

control-plane/pkg/reconciler/source/source.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, ks *sources.Kafk
201201
expectedCg.Spec.Template.Spec.Configs.KeyType = &kt
202202
}
203203

204+
if ks.Status.Auth != nil {
205+
expectedCg.Spec.Template.Spec.OIDCServiceAccountName = ks.Status.Auth.ServiceAccountName
206+
}
207+
204208
// TODO: make keda annotation values configurable and maybe unexposed
205209
expectedCg.Annotations = keda.SetAutoscalingAnnotations(ks.Annotations)
206210

@@ -260,8 +264,9 @@ func propagateConsumerGroupStatus(cg *internalscg.ConsumerGroup, ks *sources.Kaf
260264
}
261265
}
262266
ks.Status.MarkSink(&duckv1.Addressable{
263-
URL: cg.Status.SubscriberURI,
264-
CACerts: cg.Status.SubscriberCACerts,
267+
URL: cg.Status.SubscriberURI,
268+
CACerts: cg.Status.SubscriberCACerts,
269+
Audience: cg.Status.SubscriberAudience,
265270
})
266271
ks.Status.Placeable = cg.Status.Placeable
267272
if cg.Status.Replicas != nil {

control-plane/pkg/reconciler/source/source_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -1566,6 +1566,7 @@ func TestReconcileKind(t *testing.T) {
15661566
),
15671567
ConsumerSubscriber(NewSourceSinkReference()),
15681568
ConsumerReply(ConsumerNoReply()),
1569+
ConsumerOIDCServiceAccountName(makeKafkaSourceOIDCServiceAccount().Name),
15691570
)),
15701571
ConsumerGroupReplicas(1),
15711572
),

control-plane/pkg/reconciler/testing/objects_consumer.go

+6
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,12 @@ func ConsumerTopics(topics ...string) ConsumerSpecOption {
200200
}
201201
}
202202

203+
func ConsumerOIDCServiceAccountName(sa string) ConsumerSpecOption {
204+
return func(c *kafkainternals.ConsumerSpec) {
205+
c.OIDCServiceAccountName = &sa
206+
}
207+
}
208+
203209
func ConsumerPlacement(pb kafkainternals.PodBind) ConsumerSpecOption {
204210
return func(c *kafkainternals.ConsumerSpec) {
205211
c.PodBind = &pb

control-plane/pkg/reconciler/trigger/trigger.go

+6
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,12 @@ func (r *Reconciler) reconcileTriggerEgress(ctx context.Context, broker *eventin
339339
if destination.CACerts != nil {
340340
egress.DestinationCACerts = *destination.CACerts
341341
}
342+
if destination.Audience != nil {
343+
egress.DestinationAudience = *destination.Audience
344+
}
345+
if trigger.Status.Auth != nil && trigger.Status.Auth.ServiceAccountName != nil {
346+
egress.OidcServiceAccountName = *trigger.Status.Auth.ServiceAccountName
347+
}
342348

343349
newFiltersEnabled := func() bool {
344350
r.FlagsLock.RLock()

0 commit comments

Comments
 (0)