Skip to content

Commit 17a5007

Browse files
authored
Pass secret decrypter to pipedv1 schedulers and planners (#5433)
Signed-off-by: khanhtc1202 <[email protected]>
1 parent 7cf95df commit 17a5007

File tree

4 files changed

+67
-37
lines changed

4 files changed

+67
-37
lines changed

pkg/app/pipedv1/cmd/piped/piped.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,13 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
371371
// TODO: Implement the drift detector controller.
372372
}
373373

374+
// Initialize secret decrypter.
375+
decrypter, err := p.initializeSecretDecrypter(cfg)
376+
if err != nil {
377+
input.Logger.Error("failed to initialize secret decrypter", zap.Error(err))
378+
return err
379+
}
380+
374381
// Start running deployment controller.
375382
{
376383
c := controller.NewController(
@@ -380,6 +387,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
380387
deploymentLister,
381388
commandLister,
382389
notifier,
390+
decrypter,
383391
p.gracePeriod,
384392
input.Logger,
385393
tracerProvider,
@@ -667,7 +675,6 @@ func (p *piped) runPlugins(ctx context.Context, pluginsCfg []config.PipedPlugin,
667675
return plugins, nil
668676
}
669677

670-
// TODO: Remove this once the decryption task by plugin call to the plugin service is implemented.
671678
func (p *piped) initializeSecretDecrypter(cfg *config.PipedSpec) (crypto.Decrypter, error) {
672679
sm := cfg.SecretManagement
673680
if sm == nil {

pkg/app/pipedv1/controller/controller.go

+9
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ type notifier interface {
7575
Notify(event model.NotificationEvent)
7676
}
7777

78+
type secretDecrypter interface {
79+
Decrypt(string) (string, error)
80+
}
81+
7882
type DeploymentController interface {
7983
Run(ctx context.Context) error
8084
}
@@ -90,6 +94,7 @@ type controller struct {
9094
deploymentLister deploymentLister
9195
commandLister commandLister
9296
notifier notifier
97+
secretDecrypter secretDecrypter
9398

9499
// gRPC clients to communicate with plugins.
95100
pluginClients []pluginapi.PluginClient
@@ -130,6 +135,7 @@ func NewController(
130135
deploymentLister deploymentLister,
131136
commandLister commandLister,
132137
notifier notifier,
138+
secretDecrypter secretDecrypter,
133139
gracePeriod time.Duration,
134140
logger *zap.Logger,
135141
tracerProvider trace.TracerProvider,
@@ -142,6 +148,7 @@ func NewController(
142148
deploymentLister: deploymentLister,
143149
commandLister: commandLister,
144150
notifier: notifier,
151+
secretDecrypter: secretDecrypter,
145152

146153
planners: make(map[string]*planner),
147154
donePlanners: make(map[string]time.Time),
@@ -446,6 +453,7 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) (
446453
c.apiClient,
447454
c.gitClient,
448455
c.notifier,
456+
c.secretDecrypter,
449457
c.logger,
450458
c.tracerProvider,
451459
)
@@ -584,6 +592,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment)
584592
c.gitClient,
585593
c.stageBasedPluginsMap,
586594
c.notifier,
595+
c.secretDecrypter,
587596
c.logger,
588597
c.tracerProvider,
589598
)

pkg/app/pipedv1/controller/planner.go

+26-18
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ type planner struct {
7575
notifier notifier
7676
metadataStore metadatastore.MetadataStore
7777

78-
// TODO: Find a way to show log from pluggin's planner
78+
// The secretDecrypter is used to decrypt secrets
79+
// which encrypted using PipeCD built-in secret management.
80+
secretDecrypter secretDecrypter
81+
7982
logger *zap.Logger
8083
tracer trace.Tracer
8184

@@ -98,6 +101,7 @@ func newPlanner(
98101
apiClient apiClient,
99102
gitClient gitClient,
100103
notifier notifier,
104+
secretDecrypter secretDecrypter,
101105
logger *zap.Logger,
102106
tracerProvider trace.TracerProvider,
103107
) *planner {
@@ -121,6 +125,7 @@ func newPlanner(
121125
gitClient: gitClient,
122126
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
123127
notifier: notifier,
128+
secretDecrypter: secretDecrypter,
124129
doneDeploymentStatus: d.Status,
125130
cancelledCh: make(chan *model.ReportableCommand, 1),
126131
nowFunc: time.Now,
@@ -193,31 +198,34 @@ func (p *planner) Run(ctx context.Context) error {
193198
Branch: p.deployment.GitPath.Repo.Branch,
194199
}
195200

196-
runningDSP := deploysource.NewProvider(
197-
filepath.Join(p.workingDir, "running-deploysource"),
198-
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "running", p.lastSuccessfulCommitHash),
199-
p.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
200-
)
201-
rds, err := runningDSP.Get(ctx, io.Discard) // TODO: pass not io.Discard
202-
if err != nil {
203-
// TODO: log error
204-
return fmt.Errorf("error while preparing deploy source data (%v)", err)
205-
}
206-
runningDS = rds.ToPluginDeploySource()
207-
208201
targetDSP := deploysource.NewProvider(
209202
filepath.Join(p.workingDir, "target-deploysource"),
210203
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "target", p.deployment.Trigger.Commit.Hash),
211-
p.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
204+
p.deployment.GetGitPath(),
205+
p.secretDecrypter,
212206
)
213-
tds, err := targetDSP.Get(ctx, io.Discard) // TODO: pass not io.Discard
207+
tds, err := targetDSP.Get(ctx, io.Discard)
214208
if err != nil {
215-
// TODO: log error
216-
return fmt.Errorf("error while preparing deploy source data (%v)", err)
209+
p.logger.Error("error while preparing target deploy source data", zap.Error(err))
210+
return err
217211
}
218212
targetDS = tds.ToPluginDeploySource()
219213

220-
// TODO: Pass running DS as well if need?
214+
if p.lastSuccessfulCommitHash != "" {
215+
runningDSP := deploysource.NewProvider(
216+
filepath.Join(p.workingDir, "running-deploysource"),
217+
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "running", p.lastSuccessfulCommitHash),
218+
p.deployment.GetGitPath(),
219+
p.secretDecrypter,
220+
)
221+
rds, err := runningDSP.Get(ctx, io.Discard)
222+
if err != nil {
223+
p.logger.Error("error while preparing running deploy source data", zap.Error(err))
224+
return err
225+
}
226+
runningDS = rds.ToPluginDeploySource()
227+
}
228+
221229
out, err := p.buildPlan(ctx, runningDS, targetDS)
222230

223231
// If the deployment was already cancelled, we ignore the plan result.

pkg/app/pipedv1/controller/scheduler.go

+24-18
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ type scheduler struct {
4545

4646
stageBasedPluginsMap map[string]pluginapi.PluginClient
4747

48-
apiClient apiClient
49-
gitClient gitClient
50-
metadataStore metadatastore.MetadataStore
51-
notifier notifier
48+
apiClient apiClient
49+
gitClient gitClient
50+
metadataStore metadatastore.MetadataStore
51+
notifier notifier
52+
secretDecrypter secretDecrypter
5253

5354
targetDSP deploysource.Provider
5455
runningDSP deploysource.Provider
@@ -80,6 +81,7 @@ func newScheduler(
8081
gitClient gitClient,
8182
stageBasedPluginsMap map[string]pluginapi.PluginClient,
8283
notifier notifier,
84+
secretsDecrypter secretDecrypter,
8385
logger *zap.Logger,
8486
tracerProvider trace.TracerProvider,
8587
) *scheduler {
@@ -99,6 +101,7 @@ func newScheduler(
99101
gitClient: gitClient,
100102
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
101103
notifier: notifier,
104+
secretDecrypter: secretsDecrypter,
102105
doneDeploymentStatus: d.Status,
103106
cancelledCh: make(chan *model.ReportableCommand, 1),
104107
logger: logger,
@@ -165,7 +168,7 @@ func (s *scheduler) Cancel(cmd model.ReportableCommand) {
165168
}
166169

167170
// Run starts running the scheduler.
168-
// It determines what stage should be executed next by which executor.
171+
// It determines what stage should be executed next by which plugin.
169172
// The returning error does not mean that the pipeline was failed,
170173
// but it means that the scheduler could not finish its job normally.
171174
func (s *scheduler) Run(ctx context.Context) error {
@@ -193,7 +196,7 @@ func (s *scheduler) Run(ctx context.Context) error {
193196
}
194197
controllermetrics.UpdateDeploymentStatus(s.deployment, model.DeploymentStatus_DEPLOYMENT_RUNNING)
195198

196-
// notify the deployment started event
199+
// Notify the deployment started event
197200
users, groups, err := s.getApplicationNotificationMentions(model.NotificationEventType_EVENT_DEPLOYMENT_STARTED)
198201
if err != nil {
199202
s.logger.Error("failed to get the list of users or groups", zap.Error(err))
@@ -223,16 +226,20 @@ func (s *scheduler) Run(ctx context.Context) error {
223226
Branch: s.deployment.GitPath.Repo.Branch,
224227
}
225228

226-
s.runningDSP = deploysource.NewProvider(
227-
filepath.Join(s.workingDir, "running-deploysource"),
228-
deploysource.NewGitSourceCloner(s.gitClient, repoCfg, "running", s.deployment.RunningCommitHash),
229-
s.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
230-
)
229+
if s.deployment.RunningCommitHash != "" {
230+
s.runningDSP = deploysource.NewProvider(
231+
filepath.Join(s.workingDir, "running-deploysource"),
232+
deploysource.NewGitSourceCloner(s.gitClient, repoCfg, "running", s.deployment.RunningCommitHash),
233+
s.deployment.GetGitPath(),
234+
s.secretDecrypter,
235+
)
236+
}
231237

232238
s.targetDSP = deploysource.NewProvider(
233239
filepath.Join(s.workingDir, "target-deploysource"),
234240
deploysource.NewGitSourceCloner(s.gitClient, repoCfg, "target", s.deployment.Trigger.Commit.Hash),
235-
s.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
241+
s.deployment.GetGitPath(),
242+
s.secretDecrypter,
236243
)
237244

238245
ds, err := s.targetDSP.Get(ctx, io.Discard)
@@ -469,13 +476,13 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final
469476

470477
rds, err := s.runningDSP.Get(ctx, io.Discard)
471478
if err != nil {
472-
s.logger.Error("failed to get running deployment source", zap.Error(err))
479+
s.logger.Error("failed to get running deployment source", zap.String("stage-name", ps.Name), zap.Error(err))
473480
return model.StageStatus_STAGE_FAILURE
474481
}
475482

476483
tds, err := s.targetDSP.Get(ctx, io.Discard)
477484
if err != nil {
478-
s.logger.Error("failed to get target deployment source", zap.Error(err))
485+
s.logger.Error("failed to get target deployment source", zap.String("stage-name", ps.Name), zap.Error(err))
479486
return model.StageStatus_STAGE_FAILURE
480487
}
481488

@@ -508,16 +515,15 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final
508515
// Find the executor plugin for this stage.
509516
plugin, ok := s.stageBasedPluginsMap[ps.Name]
510517
if !ok {
511-
err := fmt.Errorf("no registered plugin that can perform for stage %s", ps.Name)
512-
s.logger.Error(err.Error())
518+
s.logger.Error("failed to find the plugin for the stage", zap.String("stage-name", ps.Name))
513519
s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires)
514520
return model.StageStatus_STAGE_FAILURE
515521
}
516522

517523
// Load the stage configuration.
518524
stageConfig, stageConfigFound := s.genericApplicationConfig.GetStageByte(ps.Index)
519525
if !stageConfigFound {
520-
s.logger.Error("Unable to find the stage configuration")
526+
s.logger.Error("Unable to find the stage configuration", zap.String("stage-name", ps.Name))
521527
if err := s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires); err != nil {
522528
s.logger.Error("failed to report stage status", zap.Error(err))
523529
}
@@ -535,7 +541,7 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final
535541
},
536542
})
537543
if err != nil {
538-
s.logger.Error("failed to execute stage", zap.Error(err))
544+
s.logger.Error("failed to execute stage", zap.String("stage-name", ps.Name), zap.Error(err))
539545
s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires)
540546
return model.StageStatus_STAGE_FAILURE
541547
}

0 commit comments

Comments
 (0)