Skip to content

Add catalogsource entitysource #129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
vendor

# Binaries for programs and plugins
*.exe
Expand Down
15 changes: 15 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ metadata:
creationTimestamp: null
name: manager-role
rules:
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- core.rukpak.io
resources:
Expand All @@ -16,6 +23,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- operators.coreos.com
resources:
- catalogsources
verbs:
- get
- list
- watch
- apiGroups:
- operators.operatorframework.io
resources:
Expand Down
203 changes: 203 additions & 0 deletions controllers/catalogsource_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package controllers

import (
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/deppy/pkg/deppy"
"github.com/operator-framework/deppy/pkg/deppy/input"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"github.com/operator-framework/operator-controller/internal/resolution/entity_sources/catalogsource"
)

const (
defaultCatalogSourceSyncInterval = 5 * time.Minute
defaultRegistryGRPCConnectionTimeout = 10 * time.Second

eventTypeNormal = "Normal"
eventTypeWarning = "Warning"

eventReasonCacheUpdated = "BundleCacheUpdated"
eventReasonCacheUpdateFailed = "BundleCacheUpdateFailed"
)

type CatalogSourceReconcilerOption func(reconciler *CatalogSourceReconciler)

func WithRegistryClient(registry catalogsource.RegistryClient) CatalogSourceReconcilerOption {
return func(reconciler *CatalogSourceReconciler) {
reconciler.registry = registry
}
}

func WithUnmanagedCatalogSourceSyncInterval(interval time.Duration) CatalogSourceReconcilerOption {
return func(reconciler *CatalogSourceReconciler) {
reconciler.unmanagedCatalogSourceSyncInterval = interval
}
}

// applyDefaults applies default values to empty CatalogSourceReconciler fields _after_ options have been applied
func applyDefaults() CatalogSourceReconcilerOption {
return func(reconciler *CatalogSourceReconciler) {
if reconciler.registry == nil {
reconciler.registry = catalogsource.NewRegistryGRPCClient(defaultRegistryGRPCConnectionTimeout)
}
if reconciler.unmanagedCatalogSourceSyncInterval == 0 {
reconciler.unmanagedCatalogSourceSyncInterval = defaultCatalogSourceSyncInterval
}
}
}

type CatalogSourceReconciler struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is not exactly a CatalogSource controller is it? I.e it's not introducing and reconciling a CRD named CatalogSource.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely a bit pedantic but I'm thinking in terms of communicating information to someone who's new to the code base/concepts: "* scratches head* which CRD is this controller reconciling again?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it doesn't reconcile the CatalogSource CRD, it just watches it. But then it doesn't actually reconcile any CRD on cluster. I could just name this EntityReconciler, but that'd be confusing if we add another reconciler to generate Entities for the v1 CatalogSource. Maybe V0CatalogSourceEntityReconciler?

sync.RWMutex
client.Client
scheme *runtime.Scheme
registry catalogsource.RegistryClient
recorder record.EventRecorder
unmanagedCatalogSourceSyncInterval time.Duration
cache map[string]map[deppy.Identifier]*input.Entity
}

func NewCatalogSourceReconciler(client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, options ...CatalogSourceReconcilerOption) *CatalogSourceReconciler {
reconciler := &CatalogSourceReconciler{
RWMutex: sync.RWMutex{},
Client: client,
scheme: scheme,
recorder: recorder,
unmanagedCatalogSourceSyncInterval: 0,
cache: map[string]map[deppy.Identifier]*input.Entity{},
}
// apply options
options = append(options, applyDefaults())
for _, option := range options {
option(reconciler)
}

return reconciler
}

// +kubebuilder:rbac:groups=operators.coreos.com,resources=catalogsources,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch

func (r *CatalogSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := log.FromContext(ctx).WithName("catalogsource-controller")
l.V(1).Info("starting")
defer l.V(1).Info("ending")

var catalogSource = &v1alpha1.CatalogSource{}
if err := r.Client.Get(ctx, req.NamespacedName, catalogSource); err != nil {
if errors.IsNotFound(err) {
r.dropSource(req.String())
Copy link
Member

@varshaprasad96 varshaprasad96 Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are deleting the catsrc immediately if we are not able to fetch it from cluster. I'm wondering given this use case, would it make sense to use the uncached client for this purpose so that we don't run into additional reconciles for stale c-r caches.

@joelanford would that make more sense, instead of using a cached client and redoing the work of converting to entities if there is a stale cache. Would we hit any performance issues without caching for this specific case of watching catsrc (in the sense do catalog sources in a cluster get modified often enough that caching is important)?

Copy link
Member

@joelanford joelanford Mar 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the controller is currently watching only catalog sources, I don't think the cache will ever be stale. I think the cached client is fine.

You bring up a good point about redoing work of listing entities though. It's a bummer that there's no GRPC endpoint that can tell us if we need to relist. This is something we should probably handle in the new OLMv1 catalog source spec (cc @anik120).

Copy link
Member

@varshaprasad96 varshaprasad96 Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another naive question: As an OLM user would I in future be allowed to specifically select the catsrc I want to restrict for resolution? Probably through annotations/labels?

For now we are watching all the catalog sources in the namespace. Is it necessary, if the user wants to use only a specific set of catsrc for resolution. And how do we provide the user that option. If its through Operator object, then we would need to dynamically change the watches after catsrc controller has started.

Or do we make this controller to watch all the catsrc objects, and do that filtering in the operator controller when it takes in the entities for resolution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varshaprasad96 the way we do this today is by allowing admins to set the priority of the CatalogSources. I don't think we'll ever go down the route of "when you as a tenant want to install an operator, you can specify which CatalogSources you want to install the dependencies from". Instead, what we have today is an admin can set the priorities for CatalogSources on cluster, and the resolver tires to find installable dependencies in the following oder:

Priority 0 -> CatalogSource that the top level installable is being installed from (I.e own catalogSource)
Priority 1 -> CatalogSource with priority set to 1
Priority 2 -> CatalogSource with priority set to 2
.
.
That being said, it does mean that the admin also has the privilege to change the priorities of the CatalogSource at any point in time too.

Copy link
Member

@varshaprasad96 varshaprasad96 Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anik120 Follow up question on that. If not the user, does the admin have the privilege of not selecting a particular catalogsrc to install dependencies from - for an operator?

The reason for asking that was from the perspective of whether we need to watch all catalogsources in the cluster necessarily.

But now my other question is - is there a particular design reason why we would not allow someone (whether its admin or any other persona) to not select the catsrc for resolution (Say blacklist a catsrc from being used for resolution)?

Also, since you mentioned that we would not want to go in that path (this wouldn't affect the watches part of it, but just asking to understand if there is a particular reason):

"when you as a tenant want to install an operator, you can specify which CatalogSources you want to install the dependencies from".

why not? Say I don't trust the publisher of this catsrc, I don't want to install the contents it provides.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varshaprasad96 @anik120 I'm not sure the admin can disable a catalogsource so it can be ignored when installing dependencies, apart from deleting the catalogsource. Since the admin is the one that creates and manages what catalogsources are available on cluster, it makes sense to me that they can also decide their priority when resolving something to be installed on cluster.

CatalogSource priorities are currently declared for each catalogSource, so this priority doesn't change each time we perform resolution. If we had tenants declaring their priorities, it could get potentially very messy, especially since two tenants could declare different priorities for operators being resolved at the same time.

}
return ctrl.Result{}, client.IgnoreNotFound(err)
}

entities, err := r.registry.ListEntities(ctx, catalogSource)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we attempt to list entities, should we check if the catalog source says it's ready?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had this check at one point - it should reduce some of the noisiness of transient errors at startup. That was removed for supporting unmanaged catalogSources, where the only way to tell if the catalogSource is ready is to try connecting to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catalog operator doesn't maintain that readiness information in the catsrc status if the catsrc spec directly specifies an address? If that's true, that seems like it could be considered a bug.

My memory is fuzzy, but I thought there was something in the catsrc status about this regardless of managed vs. unmanaged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right about unmanaged catalogSources - the catsrc status for grpc backed catsrc does get populated by the OLMv0 catalog operator if it is present on the cluster regardless of whether it is managed/unmanaged. We're skipping the health check under the assumption that the catalog operator need not exist for the catalogSource to work. If we can have a guarantee that the OLMv0 catalog operator will be present on any cluster that has the OLMv0 catalogSources, then adding a health check here makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the catalog-operator ins't present, there's nothing reconciling v0 CatalogSources right? I.e the api server will just throw back a "kind CatalogSource is not recognized"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have the CatalogSource CRD present on cluster without having the catalog-operator running. Whether we want this to be the case is another question entirely.

// TODO: invalidate stale cache for failed updates
if err != nil {
r.recorder.Event(catalogSource, eventTypeWarning, eventReasonCacheUpdateFailed, fmt.Sprintf("Failed to update bundle cache from %s/%s: %v", catalogSource.GetNamespace(), catalogSource.GetName(), err))
return ctrl.Result{Requeue: !isManagedCatalogSource(*catalogSource)}, err
}
if updated := r.updateCache(req.String(), entities); updated {
r.recorder.Event(catalogSource, eventTypeNormal, eventReasonCacheUpdated, fmt.Sprintf("Successfully updated bundle cache from %s/%s", catalogSource.GetNamespace(), catalogSource.GetName()))
}

if isManagedCatalogSource(*catalogSource) {
return ctrl.Result{}, nil
}
return ctrl.Result{RequeueAfter: r.unmanagedCatalogSourceSyncInterval}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *CatalogSourceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.CatalogSource{}).
Complete(r)
}

// TODO: find better way to identify catalogSources unmanaged by olm
func isManagedCatalogSource(catalogSource v1alpha1.CatalogSource) bool {
return len(catalogSource.Spec.Address) == 0
}

func (r *CatalogSourceReconciler) updateCache(sourceID string, entities []*input.Entity) bool {
newSourceCache := make(map[deppy.Identifier]*input.Entity)
for _, entity := range entities {
newSourceCache[entity.Identifier()] = entity
}
if _, ok := r.cache[sourceID]; ok && reflect.DeepEqual(r.cache[sourceID], newSourceCache) {
return false
}
r.RWMutex.Lock()
defer r.RWMutex.Unlock()
r.cache[sourceID] = newSourceCache
// return whether cache had updates
return true
}

func (r *CatalogSourceReconciler) dropSource(sourceID string) {
r.RWMutex.Lock()
defer r.RWMutex.Unlock()
delete(r.cache, sourceID)
}

func (r *CatalogSourceReconciler) Get(ctx context.Context, id deppy.Identifier) *input.Entity {
r.RWMutex.RLock()
defer r.RWMutex.RUnlock()
// don't count on deppy ID to reflect its catalogsource
for _, source := range r.cache {
if entity, ok := source[id]; ok {
return entity
}
}
return nil
}

func (r *CatalogSourceReconciler) Filter(ctx context.Context, filter input.Predicate) (input.EntityList, error) {
resultSet := input.EntityList{}
if err := r.Iterate(ctx, func(entity *input.Entity) error {
if filter(entity) {
resultSet = append(resultSet, *entity)
}
return nil
}); err != nil {
return nil, err
}
return resultSet, nil
}

func (r *CatalogSourceReconciler) GroupBy(ctx context.Context, fn input.GroupByFunction) (input.EntityListMap, error) {
resultSet := input.EntityListMap{}
if err := r.Iterate(ctx, func(entity *input.Entity) error {
keys := fn(entity)
for _, key := range keys {
resultSet[key] = append(resultSet[key], *entity)
}
return nil
}); err != nil {
return nil, err
}
return resultSet, nil
}

func (r *CatalogSourceReconciler) Iterate(ctx context.Context, fn input.IteratorFunction) error {
r.RWMutex.RLock()
defer r.RWMutex.RUnlock()
for _, source := range r.cache {
for _, entity := range source {
if err := fn(entity); err != nil {
return err
}
}
}
return nil
}
Loading