Skip to content

Commit 71151f3

Browse files
committedMar 29, 2020
refactor(db): remove denormalization that associated provided and
required apis with channel entries, rather than bundles. this trades an additional join at query time for a much simpler load interface
1 parent ec4a87d commit 71151f3

13 files changed

+324
-116
lines changed
 

‎pkg/registry/interface.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
type Load interface {
1010
AddOperatorBundle(bundle *Bundle) error
11-
AddBundlePackageChannels(manifest PackageManifest, bundle Bundle) error
11+
AddBundlePackageChannels(manifest PackageManifest, bundle *Bundle) error
1212
AddPackageChannels(manifest PackageManifest) error
1313
RmPackageName(packageName string) error
1414
ClearNonDefaultBundles(packageName string) error

‎pkg/sqlite/configmap.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ func (c *ConfigMapLoader) Populate() error {
152152
}
153153

154154
if err := c.store.AddOperatorBundle(bundle); err != nil {
155-
errs = append(errs, fmt.Errorf("error adding operator bundle %s: %s", bundle.Name, err))
155+
version, _ := bundle.Version()
156+
errs = append(errs, fmt.Errorf("error adding operator bundle %s/%s/%s: %s", csv.GetName(), version, bundle.BundleImage, err))
156157
}
157158
}
158159

‎pkg/sqlite/configmap_test.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ func TestQuerierForConfigmap(t *testing.T) {
138138
{Group: "etcd.database.coreos.com", Version: "v1beta2", Kind: "EtcdCluster", Plural: "etcdclusters"},
139139
{Group: "etcd.database.coreos.com", Version: "v1beta2", Kind: "EtcdBackup", Plural: "etcdbackups"},
140140
{Group: "etcd.database.coreos.com", Version: "v1beta2", Kind: "EtcdRestore", Plural: "etcdrestores"},
141-
// {Group: "etcd.database.coreos.com", Version:"v1beta2", Kind:"FakeEtcdObject", Plural:"fakeetcds"},
142141
},
143142
RequiredApis: []*api.GroupVersionKind{},
144143
Version: "0.9.2",
@@ -151,19 +150,19 @@ func TestQuerierForConfigmap(t *testing.T) {
151150
"{\"apiVersion\":\"apiextensions.k8s.io/v1beta1\",\"kind\":\"CustomResourceDefinition\",\"metadata\":{\"creationTimestamp\":null,\"name\":\"etcdrestores.etcd.database.coreos.com\"},\"spec\":{\"group\":\"etcd.database.coreos.com\",\"names\":{\"kind\":\"EtcdRestore\",\"listKind\":\"EtcdRestoreList\",\"plural\":\"etcdrestores\",\"singular\":\"etcdrestore\"},\"scope\":\"Namespaced\",\"version\":\"v1beta2\",\"versions\":[{\"name\":\"v1beta2\",\"served\":true,\"storage\":true}]},\"status\":{\"acceptedNames\":{\"kind\":\"\",\"plural\":\"\"},\"conditions\":null,\"storedVersions\":null}}",
152151
},
153152
}
154-
require.EqualValues(t, expectedBundle, etcdBundleByChannel)
153+
EqualBundles(t, *expectedBundle, *etcdBundleByChannel)
155154

156155
etcdBundle, err := store.GetBundle(context.TODO(), "etcd", "alpha", "etcdoperator.v0.9.2")
157156
require.NoError(t, err)
158-
require.Equal(t, expectedBundle, etcdBundle)
157+
EqualBundles(t, *expectedBundle, *etcdBundle)
159158

160159
etcdChannelEntries, err := store.GetChannelEntriesThatReplace(context.TODO(), "etcdoperator.v0.9.0")
161160
require.NoError(t, err)
162161
require.ElementsMatch(t, []*registry.ChannelEntry{{"etcd", "alpha", "etcdoperator.v0.9.2", "etcdoperator.v0.9.0"}}, etcdChannelEntries)
163162

164163
etcdBundleByReplaces, err := store.GetBundleThatReplaces(context.TODO(), "etcdoperator.v0.9.0", "etcd", "alpha")
165164
require.NoError(t, err)
166-
require.EqualValues(t, expectedBundle, etcdBundleByReplaces)
165+
EqualBundles(t, *expectedBundle, *etcdBundleByReplaces)
167166

168167
etcdChannelEntriesThatProvide, err := store.GetChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdCluster")
169168
require.ElementsMatch(t, []*registry.ChannelEntry{
@@ -178,10 +177,9 @@ func TestQuerierForConfigmap(t *testing.T) {
178177
require.NoError(t, err)
179178
require.ElementsMatch(t, []*registry.ChannelEntry{{"etcd", "alpha", "etcdoperator.v0.9.2", "etcdoperator.v0.9.0"}}, etcdLatestChannelEntriesThatProvide)
180179

181-
// etcdBundleByProvides, entry, err := store.GetBundleThatProvides(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdCluster")
182-
// require.NoError(t, err)
183-
// require.Equal(t, expectedBundle, etcdBundleByProvides)
184-
// require.Equal(t, &registry.ChannelEntry{"etcd", "alpha", "etcdoperator.v0.9.2", ""}, entry)
180+
etcdBundleByProvides, err := store.GetBundleThatProvides(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdCluster")
181+
require.NoError(t, err)
182+
EqualBundles(t, *expectedBundle, *etcdBundleByProvides)
185183

186184
expectedEtcdImages := []string{
187185
"quay.io/coreos/etcd-operator@sha256:bd944a211eaf8f31da5e6d69e8541e7cada8f16a9f7a5a570b22478997819943",

‎pkg/sqlite/directory.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ func (d *DirectoryLoader) LoadBundleWalkFunc(path string, f os.FileInfo, err err
122122
}
123123

124124
if err := d.store.AddOperatorBundle(bundle); err != nil {
125-
errs = append(errs, fmt.Errorf("error adding operator bundle %s: %s", bundle.Name, err))
125+
version, _ := bundle.Version()
126+
errs = append(errs, fmt.Errorf("error adding operator bundle %s/%s/%s: %s", csv.GetName(), version, bundle.BundleImage, err))
126127
}
127128

128129
return utilerrors.NewAggregate(errs)
@@ -177,14 +178,16 @@ func (d *DirectoryLoader) LoadPackagesWalkFunc(path string, f os.FileInfo, err e
177178
// loadBundle takes the directory that a CSV is in and assumes the rest of the objects in that directory
178179
// are part of the bundle.
179180
func loadBundle(csvName string, dir string) (*registry.Bundle, error) {
180-
log := logrus.WithFields(logrus.Fields{"dir": dir, "load": "bundle"})
181+
log := logrus.WithFields(logrus.Fields{"dir": dir, "load": "bundle", "name": csvName})
181182
files, err := ioutil.ReadDir(dir)
182183
if err != nil {
183184
return nil, err
184185
}
185186

186187
var errs []error
187-
bundle := &registry.Bundle{}
188+
bundle := &registry.Bundle{
189+
Name: csvName,
190+
}
188191
for _, f := range files {
189192
log = log.WithField("file", f.Name())
190193
if f.IsDir() {

‎pkg/sqlite/directory_test.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestDirectoryLoader(t *testing.T) {
2424
require.NoError(t, err)
2525
require.NoError(t, store.Migrate(context.TODO()))
2626

27-
loader := NewSQLLoaderForDirectory(store, "../../manifests")
27+
loader := NewSQLLoaderForDirectory(store, "./testdata/loader_data")
2828
require.NoError(t, loader.Populate())
2929
}
3030

@@ -45,7 +45,7 @@ func TestDirectoryLoaderWithBadPackageData(t *testing.T) {
4545
t.Fatal(err)
4646
}
4747
}()
48-
require.NoError(t, copy.Copy("../../manifests", dir))
48+
require.NoError(t, copy.Copy("./testdata/loader_data", dir))
4949

5050
// Point the first channel at a CSV that doesn't exist
5151
path := filepath.Join(dir, "etcd/etcd.package.yaml")
@@ -82,6 +82,7 @@ func TestDirectoryLoaderWithBadBundleData(t *testing.T) {
8282
loader := NewSQLLoaderForDirectory(store, "pkg/sqlite/testdata/incorrectbundle")
8383
require.Error(t, loader.Populate(), "error loading manifests from directory: [error adding operator bundle : json: cannot unmarshal number into Go struct field EnvVar.Install.spec.Deployments.Spec.template.spec.containers.env.value of type string, error loading package into db: [FOREIGN KEY constraint failed, no bundle found for csv 3scale-community-operator.v0.3.0]]")
8484
}
85+
8586
func TestQuerierForDirectory(t *testing.T) {
8687
db, cleanup := CreateTestDb(t)
8788
defer cleanup()
@@ -143,19 +144,19 @@ func TestQuerierForDirectory(t *testing.T) {
143144
{Group: "etcd.database.coreos.com", Version: "v1beta2", Kind: "EtcdCluster", Plural: "etcdclusters"},
144145
},
145146
}
146-
require.Equal(t, expectedBundle, etcdBundleByChannel)
147+
EqualBundles(t, *expectedBundle, *etcdBundleByChannel)
147148

148149
etcdBundle, err := store.GetBundle(context.TODO(), "etcd", "alpha", "etcdoperator.v0.9.2")
149150
require.NoError(t, err)
150-
require.Equal(t, expectedBundle, etcdBundle)
151+
EqualBundles(t, *expectedBundle, *etcdBundle)
151152

152153
etcdChannelEntries, err := store.GetChannelEntriesThatReplace(context.TODO(), "etcdoperator.v0.9.0")
153154
require.NoError(t, err)
154155
require.ElementsMatch(t, []*registry.ChannelEntry{{"etcd", "alpha", "etcdoperator.v0.9.2", "etcdoperator.v0.9.0"}, {"etcd", "stable", "etcdoperator.v0.9.2", "etcdoperator.v0.9.0"}}, etcdChannelEntries)
155156

156157
etcdBundleByReplaces, err := store.GetBundleThatReplaces(context.TODO(), "etcdoperator.v0.9.0", "etcd", "alpha")
157158
require.NoError(t, err)
158-
require.EqualValues(t, expectedBundle, etcdBundleByReplaces)
159+
EqualBundles(t, *expectedBundle, *etcdBundleByReplaces)
159160

160161
etcdChannelEntriesThatProvide, err := store.GetChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdCluster")
161162
require.ElementsMatch(t, []*registry.ChannelEntry{
@@ -178,7 +179,7 @@ func TestQuerierForDirectory(t *testing.T) {
178179

179180
etcdBundleByProvides, err := store.GetBundleThatProvides(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdCluster")
180181
require.NoError(t, err)
181-
require.Equal(t, expectedBundle, etcdBundleByProvides)
182+
EqualBundles(t, *expectedBundle, *etcdBundleByProvides)
182183

183184
kafkaPackage, err := store.GetPackage(context.TODO(), "strimzi-kafka-operator")
184185
require.NoError(t, err)
@@ -231,3 +232,10 @@ func TestQuerierForDirectory(t *testing.T) {
231232
require.NoError(t, err)
232233
require.ElementsMatch(t, expectedDatabaseImages, dbImages)
233234
}
235+
236+
func EqualBundles(t *testing.T, expected, actual api.Bundle) {
237+
require.ElementsMatch(t, expected.ProvidedApis, actual.ProvidedApis)
238+
require.ElementsMatch(t, expected.RequiredApis, actual.RequiredApis)
239+
expected.RequiredApis, expected.ProvidedApis, actual.RequiredApis, actual.ProvidedApis = nil, nil, nil, nil
240+
require.EqualValues(t, expected, actual)
241+
}

‎pkg/sqlite/graphloader_test.go

+6-25
Original file line numberDiff line numberDiff line change
@@ -3,41 +3,22 @@ package sqlite
33
import (
44
"context"
55
"database/sql"
6-
"fmt"
76
"github.com/operator-framework/operator-registry/pkg/registry"
8-
"math/rand"
9-
"os"
107
"testing"
118

129
"github.com/stretchr/testify/require"
1310
)
1411

1512
func createLoadedTestDb(t *testing.T) (*sql.DB, func()) {
16-
dbName := fmt.Sprintf("test-%d.db", rand.Int())
17-
18-
db, err := sql.Open("sqlite3", dbName)
19-
require.NoError(t, err)
20-
21-
dbLoader, err := NewSQLLiteLoader(db)
13+
db, cleanup := CreateTestDb(t)
14+
store, err := NewSQLLiteLoader(db)
2215
require.NoError(t, err)
16+
require.NoError(t, store.Migrate(context.TODO()))
2317

24-
err = dbLoader.Migrate(context.TODO())
25-
require.NoError(t, err)
18+
loader := NewSQLLoaderForDirectory(store, "./testdata/loader_data")
19+
require.NoError(t, loader.Populate())
2620

27-
loader := NewSQLLoaderForDirectory(dbLoader, "./testdata/loader_data")
28-
err = loader.Populate()
29-
require.NoError(t, err)
30-
31-
return db, func() {
32-
defer func() {
33-
if err := os.Remove(dbName); err != nil {
34-
t.Fatal(err)
35-
}
36-
}()
37-
if err := db.Close(); err != nil {
38-
t.Fatal(err)
39-
}
40-
}
21+
return db, cleanup
4122
}
4223

4324
func TestLoadPackageGraph_Etcd(t *testing.T) {

‎pkg/sqlite/image.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (i *ImageLoader) loadManifests(manifests string, annotationsFile *registry.
146146
return fmt.Errorf("Could not translate annotations file into packageManifest %s", err)
147147
}
148148

149-
if err := i.loadOperatorBundle(packageManifest, *bundle); err != nil {
149+
if err := i.loadOperatorBundle(packageManifest, bundle); err != nil {
150150
return fmt.Errorf("Error adding package %s", err)
151151
}
152152

@@ -206,7 +206,7 @@ func (i *ImageLoader) findCSV(manifests string) (*unstructured.Unstructured, err
206206
}
207207

208208
// loadOperatorBundle adds the package information to the loader's store
209-
func (i *ImageLoader) loadOperatorBundle(manifest registry.PackageManifest, bundle registry.Bundle) error {
209+
func (i *ImageLoader) loadOperatorBundle(manifest registry.PackageManifest, bundle *registry.Bundle) error {
210210
if manifest.PackageName == "" {
211211
return nil
212212
}

‎pkg/sqlite/image_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -131,19 +131,19 @@ func TestQuerierForImage(t *testing.T) {
131131
{Group: "etcd.database.coreos.com", Version: "v1beta2", Kind: "EtcdCluster", Plural: "etcdclusters"},
132132
},
133133
}
134-
require.Equal(t, expectedBundle, etcdBundleByChannel)
134+
EqualBundles(t, *expectedBundle, *etcdBundleByChannel)
135135

136136
etcdBundle, err := store.GetBundle(context.TODO(), "etcd", "alpha", "etcdoperator.v0.9.2")
137137
require.NoError(t, err)
138-
require.Equal(t, expectedBundle, etcdBundle)
138+
EqualBundles(t, *expectedBundle, *etcdBundle)
139139

140140
etcdChannelEntries, err := store.GetChannelEntriesThatReplace(context.TODO(), "etcdoperator.v0.9.0")
141141
require.NoError(t, err)
142142
require.ElementsMatch(t, []*registry.ChannelEntry{{"etcd", "alpha", "etcdoperator.v0.9.2", "etcdoperator.v0.9.0"}, {"etcd", "stable", "etcdoperator.v0.9.2", "etcdoperator.v0.9.0"}}, etcdChannelEntries)
143143

144144
etcdBundleByReplaces, err := store.GetBundleThatReplaces(context.TODO(), "etcdoperator.v0.9.0", "etcd", "alpha")
145145
require.NoError(t, err)
146-
require.EqualValues(t, expectedBundle, etcdBundleByReplaces)
146+
EqualBundles(t, *expectedBundle, *etcdBundleByReplaces)
147147

148148
etcdChannelEntriesThatProvide, err := store.GetChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdCluster")
149149
require.ElementsMatch(t, []*registry.ChannelEntry{
@@ -161,7 +161,7 @@ func TestQuerierForImage(t *testing.T) {
161161

162162
etcdBundleByProvides, err := store.GetBundleThatProvides(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdCluster")
163163
require.NoError(t, err)
164-
require.Equal(t, expectedBundle, etcdBundleByProvides)
164+
EqualBundles(t, *expectedBundle, *etcdBundleByProvides)
165165

166166
expectedEtcdImages := []string{
167167
"quay.io/coreos/etcd-operator@sha256:c0301e4686c3ed4206e370b42de5a3bd2229b9fb4906cf85f3f30650424abec2",

‎pkg/sqlite/load.go

+30-57
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ func (s *SQLLoader) AddOperatorBundle(bundle *registry.Bundle) error {
101101
}
102102
}
103103

104+
if err := s.addAPIs(tx, bundle); err != nil {
105+
return err
106+
}
107+
104108
return tx.Commit()
105109
}
106110

@@ -201,10 +205,6 @@ func (s *SQLLoader) AddPackageChannels(manifest registry.PackageManifest) error
201205
break
202206
}
203207

204-
if err := s.addAPIs(tx, channelEntryCSV, currentID); err != nil {
205-
errs = append(errs, err)
206-
}
207-
208208
skips, err := channelEntryCSV.GetSkips()
209209
if err != nil {
210210
errs = append(errs, err)
@@ -242,11 +242,6 @@ func (s *SQLLoader) AddPackageChannels(manifest registry.PackageManifest) error
242242
continue
243243
}
244244

245-
if err := s.addAPIs(tx, channelEntryCSV, synthesizedID); err != nil {
246-
errs = append(errs, err)
247-
continue
248-
}
249-
250245
depth++
251246
}
252247

@@ -454,74 +449,59 @@ func (s *SQLLoader) getCSV(tx *sql.Tx, csvName string) (*registry.ClusterService
454449
return csv, nil
455450
}
456451

457-
func (s *SQLLoader) addAPIs(tx *sql.Tx, csv *registry.ClusterServiceVersion, channelEntryId int64) error {
452+
func (s *SQLLoader) addAPIs(tx *sql.Tx, bundle *registry.Bundle) error {
453+
if bundle.Name == "" {
454+
return fmt.Errorf("cannot add apis for bundle with no name: %#v", bundle)
455+
}
458456
addAPI, err := tx.Prepare("insert or replace into api(group_name, version, kind, plural) values(?, ?, ?, ?)")
459457
if err != nil {
460458
return err
461459
}
462460
defer addAPI.Close()
463461

464-
addAPIProvider, err := tx.Prepare("insert into api_provider(group_name, version, kind, channel_entry_id) values(?, ?, ?, ?)")
462+
addApiProvider, err := tx.Prepare("insert into api_provider(group_name, version, kind, operatorbundle_name, operatorbundle_version, operatorbundle_path) values(?, ?, ?, ?, ?, ?)")
465463
if err != nil {
466464
return err
467465
}
468-
defer addAPIProvider.Close()
466+
defer addApiProvider.Close()
469467

470-
addApiRequirer, err := tx.Prepare("insert into api_requirer(group_name, version, kind, channel_entry_id) values(?, ?, ?, ?)")
468+
addApiRequirer, err := tx.Prepare("insert into api_requirer(group_name, version, kind, operatorbundle_name, operatorbundle_version, operatorbundle_path) values(?, ?, ?, ?, ?, ?)")
471469
if err != nil {
472470
return err
473471
}
474472
defer addApiRequirer.Close()
475473

476-
ownedCRDs, requiredCRDs, err := csv.GetCustomResourceDefintions()
474+
providedApis, err := bundle.ProvidedAPIs()
477475
if err != nil {
478476
return err
479477
}
480-
for _, crd := range ownedCRDs {
481-
plural, group, err := SplitCRDName(crd.Name)
482-
if err != nil {
483-
return err
484-
}
485-
if _, err := addAPI.Exec(group, crd.Version, crd.Kind, plural); err != nil {
486-
return err
487-
}
488-
if _, err := addAPIProvider.Exec(group, crd.Version, crd.Kind, channelEntryId); err != nil {
489-
return err
490-
}
491-
}
492-
for _, crd := range requiredCRDs {
493-
plural, group, err := SplitCRDName(crd.Name)
494-
if err != nil {
495-
return err
496-
}
497-
if _, err := addAPI.Exec(group, crd.Version, crd.Kind, plural); err != nil {
498-
return err
499-
}
500-
if _, err := addApiRequirer.Exec(group, crd.Version, crd.Kind, channelEntryId); err != nil {
501-
return err
502-
}
478+
requiredApis, err := bundle.RequiredAPIs()
479+
if err != nil {
480+
return err
503481
}
504-
505-
ownedAPIs, requiredAPIs, err := csv.GetApiServiceDefinitions()
482+
bundleVersion, err := bundle.Version()
506483
if err != nil {
507484
return err
508485
}
509-
for _, api := range ownedAPIs {
510-
if _, err := addAPI.Exec(api.Group, api.Version, api.Kind, api.Name); err != nil {
486+
for api := range providedApis {
487+
if _, err := addAPI.Exec(api.Group, api.Version, api.Kind, api.Plural); err != nil {
511488
return err
512489
}
513-
if _, err := addAPIProvider.Exec(api.Group, api.Version, api.Kind, channelEntryId); err != nil {
490+
491+
if _, err := addApiProvider.Exec(api.Group, api.Version, api.Kind, bundle.Name, bundleVersion, bundle.BundleImage); err != nil {
514492
return err
515493
}
516494
}
517-
for _, api := range requiredAPIs {
518-
if _, err := addAPI.Exec(api.Group, api.Version, api.Kind, api.Name); err != nil {
495+
for api := range requiredApis {
496+
if _, err := addAPI.Exec(api.Group, api.Version, api.Kind, api.Plural); err != nil {
519497
return err
520498
}
521-
if _, err := addApiRequirer.Exec(api.Group, api.Version, api.Kind, channelEntryId); err != nil {
499+
500+
if _, err := addApiRequirer.Exec(api.Group, api.Version, api.Kind, bundle.Name, bundleVersion, bundle.BundleImage); err != nil {
522501
return err
523502
}
524503
}
504+
525505
return nil
526506
}
527507
func (s *SQLLoader) getCSVNames(tx *sql.Tx, packageName string) ([]string, error) {
@@ -627,7 +607,7 @@ func (s *SQLLoader) rmBundle(tx *sql.Tx, csvName string) error {
627607
return nil
628608
}
629609

630-
func (s *SQLLoader) AddBundlePackageChannels(manifest registry.PackageManifest, bundle registry.Bundle) error {
610+
func (s *SQLLoader) AddBundlePackageChannels(manifest registry.PackageManifest, bundle *registry.Bundle) error {
631611
var errs []error
632612
tx, err := s.db.Begin()
633613
if err != nil {
@@ -682,6 +662,10 @@ func (s *SQLLoader) AddBundlePackageChannels(manifest registry.PackageManifest,
682662
}
683663
}
684664

665+
if err := s.addAPIs(tx, bundle); err != nil {
666+
return err
667+
}
668+
685669
if err := tx.Commit(); err != nil {
686670
return err
687671
}
@@ -944,12 +928,6 @@ func (s *SQLLoader) updatePackageChannels(tx *sql.Tx, manifest registry.PackageM
944928
}
945929
}
946930

947-
// add APIs
948-
if err := s.addAPIs(tx, channelEntryCSV, currentID); err != nil {
949-
errs = append(errs, err)
950-
continue
951-
}
952-
953931
// update depth to depth + 1 for replaced entry
954932
_, err = updateDepth.Exec(c.Name, manifest.PackageName, replaces)
955933
if err != nil {
@@ -991,11 +969,6 @@ func (s *SQLLoader) updatePackageChannels(tx *sql.Tx, manifest registry.PackageM
991969
continue
992970
}
993971

994-
if err := s.addAPIs(tx, channelEntryCSV, synthesizedID); err != nil {
995-
errs = append(errs, err)
996-
continue
997-
}
998-
999972
depth++
1000973
}
1001974
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
package migrations
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"github.com/operator-framework/operator-registry/pkg/registry"
7+
)
8+
9+
const AssociateApisWithBundleMigrationKey = 6
10+
11+
// Register this migration
12+
func init() {
13+
registerMigration(AssociateApisWithBundleMigrationKey, bundleApiMigration)
14+
}
15+
16+
17+
// This migration moves the link between the provided and required apis table from the channel_entry to the
18+
// bundle itself. This simplifies loading and minimizes changes that need to happen when a new bundle is
19+
// inserted into an existing database.
20+
// Before:
21+
// api_provider: FOREIGN KEY(channel_entry_id) REFERENCES channel_entry(entry_id),
22+
// api_requirer: FOREIGN KEY(channel_entry_id) REFERENCES channel_entry(entry_id),
23+
// After:
24+
// api_provider: FOREIGN KEY(operatorbundle_name, operatorbundle_version, operatorbundle_path) REFERENCES operatorbundle(name, version, bundlepath),
25+
// api_requirer: FOREIGN KEY(operatorbundle_name, operatorbundle_version, operatorbundle_path) REFERENCES operatorbundle(name, version, bundlepath),
26+
27+
var bundleApiMigration = &Migration{
28+
Id: AssociateApisWithBundleMigrationKey,
29+
Up: func(ctx context.Context, tx *sql.Tx) error {
30+
createNew := `
31+
CREATE TABLE api_provider_new (
32+
group_name TEXT,
33+
version TEXT,
34+
kind TEXT,
35+
operatorbundle_name TEXT,
36+
operatorbundle_version TEXT,
37+
operatorbundle_path TEXT,
38+
FOREIGN KEY (operatorbundle_name) REFERENCES operatorbundle(name) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
39+
FOREIGN KEY(group_name, version, kind) REFERENCES api(group_name, version, kind)
40+
);
41+
CREATE TABLE api_requirer_new (
42+
group_name TEXT,
43+
version TEXT,
44+
kind TEXT,
45+
operatorbundle_name TEXT,
46+
operatorbundle_version TEXT,
47+
operatorbundle_path TEXT,
48+
FOREIGN KEY (operatorbundle_name) REFERENCES operatorbundle(name) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
49+
FOREIGN KEY(group_name, version, kind) REFERENCES api(group_name, version, kind)
50+
);
51+
`
52+
_, err := tx.ExecContext(ctx, createNew)
53+
if err != nil {
54+
return err
55+
}
56+
57+
insertProvided := `INSERT INTO api_provider_new(group_name, version, kind, operatorbundle_name, operatorbundle_version, operatorbundle_path) VALUES (?, ?, ?, ?, ?, ?)`
58+
insertRequired := `INSERT INTO api_requirer_new(group_name, version, kind, operatorbundle_name, operatorbundle_version, operatorbundle_path) VALUES (?, ?, ?, ?, ?, ?)`
59+
60+
bundleApis, err := getApisForBundles(ctx, tx)
61+
if err != nil {
62+
return err
63+
}
64+
for bundle, apis := range bundleApis {
65+
for provided := range apis.provided {
66+
_, err := tx.ExecContext(ctx, insertProvided, provided.Group, provided.Version, provided.Kind, bundle.CsvName, bundle.Version, bundle.BundlePath)
67+
if err != nil {
68+
return err
69+
}
70+
}
71+
for required := range apis.required {
72+
_, err := tx.ExecContext(ctx, insertRequired, required.Group, required.Version, required.Kind, bundle.CsvName, bundle.Version, bundle.BundlePath)
73+
if err != nil {
74+
return err
75+
}
76+
}
77+
}
78+
79+
renameNewAndDropOld := `
80+
DROP TABLE api_provider;
81+
DROP TABLE api_requirer;
82+
ALTER TABLE api_provider_new RENAME TO api_provider;
83+
ALTER TABLE api_requirer_new RENAME TO api_requirer;
84+
`
85+
_, err = tx.ExecContext(ctx, renameNewAndDropOld)
86+
if err != nil {
87+
return err
88+
}
89+
90+
return err
91+
},
92+
Down: func(ctx context.Context, tx *sql.Tx) error {
93+
// TODO
94+
return nil
95+
},
96+
}
97+
98+
type apis struct {
99+
provided map[registry.APIKey]struct{}
100+
required map[registry.APIKey]struct{}
101+
}
102+
103+
func getApisForBundles(ctx context.Context, tx *sql.Tx) (map[registry.BundleKey]apis, error) {
104+
bundles := map[registry.BundleKey]apis{}
105+
106+
providedQuery := `SELECT api_provider.group_name, api_provider.version, api_provider.kind, operatorbundle.name, operatorbundle.version, operatorbundle.bundlepath
107+
FROM api_provider
108+
INNER JOIN channel_entry ON channel_entry.entry_id = api_provider.channel_entry_id
109+
INNER JOIN operatorbundle ON operatorbundle.name = channel_entry.operatorbundle_name`
110+
111+
requiredQuery := `SELECT api_requirer.group_name, api_requirer.version, api_requirer.kind, operatorbundle.name, operatorbundle.version, operatorbundle.bundlepath
112+
FROM api_requirer
113+
INNER JOIN channel_entry ON channel_entry.entry_id = api_requirer.channel_entry_id
114+
INNER JOIN operatorbundle ON operatorbundle.name = channel_entry.operatorbundle_name`
115+
116+
providedRows, err := tx.QueryContext(ctx, providedQuery)
117+
if err != nil {
118+
return nil, err
119+
}
120+
for providedRows.Next() {
121+
var group sql.NullString
122+
var apiVersion sql.NullString
123+
var kind sql.NullString
124+
var name sql.NullString
125+
var bundleVersion sql.NullString
126+
var path sql.NullString
127+
if err = providedRows.Scan(&group, &apiVersion, &kind, &name, &bundleVersion, &path); err != nil {
128+
return nil, err
129+
}
130+
if !group.Valid || !apiVersion.Valid || !kind.Valid || !name.Valid || !bundleVersion.Valid || !path.Valid {
131+
continue
132+
}
133+
key := registry.BundleKey{
134+
BundlePath: path.String,
135+
Version: bundleVersion.String,
136+
CsvName: name.String,
137+
}
138+
bundleApis, ok := bundles[key]
139+
if !ok {
140+
bundleApis = apis{
141+
provided: map[registry.APIKey]struct{}{},
142+
required: map[registry.APIKey]struct{}{},
143+
}
144+
}
145+
146+
bundleApis.provided[registry.APIKey{
147+
Group: group.String,
148+
Version: apiVersion.String,
149+
Kind: kind.String,
150+
}] = struct {}{}
151+
152+
bundles[key] = bundleApis
153+
}
154+
155+
requiredRows, err := tx.QueryContext(ctx, requiredQuery)
156+
if err != nil {
157+
return nil, err
158+
}
159+
for requiredRows.Next() {
160+
var group sql.NullString
161+
var apiVersion sql.NullString
162+
var kind sql.NullString
163+
var name sql.NullString
164+
var bundleVersion sql.NullString
165+
var path sql.NullString
166+
if err = requiredRows.Scan(&group, &apiVersion, &kind, &name, &bundleVersion, &path); err != nil {
167+
return nil, err
168+
}
169+
if !group.Valid || !apiVersion.Valid || !kind.Valid || !name.Valid || !bundleVersion.Valid || !path.Valid {
170+
continue
171+
}
172+
key := registry.BundleKey{
173+
BundlePath: path.String,
174+
Version: bundleVersion.String,
175+
CsvName: name.String,
176+
}
177+
bundleApis, ok := bundles[key]
178+
if !ok {
179+
bundleApis = apis{
180+
provided: map[registry.APIKey]struct{}{},
181+
required: map[registry.APIKey]struct{}{},
182+
}
183+
}
184+
185+
bundleApis.required[registry.APIKey{
186+
Group: group.String,
187+
Version: apiVersion.String,
188+
Kind: kind.String,
189+
}] = struct {}{}
190+
191+
bundles[key] = bundleApis
192+
}
193+
194+
return bundles, nil
195+
}
196+
197+
198+
// OLD queries
199+
//
200+
//func GetChannelEntriesThatProvide(ctx context.Context, group, version, kind string) (entries []*registry.ChannelEntry, err error) {
201+
// query := `SELECT DISTINCT channel_entry.package_name, channel_entry.channel_name, channel_entry.operatorbundle_name, replaces.operatorbundle_name
202+
// FROM channel_entry
203+
// INNER JOIN api_provider ON channel_entry.entry_id = api_provider.channel_entry_id
204+
// LEFT OUTER JOIN channel_entry replaces ON channel_entry.replaces = replaces.entry_id
205+
// WHERE api_provider.group_name = ? AND api_provider.version = ? AND api_provider.kind = ?`
206+
//
207+
// rows, err := s.db.QueryContext(ctx, query, group, version, kind)
208+
// if err != nil {
209+
// return
210+
// }
211+
// defer rows.Close()
212+
//
213+
// entries = []*registry.ChannelEntry{}
214+
//
215+
// for rows.Next() {
216+
// var pkgNameSQL sql.NullString
217+
// var channelNameSQL sql.NullString
218+
// var bundleNameSQL sql.NullString
219+
// var replacesSQL sql.NullString
220+
// if err = rows.Scan(&pkgNameSQL, &channelNameSQL, &bundleNameSQL, &replacesSQL); err != nil {
221+
// return
222+
// }
223+
//
224+
// entries = append(entries, &registry.ChannelEntry{
225+
// PackageName: pkgNameSQL.String,
226+
// ChannelName: channelNameSQL.String,
227+
// BundleName: bundleNameSQL.String,
228+
// Replaces: replacesSQL.String,
229+
// })
230+
// }
231+
// if len(entries) == 0 {
232+
// err = fmt.Errorf("no channel entries found that provide %s %s %s", group, version, kind)
233+
// return
234+
// }
235+
// return
236+
//}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package migrations
2+
3+
// TODO

‎pkg/sqlite/migrator.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"strings"
78

89
_ "github.com/golang-migrate/migrate/v4/source/file" // indirect import required by golang-migrate package
910
"github.com/sirupsen/logrus"
@@ -47,8 +48,8 @@ func (m *SQLLiteMigrator) Migrate(ctx context.Context) error {
4748
return err
4849
}
4950
defer func() {
50-
if err := tx.Rollback(); err != nil {
51-
logrus.WithError(err).Debugf("couldn't rollback - this is expected if the transaction committed")
51+
if err := tx.Rollback(); err != nil && !strings.Contains(err.Error(), "transaction has already been committed") {
52+
logrus.WithError(err).Warnf("couldn't rollback")
5253
}
5354
}()
5455

‎pkg/sqlite/query.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ func (s *SQLQuerier) GetBundleThatReplaces(ctx context.Context, name, pkgName, c
365365
func (s *SQLQuerier) GetChannelEntriesThatProvide(ctx context.Context, group, version, kind string) (entries []*registry.ChannelEntry, err error) {
366366
query := `SELECT DISTINCT channel_entry.package_name, channel_entry.channel_name, channel_entry.operatorbundle_name, replaces.operatorbundle_name
367367
FROM channel_entry
368-
INNER JOIN api_provider ON channel_entry.entry_id = api_provider.channel_entry_id
368+
INNER JOIN api_provider ON channel_entry.operatorbundle_name = api_provider.operatorbundle_name
369369
LEFT OUTER JOIN channel_entry replaces ON channel_entry.replaces = replaces.entry_id
370370
WHERE api_provider.group_name = ? AND api_provider.version = ? AND api_provider.kind = ?`
371371

@@ -404,7 +404,7 @@ func (s *SQLQuerier) GetChannelEntriesThatProvide(ctx context.Context, group, ve
404404
func (s *SQLQuerier) GetLatestChannelEntriesThatProvide(ctx context.Context, group, version, kind string) (entries []*registry.ChannelEntry, err error) {
405405
query := `SELECT DISTINCT channel_entry.package_name, channel_entry.channel_name, channel_entry.operatorbundle_name, replaces.operatorbundle_name, MIN(channel_entry.depth)
406406
FROM channel_entry
407-
INNER JOIN api_provider ON channel_entry.entry_id = api_provider.channel_entry_id
407+
INNER JOIN api_provider ON channel_entry.operatorbundle_name = api_provider.operatorbundle_name
408408
LEFT OUTER JOIN channel_entry replaces ON channel_entry.replaces = replaces.entry_id
409409
WHERE api_provider.group_name = ? AND api_provider.version = ? AND api_provider.kind = ?
410410
GROUP BY channel_entry.package_name, channel_entry.channel_name`
@@ -444,8 +444,8 @@ func (s *SQLQuerier) GetLatestChannelEntriesThatProvide(ctx context.Context, gro
444444
func (s *SQLQuerier) GetBundleThatProvides(ctx context.Context, group, apiVersion, kind string) (*api.Bundle, error) {
445445
query := `SELECT DISTINCT channel_entry.entry_id, operatorbundle.bundle, operatorbundle.bundlepath, MIN(channel_entry.depth), channel_entry.operatorbundle_name, channel_entry.package_name, channel_entry.channel_name, channel_entry.replaces, operatorbundle.version, operatorbundle.skiprange
446446
FROM channel_entry
447-
INNER JOIN api_provider ON channel_entry.entry_id = api_provider.channel_entry_id
448447
INNER JOIN operatorbundle ON operatorbundle.name = channel_entry.operatorbundle_name
448+
INNER JOIN api_provider ON channel_entry.operatorbundle_name = api_provider.operatorbundle_name
449449
INNER JOIN package ON package.name = channel_entry.package_name
450450
WHERE api_provider.group_name = ? AND api_provider.version = ? AND api_provider.kind = ? AND package.default_channel = channel_entry.channel_name
451451
GROUP BY channel_entry.package_name, channel_entry.channel_name`
@@ -544,8 +544,10 @@ func (s *SQLQuerier) GetImagesForBundle(ctx context.Context, csvName string) ([]
544544

545545
func (s *SQLQuerier) GetApisForEntry(ctx context.Context, entryID int64) (provided []*api.GroupVersionKind, required []*api.GroupVersionKind, err error) {
546546
providedQuery := `SELECT DISTINCT api.group_name, api.version, api.kind, api.plural FROM api
547-
INNER JOIN api_provider ON (api.group_name=api_provider.group_name AND api.version=api_provider.version AND api.kind=api_provider.kind)
548-
WHERE api_provider.channel_entry_id=?`
547+
INNER JOIN channel_entry ON channel_entry.operatorbundle_name = api_provider.operatorbundle_name
548+
INNER JOIN operatorbundle ON operatorbundle.name=channel_entry.operatorbundle_name
549+
INNER JOIN api_provider ON (api.group_name=api_provider.group_name AND api.version=api_provider.version AND api.kind=api_provider.kind AND operatorbundle.name=api_provider.operatorbundle_name)
550+
WHERE channel_entry.entry_id=?`
549551

550552
providedRows, err := s.db.QueryContext(ctx, providedQuery, entryID)
551553
if err != nil {
@@ -577,8 +579,10 @@ func (s *SQLQuerier) GetApisForEntry(ctx context.Context, entryID int64) (provid
577579
}
578580

579581
requiredQuery := `SELECT DISTINCT api.group_name, api.version, api.kind, api.plural FROM api
580-
INNER JOIN api_requirer ON (api.group_name=api_requirer.group_name AND api.version=api_requirer.version AND api.kind=api_requirer.kind)
581-
WHERE api_requirer.channel_entry_id=?`
582+
INNER JOIN channel_entry ON channel_entry.operatorbundle_name = api_requirer.operatorbundle_name
583+
INNER JOIN operatorbundle ON operatorbundle.name=channel_entry.operatorbundle_name
584+
INNER JOIN api_requirer ON (api.group_name=api_requirer.group_name AND api.version=api_requirer.version AND api.kind=api_requirer.kind AND operatorbundle.name=api_requirer.operatorbundle_name)
585+
WHERE channel_entry.entry_id=?`
582586

583587
requiredRows, err := s.db.QueryContext(ctx, requiredQuery, entryID)
584588
if err != nil {

0 commit comments

Comments
 (0)
Please sign in to comment.