Skip to content

Commit e577212

Browse files
authored
Merge pull request #890 from RedHatInsights/psav/fix_msk_ephem
Fixed MSK ephem mode
2 parents 012b1da + 395562c commit e577212

File tree

1 file changed

+77
-1
lines changed
  • controllers/cloud.redhat.com/providers/kafka

1 file changed

+77
-1
lines changed

controllers/cloud.redhat.com/providers/kafka/msk.go

+77-1
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,24 @@ import (
66
"strings"
77

88
crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1"
9+
rc "github.com/RedHatInsights/rhc-osdk-utils/resourceCache"
10+
"github.com/RedHatInsights/rhc-osdk-utils/utils"
911

1012
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/clowderconfig"
1113
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config"
1214
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors"
1315
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers"
1416
core "k8s.io/api/core/v1"
1517
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
18+
"k8s.io/apimachinery/pkg/types"
1619
)
1720

21+
// KafkaManagedSecret is the resource ident for the MSK user secret object.
22+
var KafkaManagedSecret = rc.NewMultiResourceIdent(ProvName, "kafka_managed_secret", &core.Secret{})
23+
24+
// KafkaConnectSecret is the resource ident for a MSK Connect secret object.
25+
var KafkaConnectSecret = rc.NewMultiResourceIdent(ProvName, "kafka_connect_secret", &core.Secret{})
26+
1827
type mskProvider struct {
1928
providers.Provider
2029
}
@@ -28,6 +37,8 @@ func NewMSK(p *providers.Provider) (providers.ClowderProvider, error) {
2837
CyndiConfigMap,
2938
KafkaTopic,
3039
KafkaConnect,
40+
KafkaManagedSecret,
41+
KafkaConnectSecret,
3142
)
3243
return &mskProvider{Provider: *p}, nil
3344
}
@@ -36,7 +47,72 @@ func (s *mskProvider) EnvProvide() error {
3647
s.Config = &config.AppConfig{
3748
Kafka: &config.KafkaConfig{},
3849
}
39-
return s.configureBrokers()
50+
51+
if err := s.configureBrokers(); err != nil {
52+
return err
53+
}
54+
55+
namespaceList, err := s.Env.GetNamespacesInEnv(s.Ctx, s.Client)
56+
if err != nil {
57+
return err
58+
}
59+
60+
for _, namespace := range namespaceList {
61+
if err := s.copyManagedSecret(namespace); err != nil {
62+
return err
63+
}
64+
if err := s.copyConnectSecret(namespace); err != nil {
65+
return err
66+
}
67+
}
68+
69+
return nil
70+
}
71+
72+
func (s *mskProvider) copyManagedSecret(namespace string) error {
73+
srcSecretRef := types.NamespacedName{
74+
Name: s.Env.Spec.Providers.Kafka.ManagedSecretRef.Name,
75+
Namespace: s.Env.Status.TargetNamespace,
76+
}
77+
dstSecretRef := types.NamespacedName{
78+
Name: srcSecretRef.Name,
79+
Namespace: namespace,
80+
}
81+
sec, err := utils.CopySecret(s.Ctx, s.Client, srcSecretRef, dstSecretRef)
82+
83+
if err != nil {
84+
return err
85+
}
86+
87+
if err = s.Cache.Create(KafkaManagedSecret, dstSecretRef, sec); err != nil {
88+
s.Log.Error(err, "Failed to add managed secret to cache")
89+
return err
90+
}
91+
return nil
92+
}
93+
94+
func (s *mskProvider) copyConnectSecret(namespace string) error {
95+
secName := s.getConnectClusterUserName()
96+
97+
srcSecretRef := types.NamespacedName{
98+
Name: secName,
99+
Namespace: s.Env.Spec.Providers.Kafka.ManagedSecretRef.Namespace,
100+
}
101+
dstSecretRef := types.NamespacedName{
102+
Name: secName,
103+
Namespace: namespace,
104+
}
105+
106+
sec, err := utils.CopySecret(s.Ctx, s.Client, srcSecretRef, dstSecretRef)
107+
if err != nil {
108+
return err
109+
}
110+
111+
if err = s.Cache.Create(KafkaConnectSecret, dstSecretRef, sec); err != nil {
112+
s.Log.Error(err, "Failed to add managed secret to cache")
113+
return err
114+
}
115+
return nil
40116
}
41117

42118
func (s *mskProvider) Provide(app *crd.ClowdApp) error {

0 commit comments

Comments
 (0)