Skip to content

Commit

Permalink
List applying policies in KafkaChannel (#4089)
Browse files Browse the repository at this point in the history
* list applying policies in kafka channel

Signed-off-by: rahulii <[email protected]>

* watch eventpolicies for kafkachannel controller

Signed-off-by: rahulii <[email protected]>

* update kafka channel status

Signed-off-by: rahulii <[email protected]>

* add testing helper funcs for eventpolicies for kafkachannel

Signed-off-by: rahulii <[email protected]>

* add unit tests

Signed-off-by: rahulii <[email protected]>

* minor fix

Signed-off-by: rahulii <[email protected]>

* fix review comments from creydr

Signed-off-by: rahulii <[email protected]>

---------

Signed-off-by: rahulii <[email protected]>
  • Loading branch information
rahulii authored Sep 2, 2024
1 parent 5825486 commit ea0f055
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
// KafkaChannelConditionChannelServiceReady has status True when the K8S Service representing the channel
// is ready. Because this uses ExternalName, there are no endpoints to check.
KafkaChannelConditionChannelServiceReady apis.ConditionType = "ChannelServiceReady"

ConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
)

// RegisterAlternateKafkaChannelConditionSet register a different apis.ConditionSet.
Expand Down Expand Up @@ -129,3 +131,19 @@ func (kcs *KafkaChannelStatus) MarkChannelServiceFailed(reason, messageFormat st
func (kcs *KafkaChannelStatus) MarkChannelServiceTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(KafkaChannelConditionChannelServiceReady)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(ConditionEventPoliciesReady)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkTrueWithReason(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkFalse(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkUnknown(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}
12 changes: 11 additions & 1 deletion control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
audience = nil
}

applyingEventPolicies, err := auth.GetEventPoliciesForResource(r.EventPolicyLister, messagingv1beta1.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta)
if err != nil {
return fmt.Errorf("could not get applying eventpolicies for kafkaChannel: %v", err)
}

// Get resource configuration
channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig, audience, nil)
channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig, audience, applyingEventPolicies)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}
Expand Down Expand Up @@ -265,6 +270,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
logger.Debug("Contract config map updated")
statusConditionManager.ConfigMapUpdated()

err = auth.UpdateStatusWithProvidedEventPolicies(featureFlags, &channel.Status.AppliedEventPoliciesStatus, &channel.Status, applyingEventPolicies)
if err != nil {
return fmt.Errorf("could not update KafkaChannel status with EventPolicies: %v", err)
}

// We update receiver pods annotation regardless of our contract changed or not due to the fact
// that in a previous reconciliation we might have failed to update one of our data plane pod annotation, so we want
// to anyway update remaining annotations with the contract generation that was saved in the CM.
Expand Down
Loading

0 comments on commit ea0f055

Please sign in to comment.