Skip to content

Commit 720a336

Browse files
authored
Set the name value for metrics tags to the correct top-level resource (#4120)
Previously to v2, we were setting `name` to the Broker or Channel name of the resource. Signed-off-by: Pierangelo Di Pilato <[email protected]>
1 parent c338c3d commit 720a336

File tree

7 files changed

+124
-5
lines changed

7 files changed

+124
-5
lines changed

control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go

+12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package v1alpha1
1919
import (
2020
"strings"
2121

22+
corev1 "k8s.io/api/core/v1"
2223
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2324
"k8s.io/apimachinery/pkg/runtime/schema"
2425
"k8s.io/apimachinery/pkg/types"
@@ -108,6 +109,10 @@ type ConsumerGroupSpec struct {
108109
// OIDCServiceAccountName is the name of service account used for this components
109110
// OIDC authentication.
110111
OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"`
112+
113+
// TopLevelResourceRef is a reference to a top level resource.
114+
// For a ConsumerGroup associated with a Trigger, a Broker reference will be set.
115+
TopLevelResourceRef *corev1.ObjectReference `json:"topLevelResourceRef,omitempty"`
111116
}
112117

113118
type ConsumerGroupStatus struct {
@@ -210,6 +215,13 @@ func (cg *ConsumerGroup) GetUserFacingResourceRef() *metav1.OwnerReference {
210215
return nil
211216
}
212217

218+
// GetTopLevelUserFacingResourceRef gets the top level resource reference to the user-facing resources
219+
// that are backed by this ConsumerGroup using the OwnerReference list.
220+
// For example, for a Trigger, it will return a Broker reference.
221+
func (cg *ConsumerGroup) GetTopLevelUserFacingResourceRef() *corev1.ObjectReference {
222+
return cg.Spec.TopLevelResourceRef
223+
}
224+
213225
func (cg *ConsumerGroup) IsNotScheduled() bool {
214226
// We want to return true when:
215227
// - the condition isn't present, or

control-plane/pkg/reconciler/channel/channel.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,14 @@ import (
4242
"k8s.io/apimachinery/pkg/labels"
4343
"k8s.io/apimachinery/pkg/types"
4444
corelisters "k8s.io/client-go/listers/core/v1"
45-
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
46-
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources"
4745
"knative.dev/eventing/pkg/apis/feature"
4846
"knative.dev/pkg/network"
4947
"knative.dev/pkg/resolver"
5048
"knative.dev/pkg/system"
5149

50+
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
51+
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources"
52+
5253
v1 "knative.dev/eventing/pkg/apis/duck/v1"
5354
messaging "knative.dev/eventing/pkg/apis/messaging/v1"
5455
"knative.dev/pkg/apis"
@@ -602,6 +603,13 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messag
602603
},
603604
},
604605
Spec: internalscg.ConsumerGroupSpec{
606+
TopLevelResourceRef: &corev1.ObjectReference{
607+
APIVersion: messagingv1beta1.SchemeGroupVersion.String(),
608+
Kind: "KafkaChannel",
609+
Name: channel.Name,
610+
Namespace: channel.Namespace,
611+
UID: channel.UID,
612+
},
605613
Template: internalscg.ConsumerTemplateSpec{
606614
ObjectMeta: metav1.ObjectMeta{
607615
Labels: map[string]string{

control-plane/pkg/reconciler/channel/channel_test.go

+27-2
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,12 @@ import (
6262
messagingv1beta1kafkachannelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
6363

6464
"github.com/rickb777/date/period"
65+
eventingrekttesting "knative.dev/eventing/pkg/reconciler/testing/v1"
66+
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"
67+
6568
internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
6669
kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
6770
fakeconsumergroupinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake"
68-
eventingrekttesting "knative.dev/eventing/pkg/reconciler/testing/v1"
69-
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"
7071
)
7172

7273
const (
@@ -478,6 +479,7 @@ func TestReconcileKind(t *testing.T) {
478479
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
479480
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
480481
)),
482+
withChannelTopLevelResourceRef(),
481483
),
482484
},
483485
WantUpdates: []clientgotesting.UpdateActionImpl{
@@ -551,6 +553,7 @@ func TestReconcileKind(t *testing.T) {
551553
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
552554
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
553555
)),
556+
withChannelTopLevelResourceRef(),
554557
),
555558
},
556559
WantUpdates: []clientgotesting.UpdateActionImpl{
@@ -625,6 +628,7 @@ func TestReconcileKind(t *testing.T) {
625628
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
626629
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
627630
)),
631+
withChannelTopLevelResourceRef(),
628632
),
629633
},
630634
WantUpdates: []clientgotesting.UpdateActionImpl{
@@ -684,6 +688,7 @@ func TestReconcileKind(t *testing.T) {
684688
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewChannel())),
685689
WithConsumerGroupMetaLabels(OwnerAsChannelLabel),
686690
ConsumerGroupReady,
691+
withChannelTopLevelResourceRef(),
687692
),
688693
},
689694
Key: testKey,
@@ -725,6 +730,7 @@ func TestReconcileKind(t *testing.T) {
725730
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
726731
)),
727732
ConsumerGroupReady,
733+
withChannelTopLevelResourceRef(),
728734
),
729735
},
730736
},
@@ -779,6 +785,7 @@ func TestReconcileKind(t *testing.T) {
779785
)),
780786
ConsumerGroupReplicas(1),
781787
WithConsumerGroupFailed("failed to reconcile consumer group,", "internal error"),
788+
withChannelTopLevelResourceRef(),
782789
),
783790
},
784791
Key: testKey,
@@ -858,6 +865,7 @@ func TestReconcileKind(t *testing.T) {
858865
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
859866
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
860867
)),
868+
withChannelTopLevelResourceRef(),
861869
),
862870
NewConsumerGroup(
863871
WithConsumerGroupName(Subscription2UUID),
@@ -875,6 +883,7 @@ func TestReconcileKind(t *testing.T) {
875883
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription2URI)),
876884
ConsumerReply(ConsumerNoReply()),
877885
)),
886+
withChannelTopLevelResourceRef(),
878887
),
879888
},
880889
WantUpdates: []clientgotesting.UpdateActionImpl{
@@ -946,6 +955,7 @@ func TestReconcileKind(t *testing.T) {
946955
ConsumerDelivery(NewConsumerSpecDelivery(kafkasource.Ordered)),
947956
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription2URI)),
948957
)),
958+
withChannelTopLevelResourceRef(),
949959
),
950960
},
951961
Key: testKey,
@@ -967,6 +977,7 @@ func TestReconcileKind(t *testing.T) {
967977
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
968978
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
969979
)),
980+
withChannelTopLevelResourceRef(),
970981
),
971982
},
972983
WantUpdates: []clientgotesting.UpdateActionImpl{
@@ -1218,6 +1229,7 @@ func TestReconcileKind(t *testing.T) {
12181229
)),
12191230
ConsumerGroupReplicas(1),
12201231
ConsumerGroupReady,
1232+
withChannelTopLevelResourceRef(),
12211233
),
12221234
},
12231235
Key: testKey,
@@ -1324,6 +1336,7 @@ func TestReconcileKind(t *testing.T) {
13241336
)),
13251337
ConsumerGroupReplicas(1),
13261338
ConsumerGroupReady,
1339+
withChannelTopLevelResourceRef(),
13271340
),
13281341
},
13291342
Key: testKey,
@@ -1429,6 +1442,7 @@ func TestReconcileKind(t *testing.T) {
14291442
)),
14301443
ConsumerGroupReplicas(1),
14311444
ConsumerGroupReady,
1445+
withChannelTopLevelResourceRef(),
14321446
),
14331447
},
14341448
Key: testKey,
@@ -1528,6 +1542,7 @@ func TestReconcileKind(t *testing.T) {
15281542
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
15291543
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
15301544
)),
1545+
withChannelTopLevelResourceRef(),
15311546
),
15321547
},
15331548
WantUpdates: []clientgotesting.UpdateActionImpl{
@@ -2430,3 +2445,13 @@ func httpsURL(name string, namespace string) *apis.URL {
24302445
Path: fmt.Sprintf("/%s/%s", namespace, name),
24312446
}
24322447
}
2448+
2449+
func withChannelTopLevelResourceRef() ConsumerGroupOption {
2450+
return WithTopLevelResourceRef(&corev1.ObjectReference{
2451+
APIVersion: messagingv1beta.SchemeGroupVersion.String(),
2452+
Kind: "KafkaChannel",
2453+
Namespace: ChannelNamespace,
2454+
Name: ChannelName,
2455+
UID: ChannelUUID,
2456+
})
2457+
}

control-plane/pkg/reconciler/consumer/consumer.go

+34-1
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,22 @@ func (r *Reconciler) reconcileContractResource(ctx context.Context, c *kafkainte
131131
egress.VReplicas = 1
132132
}
133133

134+
topLevelUserFacingResourceRef, err := r.reconcileTopLevelUserFacingResourceRef(c)
135+
if err != nil {
136+
return nil, fmt.Errorf("failed to reconcile top-level user facing resource reference: %w", err)
137+
}
138+
if topLevelUserFacingResourceRef == nil {
139+
topLevelUserFacingResourceRef = userFacingResourceRef
140+
}
141+
134142
resource := &contract.Resource{
135143
Uid: string(c.UID),
136144
Topics: c.Spec.Topics,
137145
BootstrapServers: c.Spec.Configs.Configs["bootstrap.servers"],
138146
Egresses: []*contract.Egress{egress},
139147
Auth: nil, // Auth will be added by reconcileAuth
140148
CloudEventOverrides: reconcileCEOverrides(c),
141-
Reference: userFacingResourceRef,
149+
Reference: topLevelUserFacingResourceRef,
142150
FeatureFlags: &contract.FeatureFlags{
143151
EnableEventTypeAutocreate: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate),
144152
},
@@ -303,6 +311,31 @@ func (r *Reconciler) reconcileUserFacingResourceRef(c *kafkainternals.Consumer)
303311
return ref, nil
304312
}
305313

314+
func (r *Reconciler) reconcileTopLevelUserFacingResourceRef(c *kafkainternals.Consumer) (*contract.Reference, error) {
315+
316+
cg, err := r.ConsumerGroupLister.ConsumerGroups(c.GetNamespace()).Get(c.GetConsumerGroup().Name)
317+
if apierrors.IsNotFound(err) {
318+
return nil, nil
319+
}
320+
if err != nil {
321+
return nil, fmt.Errorf("failed to get %s: %w", kafkainternals.ConsumerGroupGroupVersionKind.Kind, err)
322+
}
323+
324+
userFacingResource := cg.GetTopLevelUserFacingResourceRef()
325+
if userFacingResource == nil {
326+
return nil, nil
327+
}
328+
329+
ref := &contract.Reference{
330+
Uuid: string(userFacingResource.UID),
331+
Namespace: c.GetNamespace(),
332+
Name: userFacingResource.Name,
333+
Kind: userFacingResource.Kind,
334+
GroupVersion: userFacingResource.APIVersion,
335+
}
336+
return ref, nil
337+
}
338+
306339
func reconcileDeliveryOrder(c *kafkainternals.Consumer) contract.DeliveryOrder {
307340
if c.Spec.Delivery == nil {
308341
return contract.DeliveryOrder_UNORDERED

control-plane/pkg/reconciler/testing/objects_consumergroup.go

+6
Original file line numberDiff line numberDiff line change
@@ -248,3 +248,9 @@ func WithConfigmapOwnerRef(ownerref *metav1.OwnerReference) reconcilertesting.Co
248248
cg.ObjectMeta.OwnerReferences = []metav1.OwnerReference{*ownerref}
249249
}
250250
}
251+
252+
func WithTopLevelResourceRef(ref *corev1.ObjectReference) ConsumerGroupOption {
253+
return func(cg *kafkainternals.ConsumerGroup) {
254+
cg.Spec.TopLevelResourceRef = ref
255+
}
256+
}

control-plane/pkg/reconciler/trigger/v2/triggerv2.go

+7
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,13 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, broker *eventin
209209
},
210210
},
211211
Spec: internalscg.ConsumerGroupSpec{
212+
TopLevelResourceRef: &corev1.ObjectReference{
213+
APIVersion: eventing.SchemeGroupVersion.String(),
214+
Kind: "Broker",
215+
Name: broker.Name,
216+
Namespace: broker.Namespace,
217+
UID: broker.UID,
218+
},
212219
Template: internalscg.ConsumerTemplateSpec{
213220
ObjectMeta: metav1.ObjectMeta{
214221
Labels: map[string]string{

0 commit comments

Comments
 (0)