Skip to content
This repository was archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Add targets reconciler
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-mi committed Apr 22, 2020
1 parent 331165b commit 29e3d34
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 233 deletions.
1 change: 0 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

193 changes: 55 additions & 138 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@ package broker

import (
"context"
"encoding/base64"
"fmt"
"time"

"cloud.google.com/go/pubsub"
"github.com/gogo/protobuf/proto"
brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1"
"github.com/google/knative-gcp/pkg/broker/config"
"github.com/google/knative-gcp/pkg/broker/config/memory"
brokerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/broker"
brokerlisters "github.com/google/knative-gcp/pkg/client/listers/broker/v1beta1"
gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub"
Expand All @@ -37,9 +33,6 @@ import (
"github.com/google/knative-gcp/pkg/utils"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -62,9 +55,7 @@ const (
topicDeleted = "TopicDeleted"
subDeleted = "SubscriptionDeleted"

targetsCMName = "broker-targets"
targetsCMKey = "targets"
targetsCMResyncPeriod = 10 * time.Second
targetsCMName = "broker-targets"

ingressServiceName = "broker-ingress"
)
Expand Down Expand Up @@ -95,13 +86,8 @@ type Reconciler struct {
// Reconciles a broker's triggers
triggerReconciler controller.Reconciler

// TODO allow configuring multiples of these
targetsConfig config.Targets

// targetsNeedsUpdate is a channel that flags the targets ConfigMap as
// needing update. This is done in a separate goroutine to avoid contention
// between multiple controller workers.
targetsNeedsUpdate chan struct{}
// Updates broker configuration
brokerConfigUpdater BrokerConfigUpdater

// projectID is used as the GCP project ID when present, skipping the
// metadata server check. Used by tests.
Expand All @@ -122,11 +108,16 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *brokerv1beta1.Broker)
// whatever info is available. or put this in a defer?
}

if err := r.reconcileTriggers(ctx, b); err != nil {
triggers, err := r.reconcileTriggers(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling triggers", zap.Error(err))
return fmt.Errorf("failed to reconcile triggers: %w", err)
}

if err := r.reconcileTargets(ctx, b, triggers); err != nil {
return fmt.Errorf("failed to reconcile targets config: %w", err)
}

return pkgreconciler.NewEvent(corev1.EventTypeNormal, brokerReconciled, "Broker reconciled: \"%s/%s\"", b.Namespace, b.Name)
}

Expand All @@ -137,7 +128,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, b *brokerv1beta1.Broker)
// Reconcile triggers so they update their status
// TODO is this the best way to reconcile triggers when their broker is
// being deleted?
if err := r.reconcileTriggers(ctx, b); err != nil {
if _, err := r.reconcileTriggers(ctx, b); err != nil {
logger.Error("Problem reconciling triggers", zap.Error(err))
return fmt.Errorf("failed to reconcile triggers: %w", err)
}
Expand Down Expand Up @@ -185,24 +176,6 @@ func (r *Reconciler) reconcileBroker(ctx context.Context, b *brokerv1beta1.Broke
}
b.Status.PropagateIngressAvailability(ingressEndpoints)

r.targetsConfig.MutateBroker(b.Namespace, b.Name, func(m config.BrokerMutation) {
m.SetID(string(b.UID))
m.SetAddress(b.Status.Address.URL.String())
m.SetDecoupleQueue(&config.Queue{
Topic: resources.GenerateDecouplingTopicName(b),
Subscription: resources.GenerateDecouplingSubscriptionName(b),
})
if b.Status.IsReady() {
m.SetState(config.State_READY)
} else {
m.SetState(config.State_UNKNOWN)
}
})

// Update config map
// TODO should this happen in a defer so there's an update regardless of
// error status, or only when reconcile is successful?
r.flagTargetsForUpdate()
return nil
}

Expand Down Expand Up @@ -368,135 +341,79 @@ func (r *Reconciler) deleteDecouplingTopicAndSubscription(ctx context.Context, b
}

// reconcileTriggers reconciles the Triggers that are pointed to this broker
func (r *Reconciler) reconcileTriggers(ctx context.Context, b *brokerv1beta1.Broker) error {

func (r *Reconciler) reconcileTriggers(ctx context.Context, b *brokerv1beta1.Broker) ([]*brokerv1beta1.Trigger, error) {
// Filter by `eventing.knative.dev/broker: <name>` here
// to get only the triggers for this broker. The trigger webhook will
// ensure that triggers are always labeled with their broker name.
triggers, err := r.triggerLister.Triggers(b.Namespace).List(
allTriggers, err := r.triggerLister.Triggers(b.Namespace).List(
labels.SelectorFromSet(map[string]string{eventing.BrokerLabelKey: b.Name}))
if err != nil {
return err
return nil, err
}

ctx = contextWithBroker(ctx, b)
ctx = contextWithTargets(ctx, r.targetsConfig)
for _, t := range triggers {
var triggers []*brokerv1beta1.Trigger
for _, t := range allTriggers {
if t.Spec.Broker == b.Name {
logger := logging.FromContext(ctx).With(zap.String("trigger", t.Name), zap.String("broker", b.Name))
ctx = logging.WithLogger(ctx, logger)

if tKey, err := cache.MetaNamespaceKeyFunc(t); err == nil {
err = r.triggerReconciler.Reconcile(ctx, tKey)
}
triggers = append(triggers, t)
}
}

//TODO aggregate errors?
return err
return triggers, err
}

//TODO all this stuff should be in a configmap variant of the config object

// This function is not thread-safe and should only be executed by
// TargetsConfigUpdater
func (r *Reconciler) updateTargetsConfig(ctx context.Context) error {
//TODO resources package?
data, err := r.targetsConfig.Bytes()
if err != nil {
return fmt.Errorf("error serializing targets config: %w", err)
}
desired := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: targetsCMName,
Namespace: system.Namespace(),
// reconcileTriggers reconciles the Triggers that are pointed to this broker
func (r *Reconciler) reconcileTargets(ctx context.Context, b *brokerv1beta1.Broker, triggers []*brokerv1beta1.Trigger) error {
brokerConfig := &config.Broker{
Id: string(b.UID),
Name: b.Name,
Namespace: b.Namespace,
Address: b.Status.Address.URL.String(),
DecoupleQueue: &config.Queue{
Topic: resources.GenerateDecouplingTopicName(b),
Subscription: resources.GenerateDecouplingSubscriptionName(b),
},
BinaryData: map[string][]byte{targetsCMKey: data},
// Write out the text version for debugging purposes only
Data: map[string]string{"targets.txt": r.targetsConfig.String()},
}

r.Logger.Debug("Current targets config", zap.Any("targetsConfig", r.targetsConfig.String()))

existing, err := r.configMapLister.ConfigMaps(desired.Namespace).Get(desired.Name)
if errors.IsNotFound(err) {
r.Logger.Debug("Creating targets ConfigMap", zap.String("namespace", desired.Namespace), zap.String("name", desired.Name))
_, err = r.KubeClientSet.CoreV1().ConfigMaps(desired.Namespace).Create(desired)
if err != nil {
return fmt.Errorf("error creating targets ConfigMap: %w", err)
}
} else if err != nil {
return fmt.Errorf("error getting targets ConfigMap: %w", err)
Targets: make(map[string]*config.Target),
}

r.Logger.Debug("Compare targets ConfigMap", zap.Any("existing", base64.StdEncoding.EncodeToString(existing.BinaryData[targetsCMKey])), zap.String("desired", base64.StdEncoding.EncodeToString(desired.BinaryData[targetsCMKey])))
if !equality.Semantic.DeepEqual(desired.BinaryData, existing.BinaryData) {
r.Logger.Debug("Updating targets ConfigMap")
_, err = r.KubeClientSet.CoreV1().ConfigMaps(desired.Namespace).Update(desired)
if err != nil {
return fmt.Errorf("error updating targets ConfigMap: %w", err)
}
if b.Status.IsReady() {
brokerConfig.State = config.State_READY
} else {
brokerConfig.State = config.State_UNKNOWN
}
return nil
}

// LoadTargetsConfig retrieves the targets ConfigMap and
// populates the targets config struct.
func (r *Reconciler) LoadTargetsConfig(ctx context.Context) error {
r.Logger.Debug("Loading targets config from configmap")
//TODO should we use the apiserver here?
// kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(targetsCMName. metav1.GetOptions{})
//TODO retry with wait.ExponentialBackoff
existing, err := r.configMapLister.ConfigMaps(system.Namespace()).Get(targetsCMName)
if err != nil {
if errors.IsNotFound(err) {
r.targetsConfig = memory.NewEmptyTargets()
return nil
for _, t := range triggers {
if t.DeletionTimestamp == nil {
brokerConfig.Targets[t.Name] = triggerTargetConfig(b, t)
}
return fmt.Errorf("error getting targets ConfigMap: %w", err)
}

targets := &config.TargetsConfig{}
data := existing.BinaryData[targetsCMKey]
if err := proto.Unmarshal(data, targets); err != nil {
return err
}

r.targetsConfig = memory.NewTargets(targets)
r.Logger.Debug("Loaded targets config from ConfigMap", zap.String("resourceVersion", existing.ResourceVersion))
return nil
return r.brokerConfigUpdater.UpdateBrokerConfig(ctx, brokerConfig)
}

func (r *Reconciler) TargetsConfigUpdater(ctx context.Context) {
r.Logger.Debug("Starting TargetsConfigUpdater")
// check every 10 seconds even if no reconciles have occurred
ticker := time.NewTicker(targetsCMResyncPeriod)

//TODO configmap cleanup: if any brokers are in deleted state with no triggers
// (or all triggers are in deleted state), remove that entry

for {
select {
case <-ctx.Done():
r.Logger.Debug("Stopping TargetsConfigUpdater")
return
case <-r.targetsNeedsUpdate:
if err := r.updateTargetsConfig(ctx); err != nil {
r.Logger.Error("Error in TargetsConfigUpdater: %w", err)
}
case <-ticker.C:
if err := r.updateTargetsConfig(ctx); err != nil {
r.Logger.Error("Error in TargetsConfigUpdater: %w", err)
}
}
func triggerTargetConfig(b *brokerv1beta1.Broker, t *brokerv1beta1.Trigger) *config.Target {
target := &config.Target{
Id: string(t.UID),
Name: t.Name,
Namespace: t.Namespace,
Broker: b.Name,
Address: t.Status.SubscriberURI.String(),
RetryQueue: &config.Queue{
Topic: resources.GenerateRetryTopicName(t),
Subscription: resources.GenerateRetrySubscriptionName(t),
},
}
}

func (r *Reconciler) flagTargetsForUpdate() {
select {
case r.targetsNeedsUpdate <- struct{}{}:
r.Logger.Debug("Flagged targets for update")
default:
r.Logger.Debug("Flagged targets for update but already flagged")
if t.Spec.Filter != nil && t.Spec.Filter.Attributes != nil {
target.FilterAttributes = t.Spec.Filter.Attributes
}
if t.Status.IsReady() {
target.State = config.State_READY
} else {
target.State = config.State_UNKNOWN
}
return target
}
Loading

0 comments on commit 29e3d34

Please sign in to comment.