Skip to content
This repository was archived by the owner on Jun 19, 2022. It is now read-only.

[WIP] Add Targets Config Map Reconciler #914

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

193 changes: 55 additions & 138 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@ package broker

import (
"context"
"encoding/base64"
"fmt"
"time"

"cloud.google.com/go/pubsub"
"github.com/gogo/protobuf/proto"
brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1"
"github.com/google/knative-gcp/pkg/broker/config"
"github.com/google/knative-gcp/pkg/broker/config/memory"
brokerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/broker"
brokerlisters "github.com/google/knative-gcp/pkg/client/listers/broker/v1beta1"
gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub"
Expand All @@ -37,9 +33,6 @@ import (
"github.com/google/knative-gcp/pkg/utils"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -62,9 +55,7 @@ const (
topicDeleted = "TopicDeleted"
subDeleted = "SubscriptionDeleted"

targetsCMName = "broker-targets"
targetsCMKey = "targets"
targetsCMResyncPeriod = 10 * time.Second
targetsCMName = "broker-targets"

ingressServiceName = "broker-ingress"
)
Expand Down Expand Up @@ -95,13 +86,8 @@ type Reconciler struct {
// Reconciles a broker's triggers
triggerReconciler controller.Reconciler

// TODO allow configuring multiples of these
targetsConfig config.Targets

// targetsNeedsUpdate is a channel that flags the targets ConfigMap as
// needing update. This is done in a separate goroutine to avoid contention
// between multiple controller workers.
targetsNeedsUpdate chan struct{}
// Updates broker configuration
brokerConfigUpdater BrokerConfigUpdater

// projectID is used as the GCP project ID when present, skipping the
// metadata server check. Used by tests.
Expand All @@ -122,11 +108,16 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *brokerv1beta1.Broker)
// whatever info is available. or put this in a defer?
}

if err := r.reconcileTriggers(ctx, b); err != nil {
triggers, err := r.reconcileTriggers(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling triggers", zap.Error(err))
return fmt.Errorf("failed to reconcile triggers: %w", err)
}

if err := r.reconcileTargets(ctx, b, triggers); err != nil {
return fmt.Errorf("failed to reconcile targets config: %w", err)
}

return pkgreconciler.NewEvent(corev1.EventTypeNormal, brokerReconciled, "Broker reconciled: \"%s/%s\"", b.Namespace, b.Name)
}

Expand All @@ -137,7 +128,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, b *brokerv1beta1.Broker)
// Reconcile triggers so they update their status
// TODO is this the best way to reconcile triggers when their broker is
// being deleted?
if err := r.reconcileTriggers(ctx, b); err != nil {
if _, err := r.reconcileTriggers(ctx, b); err != nil {
logger.Error("Problem reconciling triggers", zap.Error(err))
return fmt.Errorf("failed to reconcile triggers: %w", err)
}
Expand Down Expand Up @@ -185,24 +176,6 @@ func (r *Reconciler) reconcileBroker(ctx context.Context, b *brokerv1beta1.Broke
}
b.Status.PropagateIngressAvailability(ingressEndpoints)

r.targetsConfig.MutateBroker(b.Namespace, b.Name, func(m config.BrokerMutation) {
m.SetID(string(b.UID))
m.SetAddress(b.Status.Address.URL.String())
m.SetDecoupleQueue(&config.Queue{
Topic: resources.GenerateDecouplingTopicName(b),
Subscription: resources.GenerateDecouplingSubscriptionName(b),
})
if b.Status.IsReady() {
m.SetState(config.State_READY)
} else {
m.SetState(config.State_UNKNOWN)
}
})

// Update config map
// TODO should this happen in a defer so there's an update regardless of
// error status, or only when reconcile is successful?
r.flagTargetsForUpdate()
return nil
}

Expand Down Expand Up @@ -368,135 +341,79 @@ func (r *Reconciler) deleteDecouplingTopicAndSubscription(ctx context.Context, b
}

// reconcileTriggers reconciles the Triggers that are pointed to this broker
func (r *Reconciler) reconcileTriggers(ctx context.Context, b *brokerv1beta1.Broker) error {

func (r *Reconciler) reconcileTriggers(ctx context.Context, b *brokerv1beta1.Broker) ([]*brokerv1beta1.Trigger, error) {
// Filter by `eventing.knative.dev/broker: <name>` here
// to get only the triggers for this broker. The trigger webhook will
// ensure that triggers are always labeled with their broker name.
triggers, err := r.triggerLister.Triggers(b.Namespace).List(
allTriggers, err := r.triggerLister.Triggers(b.Namespace).List(
labels.SelectorFromSet(map[string]string{eventing.BrokerLabelKey: b.Name}))
if err != nil {
return err
return nil, err
}

ctx = contextWithBroker(ctx, b)
ctx = contextWithTargets(ctx, r.targetsConfig)
for _, t := range triggers {
var triggers []*brokerv1beta1.Trigger
for _, t := range allTriggers {
if t.Spec.Broker == b.Name {
logger := logging.FromContext(ctx).With(zap.String("trigger", t.Name), zap.String("broker", b.Name))
ctx = logging.WithLogger(ctx, logger)

if tKey, err := cache.MetaNamespaceKeyFunc(t); err == nil {
err = r.triggerReconciler.Reconcile(ctx, tKey)
}
triggers = append(triggers, t)
}
}

//TODO aggregate errors?
return err
return triggers, err
}

//TODO all this stuff should be in a configmap variant of the config object

// This function is not thread-safe and should only be executed by
// TargetsConfigUpdater
func (r *Reconciler) updateTargetsConfig(ctx context.Context) error {
//TODO resources package?
data, err := r.targetsConfig.Bytes()
if err != nil {
return fmt.Errorf("error serializing targets config: %w", err)
}
desired := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: targetsCMName,
Namespace: system.Namespace(),
// reconcileTriggers reconciles the Triggers that are pointed to this broker
func (r *Reconciler) reconcileTargets(ctx context.Context, b *brokerv1beta1.Broker, triggers []*brokerv1beta1.Trigger) error {
brokerConfig := &config.Broker{
Id: string(b.UID),
Name: b.Name,
Namespace: b.Namespace,
Address: b.Status.Address.URL.String(),
DecoupleQueue: &config.Queue{
Topic: resources.GenerateDecouplingTopicName(b),
Subscription: resources.GenerateDecouplingSubscriptionName(b),
},
BinaryData: map[string][]byte{targetsCMKey: data},
// Write out the text version for debugging purposes only
Data: map[string]string{"targets.txt": r.targetsConfig.String()},
}

r.Logger.Debug("Current targets config", zap.Any("targetsConfig", r.targetsConfig.String()))

existing, err := r.configMapLister.ConfigMaps(desired.Namespace).Get(desired.Name)
if errors.IsNotFound(err) {
r.Logger.Debug("Creating targets ConfigMap", zap.String("namespace", desired.Namespace), zap.String("name", desired.Name))
_, err = r.KubeClientSet.CoreV1().ConfigMaps(desired.Namespace).Create(desired)
if err != nil {
return fmt.Errorf("error creating targets ConfigMap: %w", err)
}
} else if err != nil {
return fmt.Errorf("error getting targets ConfigMap: %w", err)
Targets: make(map[string]*config.Target),
}

r.Logger.Debug("Compare targets ConfigMap", zap.Any("existing", base64.StdEncoding.EncodeToString(existing.BinaryData[targetsCMKey])), zap.String("desired", base64.StdEncoding.EncodeToString(desired.BinaryData[targetsCMKey])))
if !equality.Semantic.DeepEqual(desired.BinaryData, existing.BinaryData) {
r.Logger.Debug("Updating targets ConfigMap")
_, err = r.KubeClientSet.CoreV1().ConfigMaps(desired.Namespace).Update(desired)
if err != nil {
return fmt.Errorf("error updating targets ConfigMap: %w", err)
}
if b.Status.IsReady() {
brokerConfig.State = config.State_READY
} else {
brokerConfig.State = config.State_UNKNOWN
}
return nil
}

// LoadTargetsConfig retrieves the targets ConfigMap and
// populates the targets config struct.
func (r *Reconciler) LoadTargetsConfig(ctx context.Context) error {
r.Logger.Debug("Loading targets config from configmap")
//TODO should we use the apiserver here?
// kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(targetsCMName. metav1.GetOptions{})
//TODO retry with wait.ExponentialBackoff
existing, err := r.configMapLister.ConfigMaps(system.Namespace()).Get(targetsCMName)
if err != nil {
if errors.IsNotFound(err) {
r.targetsConfig = memory.NewEmptyTargets()
return nil
for _, t := range triggers {
if t.DeletionTimestamp == nil {
brokerConfig.Targets[t.Name] = triggerTargetConfig(b, t)
}
return fmt.Errorf("error getting targets ConfigMap: %w", err)
}

targets := &config.TargetsConfig{}
data := existing.BinaryData[targetsCMKey]
if err := proto.Unmarshal(data, targets); err != nil {
return err
}

r.targetsConfig = memory.NewTargets(targets)
r.Logger.Debug("Loaded targets config from ConfigMap", zap.String("resourceVersion", existing.ResourceVersion))
return nil
return r.brokerConfigUpdater.UpdateBrokerConfig(ctx, brokerConfig)
}

func (r *Reconciler) TargetsConfigUpdater(ctx context.Context) {
r.Logger.Debug("Starting TargetsConfigUpdater")
// check every 10 seconds even if no reconciles have occurred
ticker := time.NewTicker(targetsCMResyncPeriod)

//TODO configmap cleanup: if any brokers are in deleted state with no triggers
// (or all triggers are in deleted state), remove that entry

for {
select {
case <-ctx.Done():
r.Logger.Debug("Stopping TargetsConfigUpdater")
return
case <-r.targetsNeedsUpdate:
if err := r.updateTargetsConfig(ctx); err != nil {
r.Logger.Error("Error in TargetsConfigUpdater: %w", err)
}
case <-ticker.C:
if err := r.updateTargetsConfig(ctx); err != nil {
r.Logger.Error("Error in TargetsConfigUpdater: %w", err)
}
}
func triggerTargetConfig(b *brokerv1beta1.Broker, t *brokerv1beta1.Trigger) *config.Target {
target := &config.Target{
Id: string(t.UID),
Name: t.Name,
Namespace: t.Namespace,
Broker: b.Name,
Address: t.Status.SubscriberURI.String(),
RetryQueue: &config.Queue{
Topic: resources.GenerateRetryTopicName(t),
Subscription: resources.GenerateRetrySubscriptionName(t),
},
}
}

func (r *Reconciler) flagTargetsForUpdate() {
select {
case r.targetsNeedsUpdate <- struct{}{}:
r.Logger.Debug("Flagged targets for update")
default:
r.Logger.Debug("Flagged targets for update but already flagged")
if t.Spec.Filter != nil && t.Spec.Filter.Attributes != nil {
target.FilterAttributes = t.Spec.Filter.Attributes
}
if t.Status.IsReady() {
target.State = config.State_READY
} else {
target.State = config.State_UNKNOWN
}
return target
}
Loading