Skip to content

Commit ce2958d

Browse files
authored
Merge pull request #1705 from MikeEdgar/in-memory-storage
fix: create or update in-memory storage for both Lock and Store ops
2 parents 0cfd412 + 4b4a720 commit ce2958d

File tree

4 files changed

+70
-36
lines changed

4 files changed

+70
-36
lines changed

internal/kafka/internal/services/kafkatlscertmgmt/kafka_tls_certificate_management_service.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package kafkatlscertmgmt
33
import (
44
"context"
55
"fmt"
6-
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
76
"time"
87

8+
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
9+
910
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/internal/config"
1011
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/logger"
1112
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared"
@@ -188,7 +189,7 @@ func NewKafkaTLSCertificateManagementService(
188189
Path: "secrets/tls/",
189190
}
190191
case config.InMemoryTLSCertStorageType:
191-
storage = newInMemoryStorage()
192+
storage = newInMemoryStorage(connectionFactory)
192193
case config.SecureTLSCertStorageType:
193194
storage, err = newSecureStorage(connectionFactory, awsConfig, kafkaTLSCertificateManagementConfig.AutomaticCertificateManagementConfig)
194195
}

internal/kafka/internal/services/kafkatlscertmgmt/kafka_tls_certificate_management_service_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88

99
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/internal/config"
10+
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
1011
"github.com/caddyserver/certmagic"
1112
"github.com/onsi/gomega"
1213
)
@@ -20,7 +21,7 @@ func Test_kafkaTLSCertificateManagementService_GetCertificate(t *testing.T) {
2021
request GetCertificateRequest
2122
}
2223

23-
storageWithCerts := newInMemoryStorage()
24+
storageWithCerts := newInMemoryStorage(db.NewMockConnectionFactory(nil))
2425
crtRef := "some-crt-ref"
2526
keyRef := "some-key-ref"
2627

@@ -102,7 +103,7 @@ func Test_kafkaTLSCertificateManagementService_GetCertificate(t *testing.T) {
102103
{
103104
name: "should return an error when loading from the storage returns an error",
104105
fields: fields{
105-
storage: newInMemoryStorage(),
106+
storage: newInMemoryStorage(db.NewMockConnectionFactory(nil)),
106107
config: &config.KafkaTLSCertificateManagementConfig{
107108
CertificateManagementStrategy: config.AutomaticCertificateManagement,
108109
},
@@ -144,7 +145,7 @@ func Test_kafkaTLSCertificateManagementService_RevokeCertificate(t *testing.T) {
144145
reason CertificateRevocationReason
145146
}
146147

147-
inMemoryStorage := newInMemoryStorage()
148+
inMemoryStorage := newInMemoryStorage(db.NewMockConnectionFactory(nil))
148149
certKey := "cert-key"
149150
privateKey := "private-key"
150151
_ = inMemoryStorage.Store(context.Background(), certKey, []byte{})

internal/kafka/internal/services/kafkatlscertmgmt/memory_storage.go

+18-31
Original file line numberDiff line numberDiff line change
@@ -4,65 +4,52 @@ import (
44
"context"
55
"io/fs"
66
"strings"
7-
"sync"
87
"time"
98

9+
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
10+
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/logger"
11+
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/sync"
1012
"github.com/caddyserver/certmagic"
1113
)
1214

1315
var _ certmagic.Storage = &inMemoryStorage{}
1416

1517
type inMemoryStorageItem struct {
16-
value []byte
17-
mu *sync.Mutex
18-
sync.Locker
18+
value []byte
1919
lastModified time.Time
2020
}
2121

22-
func (item inMemoryStorageItem) Lock() {
23-
item.mu.Lock()
24-
}
25-
26-
func (item inMemoryStorageItem) Unlock() {
27-
item.mu.Unlock()
28-
}
29-
3022
type inMemoryStorage struct {
3123
store map[string]inMemoryStorageItem
24+
lock sync.DistributedLockMgr
3225
}
3326

34-
func newInMemoryStorage() *inMemoryStorage {
27+
func newInMemoryStorage(connectionFactory *db.ConnectionFactory) *inMemoryStorage {
3528
return &inMemoryStorage{
3629
store: map[string]inMemoryStorageItem{},
30+
lock: sync.NewDistributedLockMgr(connectionFactory.New()),
3731
}
3832
}
3933

4034
func (storage *inMemoryStorage) Lock(ctx context.Context, key string) error {
41-
mu, ok := storage.store[key]
42-
if !ok {
43-
return fs.ErrNotExist
44-
}
45-
46-
mu.Lock()
47-
return nil
35+
return storage.lock.Lock(key)
4836
}
4937

5038
func (storage *inMemoryStorage) Unlock(ctx context.Context, key string) error {
51-
mu, ok := storage.store[key]
52-
if !ok {
53-
return fs.ErrNotExist
54-
}
55-
56-
mu.Unlock()
57-
return nil
39+
return storage.lock.Unlock(key)
5840
}
5941

6042
func (storage *inMemoryStorage) Store(ctx context.Context, key string, value []byte) error {
61-
storage.store[key] = inMemoryStorageItem{
62-
value: value,
63-
lastModified: time.Now(),
64-
mu: &sync.Mutex{},
43+
mu, ok := storage.store[key]
44+
if !ok {
45+
mu = inMemoryStorageItem{}
46+
}
47+
mu.value = value
48+
mu.lastModified = time.Now()
49+
if strings.HasPrefix(key, "acme/") {
50+
logger.Logger.Infof("storing key '%s' with value %v", key, string(value))
6551
}
52+
storage.store[key] = mu
6653
return nil
6754
}
6855

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package kafkatlscertmgmt
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
8+
"github.com/onsi/gomega"
9+
)
10+
11+
func Test_memoryStorage_Load(t *testing.T) {
12+
type args struct {
13+
key string
14+
value []byte
15+
}
16+
tests := []struct {
17+
name string
18+
args args
19+
wantErr bool
20+
}{
21+
{
22+
name: "successfully loads the same value stored",
23+
args: args{
24+
key: "some-key",
25+
value: []byte("some byte"),
26+
},
27+
wantErr: false,
28+
},
29+
}
30+
for _, tt := range tests {
31+
testcase := tt
32+
t.Run(testcase.name, func(t *testing.T) {
33+
t.Parallel()
34+
g := gomega.NewWithT(t)
35+
storage := newInMemoryStorage(db.NewMockConnectionFactory(nil))
36+
37+
storeErr := storage.Store(context.Background(), testcase.args.key, testcase.args.value)
38+
g.Expect(storeErr != nil).To(gomega.Equal(testcase.wantErr))
39+
40+
outputValue, loadErr := storage.Load(context.Background(), testcase.args.key)
41+
g.Expect(loadErr == nil)
42+
g.Expect(outputValue).To(gomega.Equal(testcase.args.value))
43+
})
44+
}
45+
}

0 commit comments

Comments
 (0)