From 0fda22f435834ba61e9a636c3869e666958592ea Mon Sep 17 00:00:00 2001 From: Yoshiki Fujikane Date: Mon, 17 Mar 2025 14:14:53 +0900 Subject: [PATCH 1/9] Replace old main function Signed-off-by: Yoshiki Fujikane --- pkg/app/pipedv1/plugin/kubernetes/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes/main.go b/pkg/app/pipedv1/plugin/kubernetes/main.go index a1ed3dce79..07c40166c2 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/main.go +++ b/pkg/app/pipedv1/plugin/kubernetes/main.go @@ -23,7 +23,8 @@ import ( "github.com/pipe-cd/pipecd/pkg/plugin/sdk" ) -func main() { +// TODO: remote it +func _main() { app := cli.NewApp( "pipecd-plugin-kubernetes", "Plugin component to deploy Kubernetes Application.", @@ -36,8 +37,7 @@ func main() { } } -// TODO: use this after rewriting the plugin with the sdk -func _main() { +func main() { sdk.RegisterDeploymentPlugin(&deployment.Plugin{}) sdk.RegisterLivestatePlugin(livestate.Plugin{}) if err := sdk.Run(); err != nil { From 498bdefe1a5ae94f559c84829dd2ac74545fffc9 Mon Sep 17 00:00:00 2001 From: Yoshiki Fujikane Date: Mon, 17 Mar 2025 13:54:32 +0900 Subject: [PATCH 2/9] Replace old applymanifests Signed-off-by: Yoshiki Fujikane --- .../plugin/kubernetes/deployment/apply.go | 19 +------------------ .../kubernetes/deployment/apply_test.go | 12 ++++-------- .../plugin/kubernetes/deployment/plugin.go | 4 ++-- 3 files changed, 7 insertions(+), 28 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/apply.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/apply.go index bdd099d971..d30570e8d9 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/apply.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/apply.go @@ -17,10 +17,8 @@ package deployment import ( "context" "errors" - "time" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/provider" - "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" "github.com/pipe-cd/pipecd/pkg/plugin/sdk" ) @@ -35,8 +33,7 @@ type applier interface { ForceReplaceManifest(ctx context.Context, manifest provider.Manifest) error } -// TODO: rewrite applyManifests with sdk.StageLogPersister -func applyManifests(ctx context.Context, applier applier, manifests []provider.Manifest, namespace string, lp logpersister.StageLogPersister) error { +func applyManifests(ctx context.Context, applier applier, manifests []provider.Manifest, namespace string, lp sdk.StageLogPersister) error { if namespace == "" { lp.Infof("Start applying %d manifests", len(manifests)) } else { @@ -91,17 +88,3 @@ func applyManifests(ctx context.Context, applier applier, manifests []provider.M lp.Successf("Successfully applied %d manifests", len(manifests)) return nil } - -// TODO: remove applyManifestsSDK and originalLogPersister after rewriting applyManifests -func applyManifestsSDK(ctx context.Context, applier applier, manifests []provider.Manifest, namespace string, lp sdk.StageLogPersister) error { - originalLogPersister := &originalLogPersister{lp} - return applyManifests(ctx, applier, manifests, namespace, originalLogPersister) -} - -type originalLogPersister struct { - sdk.StageLogPersister -} - -func (lp *originalLogPersister) Complete(timeout time.Duration) error { - return nil -} diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/apply_test.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/apply_test.go index a3bfd440b0..1e9867a72b 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/apply_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/apply_test.go @@ -19,7 +19,6 @@ import ( "errors" "fmt" "testing" - "time" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/provider" ) @@ -58,11 +57,6 @@ func (m *mockStageLogPersister) Errorf(format string, a ...interface{}) { m.logs = append(m.logs, fmt.Sprintf(format, a...)) } -func (m *mockStageLogPersister) Complete(timeout time.Duration) error { - m.completed = true - return nil -} - type mockApplier struct { applyErr error forceReplaceErr error @@ -86,7 +80,7 @@ func (m *mockApplier) CreateManifest(ctx context.Context, manifest provider.Mani return m.createErr } -func TestApplyManifests(t *testing.T) { +func Test_applyManifests(t *testing.T) { tests := []struct { name string manifests []provider.Manifest @@ -280,8 +274,10 @@ data: for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + t.Parallel() + lp := new(mockStageLogPersister) - err := applyManifests(context.Background(), tt.applier, tt.manifests, tt.namespace, lp) + err := applyManifests(t.Context(), tt.applier, tt.manifests, tt.namespace, lp) if (err != nil) != tt.wantErr { t.Errorf("applyManifests() error = %v, wantErr %v", err, tt.wantErr) } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go index 01e60e86f0..0e423948ee 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go @@ -167,7 +167,7 @@ func (p *Plugin) executeK8sSyncStage(ctx context.Context, input *sdk.ExecuteStag // Start applying all manifests to add or update running resources. // TODO: use applyManifests instead of applyManifestsSDK - if err := applyManifestsSDK(ctx, applier, manifests, cfg.Spec.Input.Namespace, lp); err != nil { + if err := applyManifests(ctx, applier, manifests, cfg.Spec.Input.Namespace, lp); err != nil { lp.Errorf("Failed while applying manifests (%v)", err) return sdk.StageStatusSuccess } @@ -329,7 +329,7 @@ func (p *Plugin) executeK8sRollbackStage(ctx context.Context, input *sdk.Execute applier := provider.NewApplier(provider.NewKubectl(kubectlPath), cfg.Spec.Input, deployTargetConfig, input.Logger) // Start applying all manifests to add or update running resources. - if err := applyManifestsSDK(ctx, applier, manifests, cfg.Spec.Input.Namespace, lp); err != nil { + if err := applyManifests(ctx, applier, manifests, cfg.Spec.Input.Namespace, lp); err != nil { lp.Errorf("Failed while applying manifests (%v)", err) return sdk.StageStatusFailure } From a5e920406b12c59d31990b8fcc76224be0c6eafa Mon Sep 17 00:00:00 2001 From: Yoshiki Fujikane Date: Mon, 17 Mar 2025 14:13:21 +0900 Subject: [PATCH 3/9] Replace old determineVersions Signed-off-by: Yoshiki Fujikane --- .../plugin/kubernetes/deployment/determine.go | 29 +++------------- .../kubernetes/deployment/determine_test.go | 33 ++++++++++--------- .../plugin/kubernetes/deployment/plugin.go | 2 +- .../plugin/kubernetes/deployment/server.go | 15 +-------- 4 files changed, 24 insertions(+), 55 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go index 8b18eee84c..5dc04bc827 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go @@ -48,7 +48,7 @@ func parseContainerImage(image string) (img containerImage) { // determineVersions decides artifact versions of an application. // It finds all container images that are being specified in the workload manifests then returns their names and tags. -func determineVersions(manifests []provider.Manifest) []*model.ArtifactVersion { +func determineVersions(manifests []provider.Manifest) []sdk.ArtifactVersion { imageMap := map[string]struct{}{} for _, m := range manifests { for _, c := range provider.FindContainerImages(m) { @@ -56,33 +56,14 @@ func determineVersions(manifests []provider.Manifest) []*model.ArtifactVersion { } } - versions := make([]*model.ArtifactVersion, 0, len(imageMap)) + versions := make([]sdk.ArtifactVersion, 0, len(imageMap)) for i := range imageMap { image := parseContainerImage(i) - versions = append(versions, &model.ArtifactVersion{ - Kind: model.ArtifactVersion_CONTAINER_IMAGE, - Version: image.tag, - Name: image.name, - Url: i, - }) - } - - return versions -} - -// determineVersionsSDK decides artifact versions of an application. -// It finds all container images that are being specified in the workload manifests then returns their names and tags. -// TODO: rewrite this function to determineVersions after the current determineVersions is removed. -func determineVersionsSDK(manifests []provider.Manifest) []sdk.ArtifactVersion { - values := determineVersions(manifests) - - versions := make([]sdk.ArtifactVersion, 0, len(values)) - for _, v := range values { versions = append(versions, sdk.ArtifactVersion{ Kind: sdk.ArtifactKindContainerImage, - Version: v.Version, - Name: v.Name, - URL: v.Url, + Version: image.tag, + Name: image.name, + URL: i, }) } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/determine_test.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/determine_test.go index b086789a3e..6e455740ab 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/determine_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/determine_test.go @@ -27,6 +27,7 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/provider" "github.com/pipe-cd/pipecd/pkg/model" + "github.com/pipe-cd/pipecd/pkg/plugin/sdk" ) func mustUnmarshalYAML[T any](t *testing.T, data []byte) T { @@ -99,13 +100,13 @@ func TestParseContainerImage(t *testing.T) { }) } } -func TestDetermineVersions(t *testing.T) { +func Test_determineVersions(t *testing.T) { t.Parallel() tests := []struct { name string manifests []string - want []*model.ArtifactVersion + want []sdk.ArtifactVersion }{ { name: "single manifest with one container", @@ -123,12 +124,12 @@ spec: image: nginx:1.19.3 `, }, - want: []*model.ArtifactVersion{ + want: []sdk.ArtifactVersion{ { - Kind: model.ArtifactVersion_CONTAINER_IMAGE, + Kind: sdk.ArtifactKindContainerImage, Version: "1.19.3", Name: "nginx", - Url: "nginx:1.19.3", + URL: "nginx:1.19.3", }, }, }, @@ -160,18 +161,18 @@ spec: image: redis:6.0.9 `, }, - want: []*model.ArtifactVersion{ + want: []sdk.ArtifactVersion{ { - Kind: model.ArtifactVersion_CONTAINER_IMAGE, + Kind: sdk.ArtifactKindContainerImage, Version: "1.19.3", Name: "nginx", - Url: "nginx:1.19.3", + URL: "nginx:1.19.3", }, { - Kind: model.ArtifactVersion_CONTAINER_IMAGE, + Kind: sdk.ArtifactKindContainerImage, Version: "6.0.9", Name: "redis", - Url: "redis:6.0.9", + URL: "redis:6.0.9", }, }, }, @@ -193,12 +194,12 @@ spec: image: nginx:1.19.3 `, }, - want: []*model.ArtifactVersion{ + want: []sdk.ArtifactVersion{ { - Kind: model.ArtifactVersion_CONTAINER_IMAGE, + Kind: sdk.ArtifactKindContainerImage, Version: "1.19.3", Name: "nginx", - Url: "nginx:1.19.3", + URL: "nginx:1.19.3", }, }, }, @@ -216,7 +217,7 @@ spec: containers: [] `, }, - want: []*model.ArtifactVersion{}, + want: []sdk.ArtifactVersion{}, }, { name: "manifest with missing image field", @@ -233,7 +234,7 @@ spec: - name: nginx `, }, - want: []*model.ArtifactVersion{}, + want: []sdk.ArtifactVersion{}, }, { name: "manifest with no containers field", @@ -248,7 +249,7 @@ spec: spec: {} `, }, - want: []*model.ArtifactVersion{}, + want: []sdk.ArtifactVersion{}, }, { name: "manifest with invalid containers field -- skipped", diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go index 0e423948ee..c2a492c358 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go @@ -358,7 +358,7 @@ func (p *Plugin) DetermineVersions(ctx context.Context, _ *sdk.ConfigNone, input } return &sdk.DetermineVersionsResponse{ - Versions: determineVersionsSDK(manifests), + Versions: determineVersions(manifests), }, nil } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go index c85135236b..25aadeb278 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go @@ -108,20 +108,7 @@ func (a *DeploymentService) DetermineStrategy(ctx context.Context, request *depl // DetermineVersions implements deployment.DeploymentServiceServer. func (a *DeploymentService) DetermineVersions(ctx context.Context, request *deployment.DetermineVersionsRequest) (*deployment.DetermineVersionsResponse, error) { - cfg, err := config.DecodeYAML[*kubeconfig.KubernetesApplicationSpec](request.GetInput().GetTargetDeploymentSource().GetApplicationConfig()) - if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - - manifests, err := a.loadManifests(ctx, request.GetInput().GetDeployment(), cfg.Spec, request.GetInput().GetTargetDeploymentSource()) - - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - return &deployment.DetermineVersionsResponse{ - Versions: determineVersions(manifests), - }, nil + return &deployment.DetermineVersionsResponse{}, nil } // BuildPipelineSyncStages implements deployment.DeploymentServiceServer. From 927accdf18b4d96344c748b8f01cb138f3410eda Mon Sep 17 00:00:00 2001 From: Yoshiki Fujikane Date: Mon, 17 Mar 2025 14:21:04 +0900 Subject: [PATCH 4/9] Replace old buildPipelineStages Signed-off-by: Yoshiki Fujikane --- .../plugin/kubernetes/deployment/pipeline.go | 52 +------- .../kubernetes/deployment/pipeline_test.go | 116 +----------------- .../plugin/kubernetes/deployment/plugin.go | 2 +- .../plugin/kubernetes/deployment/server.go | 6 +- 4 files changed, 6 insertions(+), 170 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline.go index ba4fe7eb24..a4a1fc21c6 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline.go @@ -15,12 +15,10 @@ package deployment import ( - "fmt" "slices" "time" "github.com/pipe-cd/pipecd/pkg/model" - "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" "github.com/pipe-cd/pipecd/pkg/plugin/sdk" ) @@ -154,54 +152,8 @@ func BuildQuickSyncPipeline(autoRollback bool) []sdk.QuickSyncStage { return out } -// TODO: Rename this function -func buildPipelineStages(stages []*deployment.BuildPipelineSyncStagesRequest_StageConfig, autoRollback bool, now time.Time) []*model.PipelineStage { - out := make([]*model.PipelineStage, 0, len(stages)+1) - - for _, s := range stages { - id := s.GetId() - if id == "" { - id = fmt.Sprintf("stage-%d", s.GetIndex()) - } - stage := &model.PipelineStage{ - Id: id, - Name: s.GetName(), - Desc: s.GetDesc(), - Index: s.GetIndex(), - Rollback: false, - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - } - out = append(out, stage) - } - - if autoRollback { - // we set the index of the rollback stage to the minimum index of all stages. - minIndex := slices.MinFunc(stages, func(a, b *deployment.BuildPipelineSyncStagesRequest_StageConfig) int { - return int(a.GetIndex() - b.GetIndex()) - }).GetIndex() - - s, _ := GetPredefinedStage(PredefinedStageRollback) - // we copy the predefined stage to avoid modifying the original one. - out = append(out, &model.PipelineStage{ - Id: s.GetId(), - Name: s.GetName(), - Desc: s.GetDesc(), - Index: minIndex, - Rollback: s.GetRollback(), - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - }) - } - - return out -} - -// buildPipelineStagesWithSDK builds the pipeline stages with the given SDK stages. -// TODO: Rename this function to buildPipelineStages after removing the old one. -func buildPipelineStagesWithSDK(stages []sdk.StageConfig, autoRollback bool) []sdk.PipelineStage { +// buildPipelineStages builds the pipeline stages with the given SDK stages. +func buildPipelineStages(stages []sdk.StageConfig, autoRollback bool) []sdk.PipelineStage { out := make([]sdk.PipelineStage, 0, len(stages)+1) for _, s := range stages { diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline_test.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline_test.go index c9864bb7d7..ef3af3f7dc 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline_test.go @@ -21,7 +21,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/pipe-cd/pipecd/pkg/model" - "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" "github.com/pipe-cd/pipecd/pkg/plugin/sdk" ) @@ -88,117 +87,6 @@ func Test_buildQuickSyncPipeline(t *testing.T) { } } -func TestBuildPipelineStages(t *testing.T) { - t.Parallel() - - now := time.Now() - - tests := []struct { - name string - stages []*deployment.BuildPipelineSyncStagesRequest_StageConfig - autoRollback bool - expected []*model.PipelineStage - }{ - { - name: "without auto rollback", - stages: []*deployment.BuildPipelineSyncStagesRequest_StageConfig{ - { - Id: "stage-1", - Name: "Stage 1", - Desc: "Description 1", - Index: 0, - }, - { - Id: "stage-2", - Name: "Stage 2", - Desc: "Description 2", - Index: 1, - }, - }, - autoRollback: false, - expected: []*model.PipelineStage{ - { - Id: "stage-1", - Name: "Stage 1", - Desc: "Description 1", - Index: 0, - Rollback: false, - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - }, - { - Id: "stage-2", - Name: "Stage 2", - Desc: "Description 2", - Index: 1, - Rollback: false, - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - }, - }, - }, - { - name: "with auto rollback", - stages: []*deployment.BuildPipelineSyncStagesRequest_StageConfig{ - { - Id: "stage-1", - Name: "Stage 1", - Desc: "Description 1", - Index: 0, - }, - { - Id: "stage-2", - Name: "Stage 2", - Desc: "Description 2", - Index: 1, - }, - }, - autoRollback: true, - expected: []*model.PipelineStage{ - { - Id: "stage-1", - Name: "Stage 1", - Desc: "Description 1", - Index: 0, - Rollback: false, - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - }, - { - Id: "stage-2", - Name: "Stage 2", - Desc: "Description 2", - Index: 1, - Rollback: false, - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - }, - { - Id: PredefinedStageRollback, - Name: StageK8sRollback.String(), - Desc: "Rollback the deployment", - Index: 0, - Rollback: true, - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - actual := buildPipelineStages(tt.stages, tt.autoRollback, now) - assert.Equal(t, tt.expected, actual) - }) - } -} - func TestBuildQuickSyncPipeline(t *testing.T) { t.Parallel() @@ -252,7 +140,7 @@ func TestBuildQuickSyncPipeline(t *testing.T) { } } -func Test_buildPipelineStagesWithSDK(t *testing.T) { +func Test_buildPipelineStages(t *testing.T) { t.Parallel() tests := []struct { @@ -334,7 +222,7 @@ func Test_buildPipelineStagesWithSDK(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - actual := buildPipelineStagesWithSDK(tt.stages, tt.autoRollback) + actual := buildPipelineStages(tt.stages, tt.autoRollback) assert.Equal(t, tt.expected, actual) }) } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go index c2a492c358..941b68c83e 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go @@ -65,7 +65,7 @@ func (p *Plugin) FetchDefinedStages() []string { func (p *Plugin) BuildPipelineSyncStages(ctx context.Context, _ *sdk.ConfigNone, input *sdk.BuildPipelineSyncStagesInput) (*sdk.BuildPipelineSyncStagesResponse, error) { return &sdk.BuildPipelineSyncStagesResponse{ - Stages: buildPipelineStagesWithSDK(input.Request.Stages, input.Request.Rollback), + Stages: buildPipelineStages(input.Request.Stages, input.Request.Rollback), }, nil } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go index 25aadeb278..8ffc621812 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go @@ -113,11 +113,7 @@ func (a *DeploymentService) DetermineVersions(ctx context.Context, request *depl // BuildPipelineSyncStages implements deployment.DeploymentServiceServer. func (a *DeploymentService) BuildPipelineSyncStages(ctx context.Context, request *deployment.BuildPipelineSyncStagesRequest) (*deployment.BuildPipelineSyncStagesResponse, error) { - now := time.Now() - stages := buildPipelineStages(request.GetStages(), request.GetRollback(), now) - return &deployment.BuildPipelineSyncStagesResponse{ - Stages: stages, - }, nil + return &deployment.BuildPipelineSyncStagesResponse{}, nil } // BuildQuickSyncStages implements deployment.DeploymentServiceServer. From 5cba9caf2300453d1d2e0e038a9e0f520c21ffd7 Mon Sep 17 00:00:00 2001 From: Yoshiki Fujikane Date: Mon, 17 Mar 2025 14:23:58 +0900 Subject: [PATCH 5/9] Replace old buildQuickSyncPipeline Signed-off-by: Yoshiki Fujikane --- .../plugin/kubernetes/deployment/pipeline.go | 37 +--------- .../kubernetes/deployment/pipeline_test.go | 67 +------------------ .../plugin/kubernetes/deployment/plugin.go | 2 +- .../plugin/kubernetes/deployment/server.go | 6 +- 4 files changed, 4 insertions(+), 108 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline.go index a4a1fc21c6..37eceaba6a 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline.go @@ -16,7 +16,6 @@ package deployment import ( "slices" - "time" "github.com/pipe-cd/pipecd/pkg/model" "github.com/pipe-cd/pipecd/pkg/plugin/sdk" @@ -91,41 +90,7 @@ func GetPredefinedStage(id string) (*model.PipelineStage, bool) { return stage, ok } -func buildQuickSyncPipeline(autoRollback bool, now time.Time) []*model.PipelineStage { - out := make([]*model.PipelineStage, 0, 2) - - stage, _ := GetPredefinedStage(PredefinedStageK8sSync) - // we copy the predefined stage to avoid modifying the original one. - out = append(out, &model.PipelineStage{ - Id: stage.GetId(), - Name: stage.GetName(), - Desc: stage.GetDesc(), - Rollback: stage.GetRollback(), - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - Metadata: nil, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - }, - ) - - if autoRollback { - s, _ := GetPredefinedStage(PredefinedStageRollback) - // we copy the predefined stage to avoid modifying the original one. - out = append(out, &model.PipelineStage{ - Id: s.GetId(), - Name: s.GetName(), - Desc: s.GetDesc(), - Rollback: s.GetRollback(), - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - }) - } - - return out -} - -func BuildQuickSyncPipeline(autoRollback bool) []sdk.QuickSyncStage { +func buildQuickSyncPipeline(autoRollback bool) []sdk.QuickSyncStage { out := make([]sdk.QuickSyncStage, 0, 2) stage, _ := GetPredefinedStage(PredefinedStageK8sSync) diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline_test.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline_test.go index ef3af3f7dc..148f6558e7 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/pipeline_test.go @@ -16,80 +16,15 @@ package deployment import ( "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/pipe-cd/pipecd/pkg/model" "github.com/pipe-cd/pipecd/pkg/plugin/sdk" ) func Test_buildQuickSyncPipeline(t *testing.T) { t.Parallel() - now := time.Now() - - tests := []struct { - name string - autoRollback bool - expected []*model.PipelineStage - }{ - { - name: "without auto rollback", - autoRollback: false, - expected: []*model.PipelineStage{ - { - Id: PredefinedStageK8sSync, - Name: StageK8sSync.String(), - Desc: "Sync by applying all manifests", - Index: 0, - Rollback: false, - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - Metadata: nil, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - }, - }, - }, - { - name: "with auto rollback", - autoRollback: true, - expected: []*model.PipelineStage{ - { - Id: PredefinedStageK8sSync, - Name: StageK8sSync.String(), - Desc: "Sync by applying all manifests", - Index: 0, - Rollback: false, - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - Metadata: nil, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - }, - { - Id: PredefinedStageRollback, - Name: StageK8sRollback.String(), - Desc: "Rollback the deployment", - Rollback: true, - Status: model.StageStatus_STAGE_NOT_STARTED_YET, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - actual := buildQuickSyncPipeline(tt.autoRollback, now) - assert.Equal(t, tt.expected, actual) - }) - } -} - -func TestBuildQuickSyncPipeline(t *testing.T) { - t.Parallel() - tests := []struct { name string rollback bool @@ -134,7 +69,7 @@ func TestBuildQuickSyncPipeline(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - actual := BuildQuickSyncPipeline(tt.rollback) + actual := buildQuickSyncPipeline(tt.rollback) assert.Equal(t, tt.expected, actual) }) } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go index 941b68c83e..5d8515467b 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go @@ -396,6 +396,6 @@ func (p *Plugin) DetermineStrategy(ctx context.Context, _ *sdk.ConfigNone, input func (p *Plugin) BuildQuickSyncStages(ctx context.Context, _ *sdk.ConfigNone, input *sdk.BuildQuickSyncStagesInput) (*sdk.BuildQuickSyncStagesResponse, error) { return &sdk.BuildQuickSyncStagesResponse{ - Stages: BuildQuickSyncPipeline(input.Request.Rollback), + Stages: buildQuickSyncPipeline(input.Request.Rollback), }, nil } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go index 8ffc621812..4438620a75 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go @@ -118,11 +118,7 @@ func (a *DeploymentService) BuildPipelineSyncStages(ctx context.Context, request // BuildQuickSyncStages implements deployment.DeploymentServiceServer. func (a *DeploymentService) BuildQuickSyncStages(ctx context.Context, request *deployment.BuildQuickSyncStagesRequest) (*deployment.BuildQuickSyncStagesResponse, error) { - now := time.Now() - stages := buildQuickSyncPipeline(request.GetRollback(), now) - return &deployment.BuildQuickSyncStagesResponse{ - Stages: stages, - }, nil + return &deployment.BuildQuickSyncStagesResponse{}, nil } // FetchDefinedStages implements deployment.DeploymentServiceServer. From c21c84253248c1711f2df837329a55aba0e0c910 Mon Sep 17 00:00:00 2001 From: Yoshiki Fujikane Date: Mon, 17 Mar 2025 14:27:18 +0900 Subject: [PATCH 6/9] Remove sync.go and rollback.go Signed-off-by: Yoshiki Fujikane --- .../plugin/kubernetes/deployment/rollback.go | 119 ---- .../plugin/kubernetes/deployment/server.go | 25 +- .../plugin/kubernetes/deployment/sync.go | 170 ----- .../plugin/kubernetes/deployment/sync_test.go | 599 ------------------ 4 files changed, 1 insertion(+), 912 deletions(-) delete mode 100644 pkg/app/pipedv1/plugin/kubernetes/deployment/rollback.go delete mode 100644 pkg/app/pipedv1/plugin/kubernetes/deployment/sync.go delete mode 100644 pkg/app/pipedv1/plugin/kubernetes/deployment/sync_test.go diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/rollback.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/rollback.go deleted file mode 100644 index b115ed0099..0000000000 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/rollback.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2024 The PipeCD Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package deployment - -import ( - "cmp" - "context" - - kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" - "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/provider" - config "github.com/pipe-cd/pipecd/pkg/configv1" - "github.com/pipe-cd/pipecd/pkg/model" - "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" - "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" -) - -func (a *DeploymentService) executeK8sRollbackStage(ctx context.Context, lp logpersister.StageLogPersister, input *deployment.ExecutePluginInput) model.StageStatus { - if input.GetDeployment().GetRunningCommitHash() == "" { - lp.Errorf("Unable to determine the last deployed commit to rollback. It seems this is the first deployment.") - return model.StageStatus_STAGE_FAILURE - } - - lp.Info("Start rolling back the deployment") - - cfg, err := config.DecodeYAML[*kubeconfig.KubernetesApplicationSpec](input.GetRunningDeploymentSource().GetApplicationConfig()) - if err != nil { - lp.Errorf("Failed while decoding application config (%v)", err) - return model.StageStatus_STAGE_FAILURE - } - - lp.Infof("Loading manifests at commit %s for handling", input.GetDeployment().GetRunningCommitHash()) - manifests, err := a.loadManifests(ctx, input.GetDeployment(), cfg.Spec, input.GetRunningDeploymentSource()) - if err != nil { - lp.Errorf("Failed while loading manifests (%v)", err) - return model.StageStatus_STAGE_FAILURE - } - lp.Successf("Successfully loaded %d manifests", len(manifests)) - - // Because the loaded manifests are read-only - // we duplicate them to avoid updating the shared manifests data in cache. - // TODO: implement duplicateManifests function - - // When addVariantLabelToSelector is true, ensure that all workloads - // have the variant label in their selector. - var ( - variantLabel = cfg.Spec.VariantLabel.Key - primaryVariant = cfg.Spec.VariantLabel.PrimaryValue - ) - // TODO: Consider other fields to configure whether to add a variant label to the selector - // because the rollback stage is executed in both quick sync and pipeline sync strategies. - if cfg.Spec.QuickSync.AddVariantLabelToSelector { - workloads := findWorkloadManifests(manifests, cfg.Spec.Workloads) - for _, m := range workloads { - if err := ensureVariantSelectorInWorkload(m, variantLabel, primaryVariant); err != nil { - lp.Errorf("Unable to check/set %q in selector of workload %s (%v)", variantLabel+": "+primaryVariant, m.Key().ReadableString(), err) - return model.StageStatus_STAGE_FAILURE - } - } - } - - // Add variant annotations to all manifests. - for i := range manifests { - manifests[i].AddAnnotations(map[string]string{ - variantLabel: primaryVariant, - }) - } - - if err := annotateConfigHash(manifests); err != nil { - lp.Errorf("Unable to set %q annotation into the workload manifest (%v)", provider.AnnotationConfigHash, err) - return model.StageStatus_STAGE_FAILURE - } - - // Get the deploy target config. - targets := input.GetDeployment().GetDeployTargets(a.pluginConfig.Name) - if len(targets) == 0 { - lp.Errorf("No deploy target was found for the plugin %s", a.pluginConfig.Name) - return model.StageStatus_STAGE_FAILURE - } - - deployTargetConfig, err := kubeconfig.FindDeployTarget(a.pluginConfig, targets[0]) // TODO: consider multiple targets - if err != nil { - lp.Errorf("Failed while unmarshalling deploy target config (%v)", err) - return model.StageStatus_STAGE_FAILURE - } - - // Get the kubectl tool path. - kubectlPath, err := a.toolRegistry.Kubectl(ctx, cmp.Or(cfg.Spec.Input.KubectlVersion, deployTargetConfig.KubectlVersion)) - if err != nil { - lp.Errorf("Failed while getting kubectl tool (%v)", err) - return model.StageStatus_STAGE_FAILURE - } - - // Create the applier for the target cluster. - applier := provider.NewApplier(provider.NewKubectl(kubectlPath), cfg.Spec.Input, deployTargetConfig, a.logger) - - // Start applying all manifests to add or update running resources. - if err := applyManifests(ctx, applier, manifests, cfg.Spec.Input.Namespace, lp); err != nil { - lp.Errorf("Failed while applying manifests (%v)", err) - return model.StageStatus_STAGE_FAILURE - } - - // TODO: implement prune resources - // TODO: delete all resources of CANARY variant - // TODO: delete all resources of BASELINE variant - - return model.StageStatus_STAGE_SUCCESS -} diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go index 4438620a75..e1a374c49a 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go @@ -16,7 +16,6 @@ package deployment import ( "context" - "time" kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/provider" @@ -26,7 +25,6 @@ import ( "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/common" "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" - "github.com/pipe-cd/pipecd/pkg/plugin/signalhandler" "go.uber.org/zap" "google.golang.org/grpc" @@ -159,26 +157,5 @@ func (a *DeploymentService) loadManifests(ctx context.Context, deploy *model.Dep // It returns stage status after execution without error. // Error only be raised if the given stage is not supported. func (a *DeploymentService) ExecuteStage(ctx context.Context, request *deployment.ExecuteStageRequest) (response *deployment.ExecuteStageResponse, _ error) { - lp := a.logPersister.StageLogPersister(request.GetInput().GetDeployment().GetId(), request.GetInput().GetStage().GetId()) - defer func() { - // When termination signal received and the stage is not completed yet, we should not mark the log persister as completed. - // This can occur when the piped is shutting down while the stage is still running. - if !response.GetStatus().IsCompleted() && signalhandler.Terminated() { - return - } - lp.Complete(time.Minute) - }() - - switch request.GetInput().GetStage().GetName() { - case StageK8sSync.String(): - return &deployment.ExecuteStageResponse{ - Status: a.executeK8sSyncStage(ctx, lp, request.GetInput()), - }, nil - case StageK8sRollback.String(): - return &deployment.ExecuteStageResponse{ - Status: a.executeK8sRollbackStage(ctx, lp, request.GetInput()), - }, nil - default: - return nil, status.Error(codes.InvalidArgument, "unimplemented or unsupported stage") - } + return nil, status.Error(codes.Unimplemented, "not implemented") } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/sync.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/sync.go deleted file mode 100644 index c286b64a4f..0000000000 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/sync.go +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright 2024 The PipeCD Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package deployment - -import ( - "cmp" - "context" - "errors" - "time" - - kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" - "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/provider" - config "github.com/pipe-cd/pipecd/pkg/configv1" - "github.com/pipe-cd/pipecd/pkg/model" - "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" - "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" -) - -func (a *DeploymentService) executeK8sSyncStage(ctx context.Context, lp logpersister.StageLogPersister, input *deployment.ExecutePluginInput) model.StageStatus { - lp.Infof("Start syncing the deployment") - - cfg, err := config.DecodeYAML[*kubeconfig.KubernetesApplicationSpec](input.GetTargetDeploymentSource().GetApplicationConfig()) - if err != nil { - lp.Errorf("Failed while decoding application config (%v)", err) - return model.StageStatus_STAGE_FAILURE - } - - lp.Infof("Loading manifests at commit %s for handling", input.GetDeployment().GetTrigger().GetCommit().GetHash()) - manifests, err := a.loadManifests(ctx, input.GetDeployment(), cfg.Spec, input.GetTargetDeploymentSource()) - if err != nil { - lp.Errorf("Failed while loading manifests (%v)", err) - return model.StageStatus_STAGE_FAILURE - } - lp.Successf("Successfully loaded %d manifests", len(manifests)) - - // Because the loaded manifests are read-only - // we duplicate them to avoid updating the shared manifests data in cache. - // TODO: implement duplicateManifests function - - // When addVariantLabelToSelector is true, ensure that all workloads - // have the variant label in their selector. - var ( - variantLabel = cfg.Spec.VariantLabel.Key - primaryVariant = cfg.Spec.VariantLabel.PrimaryValue - ) - // TODO: treat the stage options specified under "with" - if cfg.Spec.QuickSync.AddVariantLabelToSelector { - workloads := findWorkloadManifests(manifests, cfg.Spec.Workloads) - for _, m := range workloads { - if err := ensureVariantSelectorInWorkload(m, variantLabel, primaryVariant); err != nil { - lp.Errorf("Unable to check/set %q in selector of workload %s (%v)", variantLabel+": "+primaryVariant, m.Key().ReadableString(), err) - return model.StageStatus_STAGE_FAILURE - } - } - } - - // Add variant annotations to all manifests. - for i := range manifests { - manifests[i].AddLabels(map[string]string{ - variantLabel: primaryVariant, - }) - manifests[i].AddAnnotations(map[string]string{ - variantLabel: primaryVariant, - }) - } - - if err := annotateConfigHash(manifests); err != nil { - lp.Errorf("Unable to set %q annotation into the workload manifest (%v)", provider.AnnotationConfigHash, err) - return model.StageStatus_STAGE_FAILURE - } - - // Get the deploy target config. - targets := input.GetDeployment().GetDeployTargets(a.pluginConfig.Name) - if len(targets) == 0 { - lp.Errorf("No deploy target was found for the plugin %s", a.pluginConfig.Name) - return model.StageStatus_STAGE_FAILURE - } - deployTargetConfig, err := kubeconfig.FindDeployTarget(a.pluginConfig, targets[0]) // TODO: consider multiple targets - if err != nil { - lp.Errorf("Failed while unmarshalling deploy target config (%v)", err) - return model.StageStatus_STAGE_FAILURE - } - - // Get the kubectl tool path. - kubectlPath, err := a.toolRegistry.Kubectl(ctx, cmp.Or(cfg.Spec.Input.KubectlVersion, deployTargetConfig.KubectlVersion)) - if err != nil { - lp.Errorf("Failed while getting kubectl tool (%v)", err) - return model.StageStatus_STAGE_FAILURE - } - - // Create the kubectl wrapper for the target cluster. - kubectl := provider.NewKubectl(kubectlPath) - - // Create the applier for the target cluster. - applier := provider.NewApplier(kubectl, cfg.Spec.Input, deployTargetConfig, a.logger) - - // Start applying all manifests to add or update running resources. - if err := applyManifests(ctx, applier, manifests, cfg.Spec.Input.Namespace, lp); err != nil { - lp.Errorf("Failed while applying manifests (%v)", err) - return model.StageStatus_STAGE_FAILURE - } - - // TODO: treat the stage options specified under "with" - if !cfg.Spec.QuickSync.Prune { - lp.Info("Resource GC was skipped because sync.prune was not configured") - return model.StageStatus_STAGE_SUCCESS - } - - // Wait for all applied manifests to be stable. - // In theory, we don't need to wait for them to be stable before going to the next step - // but waiting for a while reduces the number of Kubernetes changes in a short time. - lp.Info("Waiting for the applied manifests to be stable") - select { - case <-time.After(15 * time.Second): - break - case <-ctx.Done(): - break - } - - lp.Info("Start finding all running resources but no longer defined in Git") - - namespacedLiveResources, clusterScopedLiveResources, err := provider.GetLiveResources(ctx, kubectl, deployTargetConfig.KubeConfigPath, input.GetDeployment().GetApplicationId()) - if err != nil { - lp.Errorf("Failed while getting live resources (%v)", err) - return model.StageStatus_STAGE_FAILURE - } - - if len(namespacedLiveResources)+len(clusterScopedLiveResources) == 0 { - lp.Info("There is no data about live resource so no resource will be removed") - return model.StageStatus_STAGE_SUCCESS - } - - lp.Successf("Successfully loaded %d live resources", len(namespacedLiveResources)+len(clusterScopedLiveResources)) - - removeKeys := provider.FindRemoveResources(manifests, namespacedLiveResources, clusterScopedLiveResources) - if len(removeKeys) == 0 { - lp.Info("There are no live resources should be removed") - return model.StageStatus_STAGE_SUCCESS - } - - lp.Infof("Start pruning %d resources", len(removeKeys)) - var deletedCount int - for _, key := range removeKeys { - if err := kubectl.Delete(ctx, deployTargetConfig.KubeConfigPath, key.Namespace(), key); err != nil { - if errors.Is(err, provider.ErrNotFound) { - lp.Infof("Specified resource does not exist, so skip deleting the resource: %s (%v)", key.ReadableString(), err) - continue - } - lp.Errorf("Failed while deleting resource %s (%v)", key.ReadableString(), err) - continue // continue to delete other resources - } - deletedCount++ - lp.Successf("- deleted resource: %s", key.ReadableString()) - } - - lp.Successf("Successfully deleted %d resources", deletedCount) - return model.StageStatus_STAGE_SUCCESS -} diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/sync_test.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/sync_test.go deleted file mode 100644 index 25dc8c7aad..0000000000 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/sync_test.go +++ /dev/null @@ -1,599 +0,0 @@ -// Copyright 2024 The PipeCD Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package deployment - -import ( - "context" - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/yaml" - - kubeConfigPkg "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" - config "github.com/pipe-cd/pipecd/pkg/configv1" - "github.com/pipe-cd/pipecd/pkg/model" - "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/common" - "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" - "github.com/pipe-cd/pipecd/pkg/plugin/logpersister/logpersistertest" - "github.com/pipe-cd/pipecd/pkg/plugin/toolregistry/toolregistrytest" -) - -func TestDeploymentService_executeK8sSyncStage(t *testing.T) { - t.Parallel() - - ctx := context.Background() - - // read the application config from the example file - cfg, err := os.ReadFile(filepath.Join(examplesDir(), "kubernetes", "simple", "app.pipecd.yaml")) - require.NoError(t, err) - - // prepare the request - req := &deployment.ExecuteStageRequest{ - Input: &deployment.ExecutePluginInput{ - Deployment: &model.Deployment{ - PipedId: "piped-id", - ApplicationId: "app-id", - DeployTargetsByPlugin: map[string]*model.DeployTargets{ - "kubernetes": { - DeployTargets: []string{"default"}, - }, - }, - }, - Stage: &model.PipelineStage{ - Id: "stage-id", - Name: "K8S_SYNC", - }, - StageConfig: []byte(``), - RunningDeploymentSource: nil, - TargetDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: filepath.Join(examplesDir(), "kubernetes", "simple"), - CommitHash: "0123456789", - ApplicationConfig: cfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - }, - } - - // initialize tool registry - testRegistry := toolregistrytest.NewTestToolRegistry(t) - - // initialize plugin config and dynamic client for assertions with envtest - pluginCfg, dynamicClient := setupTestPluginConfigAndDynamicClient(t) - - svc := NewDeploymentService(pluginCfg, zaptest.NewLogger(t), testRegistry, logpersistertest.NewTestLogPersister(t)) - resp, err := svc.ExecuteStage(ctx, req) - - require.NoError(t, err) - assert.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) - - deployment, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}).Namespace("default").Get(context.Background(), "simple", metav1.GetOptions{}) - require.NoError(t, err) - - assert.Equal(t, "simple", deployment.GetName()) - assert.Equal(t, "simple", deployment.GetLabels()["app"]) - - assert.Equal(t, "piped", deployment.GetLabels()["pipecd.dev/managed-by"]) - assert.Equal(t, "piped-id", deployment.GetLabels()["pipecd.dev/piped"]) - assert.Equal(t, "app-id", deployment.GetLabels()["pipecd.dev/application"]) - assert.Equal(t, "0123456789", deployment.GetLabels()["pipecd.dev/commit-hash"]) - - assert.Equal(t, "piped", deployment.GetAnnotations()["pipecd.dev/managed-by"]) - assert.Equal(t, "piped-id", deployment.GetAnnotations()["pipecd.dev/piped"]) - assert.Equal(t, "app-id", deployment.GetAnnotations()["pipecd.dev/application"]) - assert.Equal(t, "apps/v1", deployment.GetAnnotations()["pipecd.dev/original-api-version"]) - assert.Equal(t, "apps:Deployment::simple", deployment.GetAnnotations()["pipecd.dev/resource-key"]) // This assertion differs from the non-plugin-arched piped's Kubernetes platform provider, but we decided to change this behavior. - assert.Equal(t, "0123456789", deployment.GetAnnotations()["pipecd.dev/commit-hash"]) - -} - -func TestDeploymentService_executeK8sSyncStage_withInputNamespace(t *testing.T) { - t.Parallel() - - ctx := context.Background() - - // read the application config from the example file - cfg, err := os.ReadFile(filepath.Join(examplesDir(), "kubernetes", "simple", "app.pipecd.yaml")) - require.NoError(t, err) - - // decode and override the autoCreateNamespace and namespace - spec, err := config.DecodeYAML[*kubeConfigPkg.KubernetesApplicationSpec](cfg) - require.NoError(t, err) - spec.Spec.Input.AutoCreateNamespace = true - spec.Spec.Input.Namespace = "test-namespace" - cfg, err = yaml.Marshal(spec) - require.NoError(t, err) - - // prepare the request - req := &deployment.ExecuteStageRequest{ - Input: &deployment.ExecutePluginInput{ - Deployment: &model.Deployment{ - PipedId: "piped-id", - ApplicationId: "app-id", - DeployTargetsByPlugin: map[string]*model.DeployTargets{ - "kubernetes": { - DeployTargets: []string{"default"}, - }, - }, - }, - Stage: &model.PipelineStage{ - Id: "stage-id", - Name: "K8S_SYNC", - }, - StageConfig: []byte(``), - RunningDeploymentSource: nil, - TargetDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: filepath.Join(examplesDir(), "kubernetes", "simple"), - CommitHash: "0123456789", - ApplicationConfig: cfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - }, - } - - // initialize tool registry - testRegistry := toolregistrytest.NewTestToolRegistry(t) - - // initialize plugin config and dynamic client for assertions with envtest - pluginCfg, dynamicClient := setupTestPluginConfigAndDynamicClient(t) - - svc := NewDeploymentService(pluginCfg, zaptest.NewLogger(t), testRegistry, logpersistertest.NewTestLogPersister(t)) - resp, err := svc.ExecuteStage(ctx, req) - - require.NoError(t, err) - assert.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) - - deployment, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}).Namespace("test-namespace").Get(context.Background(), "simple", metav1.GetOptions{}) - require.NoError(t, err) - - assert.Equal(t, "piped", deployment.GetLabels()["pipecd.dev/managed-by"]) - assert.Equal(t, "piped-id", deployment.GetLabels()["pipecd.dev/piped"]) - assert.Equal(t, "app-id", deployment.GetLabels()["pipecd.dev/application"]) - assert.Equal(t, "0123456789", deployment.GetLabels()["pipecd.dev/commit-hash"]) - - assert.Equal(t, "simple", deployment.GetName()) - assert.Equal(t, "simple", deployment.GetLabels()["app"]) - assert.Equal(t, "piped", deployment.GetAnnotations()["pipecd.dev/managed-by"]) - assert.Equal(t, "piped-id", deployment.GetAnnotations()["pipecd.dev/piped"]) - assert.Equal(t, "app-id", deployment.GetAnnotations()["pipecd.dev/application"]) - assert.Equal(t, "apps/v1", deployment.GetAnnotations()["pipecd.dev/original-api-version"]) - assert.Equal(t, "apps:Deployment:test-namespace:simple", deployment.GetAnnotations()["pipecd.dev/resource-key"]) // This assertion differs from the non-plugin-arched piped's Kubernetes platform provider, but we decided to change this behavior. - assert.Equal(t, "0123456789", deployment.GetAnnotations()["pipecd.dev/commit-hash"]) -} - -func TestDeploymentService_executeK8sSyncStage_withPrune(t *testing.T) { - t.Parallel() - - ctx := context.Background() - - // initialize tool registry - testRegistry := toolregistrytest.NewTestToolRegistry(t) - - // initialize plugin config and dynamic client for assertions with envtest - pluginCfg, dynamicClient := setupTestPluginConfigAndDynamicClient(t) - - svc := NewDeploymentService(pluginCfg, zaptest.NewLogger(t), testRegistry, logpersistertest.NewTestLogPersister(t)) - - running := filepath.Join("./", "testdata", "prune", "running") - - // read the running application config from the testdata file - runningCfg, err := os.ReadFile(filepath.Join(running, "app.pipecd.yaml")) - require.NoError(t, err) - - ok := t.Run("prepare", func(t *testing.T) { - runningRequest := &deployment.ExecuteStageRequest{ - Input: &deployment.ExecutePluginInput{ - Deployment: &model.Deployment{ - PipedId: "piped-id", - ApplicationId: "app-id", - DeployTargetsByPlugin: map[string]*model.DeployTargets{ - "kubernetes": { - DeployTargets: []string{"default"}, - }, - }, - }, - Stage: &model.PipelineStage{ - Id: "stage-id", - Name: "K8S_SYNC", - }, - StageConfig: []byte(``), - RunningDeploymentSource: nil, - TargetDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: running, - CommitHash: "0123456789", - ApplicationConfig: runningCfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - }, - } - - resp, err := svc.ExecuteStage(ctx, runningRequest) - - require.NoError(t, err) - require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) - - service, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}).Namespace("default").Get(context.Background(), "simple", metav1.GetOptions{}) - require.NoError(t, err) - - require.Equal(t, "piped", service.GetLabels()["pipecd.dev/managed-by"]) - require.Equal(t, "piped-id", service.GetLabels()["pipecd.dev/piped"]) - require.Equal(t, "app-id", service.GetLabels()["pipecd.dev/application"]) - require.Equal(t, "0123456789", service.GetLabels()["pipecd.dev/commit-hash"]) - - require.Equal(t, "simple", service.GetName()) - require.Equal(t, "piped", service.GetAnnotations()["pipecd.dev/managed-by"]) - require.Equal(t, "piped-id", service.GetAnnotations()["pipecd.dev/piped"]) - require.Equal(t, "app-id", service.GetAnnotations()["pipecd.dev/application"]) - require.Equal(t, "v1", service.GetAnnotations()["pipecd.dev/original-api-version"]) - require.Equal(t, ":Service::simple", service.GetAnnotations()["pipecd.dev/resource-key"]) // This assertion differs from the non-plugin-arched piped's Kubernetes platform provider, but we decided to change this behavior. - require.Equal(t, "0123456789", service.GetAnnotations()["pipecd.dev/commit-hash"]) - }) - require.Truef(t, ok, "expected prepare to succeed") - - t.Run("run with prune", func(t *testing.T) { - - // prepare the request to ensure the running deployment exists - - target := filepath.Join("./", "testdata", "prune", "target") - - // read the running application config from the testdata file - targetCfg, err := os.ReadFile(filepath.Join(target, "app.pipecd.yaml")) - require.NoError(t, err) - - // prepare the request to ensure the running deployment exists - targetRequest := &deployment.ExecuteStageRequest{ - Input: &deployment.ExecutePluginInput{ - Deployment: &model.Deployment{ - PipedId: "piped-id", - ApplicationId: "app-id", - DeployTargetsByPlugin: map[string]*model.DeployTargets{ - "kubernetes": { - DeployTargets: []string{"default"}, - }, - }, - }, - Stage: &model.PipelineStage{ - Id: "stage-id", - Name: "K8S_SYNC", - }, - StageConfig: []byte(``), - RunningDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: running, - CommitHash: "0123456789", - ApplicationConfig: runningCfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - TargetDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: target, - CommitHash: "0012345678", - ApplicationConfig: targetCfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - }, - } - - resp, err := svc.ExecuteStage(ctx, targetRequest) - require.NoError(t, err) - require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) - - _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}).Namespace("default").Get(context.Background(), "simple", metav1.GetOptions{}) - require.Error(t, err) - require.Truef(t, apierrors.IsNotFound(err), "expected error to be NotFound, but got %v", err) - }) -} - -func TestDeploymentService_executeK8sSyncStage_withPrune_changesNamespace(t *testing.T) { - t.Parallel() - - ctx := context.Background() - - // initialize tool registry - testRegistry := toolregistrytest.NewTestToolRegistry(t) - - // initialize plugin config and dynamic client for assertions with envtest - pluginCfg, dynamicClient := setupTestPluginConfigAndDynamicClient(t) - - svc := NewDeploymentService(pluginCfg, zaptest.NewLogger(t), testRegistry, logpersistertest.NewTestLogPersister(t)) - - running := filepath.Join("./", "testdata", "prune_with_change_namespace", "running") - - // read the running application config from the example file - runningCfg, err := os.ReadFile(filepath.Join(running, "app.pipecd.yaml")) - require.NoError(t, err) - - ok := t.Run("prepare", func(t *testing.T) { - // prepare the request to ensure the running deployment exists - runningRequest := &deployment.ExecuteStageRequest{ - Input: &deployment.ExecutePluginInput{ - Deployment: &model.Deployment{ - PipedId: "piped-id", - ApplicationId: "app-id", - DeployTargetsByPlugin: map[string]*model.DeployTargets{ - "kubernetes": { - DeployTargets: []string{"default"}, - }, - }, - }, - Stage: &model.PipelineStage{ - Id: "stage-id", - Name: "K8S_SYNC", - }, - StageConfig: []byte(``), - RunningDeploymentSource: nil, - TargetDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: running, - CommitHash: "0123456789", - ApplicationConfig: runningCfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - }, - } - - resp, err := svc.ExecuteStage(ctx, runningRequest) - - require.NoError(t, err) - require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) - - service, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}).Namespace("test-1").Get(context.Background(), "simple", metav1.GetOptions{}) - require.NoError(t, err) - - require.Equal(t, "piped", service.GetLabels()["pipecd.dev/managed-by"]) - require.Equal(t, "piped-id", service.GetLabels()["pipecd.dev/piped"]) - require.Equal(t, "app-id", service.GetLabels()["pipecd.dev/application"]) - require.Equal(t, "0123456789", service.GetLabels()["pipecd.dev/commit-hash"]) - - require.Equal(t, "simple", service.GetName()) - require.Equal(t, "piped", service.GetAnnotations()["pipecd.dev/managed-by"]) - require.Equal(t, "piped-id", service.GetAnnotations()["pipecd.dev/piped"]) - require.Equal(t, "app-id", service.GetAnnotations()["pipecd.dev/application"]) - require.Equal(t, "v1", service.GetAnnotations()["pipecd.dev/original-api-version"]) - require.Equal(t, "0123456789", service.GetAnnotations()["pipecd.dev/commit-hash"]) - require.Equal(t, ":Service:test-1:simple", service.GetAnnotations()["pipecd.dev/resource-key"]) - }) - require.Truef(t, ok, "expected prepare to succeed") - - t.Run("run with prune", func(t *testing.T) { - target := filepath.Join("./", "testdata", "prune_with_change_namespace", "target") - - // read the running application config from the example file - targetCfg, err := os.ReadFile(filepath.Join(target, "app.pipecd.yaml")) - require.NoError(t, err) - - // prepare the request to ensure the running deployment exists - targetRequest := &deployment.ExecuteStageRequest{ - Input: &deployment.ExecutePluginInput{ - Deployment: &model.Deployment{ - PipedId: "piped-id", - ApplicationId: "app-id", - DeployTargetsByPlugin: map[string]*model.DeployTargets{ - "kubernetes": { - DeployTargets: []string{"default"}, - }, - }, - }, - Stage: &model.PipelineStage{ - Id: "stage-id", - Name: "K8S_SYNC", - }, - StageConfig: []byte(``), - RunningDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: running, - CommitHash: "0123456789", - ApplicationConfig: runningCfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - TargetDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: target, - CommitHash: "0012345678", - ApplicationConfig: targetCfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - }, - } - - resp, err := svc.ExecuteStage(ctx, targetRequest) - require.NoError(t, err) - require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) - - // The service should be removed from the previous namespace - _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}).Namespace("test-1").Get(context.Background(), "simple", metav1.GetOptions{}) - require.Error(t, err) - require.Truef(t, apierrors.IsNotFound(err), "expected error to be NotFound, but got %v", err) - - // The service should be created in the new namespace - service, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}).Namespace("test-2").Get(context.Background(), "simple", metav1.GetOptions{}) - require.NoError(t, err) - - require.Equal(t, "piped", service.GetLabels()["pipecd.dev/managed-by"]) - require.Equal(t, "piped-id", service.GetLabels()["pipecd.dev/piped"]) - require.Equal(t, "app-id", service.GetLabels()["pipecd.dev/application"]) - require.Equal(t, "0012345678", service.GetLabels()["pipecd.dev/commit-hash"]) - - require.Equal(t, "simple", service.GetName()) - require.Equal(t, "piped", service.GetAnnotations()["pipecd.dev/managed-by"]) - require.Equal(t, "piped-id", service.GetAnnotations()["pipecd.dev/piped"]) - require.Equal(t, "app-id", service.GetAnnotations()["pipecd.dev/application"]) - require.Equal(t, "v1", service.GetAnnotations()["pipecd.dev/original-api-version"]) - require.Equal(t, "0012345678", service.GetAnnotations()["pipecd.dev/commit-hash"]) - require.Equal(t, ":Service:test-2:simple", service.GetAnnotations()["pipecd.dev/resource-key"]) - }) -} - -func TestDeploymentService_executeK8sSyncStage_withPrune_clusterScoped(t *testing.T) { - t.Parallel() - - ctx := context.Background() - - // initialize tool registry - testRegistry := toolregistrytest.NewTestToolRegistry(t) - - // initialize plugin config and dynamic client for assertions with envtest - pluginCfg, dynamicClient := setupTestPluginConfigAndDynamicClient(t) - - svc := NewDeploymentService(pluginCfg, zaptest.NewLogger(t), testRegistry, logpersistertest.NewTestLogPersister(t)) - - // prepare the custom resource definition - prepare := filepath.Join("./", "testdata", "prune_cluster_scoped_resource", "prepare") - - prepareCfg, err := os.ReadFile(filepath.Join(prepare, "app.pipecd.yaml")) - require.NoError(t, err) - - ok := t.Run("prepare crd", func(t *testing.T) { - prepareRequest := &deployment.ExecuteStageRequest{ - Input: &deployment.ExecutePluginInput{ - Deployment: &model.Deployment{ - PipedId: "piped-id", - ApplicationId: "prepare-app-id", - DeployTargetsByPlugin: map[string]*model.DeployTargets{ - "kubernetes": { - DeployTargets: []string{"default"}, - }, - }, - }, - Stage: &model.PipelineStage{ - Id: "stage-id", - Name: "K8S_SYNC", - }, - StageConfig: []byte(``), - RunningDeploymentSource: nil, - TargetDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: prepare, - CommitHash: "0123456789", - ApplicationConfig: prepareCfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - }, - } - - resp, err := svc.ExecuteStage(ctx, prepareRequest) - - require.NoError(t, err) - require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) - }) - require.Truef(t, ok, "expected prepare to succeed") - - // prepare the running resources - running := filepath.Join("./", "testdata", "prune_cluster_scoped_resource", "running") - - // read the running application config from the example file - runningCfg, err := os.ReadFile(filepath.Join(running, "app.pipecd.yaml")) - require.NoError(t, err) - - ok = t.Run("prepare running", func(t *testing.T) { - // prepare the request to ensure the running deployment exists - runningRequest := &deployment.ExecuteStageRequest{ - Input: &deployment.ExecutePluginInput{ - Deployment: &model.Deployment{ - PipedId: "piped-id", - ApplicationId: "app-id", - DeployTargetsByPlugin: map[string]*model.DeployTargets{ - "kubernetes": { - DeployTargets: []string{"default"}, - }, - }, - }, - Stage: &model.PipelineStage{ - Id: "stage-id", - Name: "K8S_SYNC", - }, - StageConfig: []byte(``), - RunningDeploymentSource: nil, - TargetDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: running, - CommitHash: "0123456789", - ApplicationConfig: runningCfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - }, - } - - resp, err := svc.ExecuteStage(ctx, runningRequest) - - require.NoError(t, err) - require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) - - // The my-new-cron-object/my-new-cron-object-2/my-new-cron-object-v1beta1 should be created - _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object", metav1.GetOptions{}) - require.NoError(t, err) - _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object-2", metav1.GetOptions{}) - require.NoError(t, err) - _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object-v1beta1", metav1.GetOptions{}) - require.NoError(t, err) - }) - require.Truef(t, ok, "expected prepare to succeed") - - t.Run("sync", func(t *testing.T) { - // sync the target resources and assert the prune behavior - target := filepath.Join("./", "testdata", "prune_cluster_scoped_resource", "target") - - // read the running application config from the example file - targetCfg, err := os.ReadFile(filepath.Join(target, "app.pipecd.yaml")) - require.NoError(t, err) - - // prepare the request to ensure the running deployment exists - targetRequest := &deployment.ExecuteStageRequest{ - Input: &deployment.ExecutePluginInput{ - Deployment: &model.Deployment{ - PipedId: "piped-id", - ApplicationId: "app-id", - DeployTargetsByPlugin: map[string]*model.DeployTargets{ - "kubernetes": { - DeployTargets: []string{"default"}, - }, - }, - }, - Stage: &model.PipelineStage{ - Id: "stage-id", - Name: "K8S_SYNC", - }, - StageConfig: []byte(``), - RunningDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: running, - CommitHash: "0123456789", - ApplicationConfig: runningCfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - TargetDeploymentSource: &common.DeploymentSource{ - ApplicationDirectory: target, - CommitHash: "0012345678", - ApplicationConfig: targetCfg, - ApplicationConfigFilename: "app.pipecd.yaml", - }, - }, - } - - resp, err := svc.ExecuteStage(ctx, targetRequest) - require.NoError(t, err) - require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) - - // The my-new-cron-object should not be removed - _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object", metav1.GetOptions{}) - require.NoError(t, err) - // The my-new-cron-object-2 should be removed - _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object-2", metav1.GetOptions{}) - require.Error(t, err) - require.Truef(t, apierrors.IsNotFound(err), "expected error to be NotFound, but got %v", err) - // The my-new-cron-object-v1beta1 should be removed - _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object-v1beta1", metav1.GetOptions{}) - require.Error(t, err) - require.Truef(t, apierrors.IsNotFound(err), "expected error to be NotFound, but got %v", err) - }) -} From bd9b56603cdcbc64ce3e8bd78d6bad71852fa0d6 Mon Sep 17 00:00:00 2001 From: Yoshiki Fujikane Date: Mon, 17 Mar 2025 14:30:18 +0900 Subject: [PATCH 7/9] Remove server_test.go Signed-off-by: Yoshiki Fujikane --- .../kubernetes/deployment/plugin_test.go | 57 ++++++++ .../kubernetes/deployment/server_test.go | 126 ------------------ 2 files changed, 57 insertions(+), 126 deletions(-) delete mode 100644 pkg/app/pipedv1/plugin/kubernetes/deployment/server_test.go diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin_test.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin_test.go index 27972ea99d..5665d444d7 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin_test.go @@ -29,6 +29,9 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/yaml" kubeConfigPkg "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" @@ -38,6 +41,60 @@ import ( "github.com/pipe-cd/pipecd/pkg/plugin/toolregistry/toolregistrytest" ) +// TODO: move to a common package +func examplesDir() string { + d, _ := os.Getwd() + for { + if _, err := os.Stat(filepath.Join(d, "examples")); err == nil { + return filepath.Join(d, "examples") + } + d = filepath.Dir(d) + } +} + +func kubeconfigFromRestConfig(restConfig *rest.Config) (string, error) { + clusters := make(map[string]*clientcmdapi.Cluster) + clusters["default-cluster"] = &clientcmdapi.Cluster{ + Server: restConfig.Host, + CertificateAuthorityData: restConfig.CAData, + } + contexts := make(map[string]*clientcmdapi.Context) + contexts["default-context"] = &clientcmdapi.Context{ + Cluster: "default-cluster", + AuthInfo: "default-user", + } + authinfos := make(map[string]*clientcmdapi.AuthInfo) + authinfos["default-user"] = &clientcmdapi.AuthInfo{ + ClientCertificateData: restConfig.CertData, + ClientKeyData: restConfig.KeyData, + } + clientConfig := clientcmdapi.Config{ + Kind: "Config", + APIVersion: "v1", + Clusters: clusters, + Contexts: contexts, + CurrentContext: "default-context", + AuthInfos: authinfos, + } + b, err := clientcmd.Write(clientConfig) + if err != nil { + return "", err + } + + return string(b), nil +} + +func setupEnvTest(t *testing.T) *rest.Config { + t.Helper() + + tEnv := new(envtest.Environment) + kubeCfg, err := tEnv.Start() + require.NoError(t, err) + t.Cleanup(func() { tEnv.Stop() }) + + return kubeCfg +} + func setupTestDeployTargetConfig(t *testing.T, kubeCfg *rest.Config) *kubeConfigPkg.KubernetesDeployTargetConfig { t.Helper() diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/server_test.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/server_test.go deleted file mode 100644 index 514a4c8cb4..0000000000 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/server_test.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2024 The PipeCD Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package deployment - -import ( - "encoding/json" - "os" - "path" - "path/filepath" - "testing" - - "github.com/stretchr/testify/require" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - "sigs.k8s.io/controller-runtime/pkg/envtest" - - kubeConfigPkg "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" - config "github.com/pipe-cd/pipecd/pkg/configv1" -) - -// TODO: move to a common package -func examplesDir() string { - d, _ := os.Getwd() - for { - if _, err := os.Stat(filepath.Join(d, "examples")); err == nil { - return filepath.Join(d, "examples") - } - d = filepath.Dir(d) - } -} - -func kubeconfigFromRestConfig(restConfig *rest.Config) (string, error) { - clusters := make(map[string]*clientcmdapi.Cluster) - clusters["default-cluster"] = &clientcmdapi.Cluster{ - Server: restConfig.Host, - CertificateAuthorityData: restConfig.CAData, - } - contexts := make(map[string]*clientcmdapi.Context) - contexts["default-context"] = &clientcmdapi.Context{ - Cluster: "default-cluster", - AuthInfo: "default-user", - } - authinfos := make(map[string]*clientcmdapi.AuthInfo) - authinfos["default-user"] = &clientcmdapi.AuthInfo{ - ClientCertificateData: restConfig.CertData, - ClientKeyData: restConfig.KeyData, - } - clientConfig := clientcmdapi.Config{ - Kind: "Config", - APIVersion: "v1", - Clusters: clusters, - Contexts: contexts, - CurrentContext: "default-context", - AuthInfos: authinfos, - } - b, err := clientcmd.Write(clientConfig) - if err != nil { - return "", err - } - - return string(b), nil -} - -func setupEnvTest(t *testing.T) *rest.Config { - t.Helper() - - tEnv := new(envtest.Environment) - kubeCfg, err := tEnv.Start() - require.NoError(t, err) - t.Cleanup(func() { tEnv.Stop() }) - - return kubeCfg -} - -func setupTestPluginConfig(t *testing.T, kubeCfg *rest.Config) *config.PipedPlugin { - t.Helper() - - kubeconfig, err := kubeconfigFromRestConfig(kubeCfg) - require.NoError(t, err) - - workDir := t.TempDir() - kubeconfigPath := path.Join(workDir, "kubeconfig") - err = os.WriteFile(kubeconfigPath, []byte(kubeconfig), 0755) - require.NoError(t, err) - - deployTarget, err := json.Marshal(kubeConfigPkg.KubernetesDeployTargetConfig{KubeConfigPath: kubeconfigPath}) - require.NoError(t, err) - - // prepare the piped plugin config - return &config.PipedPlugin{ - Name: "kubernetes", - URL: "file:///path/to/kubernetes/plugin", // dummy for testing - Port: 0, // dummy for testing - DeployTargets: []config.PipedDeployTarget{{ - Name: "default", - Labels: map[string]string{}, - Config: json.RawMessage(deployTarget), - }}, - } -} - -func setupTestPluginConfigAndDynamicClient(t *testing.T) (*config.PipedPlugin, dynamic.Interface) { - t.Helper() - - kubeCfg := setupEnvTest(t) - pluginCfg := setupTestPluginConfig(t, kubeCfg) - - dynamicClient, err := dynamic.NewForConfig(kubeCfg) - require.NoError(t, err) - - return pluginCfg, dynamicClient -} From a15757f003159afb2964cb654a2cb20ccde58f63 Mon Sep 17 00:00:00 2001 From: Yoshiki Fujikane Date: Mon, 17 Mar 2025 14:40:15 +0900 Subject: [PATCH 8/9] Replace old determineStrategy Signed-off-by: Yoshiki Fujikane --- .../plugin/kubernetes/deployment/determine.go | 44 ++++++----------- .../kubernetes/deployment/determine_test.go | 47 +++++++++---------- .../plugin/kubernetes/deployment/plugin.go | 2 +- .../plugin/kubernetes/deployment/server.go | 24 +--------- 4 files changed, 39 insertions(+), 78 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go index 5dc04bc827..d9198a8116 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go @@ -23,7 +23,6 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/provider" - "github.com/pipe-cd/pipecd/pkg/model" "github.com/pipe-cd/pipecd/pkg/plugin/diff" "github.com/pipe-cd/pipecd/pkg/plugin/sdk" ) @@ -157,16 +156,17 @@ func checkReplicasChange(ns diff.Nodes) (before, after string, changed bool) { return node.StringX(), node.StringY(), true } +// determineStrategy decides the sync strategy and summary message based on the given manifests. // First up, checks to see if the workload's `spec.template` has been changed, // and then checks if the configmap/secret's data. -func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8sResourceReference, logger *zap.Logger) (strategy model.SyncStrategy, summary string) { +func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8sResourceReference, logger *zap.Logger) (strategy sdk.SyncStrategy, summary string) { oldWorkloads := findWorkloadManifests(olds, workloadRefs) if len(oldWorkloads) == 0 { - return model.SyncStrategy_QUICK_SYNC, "Quick sync by applying all manifests because it was unable to find the currently running workloads" + return sdk.SyncStrategyQuickSync, "Quick sync by applying all manifests because it was unable to find the currently running workloads" } newWorkloads := findWorkloadManifests(news, workloadRefs) if len(newWorkloads) == 0 { - return model.SyncStrategy_QUICK_SYNC, "Quick sync by applying all manifests because it was unable to find workloads in the new manifests" + return sdk.SyncStrategyQuickSync, "Quick sync by applying all manifests because it was unable to find workloads in the new manifests" } workloads := provider.FindSameManifests(oldWorkloads, newWorkloads) @@ -177,7 +177,7 @@ func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8s // do progressive deployment with the specified pipeline. diffResult, err := provider.Diff(w.Old, w.New, logger) if err != nil { - return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively due to an error while calculating the diff (%v)", err) + return sdk.SyncStrategyPipelineSync, fmt.Sprintf("Sync progressively due to an error while calculating the diff (%v)", err) } diffNodes := diffResult.Nodes() diffs[w.New.Key()] = diffNodes @@ -185,9 +185,9 @@ func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8s templateDiffs := diffNodes.FindByPrefix("spec.template") if len(templateDiffs) > 0 { if msg, changed := checkImageChange(templateDiffs); changed { - return model.SyncStrategy_PIPELINE, msg + return sdk.SyncStrategyPipelineSync, msg } - return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because pod template of workload %s was changed", w.New.Name()) + return sdk.SyncStrategyPipelineSync, fmt.Sprintf("Sync progressively because pod template of workload %s was changed", w.New.Name()) } } @@ -196,22 +196,22 @@ func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8s oldConfigs := provider.FindConfigsAndSecrets(olds) newConfigs := provider.FindConfigsAndSecrets(news) if len(oldConfigs) > len(newConfigs) { - return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %d configmap/secret deleted", len(oldConfigs)-len(newConfigs)) + return sdk.SyncStrategyPipelineSync, fmt.Sprintf("Sync progressively because %d configmap/secret deleted", len(oldConfigs)-len(newConfigs)) } if len(oldConfigs) < len(newConfigs) { - return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because new %d configmap/secret added", len(newConfigs)-len(oldConfigs)) + return sdk.SyncStrategyPipelineSync, fmt.Sprintf("Sync progressively because new %d configmap/secret added", len(newConfigs)-len(oldConfigs)) } for k, oc := range oldConfigs { nc, ok := newConfigs[k] if !ok { - return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was deleted", oc.Kind(), oc.Name()) + return sdk.SyncStrategyPipelineSync, fmt.Sprintf("Sync progressively because %s %s was deleted", oc.Kind(), oc.Name()) } result, err := provider.Diff(oc, nc, logger) if err != nil { - return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively due to an error while calculating the diff (%v)", err) + return sdk.SyncStrategyPipelineSync, fmt.Sprintf("Sync progressively due to an error while calculating the diff (%v)", err) } if result.HasDiff() { - return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was updated", oc.Kind(), oc.Name()) + return sdk.SyncStrategyPipelineSync, fmt.Sprintf("Sync progressively because %s %s was updated", oc.Kind(), oc.Name()) } } @@ -225,24 +225,8 @@ func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8s } if len(scales) > 0 { slices.Sort(scales) - return model.SyncStrategy_QUICK_SYNC, fmt.Sprintf("Quick sync to scale %s", strings.Join(scales, ", ")) + return sdk.SyncStrategyQuickSync, fmt.Sprintf("Quick sync to scale %s", strings.Join(scales, ", ")) } - return model.SyncStrategy_QUICK_SYNC, "Quick sync by applying all manifests" -} - -// determineStrategySDK decides the sync strategy and summary message based on the given manifests. -// TODO: rewrite this function to determineStrategy after the current determineStrategy is removed. -func determineStrategySDK(olds, news []provider.Manifest, workloadRefs []config.K8sResourceReference, logger *zap.Logger) (strategy sdk.SyncStrategy, summary string) { - mStrategy, summary := determineStrategy(olds, news, workloadRefs, logger) - - var s sdk.SyncStrategy - switch mStrategy { - case model.SyncStrategy_QUICK_SYNC: - s = sdk.SyncStrategyQuickSync - case model.SyncStrategy_PIPELINE: - s = sdk.SyncStrategyQuickSync - } - - return s, summary + return sdk.SyncStrategyQuickSync, "Quick sync by applying all manifests" } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/determine_test.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/determine_test.go index 6e455740ab..24ce404407 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/determine_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/determine_test.go @@ -26,7 +26,6 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/provider" - "github.com/pipe-cd/pipecd/pkg/model" "github.com/pipe-cd/pipecd/pkg/plugin/sdk" ) @@ -914,13 +913,13 @@ spec: } } -func TestDetermineStrategy(t *testing.T) { +func Test_determineStrategy(t *testing.T) { tests := []struct { name string olds []string news []string workloadRefs []config.K8sResourceReference - wantStrategy model.SyncStrategy + wantStrategy sdk.SyncStrategy wantSummary string }{ { @@ -940,7 +939,7 @@ spec: image: nginx:1.19.3 `, }, - wantStrategy: model.SyncStrategy_QUICK_SYNC, + wantStrategy: sdk.SyncStrategyQuickSync, wantSummary: "Quick sync by applying all manifests because it was unable to find the currently running workloads", }, { @@ -960,7 +959,7 @@ spec: `, }, news: []string{}, - wantStrategy: model.SyncStrategy_QUICK_SYNC, + wantStrategy: sdk.SyncStrategyQuickSync, wantSummary: "Quick sync by applying all manifests because it was unable to find workloads in the new manifests", }, { @@ -993,7 +992,7 @@ spec: image: nginx:1.19.4 `, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because of updating image nginx from 1.19.3 to 1.19.4", }, { @@ -1042,7 +1041,7 @@ data: key: new-value `, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because ConfigMap my-config was updated", }, { @@ -1077,7 +1076,7 @@ spec: image: nginx:1.19.3 `, }, - wantStrategy: model.SyncStrategy_QUICK_SYNC, + wantStrategy: sdk.SyncStrategyQuickSync, wantSummary: "Quick sync to scale Deployment/nginx-deployment from 3 to 5", }, { @@ -1111,7 +1110,7 @@ spec: image: nginx:1.19.3 `, }, - wantStrategy: model.SyncStrategy_QUICK_SYNC, + wantStrategy: sdk.SyncStrategyQuickSync, wantSummary: "Quick sync to scale Deployment/nginx-deployment from to 1", }, { @@ -1145,7 +1144,7 @@ spec: image: nginx:1.19.3 `, }, - wantStrategy: model.SyncStrategy_QUICK_SYNC, + wantStrategy: sdk.SyncStrategyQuickSync, wantSummary: "Quick sync to scale Deployment/nginx-deployment from 1 to ", }, { @@ -1206,7 +1205,7 @@ spec: image: redis:6.0.9 `, }, - wantStrategy: model.SyncStrategy_QUICK_SYNC, + wantStrategy: sdk.SyncStrategyQuickSync, wantSummary: "Quick sync to scale Deployment/nginx-deployment from 3 to 5, Deployment/redis-deployment from 2 to 4", }, { @@ -1243,7 +1242,7 @@ spec: image: redis:6.0.10 `, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because of updating image nginx from 1.19.3 to 1.19.4, image redis from 6.0.9 to 6.0.10", }, { @@ -1280,7 +1279,7 @@ spec: image: nginx:1.19.3 `, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because of updating image nginx:1.19.3 to redis:6.0.9, image redis:6.0.9 to nginx:1.19.3", }, { @@ -1343,7 +1342,7 @@ spec: Name: "nginx-deployment", }, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because of updating image nginx from 1.19.3 to 1.19.4", }, { @@ -1376,7 +1375,7 @@ spec: image: nginx:1.19.3 `, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because pod template of workload nginx-deployment was changed", }, { @@ -1417,7 +1416,7 @@ spec: memory: "1Gi" `, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because pod template of workload nginx-deployment was changed", }, { @@ -1458,7 +1457,7 @@ spec: image: nginx:1.19.3 `, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because 1 configmap/secret deleted", }, { @@ -1499,7 +1498,7 @@ spec: image: nginx:1.19.3 `, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because 1 configmap/secret deleted", }, { @@ -1540,7 +1539,7 @@ data: key: value `, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because new 1 configmap/secret added", }, { @@ -1581,7 +1580,7 @@ data: key: dmFsdWU= `, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because new 1 configmap/secret added", }, { @@ -1630,7 +1629,7 @@ data: key: new-value `, }, - wantStrategy: model.SyncStrategy_PIPELINE, + wantStrategy: sdk.SyncStrategyPipelineSync, wantSummary: "Sync progressively because ConfigMap old-config was deleted", }, { @@ -1664,7 +1663,7 @@ spec: image: nginx:1.19.3 `, }, - wantStrategy: model.SyncStrategy_QUICK_SYNC, + wantStrategy: sdk.SyncStrategyQuickSync, wantSummary: "Quick sync by applying all manifests", }, { @@ -1697,7 +1696,7 @@ spec: image: nginx:1.19.3 `, }, - wantStrategy: model.SyncStrategy_QUICK_SYNC, + wantStrategy: sdk.SyncStrategyQuickSync, wantSummary: "Quick sync by applying all manifests", }, } @@ -1713,7 +1712,7 @@ spec: } logger := zap.NewNop() gotStrategy, gotSummary := determineStrategy(oldManifests, newManifests, tt.workloadRefs, logger) - assert.Equal(t, tt.wantStrategy.String(), gotStrategy.String()) + assert.Equal(t, tt.wantStrategy, gotStrategy) assert.Equal(t, tt.wantSummary, gotSummary) }) } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go index 5d8515467b..f89622e97e 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/plugin.go @@ -386,7 +386,7 @@ func (p *Plugin) DetermineStrategy(ctx context.Context, _ *sdk.ConfigNone, input return nil, err } - strategy, summary := determineStrategySDK(runnings, targets, cfg.Spec.Workloads, logger) + strategy, summary := determineStrategy(runnings, targets, cfg.Spec.Workloads, logger) return &sdk.DetermineStrategyResponse{ Strategy: strategy, diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go index e1a374c49a..945b41b6e3 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go @@ -78,29 +78,7 @@ func (a *DeploymentService) Register(server *grpc.Server) { // DetermineStrategy implements deployment.DeploymentServiceServer. func (a *DeploymentService) DetermineStrategy(ctx context.Context, request *deployment.DetermineStrategyRequest) (*deployment.DetermineStrategyResponse, error) { - cfg, err := config.DecodeYAML[*kubeconfig.KubernetesApplicationSpec](request.GetInput().GetTargetDeploymentSource().GetApplicationConfig()) - if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - - runnings, err := a.loadManifests(ctx, request.GetInput().GetDeployment(), cfg.Spec, request.GetInput().GetRunningDeploymentSource()) - - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - targets, err := a.loadManifests(ctx, request.GetInput().GetDeployment(), cfg.Spec, request.GetInput().GetTargetDeploymentSource()) - - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - strategy, summary := determineStrategy(runnings, targets, cfg.Spec.Workloads, a.logger) - - return &deployment.DetermineStrategyResponse{ - SyncStrategy: strategy, - Summary: summary, - }, nil + return &deployment.DetermineStrategyResponse{}, nil } From fde65bb8b66bacbfae533285134d5a2b73f181be Mon Sep 17 00:00:00 2001 From: Yoshiki Fujikane Date: Mon, 17 Mar 2025 16:21:27 +0900 Subject: [PATCH 9/9] Remove DeploymentService Signed-off-by: Yoshiki Fujikane --- .../plugin/kubernetes/deployment/server.go | 139 --------------- pkg/app/pipedv1/plugin/kubernetes/main.go | 15 -- pkg/app/pipedv1/plugin/kubernetes/plugin.go | 164 ------------------ 3 files changed, 318 deletions(-) delete mode 100644 pkg/app/pipedv1/plugin/kubernetes/deployment/server.go delete mode 100644 pkg/app/pipedv1/plugin/kubernetes/plugin.go diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go deleted file mode 100644 index 945b41b6e3..0000000000 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2024 The PipeCD Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package deployment - -import ( - "context" - - kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" - "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/provider" - "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/toolregistry" - config "github.com/pipe-cd/pipecd/pkg/configv1" - "github.com/pipe-cd/pipecd/pkg/model" - "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/common" - "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" - "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" - - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -type toolClient interface { - InstallTool(ctx context.Context, name, version, script string) (string, error) -} - -type logPersister interface { - StageLogPersister(deploymentID, stageID string) logpersister.StageLogPersister -} - -type DeploymentService struct { - deployment.UnimplementedDeploymentServiceServer - - // this field is set with the plugin configuration - // the plugin configuration is sent from piped while initializing the plugin - pluginConfig *config.PipedPlugin - - logger *zap.Logger - toolRegistry toolRegistry - loader loader - logPersister logPersister -} - -// NewDeploymentService creates a new planService. -func NewDeploymentService( - config *config.PipedPlugin, - logger *zap.Logger, - toolClient toolClient, - logPersister logPersister, -) *DeploymentService { - toolRegistry := toolregistry.NewRegistry(toolClient) - - return &DeploymentService{ - pluginConfig: config, - logger: logger.Named("planner"), - toolRegistry: toolRegistry, - loader: provider.NewLoader(toolRegistry), - logPersister: logPersister, - } -} - -// Register registers all handling of this service into the specified gRPC server. -func (a *DeploymentService) Register(server *grpc.Server) { - deployment.RegisterDeploymentServiceServer(server, a) -} - -// DetermineStrategy implements deployment.DeploymentServiceServer. -func (a *DeploymentService) DetermineStrategy(ctx context.Context, request *deployment.DetermineStrategyRequest) (*deployment.DetermineStrategyResponse, error) { - return &deployment.DetermineStrategyResponse{}, nil - -} - -// DetermineVersions implements deployment.DeploymentServiceServer. -func (a *DeploymentService) DetermineVersions(ctx context.Context, request *deployment.DetermineVersionsRequest) (*deployment.DetermineVersionsResponse, error) { - return &deployment.DetermineVersionsResponse{}, nil -} - -// BuildPipelineSyncStages implements deployment.DeploymentServiceServer. -func (a *DeploymentService) BuildPipelineSyncStages(ctx context.Context, request *deployment.BuildPipelineSyncStagesRequest) (*deployment.BuildPipelineSyncStagesResponse, error) { - return &deployment.BuildPipelineSyncStagesResponse{}, nil -} - -// BuildQuickSyncStages implements deployment.DeploymentServiceServer. -func (a *DeploymentService) BuildQuickSyncStages(ctx context.Context, request *deployment.BuildQuickSyncStagesRequest) (*deployment.BuildQuickSyncStagesResponse, error) { - return &deployment.BuildQuickSyncStagesResponse{}, nil -} - -// FetchDefinedStages implements deployment.DeploymentServiceServer. -func (a *DeploymentService) FetchDefinedStages(context.Context, *deployment.FetchDefinedStagesRequest) (*deployment.FetchDefinedStagesResponse, error) { - stages := make([]string, 0, len(AllStages)) - for _, s := range AllStages { - stages = append(stages, string(s)) - } - - return &deployment.FetchDefinedStagesResponse{ - Stages: stages, - }, nil -} - -func (a *DeploymentService) loadManifests(ctx context.Context, deploy *model.Deployment, spec *kubeconfig.KubernetesApplicationSpec, deploymentSource *common.DeploymentSource) ([]provider.Manifest, error) { - manifests, err := a.loader.LoadManifests(ctx, provider.LoaderInput{ - PipedID: deploy.GetPipedId(), - AppID: deploy.GetApplicationId(), - CommitHash: deploymentSource.GetCommitHash(), - AppName: deploy.GetApplicationName(), - AppDir: deploymentSource.GetApplicationDirectory(), - ConfigFilename: deploymentSource.GetApplicationConfigFilename(), - Manifests: spec.Input.Manifests, - Namespace: spec.Input.Namespace, - TemplatingMethod: provider.TemplatingMethodNone, // TODO: Implement detection of templating method or add it to the config spec. - - // TODO: Define other fields for LoaderInput - }) - - if err != nil { - return nil, err - } - - return manifests, nil -} - -// ExecuteStage performs stage-defined tasks. -// It returns stage status after execution without error. -// Error only be raised if the given stage is not supported. -func (a *DeploymentService) ExecuteStage(ctx context.Context, request *deployment.ExecuteStageRequest) (response *deployment.ExecuteStageResponse, _ error) { - return nil, status.Error(codes.Unimplemented, "not implemented") -} diff --git a/pkg/app/pipedv1/plugin/kubernetes/main.go b/pkg/app/pipedv1/plugin/kubernetes/main.go index 07c40166c2..0c0bc7e5bd 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/main.go +++ b/pkg/app/pipedv1/plugin/kubernetes/main.go @@ -19,24 +19,9 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/deployment" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/livestate" - "github.com/pipe-cd/pipecd/pkg/cli" "github.com/pipe-cd/pipecd/pkg/plugin/sdk" ) -// TODO: remote it -func _main() { - app := cli.NewApp( - "pipecd-plugin-kubernetes", - "Plugin component to deploy Kubernetes Application.", - ) - app.AddCommands( - NewPluginCommand(), - ) - if err := app.Run(); err != nil { - log.Fatal(err) - } -} - func main() { sdk.RegisterDeploymentPlugin(&deployment.Plugin{}) sdk.RegisterLivestatePlugin(livestate.Plugin{}) diff --git a/pkg/app/pipedv1/plugin/kubernetes/plugin.go b/pkg/app/pipedv1/plugin/kubernetes/plugin.go deleted file mode 100644 index fdd5935446..0000000000 --- a/pkg/app/pipedv1/plugin/kubernetes/plugin.go +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright 2024 The PipeCD Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "context" - "net/http" - "net/http/pprof" - "time" - - "github.com/spf13/cobra" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" - - "github.com/pipe-cd/pipecd/pkg/admin" - "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/deployment" - "github.com/pipe-cd/pipecd/pkg/cli" - config "github.com/pipe-cd/pipecd/pkg/configv1" - "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" - "github.com/pipe-cd/pipecd/pkg/plugin/pipedapi" - "github.com/pipe-cd/pipecd/pkg/plugin/toolregistry" - "github.com/pipe-cd/pipecd/pkg/rpc" - "github.com/pipe-cd/pipecd/pkg/version" -) - -type plugin struct { - pipedPluginService string - gracePeriod time.Duration - tls bool - certFile string - keyFile string - config string - - enableGRPCReflection bool -} - -// NewPluginCommand creates a new cobra command for executing api server. -func NewPluginCommand() *cobra.Command { - s := &plugin{ - gracePeriod: 30 * time.Second, - } - cmd := &cobra.Command{ - Use: "start", - Short: "Start running the kubernetes-plugin.", - RunE: cli.WithContext(s.run), - } - - cmd.Flags().StringVar(&s.pipedPluginService, "piped-plugin-service", s.pipedPluginService, "The port number used to connect to the piped plugin service.") // TODO: we should discuss about the name of this flag, or we should use environment variable instead. - cmd.Flags().StringVar(&s.config, "config", s.config, "The configuration for the plugin.") - cmd.Flags().DurationVar(&s.gracePeriod, "grace-period", s.gracePeriod, "How long to wait for graceful shutdown.") - - cmd.Flags().BoolVar(&s.tls, "tls", s.tls, "Whether running the gRPC server with TLS or not.") - cmd.Flags().StringVar(&s.certFile, "cert-file", s.certFile, "The path to the TLS certificate file.") - cmd.Flags().StringVar(&s.keyFile, "key-file", s.keyFile, "The path to the TLS key file.") - - // For debugging early in development - cmd.Flags().BoolVar(&s.enableGRPCReflection, "enable-grpc-reflection", s.enableGRPCReflection, "Whether to enable the reflection service or not.") - - cmd.MarkFlagRequired("piped-plugin-service") - cmd.MarkFlagRequired("config") - - return cmd -} - -func (s *plugin) run(ctx context.Context, input cli.Input) (runErr error) { - // Make a cancellable context. - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - group, ctx := errgroup.WithContext(ctx) - - pipedapiClient, err := pipedapi.NewClient(ctx, s.pipedPluginService) - if err != nil { - input.Logger.Error("failed to create piped plugin service client", zap.Error(err)) - return err - } - - // Load the configuration. - cfg, err := config.ParsePluginConfig(s.config) - if err != nil { - input.Logger.Error("failed to parse the configuration", zap.Error(err)) - return err - } - - // Start running admin server. - { - var ( - ver = []byte(version.Get().Version) // TODO: get the plugin's version - admin = admin.NewAdmin(0, s.gracePeriod, input.Logger) // TODO: add config for admin port - ) - - admin.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { - w.Write(ver) - }) - admin.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("ok")) - }) - admin.HandleFunc("/debug/pprof/", pprof.Index) - admin.HandleFunc("/debug/pprof/profile", pprof.Profile) - admin.HandleFunc("/debug/pprof/trace", pprof.Trace) - - group.Go(func() error { - return admin.Run(ctx) - }) - } - - // Start log persister - persister := logpersister.NewPersister(pipedapiClient, input.Logger) - group.Go(func() error { - return persister.Run(ctx) - }) - - // Start a gRPC server for handling external API requests. - { - var ( - service = deployment.NewDeploymentService( - cfg, - input.Logger, - toolregistry.NewToolRegistry(pipedapiClient), - persister, - ) - opts = []rpc.Option{ - rpc.WithPort(cfg.Port), - rpc.WithGracePeriod(s.gracePeriod), - rpc.WithLogger(input.Logger), - rpc.WithLogUnaryInterceptor(input.Logger), - rpc.WithRequestValidationUnaryInterceptor(), - rpc.WithSignalHandlingUnaryInterceptor(), - } - ) - if s.tls { - opts = append(opts, rpc.WithTLS(s.certFile, s.keyFile)) - } - if s.enableGRPCReflection { - opts = append(opts, rpc.WithGRPCReflection()) - } - if input.Flags.Metrics { - opts = append(opts, rpc.WithPrometheusUnaryInterceptor()) - } - - server := rpc.NewServer(service, opts...) - group.Go(func() error { - return server.Run(ctx) - }) - } - - if err := group.Wait(); err != nil { - input.Logger.Error("failed while running", zap.Error(err)) - return err - } - return nil -}