-
Notifications
You must be signed in to change notification settings - Fork 65
Add catalogsource entitysource #129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f808375
8780dc8
b7bdee9
45e947e
5474b02
a1dd764
d7c0933
d25f283
0412ae2
b56ee83
d9b0cea
afc89b8
b2c103a
0d54d08
ba77eb7
ed90fc8
dc03e59
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
vendor | ||
|
||
# Binaries for programs and plugins | ||
*.exe | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
package controllers | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"reflect" | ||
"sync" | ||
"time" | ||
|
||
"github.com/operator-framework/api/pkg/operators/v1alpha1" | ||
"github.com/operator-framework/deppy/pkg/deppy" | ||
"github.com/operator-framework/deppy/pkg/deppy/input" | ||
"k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/client-go/tools/record" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/log" | ||
|
||
"github.com/operator-framework/operator-controller/internal/resolution/entity_sources/catalogsource" | ||
) | ||
|
||
const ( | ||
defaultCatalogSourceSyncInterval = 5 * time.Minute | ||
defaultRegistryGRPCConnectionTimeout = 10 * time.Second | ||
|
||
eventTypeNormal = "Normal" | ||
eventTypeWarning = "Warning" | ||
|
||
eventReasonCacheUpdated = "BundleCacheUpdated" | ||
eventReasonCacheUpdateFailed = "BundleCacheUpdateFailed" | ||
) | ||
|
||
type CatalogSourceReconcilerOption func(reconciler *CatalogSourceReconciler) | ||
|
||
func WithRegistryClient(registry catalogsource.RegistryClient) CatalogSourceReconcilerOption { | ||
return func(reconciler *CatalogSourceReconciler) { | ||
reconciler.registry = registry | ||
} | ||
} | ||
|
||
func WithUnmanagedCatalogSourceSyncInterval(interval time.Duration) CatalogSourceReconcilerOption { | ||
return func(reconciler *CatalogSourceReconciler) { | ||
reconciler.unmanagedCatalogSourceSyncInterval = interval | ||
} | ||
} | ||
|
||
// applyDefaults applies default values to empty CatalogSourceReconciler fields _after_ options have been applied | ||
func applyDefaults() CatalogSourceReconcilerOption { | ||
return func(reconciler *CatalogSourceReconciler) { | ||
if reconciler.registry == nil { | ||
reconciler.registry = catalogsource.NewRegistryGRPCClient(defaultRegistryGRPCConnectionTimeout) | ||
} | ||
if reconciler.unmanagedCatalogSourceSyncInterval == 0 { | ||
reconciler.unmanagedCatalogSourceSyncInterval = defaultCatalogSourceSyncInterval | ||
} | ||
} | ||
} | ||
|
||
type CatalogSourceReconciler struct { | ||
sync.RWMutex | ||
client.Client | ||
scheme *runtime.Scheme | ||
registry catalogsource.RegistryClient | ||
recorder record.EventRecorder | ||
unmanagedCatalogSourceSyncInterval time.Duration | ||
cache map[string]map[deppy.Identifier]*input.Entity | ||
} | ||
|
||
func NewCatalogSourceReconciler(client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, options ...CatalogSourceReconcilerOption) *CatalogSourceReconciler { | ||
reconciler := &CatalogSourceReconciler{ | ||
RWMutex: sync.RWMutex{}, | ||
Client: client, | ||
scheme: scheme, | ||
recorder: recorder, | ||
unmanagedCatalogSourceSyncInterval: 0, | ||
cache: map[string]map[deppy.Identifier]*input.Entity{}, | ||
} | ||
// apply options | ||
options = append(options, applyDefaults()) | ||
for _, option := range options { | ||
option(reconciler) | ||
} | ||
|
||
return reconciler | ||
} | ||
|
||
// +kubebuilder:rbac:groups=operators.coreos.com,resources=catalogsources,verbs=get;list;watch | ||
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch | ||
|
||
func (r *CatalogSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { | ||
l := log.FromContext(ctx).WithName("catalogsource-controller") | ||
l.V(1).Info("starting") | ||
defer l.V(1).Info("ending") | ||
|
||
var catalogSource = &v1alpha1.CatalogSource{} | ||
if err := r.Client.Get(ctx, req.NamespacedName, catalogSource); err != nil { | ||
if errors.IsNotFound(err) { | ||
r.dropSource(req.String()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are deleting the catsrc immediately if we are not able to fetch it from cluster. I'm wondering given this use case, would it make sense to use the uncached client for this purpose so that we don't run into additional reconciles for stale c-r caches. @joelanford would that make more sense, instead of using a cached client and redoing the work of converting to entities if there is a stale cache. Would we hit any performance issues without caching for this specific case of watching catsrc (in the sense do catalog sources in a cluster get modified often enough that caching is important)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the controller is currently watching only catalog sources, I don't think the cache will ever be stale. I think the cached client is fine. You bring up a good point about redoing work of listing entities though. It's a bummer that there's no GRPC endpoint that can tell us if we need to relist. This is something we should probably handle in the new OLMv1 catalog source spec (cc @anik120). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another naive question: As an OLM user would I in future be allowed to specifically select the catsrc I want to restrict for resolution? Probably through annotations/labels? For now we are watching all the catalog sources in the namespace. Is it necessary, if the user wants to use only a specific set of catsrc for resolution. And how do we provide the user that option. If its through Operator object, then we would need to dynamically change the watches after catsrc controller has started. Or do we make this controller to watch all the catsrc objects, and do that filtering in the operator controller when it takes in the entities for resolution? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @varshaprasad96 the way we do this today is by allowing admins to set the priority of the CatalogSources. I don't think we'll ever go down the route of "when you as a tenant want to install an operator, you can specify which CatalogSources you want to install the dependencies from". Instead, what we have today is an admin can set the priorities for CatalogSources on cluster, and the resolver tires to find installable dependencies in the following oder: Priority 0 -> CatalogSource that the top level installable is being installed from (I.e own catalogSource) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @anik120 Follow up question on that. If not the user, does the admin have the privilege of not selecting a particular catalogsrc to install dependencies from - for an operator? The reason for asking that was from the perspective of whether we need to watch all catalogsources in the cluster necessarily. But now my other question is - is there a particular design reason why we would not allow someone (whether its admin or any other persona) to not select the catsrc for resolution (Say blacklist a catsrc from being used for resolution)? Also, since you mentioned that we would not want to go in that path (this wouldn't affect the watches part of it, but just asking to understand if there is a particular reason):
why not? Say I don't trust the publisher of this catsrc, I don't want to install the contents it provides. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @varshaprasad96 @anik120 I'm not sure the admin can disable a catalogsource so it can be ignored when installing dependencies, apart from deleting the catalogsource. Since the admin is the one that creates and manages what catalogsources are available on cluster, it makes sense to me that they can also decide their priority when resolving something to be installed on cluster. CatalogSource priorities are currently declared for each catalogSource, so this priority doesn't change each time we perform resolution. If we had tenants declaring their priorities, it could get potentially very messy, especially since two tenants could declare different priorities for operators being resolved at the same time. |
||
} | ||
return ctrl.Result{}, client.IgnoreNotFound(err) | ||
} | ||
|
||
entities, err := r.registry.ListEntities(ctx, catalogSource) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before we attempt to list entities, should we check if the catalog source says it's ready? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had this check at one point - it should reduce some of the noisiness of transient errors at startup. That was removed for supporting unmanaged catalogSources, where the only way to tell if the catalogSource is ready is to try connecting to it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The catalog operator doesn't maintain that readiness information in the catsrc status if the catsrc spec directly specifies an address? If that's true, that seems like it could be considered a bug. My memory is fuzzy, but I thought there was something in the catsrc status about this regardless of managed vs. unmanaged. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right about unmanaged catalogSources - the catsrc status for grpc backed catsrc does get populated by the OLMv0 catalog operator if it is present on the cluster regardless of whether it is managed/unmanaged. We're skipping the health check under the assumption that the catalog operator need not exist for the catalogSource to work. If we can have a guarantee that the OLMv0 catalog operator will be present on any cluster that has the OLMv0 catalogSources, then adding a health check here makes sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the catalog-operator ins't present, there's nothing reconciling v0 CatalogSources right? I.e the api server will just throw back a "kind CatalogSource is not recognized"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can have the CatalogSource CRD present on cluster without having the catalog-operator running. Whether we want this to be the case is another question entirely. |
||
// TODO: invalidate stale cache for failed updates | ||
if err != nil { | ||
r.recorder.Event(catalogSource, eventTypeWarning, eventReasonCacheUpdateFailed, fmt.Sprintf("Failed to update bundle cache from %s/%s: %v", catalogSource.GetNamespace(), catalogSource.GetName(), err)) | ||
return ctrl.Result{Requeue: !isManagedCatalogSource(*catalogSource)}, err | ||
} | ||
if updated := r.updateCache(req.String(), entities); updated { | ||
r.recorder.Event(catalogSource, eventTypeNormal, eventReasonCacheUpdated, fmt.Sprintf("Successfully updated bundle cache from %s/%s", catalogSource.GetNamespace(), catalogSource.GetName())) | ||
} | ||
|
||
if isManagedCatalogSource(*catalogSource) { | ||
return ctrl.Result{}, nil | ||
} | ||
return ctrl.Result{RequeueAfter: r.unmanagedCatalogSourceSyncInterval}, nil | ||
} | ||
|
||
// SetupWithManager sets up the controller with the Manager. | ||
func (r *CatalogSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||
return ctrl.NewControllerManagedBy(mgr). | ||
For(&v1alpha1.CatalogSource{}). | ||
Complete(r) | ||
} | ||
|
||
// TODO: find better way to identify catalogSources unmanaged by olm | ||
func isManagedCatalogSource(catalogSource v1alpha1.CatalogSource) bool { | ||
return len(catalogSource.Spec.Address) == 0 | ||
ankitathomas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func (r *CatalogSourceReconciler) updateCache(sourceID string, entities []*input.Entity) bool { | ||
newSourceCache := make(map[deppy.Identifier]*input.Entity) | ||
for _, entity := range entities { | ||
newSourceCache[entity.Identifier()] = entity | ||
} | ||
if _, ok := r.cache[sourceID]; ok && reflect.DeepEqual(r.cache[sourceID], newSourceCache) { | ||
return false | ||
} | ||
r.RWMutex.Lock() | ||
defer r.RWMutex.Unlock() | ||
r.cache[sourceID] = newSourceCache | ||
// return whether cache had updates | ||
return true | ||
} | ||
|
||
func (r *CatalogSourceReconciler) dropSource(sourceID string) { | ||
r.RWMutex.Lock() | ||
defer r.RWMutex.Unlock() | ||
delete(r.cache, sourceID) | ||
} | ||
|
||
func (r *CatalogSourceReconciler) Get(ctx context.Context, id deppy.Identifier) *input.Entity { | ||
r.RWMutex.RLock() | ||
defer r.RWMutex.RUnlock() | ||
// don't count on deppy ID to reflect its catalogsource | ||
for _, source := range r.cache { | ||
if entity, ok := source[id]; ok { | ||
return entity | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (r *CatalogSourceReconciler) Filter(ctx context.Context, filter input.Predicate) (input.EntityList, error) { | ||
resultSet := input.EntityList{} | ||
if err := r.Iterate(ctx, func(entity *input.Entity) error { | ||
if filter(entity) { | ||
resultSet = append(resultSet, *entity) | ||
} | ||
return nil | ||
}); err != nil { | ||
return nil, err | ||
} | ||
return resultSet, nil | ||
} | ||
|
||
func (r *CatalogSourceReconciler) GroupBy(ctx context.Context, fn input.GroupByFunction) (input.EntityListMap, error) { | ||
resultSet := input.EntityListMap{} | ||
if err := r.Iterate(ctx, func(entity *input.Entity) error { | ||
keys := fn(entity) | ||
for _, key := range keys { | ||
resultSet[key] = append(resultSet[key], *entity) | ||
} | ||
return nil | ||
}); err != nil { | ||
return nil, err | ||
} | ||
return resultSet, nil | ||
} | ||
|
||
func (r *CatalogSourceReconciler) Iterate(ctx context.Context, fn input.IteratorFunction) error { | ||
r.RWMutex.RLock() | ||
defer r.RWMutex.RUnlock() | ||
for _, source := range r.cache { | ||
for _, entity := range source { | ||
if err := fn(entity); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This is not exactly a
CatalogSource
controller is it? I.e it's not introducing and reconciling a CRD namedCatalogSource
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely a bit pedantic but I'm thinking in terms of communicating information to someone who's new to the code base/concepts: "* scratches head* which CRD is this controller reconciling again?"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose it doesn't reconcile the CatalogSource CRD, it just watches it. But then it doesn't actually reconcile any CRD on cluster. I could just name this EntityReconciler, but that'd be confusing if we add another reconciler to generate Entities for the v1 CatalogSource. Maybe V0CatalogSourceEntityReconciler?