Skip to content

Commit 8096517

Browse files
committedNov 13, 2020
fix(indexing): pick channel heads by semver
On index add in replaces mode, pick the highest bundle version in each channel as its head; take the default channel from the highest bundle version in the entire package.
1 parent 396ca5b commit 8096517

15 files changed

+577
-98
lines changed
 

‎bundles/prometheus.0.14.0-beta/manifests/prometheusoperator.0.14.0.clusterserviceversion.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ spec:
154154
cpu: 100m
155155
memory: 50Mi
156156
maturity: alpha
157-
version: 0.14.0
157+
version: 0.14.0-beta
158158
customresourcedefinitions:
159159
owned:
160160
- name: prometheuses.monitoring.coreos.com

‎pkg/lib/bundle/validate.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ func (i imageValidator) ValidateBundleContent(manifestDir string) error {
365365

366366
// Validate the bundle object
367367
if len(unstObjs) > 0 {
368-
bundle := registry.NewBundle(csvName, "", nil, unstObjs...)
368+
bundle := registry.NewBundle(csvName, &registry.Annotations{}, unstObjs...)
369369
bundleValidator := validation.BundleValidator
370370
results := bundleValidator.Validate(bundle)
371371
if len(results) > 0 {

‎pkg/lib/validation/bundle_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestValidateBundle(t *testing.T) {
6060
}
6161

6262
// Validate the bundle object
63-
bundle := registry.NewBundle("test", "", nil, unstObjs...)
63+
bundle := registry.NewBundle("test", &registry.Annotations{}, unstObjs...)
6464
results := BundleValidator.Validate(bundle)
6565

6666
if len(results) > 0 {

‎pkg/registry/bundle.go

+53-12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package registry
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"strings"
67

@@ -41,33 +42,59 @@ type Bundle struct {
4142
Package string
4243
Channels []string
4344
BundleImage string
45+
version string
4446
csv *ClusterServiceVersion
4547
v1beta1crds []*apiextensionsv1beta1.CustomResourceDefinition
4648
v1crds []*apiextensionsv1.CustomResourceDefinition
4749
Dependencies []*Dependency
4850
Properties []*Property
51+
Annotations *Annotations
4952
cacheStale bool
5053
}
5154

52-
func NewBundle(name, pkgName string, channels []string, objs ...*unstructured.Unstructured) *Bundle {
53-
bundle := &Bundle{Name: name, Package: pkgName, Channels: channels, cacheStale: false}
55+
func NewBundle(name string, annotations *Annotations, objs ...*unstructured.Unstructured) *Bundle {
56+
bundle := &Bundle{
57+
Name: name,
58+
Package: annotations.PackageName,
59+
Annotations: annotations,
60+
}
5461
for _, o := range objs {
5562
bundle.Add(o)
5663
}
64+
65+
if annotations == nil {
66+
return bundle
67+
}
68+
bundle.Channels = strings.Split(annotations.Channels, ",")
69+
5770
return bundle
5871
}
5972

60-
func NewBundleFromStrings(name, pkgName string, channels []string, objs []string) (*Bundle, error) {
73+
func NewBundleFromStrings(name, version, pkg, defaultChannel, channels, objs string) (*Bundle, error) {
74+
objStrs, err := BundleStringToObjectStrings(objs)
75+
if err != nil {
76+
return nil, err
77+
}
78+
6179
unstObjs := []*unstructured.Unstructured{}
62-
for _, o := range objs {
80+
for _, o := range objStrs {
6381
dec := yaml.NewYAMLOrJSONDecoder(strings.NewReader(o), 10)
6482
unst := &unstructured.Unstructured{}
6583
if err := dec.Decode(unst); err != nil {
6684
return nil, err
6785
}
6886
unstObjs = append(unstObjs, unst)
6987
}
70-
return NewBundle(name, pkgName, channels, unstObjs...), nil
88+
89+
annotations := &Annotations{
90+
PackageName: pkg,
91+
Channels: channels,
92+
DefaultChannelName: defaultChannel,
93+
}
94+
bundle := NewBundle(name, annotations, unstObjs...)
95+
bundle.version = version
96+
97+
return bundle, nil
7198
}
7299

73100
func (b *Bundle) Size() int {
@@ -86,10 +113,20 @@ func (b *Bundle) ClusterServiceVersion() (*ClusterServiceVersion, error) {
86113
}
87114

88115
func (b *Bundle) Version() (string, error) {
89-
if err := b.cache(); err != nil {
116+
if b.version != "" {
117+
return b.version, nil
118+
}
119+
120+
var err error
121+
if err = b.cache(); err != nil {
90122
return "", err
91123
}
92-
return b.csv.GetVersion()
124+
125+
if b.csv != nil {
126+
b.version, err = b.csv.GetVersion()
127+
}
128+
129+
return b.version, err
93130
}
94131

95132
func (b *Bundle) SkipRange() (string, error) {
@@ -226,29 +263,33 @@ func (b *Bundle) AllProvidedAPIsInBundle() error {
226263
return nil
227264
}
228265

229-
func (b *Bundle) Serialize() (csvName, bundleImage string, csvBytes []byte, bundleBytes []byte, err error) {
266+
func (b *Bundle) Serialize() (csvName, bundleImage string, csvBytes []byte, bundleBytes []byte, annotationBytes []byte, err error) {
230267
csvCount := 0
231268
for _, obj := range b.Objects {
232269
objBytes, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
233270
if err != nil {
234-
return "", "", nil, nil, err
271+
return "", "", nil, nil, nil, err
235272
}
236273
bundleBytes = append(bundleBytes, objBytes...)
237274

238275
if obj.GroupVersionKind().Kind == "ClusterServiceVersion" {
239276
csvName = obj.GetName()
240277
csvBytes, err = runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
241278
if err != nil {
242-
return "", "", nil, nil, err
279+
return "", "", nil, nil, nil, err
243280
}
244281
csvCount += 1
245282
if csvCount > 1 {
246-
return "", "", nil, nil, fmt.Errorf("two csvs found in one bundle")
283+
return "", "", nil, nil, nil, fmt.Errorf("two csvs found in one bundle")
247284
}
248285
}
249286
}
250287

251-
return csvName, b.BundleImage, csvBytes, bundleBytes, nil
288+
if b.Annotations != nil {
289+
annotationBytes, err = json.Marshal(b.Annotations)
290+
}
291+
292+
return csvName, b.BundleImage, csvBytes, bundleBytes, annotationBytes, nil
252293
}
253294

254295
func (b *Bundle) Images() (map[string]struct{}, error) {

‎pkg/registry/bundle_test.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ package registry
22

33
import (
44
"io/ioutil"
5-
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
6-
"k8s.io/apimachinery/pkg/runtime/serializer"
7-
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
85
"path/filepath"
96
"reflect"
107
"strings"
118
"testing"
129

10+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
11+
"k8s.io/apimachinery/pkg/runtime/serializer"
12+
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
13+
1314
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
1415
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
1516
)
@@ -22,7 +23,10 @@ const (
2223
// The provided APIs and CRD objects in the created bundle are compared to those in a test manifest directory.
2324
func TestV1CRDsInBundle(t *testing.T) {
2425
// create bundle from manifests that include a v1 CRD
25-
bundle := NewBundle("test", "lib-bucket-provisioner", []string{"alpha"})
26+
bundle := NewBundle("test", &Annotations{
27+
PackageName: "lib-bucket-provisioner",
28+
Channels: "alpha",
29+
})
2630

2731
// Read all files in manifests directory
2832
items, err := ioutil.ReadDir(manifestDir)

‎pkg/registry/decode.go

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"io"
7+
78
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
89
"k8s.io/apimachinery/pkg/util/yaml"
910
)

‎pkg/registry/empty.go

+4
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ func (EmptyQuery) GetBundlePathIfExists(ctx context.Context, csvName string) (bu
108108
return "", errors.New("empty querier: cannot get bundle path for bundle")
109109
}
110110

111+
func (EmptyQuery) ListRegistryBundles(ctx context.Context) ([]*Bundle, error) {
112+
return nil, errors.New("empty querier: cannot list registry bundles")
113+
}
114+
111115
var _ Query = &EmptyQuery{}
112116

113117
func NewEmptyQuerier() *EmptyQuery {

‎pkg/registry/imageinput.go

+1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ func (i *ImageInput) getBundleFromManifests() error {
115115
bundle.Dependencies = i.dependenciesFile.GetDependencies()
116116

117117
bundle.Name = csvName
118+
bundle.Annotations = &i.AnnotationsFile.Annotations
118119
bundle.Package = i.AnnotationsFile.Annotations.PackageName
119120
bundle.Channels = strings.Split(i.AnnotationsFile.Annotations.Channels, ",")
120121

‎pkg/registry/interface.go

+2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type Query interface {
5959
GetDependenciesForBundle(ctx context.Context, name, version, path string) (dependencies []*api.Dependency, err error)
6060
// Get the bundle path if it exists
6161
GetBundlePathIfExists(ctx context.Context, csvName string) (string, error)
62+
// ListRegistryBundles returns a set of registry bundles.
63+
ListRegistryBundles(ctx context.Context) ([]*Bundle, error)
6264
}
6365

6466
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . GraphLoader

‎pkg/registry/populator.go

+129-58
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"path/filepath"
1010
"strings"
1111

12+
"github.com/blang/semver"
1213
"github.com/sirupsen/logrus"
1314
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1415
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@@ -150,8 +151,7 @@ func (i *DirectoryPopulator) globalSanityCheck(imagesToAdd []*ImageInput) error
150151

151152
func (i *DirectoryPopulator) loadManifests(imagesToAdd []*ImageInput, imagesToReAdd []*ImageInput, mode Mode) error {
152153
// global sanity checks before insertion
153-
err := i.globalSanityCheck(imagesToAdd)
154-
if err != nil {
154+
if err := i.globalSanityCheck(imagesToAdd); err != nil {
155155
return err
156156
}
157157

@@ -192,7 +192,7 @@ func (i *DirectoryPopulator) loadManifests(imagesToAdd []*ImageInput, imagesToRe
192192
break
193193
}
194194

195-
if err = i.loadManifestsReplaces(next.Bundle, next.AnnotationsFile); err != nil {
195+
if err = i.loadManifestsReplaces(next.Bundle); err != nil {
196196
errs = append(errs, err)
197197
break
198198
}
@@ -204,23 +204,18 @@ func (i *DirectoryPopulator) loadManifests(imagesToAdd []*ImageInput, imagesToRe
204204
}
205205
case SemVerMode:
206206
for _, image := range imagesToAdd {
207-
err := i.loadManifestsSemver(image.Bundle, image.AnnotationsFile, false)
208-
if err != nil {
207+
if err := i.loadManifestsSemver(image.Bundle, image.AnnotationsFile, false); err != nil {
209208
return err
210209
}
211210
}
212211
case SkipPatchMode:
213212
for _, image := range imagesToAdd {
214-
err := i.loadManifestsSemver(image.Bundle, image.AnnotationsFile, true)
215-
if err != nil {
213+
if err := i.loadManifestsSemver(image.Bundle, image.AnnotationsFile, true); err != nil {
216214
return err
217215
}
218216
}
219217
default:
220-
err := fmt.Errorf("Unsupported update mode")
221-
if err != nil {
222-
return err
223-
}
218+
return fmt.Errorf("Unsupported update mode")
224219
}
225220

226221
// Finally let's delete all the old bundles
@@ -231,18 +226,34 @@ func (i *DirectoryPopulator) loadManifests(imagesToAdd []*ImageInput, imagesToRe
231226
return nil
232227
}
233228

234-
func (i *DirectoryPopulator) loadManifestsReplaces(bundle *Bundle, annotationsFile *AnnotationsFile) error {
235-
bcsv, err := bundle.ClusterServiceVersion()
229+
var packageContextKey = "package"
230+
231+
// ContextWithPackage adds a package value to a context.
232+
func ContextWithPackage(ctx context.Context, pkg string) context.Context {
233+
return context.WithValue(ctx, packageContextKey, pkg)
234+
}
235+
236+
// PackageFromContext returns the package value of the context if set, returns false if unset.
237+
func PackageFromContext(ctx context.Context) (string, bool) {
238+
pkg, ok := ctx.Value(packageContextKey).(string)
239+
return pkg, ok
240+
}
241+
242+
func (i *DirectoryPopulator) loadManifestsReplaces(bundle *Bundle) error {
243+
ctx := ContextWithPackage(context.TODO(), bundle.Package)
244+
bundles, err := i.querier.ListRegistryBundles(ctx)
236245
if err != nil {
237-
return fmt.Errorf("error getting csv from bundle %s: %s", bundle.Name, err)
246+
return fmt.Errorf("failed to list registry bundles: %s", err)
238247
}
239248

240-
packageManifest, err := i.translateAnnotationsIntoPackage(annotationsFile, bcsv)
249+
// Add the new bundle and get the semver informed package manifest
250+
bundles = append(bundles, bundle)
251+
packageManifest, err := SemverPackageManifest(bundles)
241252
if err != nil {
242-
return fmt.Errorf("Could not translate annotations file into packageManifest %s", err)
253+
return fmt.Errorf("failed to generate semver informed package manifest: %s", err)
243254
}
244255

245-
if err := i.loadOperatorBundle(packageManifest, bundle); err != nil {
256+
if err := i.loadOperatorBundle(*packageManifest, bundle); err != nil {
246257
return fmt.Errorf("Error adding package %s", err)
247258
}
248259

@@ -369,58 +380,118 @@ func (i *DirectoryPopulator) loadOperatorBundle(manifest PackageManifest, bundle
369380
return nil
370381
}
371382

372-
// translateAnnotationsIntoPackage attempts to translate the channels.yaml file at the given path into a package.yaml
373-
func (i *DirectoryPopulator) translateAnnotationsIntoPackage(annotations *AnnotationsFile, csv *ClusterServiceVersion) (PackageManifest, error) {
374-
manifest := PackageManifest{}
375-
existingChannels := map[string]string{}
383+
// SemverPackageManifest generates a PackageManifest from a set of bundles, determining channel heads and the default channel using semver.
384+
// Bundles with the highest version field (according to semver) are chosen as channel heads, and the default channel is taken from the last,
385+
// highest versioned bundle in the entire set to define it.
386+
// The given bundles must all belong to the same package or an error is thrown.
387+
func SemverPackageManifest(bundles []*Bundle) (*PackageManifest, error) {
388+
type bundleVersion struct {
389+
name string
390+
version semver.Version
391+
392+
// Keep track of the number of times we visit each version so we can tell if a head is contested
393+
count int
394+
}
395+
heads := map[string]bundleVersion{}
396+
397+
var (
398+
pkgName string
399+
defaultChannel string
400+
maxVersion bundleVersion
401+
)
376402

377-
pkgm, err := i.querier.GetPackage(context.TODO(), annotations.GetName())
378-
if err == nil {
379-
for _, c := range pkgm.Channels {
380-
existingChannels[c.Name] = c.CurrentCSVName
403+
for _, bundle := range bundles {
404+
if pkgName != "" && pkgName != bundle.Package {
405+
return nil, fmt.Errorf("more than one package in input")
381406
}
382-
}
407+
pkgName = bundle.Package
383408

384-
for _, ch := range annotations.GetChannels() {
385-
existingChannels[ch] = csv.GetName()
386-
}
409+
rawVersion, err := bundle.Version()
410+
if err != nil {
411+
return nil, fmt.Errorf("error getting bundle %s version: %s", bundle.Name, err)
412+
}
413+
if rawVersion == "" {
414+
// If a version isn't provided by the bundle, give it a dummy zero version
415+
// The thought is that properly versioned bundles will always be non-zero
416+
rawVersion = "0.0.0-z"
417+
}
418+
419+
version, err := semver.Parse(rawVersion)
420+
if err != nil {
421+
return nil, fmt.Errorf("error parsing bundle %s version %s: %s", bundle.Name, rawVersion, err)
422+
}
423+
current := bundleVersion{
424+
name: bundle.Name,
425+
version: version,
426+
count: 1,
427+
}
387428

388-
channels := []PackageChannel{}
389-
for c, current := range existingChannels {
390-
channels = append(channels,
391-
PackageChannel{
392-
Name: c,
393-
CurrentCSVName: current,
394-
})
429+
for _, channel := range bundle.Channels {
430+
head, ok := heads[channel]
431+
if !ok {
432+
heads[channel] = current
433+
continue
434+
}
435+
436+
if version.LT(head.version) {
437+
continue
438+
}
439+
440+
if version.EQ(head.version) {
441+
// We have a duplicate version, add the count
442+
current.count += head.count
443+
}
444+
445+
// Current >= head
446+
heads[channel] = current
447+
}
448+
449+
// Set max if bundle is greater
450+
if version.LT(maxVersion.version) {
451+
continue
452+
}
453+
454+
if version.EQ(maxVersion.version) {
455+
current.count += maxVersion.count
456+
}
457+
458+
// Current >= maxVersion
459+
maxVersion = current
460+
if annotations := bundle.Annotations; annotations != nil && annotations.DefaultChannelName != "" {
461+
// Take it when you can get it
462+
defaultChannel = annotations.DefaultChannelName
463+
}
395464
}
396465

397-
manifest = PackageManifest{
398-
PackageName: annotations.GetName(),
399-
Channels: channels,
466+
if maxVersion.count > 1 {
467+
return nil, fmt.Errorf("more than one bundle with maximum version %s", maxVersion.version)
400468
}
401469

402-
defaultChan := annotations.GetDefaultChannelName()
403-
if defaultChan != "" {
404-
if _, found := existingChannels[defaultChan]; found {
405-
manifest.DefaultChannelName = annotations.GetDefaultChannelName()
406-
} else {
407-
return manifest, fmt.Errorf("Channel %s is set as default in annotations but not found in existing package channels", defaultChan)
408-
}
409-
} else {
410-
// No default channel is provided in annotations. Attempt to infer from package manifest
411-
if pkgm != nil {
412-
manifest.DefaultChannelName = pkgm.GetDefaultChannel()
413-
} else {
414-
// Infer default channel from channel list
415-
if annotations.SelectDefaultChannel() != "" {
416-
manifest.DefaultChannelName = annotations.SelectDefaultChannel()
417-
} else {
418-
return manifest, fmt.Errorf("Default channel is missing and can't be inferred")
419-
}
470+
pkg := &PackageManifest{
471+
PackageName: pkgName,
472+
DefaultChannelName: defaultChannel,
473+
}
474+
defaultFound := len(heads) == 1 && defaultChannel == ""
475+
for channel, head := range heads {
476+
if head.count > 1 {
477+
return nil, fmt.Errorf("more than one potential channel head for %s", channel)
478+
}
479+
if len(heads) == 1 {
480+
// Only one possible default channel
481+
pkg.DefaultChannelName = channel
420482
}
483+
defaultFound = defaultFound || channel == defaultChannel
484+
pkg.Channels = append(pkg.Channels, PackageChannel{
485+
Name: channel,
486+
CurrentCSVName: head.name,
487+
})
488+
}
489+
490+
if !defaultFound {
491+
return nil, fmt.Errorf("unable to determine default channel")
421492
}
422493

423-
return manifest, nil
494+
return pkg, nil
424495
}
425496

426497
// DecodeFile decodes the file at a path into the given interface.

‎pkg/registry/populator_test.go

+194-6
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,13 @@ import (
1414

1515
"github.com/sirupsen/logrus"
1616
"github.com/stretchr/testify/require"
17+
"k8s.io/apimachinery/pkg/util/errors"
18+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
1719

1820
"github.com/operator-framework/operator-registry/pkg/api"
1921
"github.com/operator-framework/operator-registry/pkg/image"
2022
"github.com/operator-framework/operator-registry/pkg/registry"
2123
"github.com/operator-framework/operator-registry/pkg/sqlite"
22-
23-
"k8s.io/apimachinery/pkg/util/errors"
24-
utilerrors "k8s.io/apimachinery/pkg/util/errors"
2524
)
2625

2726
func init() {
@@ -327,7 +326,7 @@ func TestImageLoading(t *testing.T) {
327326
wantPackages: []*registry.Package{
328327
{
329328
Name: "prometheus",
330-
DefaultChannel: "beta",
329+
DefaultChannel: "preview",
331330
Channels: map[string]registry.Channel{
332331
"preview": {
333332
Head: registry.BundleKey{
@@ -342,11 +341,11 @@ func TestImageLoading(t *testing.T) {
342341
"beta": {
343342
Head: registry.BundleKey{
344343
BundlePath: "quay.io/prometheus/operator:0.14.0-beta",
345-
Version: "0.14.0",
344+
Version: "0.14.0-beta",
346345
CsvName: "prometheusoperator.0.14.0-beta",
347346
},
348347
Nodes: map[registry.BundleKey]map[registry.BundleKey]struct{}{
349-
{BundlePath: "quay.io/prometheus/operator:0.14.0-beta", Version: "0.14.0", CsvName: "prometheusoperator.0.14.0-beta"}: {},
348+
{BundlePath: "quay.io/prometheus/operator:0.14.0-beta", Version: "0.14.0-beta", CsvName: "prometheusoperator.0.14.0-beta"}: {},
350349
},
351350
},
352351
},
@@ -856,6 +855,7 @@ func TestDeprecateBundle(t *testing.T) {
856855

857856
deprecator := sqlite.NewSQLDeprecatorForBundles(store, tt.args.bundles)
858857
err = deprecator.Deprecate()
858+
fmt.Printf("error: %s\n", err)
859859
require.Equal(t, tt.expected.err, err)
860860

861861
// Ensure remaining bundlePaths in db match
@@ -1313,3 +1313,191 @@ func TestOverwrite(t *testing.T) {
13131313
})
13141314
}
13151315
}
1316+
1317+
func TestSemverPackageManifest(t *testing.T) {
1318+
bundle := func(name, version, pkg, defaultChannel, channels string) *registry.Bundle {
1319+
b, err := registry.NewBundleFromStrings(name, version, pkg, defaultChannel, channels, "")
1320+
require.NoError(t, err)
1321+
return b
1322+
}
1323+
type args struct {
1324+
bundles []*registry.Bundle
1325+
}
1326+
type expect struct {
1327+
packageManifest *registry.PackageManifest
1328+
hasError bool
1329+
}
1330+
for _, tt := range []struct {
1331+
description string
1332+
args args
1333+
expect expect
1334+
}{
1335+
{
1336+
description: "OneUnversioned",
1337+
args: args{
1338+
bundles: []*registry.Bundle{
1339+
bundle("operator", "", "package", "stable", "stable"), // version "" is interpreted as 0.0.0-z
1340+
},
1341+
},
1342+
expect: expect{
1343+
packageManifest: &registry.PackageManifest{
1344+
PackageName: "package",
1345+
DefaultChannelName: "stable",
1346+
Channels: []registry.PackageChannel{
1347+
{
1348+
Name: "stable",
1349+
CurrentCSVName: "operator",
1350+
},
1351+
},
1352+
},
1353+
},
1354+
},
1355+
{
1356+
description: "TwoUnversioned",
1357+
args: args{
1358+
bundles: []*registry.Bundle{
1359+
bundle("operator-1", "", "package", "stable", "stable"),
1360+
bundle("operator-2", "", "package", "stable", "stable"),
1361+
},
1362+
},
1363+
expect: expect{
1364+
hasError: true,
1365+
},
1366+
},
1367+
{
1368+
description: "UnversionedAndVersioned",
1369+
args: args{
1370+
bundles: []*registry.Bundle{
1371+
bundle("operator-1", "", "package", "", "stable"),
1372+
bundle("operator-2", "", "package", "", "stable"),
1373+
bundle("operator-3", "0.0.1", "package", "", "stable"), // As long as there is one version, we should be good
1374+
},
1375+
},
1376+
expect: expect{
1377+
packageManifest: &registry.PackageManifest{
1378+
PackageName: "package",
1379+
DefaultChannelName: "stable",
1380+
Channels: []registry.PackageChannel{
1381+
{
1382+
Name: "stable",
1383+
CurrentCSVName: "operator-3",
1384+
},
1385+
},
1386+
},
1387+
},
1388+
},
1389+
{
1390+
description: "MaxVersionsAreChannelHeads",
1391+
args: args{
1392+
bundles: []*registry.Bundle{
1393+
bundle("operator-1", "1.0.0", "package", "slow", "slow"),
1394+
bundle("operator-2", "1.1.0", "package", "stable", "slow,stable"),
1395+
bundle("operator-3", "2.1.0", "package", "stable", "edge"),
1396+
},
1397+
},
1398+
expect: expect{
1399+
packageManifest: &registry.PackageManifest{
1400+
PackageName: "package",
1401+
DefaultChannelName: "stable",
1402+
Channels: []registry.PackageChannel{
1403+
{
1404+
Name: "slow",
1405+
CurrentCSVName: "operator-2",
1406+
},
1407+
{
1408+
Name: "stable",
1409+
CurrentCSVName: "operator-2",
1410+
},
1411+
{
1412+
Name: "edge",
1413+
CurrentCSVName: "operator-3",
1414+
},
1415+
},
1416+
},
1417+
},
1418+
},
1419+
{
1420+
description: "DuplicateVersionsNotTolerated",
1421+
args: args{
1422+
bundles: []*registry.Bundle{
1423+
bundle("operator-1", "1.0.0", "package", "slow", "slow"),
1424+
bundle("operator-2", "1.0.0", "package", "stable", "slow,stable"),
1425+
bundle("operator-3", "2.1.0", "package", "stable", "edge"),
1426+
},
1427+
},
1428+
expect: expect{
1429+
hasError: true,
1430+
},
1431+
},
1432+
{
1433+
description: "DuplicateVersionsInSeparateChannelsAreTolerated",
1434+
args: args{
1435+
bundles: []*registry.Bundle{
1436+
bundle("operator-1", "1.0.0", "package", "slow", "slow"),
1437+
bundle("operator-2", "1.0.0", "package", "stable", "stable"),
1438+
bundle("operator-3", "2.1.0", "package", "edge", "edge"), // Should only be tolerated if we have a global max
1439+
},
1440+
},
1441+
expect: expect{
1442+
packageManifest: &registry.PackageManifest{
1443+
PackageName: "package",
1444+
DefaultChannelName: "edge",
1445+
Channels: []registry.PackageChannel{
1446+
{
1447+
Name: "slow",
1448+
CurrentCSVName: "operator-1",
1449+
},
1450+
{
1451+
Name: "stable",
1452+
CurrentCSVName: "operator-2",
1453+
},
1454+
{
1455+
Name: "edge",
1456+
CurrentCSVName: "operator-3",
1457+
},
1458+
},
1459+
},
1460+
},
1461+
},
1462+
{
1463+
description: "DuplicateMaxVersionsAreNotTolerated",
1464+
args: args{
1465+
bundles: []*registry.Bundle{
1466+
bundle("operator-1", "1.0.0", "package", "slow", "slow"),
1467+
bundle("operator-2", "1.0.0", "package", "stable", "stable"),
1468+
},
1469+
},
1470+
expect: expect{
1471+
hasError: true,
1472+
},
1473+
},
1474+
{
1475+
description: "UnknownDefaultChannel",
1476+
args: args{
1477+
bundles: []*registry.Bundle{
1478+
bundle("operator-1", "1.0.0", "package", "stable", "stable"),
1479+
bundle("operator-2", "2.0.0", "package", "edge", "stable"),
1480+
},
1481+
},
1482+
expect: expect{
1483+
hasError: true,
1484+
},
1485+
},
1486+
} {
1487+
t.Run(tt.description, func(t *testing.T) {
1488+
packageManifest, err := registry.SemverPackageManifest(tt.args.bundles)
1489+
if tt.expect.hasError {
1490+
require.Error(t, err)
1491+
return
1492+
}
1493+
1494+
require.NoError(t, err)
1495+
require.NotNil(t, packageManifest)
1496+
1497+
expected := tt.expect.packageManifest
1498+
require.Equal(t, expected.PackageName, packageManifest.PackageName)
1499+
require.Equal(t, expected.DefaultChannelName, packageManifest.DefaultChannelName)
1500+
require.ElementsMatch(t, expected.Channels, packageManifest.Channels)
1501+
})
1502+
}
1503+
}

‎pkg/sqlite/configmap.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
"github.com/ghodss/yaml"
99
"github.com/sirupsen/logrus"
10-
"k8s.io/api/core/v1"
10+
v1 "k8s.io/api/core/v1"
1111
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
1212
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1313
"k8s.io/apimachinery/pkg/runtime"
@@ -127,7 +127,7 @@ func (c *ConfigMapLoader) Populate() error {
127127
continue
128128
}
129129

130-
bundle := registry.NewBundle(csv.GetName(), "", nil, &unstructured.Unstructured{Object: csvUnst})
130+
bundle := registry.NewBundle(csv.GetName(), &registry.Annotations{}, &unstructured.Unstructured{Object: csvUnst})
131131
ownedCRDs, _, err := csv.GetCustomResourceDefintions()
132132
if err != nil {
133133
errs = append(errs, err)

‎pkg/sqlite/load.go

+73-12
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func NewSQLLiteLoader(db *sql.DB, opts ...DbOption) (MigratableLoader, error) {
3131
o(options)
3232
}
3333

34-
if _, err := db.Exec("PRAGMA foreign_keys = ON", nil); err != nil {
34+
if _, err := db.Exec("PRAGMA foreign_keys = ON"); err != nil {
3535
return nil, err
3636
}
3737

@@ -67,6 +67,7 @@ func (s *sqlLoader) AddOperatorBundle(bundle *registry.Bundle) error {
6767
}
6868

6969
func (s *sqlLoader) addOperatorBundle(tx *sql.Tx, bundle *registry.Bundle) error {
70+
// addBundle, err := tx.Prepare("insert into operatorbundle(name, csv, bundle, bundlepath, version, skiprange, replaces, skips, annotations) values(?, ?, ?, ?, ?, ?, ?, ?, ?)")
7071
addBundle, err := tx.Prepare("insert into operatorbundle(name, csv, bundle, bundlepath, version, skiprange, replaces, skips) values(?, ?, ?, ?, ?, ?, ?, ?)")
7172
if err != nil {
7273
return err
@@ -79,7 +80,8 @@ func (s *sqlLoader) addOperatorBundle(tx *sql.Tx, bundle *registry.Bundle) error
7980
}
8081
defer addImage.Close()
8182

82-
csvName, bundleImage, csvBytes, bundleBytes, err := bundle.Serialize()
83+
// csvName, bundleImage, csvBytes, bundleBytes, annotationBytes, err := bundle.Serialize()
84+
csvName, bundleImage, csvBytes, bundleBytes, _, err := bundle.Serialize()
8385
if err != nil {
8486
return err
8587
}
@@ -105,6 +107,7 @@ func (s *sqlLoader) addOperatorBundle(tx *sql.Tx, bundle *registry.Bundle) error
105107
return err
106108
}
107109

110+
// if _, err := addBundle.Exec(csvName, csvBytes, bundleBytes, bundleImage, version, skiprange, replaces, strings.Join(skips, ","), annotationBytes); err != nil {
108111
if _, err := addBundle.Exec(csvName, csvBytes, bundleBytes, bundleImage, version, skiprange, replaces, strings.Join(skips, ",")); err != nil {
109112
return err
110113
}
@@ -656,23 +659,72 @@ func (s *sqlLoader) RemovePackage(packageName string) error {
656659
return err
657660
}
658661
for _, csvName := range csvNames {
659-
err = s.rmBundle(tx, csvName)
660-
if err != nil {
662+
if err := s.rmBundle(tx, csvName); err != nil {
661663
return err
662664
}
663665
}
664666

667+
deletePackage, err := tx.Prepare("DELETE FROM package WHERE package.name=?")
668+
if err != nil {
669+
return err
670+
}
671+
defer deletePackage.Close()
672+
673+
if _, err := deletePackage.Exec(packageName); err != nil {
674+
return err
675+
}
676+
677+
deleteChannel, err := tx.Prepare("DELETE FROM channel WHERE package_name = ?")
678+
if err != nil {
679+
return err
680+
}
681+
defer deleteChannel.Close()
682+
683+
if _, err := deleteChannel.Exec(packageName); err != nil {
684+
return err
685+
}
686+
665687
return tx.Commit()
666688
}
667689

668690
func (s *sqlLoader) rmBundle(tx *sql.Tx, csvName string) error {
669-
stmt, err := tx.Prepare("DELETE FROM operatorbundle WHERE operatorbundle.name=?")
691+
deleteBundle, err := tx.Prepare("DELETE FROM operatorbundle WHERE operatorbundle.name=?")
670692
if err != nil {
671693
return err
672694
}
673-
defer stmt.Close()
695+
defer deleteBundle.Close()
674696

675-
if _, err := stmt.Exec(csvName); err != nil {
697+
if _, err := deleteBundle.Exec(csvName); err != nil {
698+
return err
699+
}
700+
701+
deleteProvider, err := tx.Prepare("DELETE FROM api_provider WHERE api_provider.operatorbundle_name=?")
702+
if err != nil {
703+
return err
704+
}
705+
defer deleteProvider.Close()
706+
707+
if _, err := deleteProvider.Exec(csvName); err != nil {
708+
return err
709+
}
710+
711+
deleteRequirer, err := tx.Prepare("DELETE FROM api_requirer WHERE api_requirer.operatorbundle_name=?")
712+
if err != nil {
713+
return err
714+
}
715+
defer deleteRequirer.Close()
716+
717+
if _, err := deleteRequirer.Exec(csvName); err != nil {
718+
return err
719+
}
720+
721+
deleteChannelEntries, err := tx.Prepare("DELETE FROM channel_entry WHERE channel_entry.operatorbundle_name=?")
722+
if err != nil {
723+
return err
724+
}
725+
defer deleteChannelEntries.Close()
726+
727+
if _, err := deleteChannelEntries.Exec(csvName); err != nil {
676728
return err
677729
}
678730

@@ -706,8 +758,8 @@ func (s *sqlLoader) AddBundlePackageChannels(manifest registry.PackageManifest,
706758
return err
707759
}
708760

709-
// Delete package and channels (entries will cascade) - they will be recalculated
710-
deletePkg, err := tx.Prepare("delete from package where name = ?")
761+
// Delete package, channel, and entries - they will be recalculated
762+
deletePkg, err := tx.Prepare("DELETE FROM package WHERE name = ?")
711763
if err != nil {
712764
return err
713765
}
@@ -716,7 +768,7 @@ func (s *sqlLoader) AddBundlePackageChannels(manifest registry.PackageManifest,
716768
if err != nil {
717769
return err
718770
}
719-
deleteChan, err := tx.Prepare("delete from channel where package_name = ?")
771+
deleteChan, err := tx.Prepare("DELETE FROM channel WHERE package_name = ?")
720772
if err != nil {
721773
return err
722774
}
@@ -725,6 +777,15 @@ func (s *sqlLoader) AddBundlePackageChannels(manifest registry.PackageManifest,
725777
if err != nil {
726778
return err
727779
}
780+
deleteChannelEntries, err := tx.Prepare("DELETE FROM channel_entry WHERE package_name = ?")
781+
if err != nil {
782+
return err
783+
}
784+
defer deleteChannelEntries.Close()
785+
_, err = deleteChannelEntries.Exec(manifest.PackageName)
786+
if err != nil {
787+
return err
788+
}
728789

729790
if err := s.addPackageChannels(tx, manifest); err != nil {
730791
return err
@@ -1019,11 +1080,11 @@ func (s *sqlLoader) DeprecateBundle(path string) error {
10191080
}
10201081

10211082
for _, bundle := range tailBundles {
1022-
err := s.rmBundle(tx, bundle)
1083+
err = s.rmChannelEntry(tx, bundle)
10231084
if err != nil {
10241085
return err
10251086
}
1026-
err = s.rmChannelEntry(tx, bundle)
1087+
err := s.rmBundle(tx, bundle)
10271088
if err != nil {
10281089
return err
10291090
}

‎pkg/sqlite/load_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"strings"
78
"testing"
89

910
"github.com/stretchr/testify/require"
@@ -239,7 +240,10 @@ func newUnstructuredCSVwithSkips(t *testing.T, name, replaces string, skips ...s
239240
}
240241

241242
func newBundle(t *testing.T, name, pkgName string, channels []string, objs ...*unstructured.Unstructured) *registry.Bundle {
242-
bundle := registry.NewBundle(name, pkgName, channels, objs...)
243+
bundle := registry.NewBundle(name, &registry.Annotations{
244+
PackageName: pkgName,
245+
Channels: strings.Join(channels, ","),
246+
}, objs...)
243247

244248
// Bust the bundle cache to set the CSV and CRDs
245249
_, err := bundle.ClusterServiceVersion()

‎pkg/sqlite/query.go

+102
Original file line numberDiff line numberDiff line change
@@ -1233,3 +1233,105 @@ func (s *SQLQuerier) GetBundlePathIfExists(ctx context.Context, bundleName strin
12331233

12341234
return
12351235
}
1236+
1237+
// ListRegistryBundles returns a set of registry bundles.
1238+
// The set can be filtered by package by setting the given context's 'package' key to a desired package name.
1239+
// e.g.
1240+
// ctx := ContextWithPackage(context.TODO(), "etcd")
1241+
// bundles, err := querier.ListRegistryBundles(ctx)
1242+
// // ...
1243+
func (s *SQLQuerier) ListRegistryBundles(ctx context.Context) ([]*registry.Bundle, error) {
1244+
listBundlesQuery := `
1245+
SELECT DISTINCT operatorbundle.name, operatorbundle.version, operatorbundle.bundle, channel_entry.package_name
1246+
FROM operatorbundle
1247+
LEFT OUTER JOIN channel_entry ON operatorbundle.name = channel_entry.operatorbundle_name`
1248+
1249+
var (
1250+
err error
1251+
rows RowScanner
1252+
)
1253+
if pkg, ok := registry.PackageFromContext(ctx); ok {
1254+
listBundlesQuery += " WHERE channel_entry.package_name=?"
1255+
rows, err = s.db.QueryContext(ctx, listBundlesQuery, pkg)
1256+
} else {
1257+
rows, err = s.db.QueryContext(ctx, listBundlesQuery)
1258+
}
1259+
if err != nil {
1260+
return nil, err
1261+
}
1262+
defer rows.Close()
1263+
1264+
var bundles []*registry.Bundle
1265+
for rows.Next() {
1266+
var (
1267+
bundleName sql.NullString
1268+
bundleVersion sql.NullString
1269+
bundle sql.NullString
1270+
packageName sql.NullString
1271+
)
1272+
if err := rows.Scan(&bundleName, &bundleVersion, &bundle, &packageName); err != nil {
1273+
return nil, err
1274+
}
1275+
1276+
switch {
1277+
case !bundleName.Valid:
1278+
return nil, fmt.Errorf("bundle name column corrupted")
1279+
case !bundleVersion.Valid:
1280+
// Version field is currently nullable
1281+
case !bundle.Valid:
1282+
// Bundle field is currently nullable
1283+
case !packageName.Valid:
1284+
return nil, fmt.Errorf("package name column corrupted")
1285+
}
1286+
1287+
// Allow the channel_entry table to be authoritative
1288+
channels, err := s.listBundleChannels(ctx, bundleName.String)
1289+
if err != nil {
1290+
return nil, fmt.Errorf("unable to list channels for bundle %s: %s", bundleName.String, err)
1291+
}
1292+
1293+
defaultChannel, err := s.GetDefaultChannelForPackage(ctx, packageName.String)
1294+
if err != nil {
1295+
return nil, fmt.Errorf("unable to get default channel for package %s: %s", packageName.String, err)
1296+
}
1297+
1298+
b, err := registry.NewBundleFromStrings(bundleName.String, bundleVersion.String, packageName.String, defaultChannel, strings.Join(channels, ","), bundle.String)
1299+
if err != nil {
1300+
return nil, fmt.Errorf("unable to unmarshal bundle %s from database: %s", bundleName.String, err)
1301+
}
1302+
1303+
bundles = append(bundles, b)
1304+
}
1305+
1306+
return bundles, nil
1307+
}
1308+
1309+
func (s *SQLQuerier) listBundleChannels(ctx context.Context, bundleName string) ([]string, error) {
1310+
listBundleChannelsQuery := `
1311+
SELECT DISTINCT channel_entry.channel_name
1312+
FROM channel_entry
1313+
INNER JOIN operatorbundle ON channel_entry.operatorbundle_name = operatorbundle.name
1314+
WHERE operatorbundle.name = ?`
1315+
1316+
rows, err := s.db.QueryContext(ctx, listBundleChannelsQuery, bundleName)
1317+
if err != nil {
1318+
return nil, err
1319+
}
1320+
defer rows.Close()
1321+
1322+
var channels []string
1323+
for rows.Next() {
1324+
var channel sql.NullString
1325+
if err := rows.Scan(&channel); err != nil {
1326+
return nil, err
1327+
}
1328+
1329+
if !channel.Valid {
1330+
return nil, fmt.Errorf("channel name column corrupt for bundle %s", bundleName)
1331+
}
1332+
1333+
channels = append(channels, channel.String)
1334+
}
1335+
1336+
return channels, nil
1337+
}

0 commit comments

Comments
 (0)
Please sign in to comment.