diff --git a/Gopkg.lock b/Gopkg.lock index e6a25b8c63..980aebe5dd 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1816,7 +1816,6 @@ "github.com/cloudevents/sdk-go/v2/protocol/http", "github.com/cloudevents/sdk-go/v2/protocol/pubsub", "github.com/fsnotify/fsnotify", - "github.com/gogo/protobuf/proto", "github.com/golang/protobuf/jsonpb", "github.com/golang/protobuf/proto", "github.com/golang/protobuf/ptypes", diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index e0637b9577..1a312e6f2e 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -20,15 +20,11 @@ package broker import ( "context" - "encoding/base64" "fmt" - "time" "cloud.google.com/go/pubsub" - "github.com/gogo/protobuf/proto" brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" "github.com/google/knative-gcp/pkg/broker/config" - "github.com/google/knative-gcp/pkg/broker/config/memory" brokerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/broker" brokerlisters "github.com/google/knative-gcp/pkg/client/listers/broker/v1beta1" gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" @@ -37,9 +33,6 @@ import ( "github.com/google/knative-gcp/pkg/utils" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -62,9 +55,7 @@ const ( topicDeleted = "TopicDeleted" subDeleted = "SubscriptionDeleted" - targetsCMName = "broker-targets" - targetsCMKey = "targets" - targetsCMResyncPeriod = 10 * time.Second + targetsCMName = "broker-targets" ingressServiceName = "broker-ingress" ) @@ -95,13 +86,8 @@ type Reconciler struct { // Reconciles a broker's triggers triggerReconciler controller.Reconciler - // TODO allow configuring multiples of these - targetsConfig config.Targets - - // targetsNeedsUpdate is a channel that flags the targets ConfigMap as - // needing update. This is done in a separate goroutine to avoid contention - // between multiple controller workers. - targetsNeedsUpdate chan struct{} + // Updates broker configuration + brokerConfigUpdater BrokerConfigUpdater // projectID is used as the GCP project ID when present, skipping the // metadata server check. Used by tests. @@ -122,11 +108,16 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *brokerv1beta1.Broker) // whatever info is available. or put this in a defer? } - if err := r.reconcileTriggers(ctx, b); err != nil { + triggers, err := r.reconcileTriggers(ctx, b) + if err != nil { logging.FromContext(ctx).Error("Problem reconciling triggers", zap.Error(err)) return fmt.Errorf("failed to reconcile triggers: %w", err) } + if err := r.reconcileTargets(ctx, b, triggers); err != nil { + return fmt.Errorf("failed to reconcile targets config: %w", err) + } + return pkgreconciler.NewEvent(corev1.EventTypeNormal, brokerReconciled, "Broker reconciled: \"%s/%s\"", b.Namespace, b.Name) } @@ -137,7 +128,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, b *brokerv1beta1.Broker) // Reconcile triggers so they update their status // TODO is this the best way to reconcile triggers when their broker is // being deleted? - if err := r.reconcileTriggers(ctx, b); err != nil { + if _, err := r.reconcileTriggers(ctx, b); err != nil { logger.Error("Problem reconciling triggers", zap.Error(err)) return fmt.Errorf("failed to reconcile triggers: %w", err) } @@ -185,24 +176,6 @@ func (r *Reconciler) reconcileBroker(ctx context.Context, b *brokerv1beta1.Broke } b.Status.PropagateIngressAvailability(ingressEndpoints) - r.targetsConfig.MutateBroker(b.Namespace, b.Name, func(m config.BrokerMutation) { - m.SetID(string(b.UID)) - m.SetAddress(b.Status.Address.URL.String()) - m.SetDecoupleQueue(&config.Queue{ - Topic: resources.GenerateDecouplingTopicName(b), - Subscription: resources.GenerateDecouplingSubscriptionName(b), - }) - if b.Status.IsReady() { - m.SetState(config.State_READY) - } else { - m.SetState(config.State_UNKNOWN) - } - }) - - // Update config map - // TODO should this happen in a defer so there's an update regardless of - // error status, or only when reconcile is successful? - r.flagTargetsForUpdate() return nil } @@ -368,20 +341,19 @@ func (r *Reconciler) deleteDecouplingTopicAndSubscription(ctx context.Context, b } // reconcileTriggers reconciles the Triggers that are pointed to this broker -func (r *Reconciler) reconcileTriggers(ctx context.Context, b *brokerv1beta1.Broker) error { - +func (r *Reconciler) reconcileTriggers(ctx context.Context, b *brokerv1beta1.Broker) ([]*brokerv1beta1.Trigger, error) { // Filter by `eventing.knative.dev/broker: ` here // to get only the triggers for this broker. The trigger webhook will // ensure that triggers are always labeled with their broker name. - triggers, err := r.triggerLister.Triggers(b.Namespace).List( + allTriggers, err := r.triggerLister.Triggers(b.Namespace).List( labels.SelectorFromSet(map[string]string{eventing.BrokerLabelKey: b.Name})) if err != nil { - return err + return nil, err } ctx = contextWithBroker(ctx, b) - ctx = contextWithTargets(ctx, r.targetsConfig) - for _, t := range triggers { + var triggers []*brokerv1beta1.Trigger + for _, t := range allTriggers { if t.Spec.Broker == b.Name { logger := logging.FromContext(ctx).With(zap.String("trigger", t.Name), zap.String("broker", b.Name)) ctx = logging.WithLogger(ctx, logger) @@ -389,114 +361,59 @@ func (r *Reconciler) reconcileTriggers(ctx context.Context, b *brokerv1beta1.Bro if tKey, err := cache.MetaNamespaceKeyFunc(t); err == nil { err = r.triggerReconciler.Reconcile(ctx, tKey) } + triggers = append(triggers, t) } } - - //TODO aggregate errors? - return err + return triggers, err } -//TODO all this stuff should be in a configmap variant of the config object - -// This function is not thread-safe and should only be executed by -// TargetsConfigUpdater -func (r *Reconciler) updateTargetsConfig(ctx context.Context) error { - //TODO resources package? - data, err := r.targetsConfig.Bytes() - if err != nil { - return fmt.Errorf("error serializing targets config: %w", err) - } - desired := &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: targetsCMName, - Namespace: system.Namespace(), +// reconcileTriggers reconciles the Triggers that are pointed to this broker +func (r *Reconciler) reconcileTargets(ctx context.Context, b *brokerv1beta1.Broker, triggers []*brokerv1beta1.Trigger) error { + brokerConfig := &config.Broker{ + Id: string(b.UID), + Name: b.Name, + Namespace: b.Namespace, + Address: b.Status.Address.URL.String(), + DecoupleQueue: &config.Queue{ + Topic: resources.GenerateDecouplingTopicName(b), + Subscription: resources.GenerateDecouplingSubscriptionName(b), }, - BinaryData: map[string][]byte{targetsCMKey: data}, - // Write out the text version for debugging purposes only - Data: map[string]string{"targets.txt": r.targetsConfig.String()}, - } - - r.Logger.Debug("Current targets config", zap.Any("targetsConfig", r.targetsConfig.String())) - - existing, err := r.configMapLister.ConfigMaps(desired.Namespace).Get(desired.Name) - if errors.IsNotFound(err) { - r.Logger.Debug("Creating targets ConfigMap", zap.String("namespace", desired.Namespace), zap.String("name", desired.Name)) - _, err = r.KubeClientSet.CoreV1().ConfigMaps(desired.Namespace).Create(desired) - if err != nil { - return fmt.Errorf("error creating targets ConfigMap: %w", err) - } - } else if err != nil { - return fmt.Errorf("error getting targets ConfigMap: %w", err) + Targets: make(map[string]*config.Target), } - - r.Logger.Debug("Compare targets ConfigMap", zap.Any("existing", base64.StdEncoding.EncodeToString(existing.BinaryData[targetsCMKey])), zap.String("desired", base64.StdEncoding.EncodeToString(desired.BinaryData[targetsCMKey]))) - if !equality.Semantic.DeepEqual(desired.BinaryData, existing.BinaryData) { - r.Logger.Debug("Updating targets ConfigMap") - _, err = r.KubeClientSet.CoreV1().ConfigMaps(desired.Namespace).Update(desired) - if err != nil { - return fmt.Errorf("error updating targets ConfigMap: %w", err) - } + if b.Status.IsReady() { + brokerConfig.State = config.State_READY + } else { + brokerConfig.State = config.State_UNKNOWN } - return nil -} -// LoadTargetsConfig retrieves the targets ConfigMap and -// populates the targets config struct. -func (r *Reconciler) LoadTargetsConfig(ctx context.Context) error { - r.Logger.Debug("Loading targets config from configmap") - //TODO should we use the apiserver here? - // kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(targetsCMName. metav1.GetOptions{}) - //TODO retry with wait.ExponentialBackoff - existing, err := r.configMapLister.ConfigMaps(system.Namespace()).Get(targetsCMName) - if err != nil { - if errors.IsNotFound(err) { - r.targetsConfig = memory.NewEmptyTargets() - return nil + for _, t := range triggers { + if t.DeletionTimestamp == nil { + brokerConfig.Targets[t.Name] = triggerTargetConfig(b, t) } - return fmt.Errorf("error getting targets ConfigMap: %w", err) } - targets := &config.TargetsConfig{} - data := existing.BinaryData[targetsCMKey] - if err := proto.Unmarshal(data, targets); err != nil { - return err - } - - r.targetsConfig = memory.NewTargets(targets) - r.Logger.Debug("Loaded targets config from ConfigMap", zap.String("resourceVersion", existing.ResourceVersion)) - return nil + return r.brokerConfigUpdater.UpdateBrokerConfig(ctx, brokerConfig) } -func (r *Reconciler) TargetsConfigUpdater(ctx context.Context) { - r.Logger.Debug("Starting TargetsConfigUpdater") - // check every 10 seconds even if no reconciles have occurred - ticker := time.NewTicker(targetsCMResyncPeriod) - - //TODO configmap cleanup: if any brokers are in deleted state with no triggers - // (or all triggers are in deleted state), remove that entry - - for { - select { - case <-ctx.Done(): - r.Logger.Debug("Stopping TargetsConfigUpdater") - return - case <-r.targetsNeedsUpdate: - if err := r.updateTargetsConfig(ctx); err != nil { - r.Logger.Error("Error in TargetsConfigUpdater: %w", err) - } - case <-ticker.C: - if err := r.updateTargetsConfig(ctx); err != nil { - r.Logger.Error("Error in TargetsConfigUpdater: %w", err) - } - } +func triggerTargetConfig(b *brokerv1beta1.Broker, t *brokerv1beta1.Trigger) *config.Target { + target := &config.Target{ + Id: string(t.UID), + Name: t.Name, + Namespace: t.Namespace, + Broker: b.Name, + Address: t.Status.SubscriberURI.String(), + RetryQueue: &config.Queue{ + Topic: resources.GenerateRetryTopicName(t), + Subscription: resources.GenerateRetrySubscriptionName(t), + }, } -} - -func (r *Reconciler) flagTargetsForUpdate() { - select { - case r.targetsNeedsUpdate <- struct{}{}: - r.Logger.Debug("Flagged targets for update") - default: - r.Logger.Debug("Flagged targets for update but already flagged") + if t.Spec.Filter != nil && t.Spec.Filter.Attributes != nil { + target.FilterAttributes = t.Spec.Filter.Attributes + } + if t.Status.IsReady() { + target.State = config.State_READY + } else { + target.State = config.State_UNKNOWN } + return target } diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index 6cab27ee5b..f7abe9363a 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -22,12 +22,14 @@ import ( "testing" brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" - "github.com/google/knative-gcp/pkg/broker/config/memory" + "github.com/google/knative-gcp/pkg/broker/config" "github.com/google/knative-gcp/pkg/client/injection/ducks/duck/v1alpha1/resource" brokerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/broker" gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" "github.com/google/knative-gcp/pkg/reconciler" . "github.com/google/knative-gcp/pkg/reconciler/testing" + "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" @@ -56,6 +58,9 @@ const ( var ( testKey = fmt.Sprintf("%s/%s", testNS, brokerName) + decoupleTopic = fmt.Sprintf("cre-bkr_%s_%s_%s", testNS, brokerName, testUID) + decoupleSubscription = fmt.Sprintf("cre-bkr_%s_%s_%s", testNS, brokerName, testUID) + brokerFinalizerUpdatedEvent = Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "test-broker" finalizers`) brokerReconciledEvent = Eventf(corev1.EventTypeNormal, "BrokerReconciled", `Broker reconciled: "testnamespace/test-broker"`) brokerFinalizedEvent = Eventf(corev1.EventTypeNormal, "BrokerFinalized", `Broker finalized: "testnamespace/test-broker"`) @@ -65,6 +70,22 @@ var ( Host: fmt.Sprintf("%s.%s.svc.%s", ingressServiceName, systemNS, utils.GetClusterDomainName()), Path: fmt.Sprintf("/%s/%s", testNS, brokerName), } + + targetsConfig = &config.TargetsConfig{ + Brokers: map[string]*config.Broker{ + testKey: &config.Broker{ + Id: testUID, + Name: brokerName, + Namespace: testNS, + Address: brokerAddress.String(), + DecoupleQueue: &config.Queue{ + Topic: decoupleTopic, + Subscription: decoupleSubscription, + }, + State: config.State_READY, + }, + }, + } ) func init() { @@ -136,6 +157,16 @@ func TestAllCases(t *testing.T) { NewEndpoints(ingressServiceName, systemNS, WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), }, + SkipNamespaceValidation: true, + WantCreates: []runtime.Object{ + NewTargetsConfigMap(targetsConfig), + }, + WantUpdates: []clientgotesting.UpdateActionImpl{ + clientgotesting.NewUpdateAction( + corev1.SchemeGroupVersion.WithResource(string(corev1.ResourceConfigMaps)), + systemNS, + NewTargetsConfigMap(targetsConfig)), + }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, WithBrokerClass(brokerv1beta1.BrokerClass), @@ -159,19 +190,33 @@ func TestAllCases(t *testing.T) { ctx = addressable.WithDuck(ctx) ctx = resource.WithDuck(ctx) r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - triggerLister: listers.GetTriggerLister(), - configMapLister: listers.GetConfigMapLister(), - endpointsLister: listers.GetEndpointsLister(), - CreateClientFn: gpubsub.TestClientCreator(testData["ps"]), - targetsConfig: memory.NewEmptyTargets(), - targetsNeedsUpdate: make(chan struct{}), - projectID: testProject, + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + triggerLister: listers.GetTriggerLister(), + configMapLister: listers.GetConfigMapLister(), + endpointsLister: listers.GetEndpointsLister(), + CreateClientFn: gpubsub.TestClientCreator(testData["ps"]), + projectID: testProject, } + r.brokerConfigUpdater = NewTargetsReconciler(ctx, r.KubeClientSet, systemNS, targetsCMName) return brokerreconciler.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetBrokerLister(), r.Recorder, r, brokerv1beta1.BrokerClass) })) } +func NewTargetsConfigMap(c *config.TargetsConfig) *corev1.ConfigMap { + b, err := proto.Marshal(c) + if err != nil { + return nil + } + return NewConfigMap(targetsCMName, systemNS, + WithData(map[string]string{ + "targets.txt": prototext.Format(c), + }), + WithBinaryData(map[string][]uint8{ + "targets": b, + }), + ) +} + func patchFinalizers(namespace, name, finalizer string) clientgotesting.PatchActionImpl { action := clientgotesting.PatchActionImpl{} action.Name = name diff --git a/pkg/reconciler/broker/controller.go b/pkg/reconciler/broker/controller.go index a048dcc385..9f3fb4df40 100644 --- a/pkg/reconciler/broker/controller.go +++ b/pkg/reconciler/broker/controller.go @@ -20,7 +20,6 @@ import ( "context" brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" - "github.com/google/knative-gcp/pkg/broker/config/memory" injectionclient "github.com/google/knative-gcp/pkg/client/injection/client" brokerinformer "github.com/google/knative-gcp/pkg/client/injection/informers/broker/v1beta1/broker" triggerinformer "github.com/google/knative-gcp/pkg/client/injection/informers/broker/v1beta1/trigger" @@ -28,7 +27,6 @@ import ( triggerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/trigger" gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" - "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" @@ -41,6 +39,7 @@ import ( "knative.dev/pkg/controller" pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/pkg/resolver" + "knative.dev/pkg/system" ) const ( @@ -56,25 +55,13 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl endpointsInformer := endpointsinformer.Get(ctx) r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - triggerLister: triggerInformer.Lister(), - configMapLister: configMapInformer.Lister(), - endpointsLister: endpointsInformer.Lister(), - CreateClientFn: gpubsub.NewClient, - targetsNeedsUpdate: make(chan struct{}), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + triggerLister: triggerInformer.Lister(), + configMapLister: configMapInformer.Lister(), + endpointsLister: endpointsInformer.Lister(), + CreateClientFn: gpubsub.NewClient, } - - //TODO wrap this up in a targets struct backed by a configmap - // Load targets config from the existing configmap if present - if err := r.LoadTargetsConfig(ctx); err != nil { - r.Logger.Error("error loading targets config", zap.Error(err)) - // For some reason the targets config is corrupt, proceed with an - // empty one - r.targetsConfig = memory.NewEmptyTargets() - } - - // Start the single thread updating the targets configmap - go r.TargetsConfigUpdater(ctx) + r.brokerConfigUpdater = NewTargetsReconciler(ctx, r.KubeClientSet, system.Namespace(), targetsCMName) impl := brokerreconciler.NewImpl(ctx, r, brokerv1beta1.BrokerClass) diff --git a/pkg/reconciler/broker/targets.go b/pkg/reconciler/broker/targets.go new file mode 100644 index 0000000000..2013cf5eee --- /dev/null +++ b/pkg/reconciler/broker/targets.go @@ -0,0 +1,205 @@ +package broker + +import ( + "context" + "fmt" + "time" + + "github.com/google/knative-gcp/pkg/broker/config" + "go.uber.org/zap" + "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/typed/core/v1" + "knative.dev/eventing/pkg/logging" +) + +const ( + targetsCMKey = "targets" + targetsFileName = "targets.txt" + targetsCMResyncPeriod = 10 * time.Second +) + +type BrokerConfigUpdater interface { + UpdateBrokerConfig(context.Context, *config.Broker) error +} + +type updateReq struct { + broker *config.Broker + replyCh chan error +} + +type targetsReconciler struct { + targetsName, targetsNS string + targets *config.TargetsConfig + updateCh chan updateReq + configMaps v1.ConfigMapInterface + logger *zap.Logger +} + +func NewTargetsReconciler(ctx context.Context, client kubernetes.Interface, targetsNS, targetsName string) BrokerConfigUpdater { + r := &targetsReconciler{ + targetsName: targetsName, + targetsNS: targetsNS, + updateCh: make(chan updateReq), + configMaps: client.CoreV1().ConfigMaps(targetsNS), + logger: logging.FromContext(ctx), + } + if err := r.loadTargetsConfig(ctx); err != nil { + r.logger.Error("error loading targets config", zap.Error(err)) + r.targets = new(config.TargetsConfig) + } + go r.reconcile(ctx) + return r +} + +func (r *targetsReconciler) UpdateBrokerConfig(ctx context.Context, b *config.Broker) error { + if b.Name == "" || b.Namespace == "" { + return fmt.Errorf("broker missing namespace or name: %s", prototext.MarshalOptions{}.Format(b)) + } + + req := updateReq{ + broker: b, + replyCh: make(chan error, 1), + } + select { + case r.updateCh <- req: + case <-ctx.Done(): + return ctx.Err() + } + select { + case err := <-req.replyCh: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +// loadTargetsConfig retrieves the targets ConfigMap and populates r.targets. +func (r *targetsReconciler) loadTargetsConfig(ctx context.Context) error { + logger := r.logger.With(zap.String("targetsName", r.targetsName), zap.String("targetsNS", r.targetsNS)) + logger.Debug("Loading targets config from configmap") + targetsConfigMap, err := r.configMaps.Get(r.targetsName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + r.targets = &config.TargetsConfig{ + Brokers: make(map[string]*config.Broker), + } + return nil + } + return fmt.Errorf("error getting targets ConfigMap: %w", err) + } + + targets := new(config.TargetsConfig) + if err := proto.Unmarshal(targetsConfigMap.BinaryData[targetsCMKey], targets); err != nil { + return err + } + r.logger.Debug("Loaded targets config from ConfigMap", + zap.String("resourceVersion", targetsConfigMap.ResourceVersion)) + + r.targets = targets + return nil +} + +func (r *targetsReconciler) reconcile(ctx context.Context) { + var writing, waiting []chan<- error + var writeCh <-chan error + var lastErr error + for { + var resync <-chan time.Time + var resyncTimer *time.Timer + if writeCh == nil { + resyncTimer = time.NewTimer(targetsCMResyncPeriod) + resync = resyncTimer.C + } + + select { + case req := <-r.updateCh: + k := config.BrokerKey(req.broker.Namespace, req.broker.Name) + if proto.Equal(req.broker, r.targets.Brokers[k]) { + if waiting != nil { + waiting = append(waiting, req.replyCh) + break + } + if writeCh != nil { + writing = append(writing, req.replyCh) + break + } + if lastErr == nil { + req.replyCh <- nil + break + } + } + r.targets.GetBrokers()[k] = req.broker + if writeCh != nil { + waiting = append(waiting, req.replyCh) + break + } + writeCh = r.startWrite() + writing = append(writing, req.replyCh) + + case lastErr = <-writeCh: + writeCh = nil + for _, ch := range writing { + ch <- lastErr + } + writing, waiting = waiting, nil + if writing == nil { + break + } + writeCh = r.startWrite() + + case <-resync: + writeCh = r.startWrite() + + case <-ctx.Done(): + r.logger.Info("targets reconciler stopped", zap.Error(ctx.Err())) + if resyncTimer != nil { + resyncTimer.Stop() + } + return + } + + if resyncTimer != nil { + resyncTimer.Stop() + } + } +} + +func (r *targetsReconciler) startWrite() <-chan error { + writeCh := make(chan error, 1) + go r.writeTargets(writeCh, proto.Clone(r.targets).(*config.TargetsConfig)) + return writeCh +} + +func (r *targetsReconciler) writeTargets(replyCh chan<- error, targets *config.TargetsConfig) (err error) { + defer func() { + replyCh <- err + }() + + r.logger.Debug("writing targets config", zap.Any("targetsConfig", targets)) + + targetsData, err := proto.Marshal(targets) + if err != nil { + return + } + targetsText := prototext.Format(targets) + + targetsConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: r.targetsName, + Namespace: r.targetsNS, + }, + BinaryData: map[string][]byte{targetsCMKey: targetsData}, + // Write out the text version for debugging purposes only + Data: map[string]string{targetsFileName: targetsText}, + } + _, err = r.configMaps.Update(targetsConfigMap) + if errors.IsNotFound(err) { + _, err = r.configMaps.Create(targetsConfigMap) + } + return +} diff --git a/pkg/reconciler/broker/trigger.go b/pkg/reconciler/broker/trigger.go index 362c0a921d..7d8f17f089 100644 --- a/pkg/reconciler/broker/trigger.go +++ b/pkg/reconciler/broker/trigger.go @@ -22,7 +22,6 @@ import ( "cloud.google.com/go/pubsub" brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" - "github.com/google/knative-gcp/pkg/broker/config" triggerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/trigger" gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" @@ -110,34 +109,6 @@ func (r *TriggerReconciler) ReconcileKind(ctx context.Context, t *brokerv1beta1. return err } - targetsConfig := targetsFromContext(ctx) - if targetsConfig == nil { - return fmt.Errorf("Couldn't fetch Targets from context") - } - - targetsConfig.MutateBroker(b.Namespace, b.Name, func(m config.BrokerMutation) { - target := &config.Target{ - Id: string(t.UID), - Name: t.Name, - Namespace: t.Namespace, - Broker: b.Name, - Address: t.Status.SubscriberURI.String(), - RetryQueue: &config.Queue{ - Topic: resources.GenerateRetryTopicName(t), - Subscription: resources.GenerateRetrySubscriptionName(t), - }, - } - if t.Spec.Filter != nil && t.Spec.Filter.Attributes != nil { - target.FilterAttributes = t.Spec.Filter.Attributes - } - if t.Status.IsReady() { - target.State = config.State_READY - } else { - target.State = config.State_UNKNOWN - } - m.UpsertTargets(target) - }) - return pkgreconciler.NewEvent(corev1.EventTypeNormal, triggerReconciled, "Trigger reconciled: \"%s/%s\"", t.Namespace, t.Name) } @@ -146,19 +117,6 @@ func (r *TriggerReconciler) FinalizeKind(ctx context.Context, t *brokerv1beta1.T return err } - targetsConfig := targetsFromContext(ctx) - if targetsConfig == nil { - return fmt.Errorf("Couldn't fetch Targets from context") - } - - // Use the trigger's namespace and broker name here so the broker isn't needed - // from context - targetsConfig.MutateBroker(t.Namespace, t.Spec.Broker, func(m config.BrokerMutation) { - m.DeleteTargets(&config.Target{ - Name: t.Name, - }) - }) - return pkgreconciler.NewEvent(corev1.EventTypeNormal, triggerFinalized, "Trigger finalized: \"%s/%s\"", t.Namespace, t.Name) } @@ -411,17 +369,3 @@ func brokerFromContext(ctx context.Context) *brokerv1beta1.Broker { func contextWithBroker(ctx context.Context, b *brokerv1beta1.Broker) context.Context { return context.WithValue(ctx, brokerKey{}, b) } - -type targetsKey struct{} - -func targetsFromContext(ctx context.Context) config.Targets { - untyped := ctx.Value(targetsKey{}) - if untyped == nil { - return nil - } - return untyped.(config.Targets) -} - -func contextWithTargets(ctx context.Context, t config.Targets) context.Context { - return context.WithValue(ctx, targetsKey{}, t) -} diff --git a/pkg/reconciler/broker/trigger_test.go b/pkg/reconciler/broker/trigger_test.go index aef22fe81e..9002f4727b 100644 --- a/pkg/reconciler/broker/trigger_test.go +++ b/pkg/reconciler/broker/trigger_test.go @@ -22,7 +22,7 @@ import ( "testing" brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" - "github.com/google/knative-gcp/pkg/broker/config/memory" + "github.com/google/knative-gcp/pkg/broker/config" injectionclient "github.com/google/knative-gcp/pkg/client/injection/client" "github.com/google/knative-gcp/pkg/client/injection/ducks/duck/v1alpha1/resource" brokerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/broker" @@ -70,6 +70,37 @@ var ( Version: subscriberVersion, Kind: subscriberKind, } + + retryTopic = fmt.Sprintf("cre-tgr_%s_%s_%s", testNS, triggerName, testUID) + retrySubscription = fmt.Sprintf("cre-tgr_%s_%s_%s", testNS, triggerName, testUID) + + targetsConfigWithTrigger = &config.TargetsConfig{ + Brokers: map[string]*config.Broker{ + testKey: &config.Broker{ + Id: testUID, + Name: brokerName, + Namespace: testNS, + Address: brokerAddress.String(), + DecoupleQueue: &config.Queue{ + Topic: decoupleTopic, + Subscription: decoupleSubscription, + }, + State: config.State_READY, + Targets: map[string]*config.Target{ + triggerName: &config.Target{ + Id: testUID, + Name: triggerName, + Namespace: testNS, + Broker: brokerName, + RetryQueue: &config.Queue{ + Topic: retryTopic, + Subscription: retrySubscription, + }, + }, + }, + }, + }, + } ) func init() { @@ -100,6 +131,7 @@ func TestAllCasesTrigger(t *testing.T) { WithTriggerUID(testUID), WithTriggerDeletionTimestamp, WithTriggerFinalizers(triggerFinalizerName)), + NewTargetsConfigMap(targetsConfig), }, WantEvents: []string{ triggerFinalizerUpdatedEvent, @@ -143,6 +175,7 @@ func TestAllCasesTrigger(t *testing.T) { WithTriggerUID(testUID), WithTriggerDeletionTimestamp, WithTriggerFinalizers(triggerFinalizerName)), + NewTargetsConfigMap(targetsConfig), }, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "TopicDeleted", `Deleted PubSub topic "cre-tgr_testnamespace_test-trigger_abc123"`), @@ -188,6 +221,13 @@ func TestAllCasesTrigger(t *testing.T) { NewTrigger(triggerName, testNS, brokerName, WithTriggerUID(testUID), WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS)), + NewTargetsConfigMap(targetsConfig), + }, + WantUpdates: []clientgotesting.UpdateActionImpl{ + clientgotesting.NewUpdateAction( + corev1.SchemeGroupVersion.WithResource(string(corev1.ResourceConfigMaps)), + systemNS, + NewTargetsConfigMap(targetsConfigWithTrigger)), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -237,15 +277,14 @@ func TestAllCasesTrigger(t *testing.T) { ctx = resource.WithDuck(ctx) ctx = conditions.WithDuck(ctx) r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - triggerLister: listers.GetTriggerLister(), - configMapLister: listers.GetConfigMapLister(), - endpointsLister: listers.GetEndpointsLister(), - CreateClientFn: gpubsub.TestClientCreator(testData["b"]), - targetsConfig: memory.NewEmptyTargets(), - targetsNeedsUpdate: make(chan struct{}), - projectID: testProject, + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + triggerLister: listers.GetTriggerLister(), + configMapLister: listers.GetConfigMapLister(), + endpointsLister: listers.GetEndpointsLister(), + CreateClientFn: gpubsub.TestClientCreator(testData["b"]), + projectID: testProject, } + r.brokerConfigUpdater = NewTargetsReconciler(ctx, r.KubeClientSet, systemNS, targetsCMName) tr := &TriggerReconciler{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), diff --git a/pkg/reconciler/testing/configmap.go b/pkg/reconciler/testing/configmap.go new file mode 100644 index 0000000000..7a5bacb043 --- /dev/null +++ b/pkg/reconciler/testing/configmap.go @@ -0,0 +1,49 @@ +/* +Copyright 2020 Google LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type ConfigMapOption func(*corev1.ConfigMap) + +func NewConfigMap(name, namespace string, opts ...ConfigMapOption) *corev1.ConfigMap { + c := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + for _, opt := range opts { + opt(c) + } + return c +} + +func WithData(data map[string]string) ConfigMapOption { + return func(c *corev1.ConfigMap) { + c.Data = data + } +} + +func WithBinaryData(data map[string][]uint8) ConfigMapOption { + return func(c *corev1.ConfigMap) { + c.BinaryData = data + } +}