Skip to content

feat: Implement Server-Side Diffs #522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f
k8s.io/kubectl v0.26.4
k8s.io/kubernetes v1.26.4
sigs.k8s.io/structured-merge-diff/v4 v4.2.3
sigs.k8s.io/structured-merge-diff/v4 v4.4.1
sigs.k8s.io/yaml v1.3.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ sigs.k8s.io/kustomize/api v0.12.1 h1:7YM7gW3kYBwtKvoY216ZzY+8hM+lV53LUayghNRJ0vM
sigs.k8s.io/kustomize/api v0.12.1/go.mod h1:y3JUhimkZkR6sbLNwfJHxvo1TCLwuwm14sCYnkH6S1s=
sigs.k8s.io/kustomize/kyaml v0.13.9 h1:Qz53EAaFFANyNgyOEJbT/yoIHygK40/ZcvU3rgry2Tk=
sigs.k8s.io/kustomize/kyaml v0.13.9/go.mod h1:QsRbD0/KcU+wdk0/L0fIp2KLnohkVzs6fQ85/nOXac4=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
168 changes: 168 additions & 0 deletions pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package diff

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -83,6 +84,14 @@ func Diff(config, live *unstructured.Unstructured, opts ...Option) (*DiffResult,
Normalize(live, opts...)
}

if o.serverSideDiff {
r, err := ServerSideDiff(config, live, opts...)
if err != nil {
return nil, fmt.Errorf("error calculating server side diff: %w", err)
}
return r, nil
}

// TODO The two variables bellow are necessary because there is a cyclic
// dependency with the kube package that blocks the usage of constants
// from common package. common package needs to be refactored and exclude
Expand Down Expand Up @@ -120,6 +129,165 @@ func Diff(config, live *unstructured.Unstructured, opts ...Option) (*DiffResult,
return TwoWayDiff(config, live)
}

// ServerSideDiff will execute a k8s server-side apply in dry-run mode with the
// given config. The result will be compared with given live resource to determine
// diff. If config or live are nil it means resource creation or deletion. In this
// no call will be made to kube-api and a simple diff will be returned.
func ServerSideDiff(config, live *unstructured.Unstructured, opts ...Option) (*DiffResult, error) {
if live != nil && config != nil {
result, err := serverSideDiff(config, live, opts...)
if err != nil {
return nil, fmt.Errorf("serverSideDiff error: %w", err)
}
return result, nil
}
// Currently, during resource creation a shallow diff (non ServerSide apply
// based) will be returned. The reasons are:
// - Saves 1 additional call to KubeAPI
// - Much lighter/faster diff
// - This is the existing behaviour users are already used to
// - No direct benefit to the user
result, err := handleResourceCreateOrDeleteDiff(config, live)
if err != nil {
return nil, fmt.Errorf("error handling resource creation or deletion: %w", err)
}
return result, nil
}

// ServerSideDiff will execute a k8s server-side apply in dry-run mode with the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// ServerSideDiff will execute a k8s server-side apply in dry-run mode with the
// serverSideDiff will execute a k8s server-side apply in dry-run mode with the

// given config. The result will be compared with given live resource to determine
// diff. Modifications done by mutation webhooks are removed from the diff by default.
// This behaviour can be customized with Option.WithIgnoreMutationWebhook.
func serverSideDiff(config, live *unstructured.Unstructured, opts ...Option) (*DiffResult, error) {
o := applyOptions(opts)
if o.serverSideDryRunner == nil {
return nil, fmt.Errorf("serverSideDryRunner is null")
}
predictedLiveStr, err := o.serverSideDryRunner.Run(context.Background(), config, o.manager)
if err != nil {
return nil, fmt.Errorf("error running server side apply in dryrun mode: %w", err)
}
predictedLive, err := jsonStrToUnstructured(predictedLiveStr)
if err != nil {
return nil, fmt.Errorf("error converting json string to unstructured: %w", err)
}

if o.ignoreMutationWebhook {
predictedLive, err = removeWebhookMutation(predictedLive, live, o.gvkParser, o.manager)
if err != nil {
return nil, fmt.Errorf("error removing non config mutations: %w", err)
}
}

Normalize(predictedLive, opts...)
unstructured.RemoveNestedField(predictedLive.Object, "metadata", "managedFields")

predictedLiveBytes, err := json.Marshal(predictedLive)
if err != nil {
return nil, fmt.Errorf("error marshaling predicted live resource: %w", err)
}

unstructured.RemoveNestedField(live.Object, "metadata", "managedFields")
liveBytes, err := json.Marshal(live)
if err != nil {
return nil, fmt.Errorf("error marshaling live resource: %w", err)
}
return buildDiffResult(predictedLiveBytes, liveBytes), nil
}

// removeWebhookMutation will compare the predictedLive with live to identify
// changes done by mutation webhooks. Webhook mutations are identified by finding
// changes in predictedLive fields not associated with any manager in the
// managedFields. All fields under this condition will be reverted with their state
// from live. If the given predictedLive does not have the managedFields, an error
// will be returned.
func removeWebhookMutation(predictedLive, live *unstructured.Unstructured, gvkParser *managedfields.GvkParser, manager string) (*unstructured.Unstructured, error) {
plManagedFields := predictedLive.GetManagedFields()
if len(plManagedFields) == 0 {
return nil, fmt.Errorf("predictedLive for resource %s/%s must have the managedFields", predictedLive.GetKind(), predictedLive.GetName())
}
gvk := predictedLive.GetObjectKind().GroupVersionKind()
pt := gvkParser.Type(gvk)
typedPredictedLive, err := pt.FromUnstructured(predictedLive.Object)
if err != nil {
return nil, fmt.Errorf("error converting predicted live state from unstructured to %s: %w", gvk, err)
}

typedLive, err := pt.FromUnstructured(live.Object)
if err != nil {
return nil, fmt.Errorf("error converting live state from unstructured to %s: %w", gvk, err)
}

// Compare the predicted live with the live resource
comparison, err := typedLive.Compare(typedPredictedLive)
if err != nil {
return nil, fmt.Errorf("error comparing predicted resource to live resource: %w", err)
}

// Loop over all existing managers in predicted live resource to identify
// fields mutated (in predicted live) not owned by any manager.
for _, mfEntry := range plManagedFields {
mfs := &fieldpath.Set{}
err := mfs.FromJSON(bytes.NewReader(mfEntry.FieldsV1.Raw))
if err != nil {
return nil, fmt.Errorf("error building managedFields set: %s", err)
}
if comparison.Added != nil && !comparison.Added.Empty() {
// exclude the added fields owned by this manager from the comparison
comparison.Added = comparison.Added.Difference(mfs)
}
if comparison.Modified != nil && !comparison.Modified.Empty() {
// exclude the modified fields owned by this manager from the comparison
comparison.Modified = comparison.Modified.Difference(mfs)
}
if comparison.Removed != nil && !comparison.Removed.Empty() {
// exclude the removed fields owned by this manager from the comparison
comparison.Removed = comparison.Removed.Difference(mfs)
}
}
// At this point, comparison holds all mutations that aren't owned by any
// of the existing managers.

if comparison.Added != nil && !comparison.Added.Empty() {
// remove added fields that aren't owned by any manager
typedPredictedLive = typedPredictedLive.RemoveItems(comparison.Added)
}

if comparison.Modified != nil && !comparison.Modified.Empty() {
liveModValues := typedLive.ExtractItems(comparison.Modified)
// revert modified fields not owned by any manager
typedPredictedLive, err = typedPredictedLive.Merge(liveModValues)
if err != nil {
return nil, fmt.Errorf("error reverting webhook modified fields in predicted live resource: %s", err)
}
}

if comparison.Removed != nil && !comparison.Removed.Empty() {
liveRmValues := typedLive.ExtractItems(comparison.Removed)
// revert removed fields not owned by any manager
typedPredictedLive, err = typedPredictedLive.Merge(liveRmValues)
if err != nil {
return nil, fmt.Errorf("error reverting webhook removed fields in predicted live resource: %s", err)
}
}

plu := typedPredictedLive.AsValue().Unstructured()
pl, ok := plu.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("error converting live typedValue: expected map got %T", plu)
}
return &unstructured.Unstructured{Object: pl}, nil
}

func jsonStrToUnstructured(jsonString string) (*unstructured.Unstructured, error) {
res := make(map[string]interface{})
err := json.Unmarshal([]byte(jsonString), &res)
if err != nil {
return nil, fmt.Errorf("unmarshal error: %s", err)
}
return &unstructured.Unstructured{Object: res}, nil
}

// StructuredMergeDiff will calculate the diff using the structured-merge-diff
// k8s library (https://github.com/kubernetes-sigs/structured-merge-diff).
func StructuredMergeDiff(config, live *unstructured.Unstructured, gvkParser *managedfields.GvkParser, manager string) (*DiffResult, error) {
Expand Down
56 changes: 56 additions & 0 deletions pkg/diff/diff_options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package diff

import (
"context"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/managedfields"
"k8s.io/klog/v2/klogr"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)

type Option func(*options)
Expand All @@ -17,11 +21,15 @@ type options struct {
structuredMergeDiff bool
gvkParser *managedfields.GvkParser
manager string
serverSideDiff bool
serverSideDryRunner ServerSideDryRunner
ignoreMutationWebhook bool
}

func applyOptions(opts []Option) options {
o := options{
ignoreAggregatedRoles: false,
ignoreMutationWebhook: true,
normalizer: GetNoopNormalizer(),
log: klogr.New(),
}
Expand All @@ -31,6 +39,36 @@ func applyOptions(opts []Option) options {
return o
}

type KubeApplier interface {
ApplyResource(ctx context.Context, obj *unstructured.Unstructured, dryRunStrategy cmdutil.DryRunStrategy, force, validate, serverSideApply bool, manager string, serverSideDiff bool) (string, error)
}

// ServerSideDryRunner defines the contract to run a server-side apply in
// dryrun mode.
type ServerSideDryRunner interface {
Run(ctx context.Context, obj *unstructured.Unstructured, manager string) (string, error)
}

// K8sServerSideDryRunner is the Kubernetes implementation of ServerSideDryRunner.
type K8sServerSideDryRunner struct {
dryrunApplier KubeApplier
}

// NewK8sServerSideDryRunner will instantiate a new K8sServerSideDryRunner with
// the given kubeApplier.
func NewK8sServerSideDryRunner(kubeApplier KubeApplier) *K8sServerSideDryRunner {
return &K8sServerSideDryRunner{
dryrunApplier: kubeApplier,
}
}

// ServerSideApplyDryRun will invoke a kubernetes server-side apply with the given
// obj and the given manager in dryrun mode. Will return the predicted live state
// json as string.
func (kdr *K8sServerSideDryRunner) Run(ctx context.Context, obj *unstructured.Unstructured, manager string) (string, error) {
return kdr.dryrunApplier.ApplyResource(ctx, obj, cmdutil.DryRunServer, false, false, true, manager, true)
}

func IgnoreAggregatedRoles(ignore bool) Option {
return func(o *options) {
o.ignoreAggregatedRoles = ignore
Expand Down Expand Up @@ -66,3 +104,21 @@ func WithManager(manager string) Option {
o.manager = manager
}
}

func WithServerSideDiff(ssd bool) Option {
return func(o *options) {
o.serverSideDiff = ssd
}
}

func WithIgnoreMutationWebhook(mw bool) Option {
return func(o *options) {
o.ignoreMutationWebhook = mw
}
}

func WithServerSideDryRunner(ssadr ServerSideDryRunner) Option {
return func(o *options) {
o.serverSideDryRunner = ssadr
}
}
73 changes: 73 additions & 0 deletions pkg/diff/diff_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package diff

import (
"context"
"encoding/json"
"fmt"
"os"
Expand All @@ -9,9 +10,11 @@ import (
"strings"
"testing"

"github.com/argoproj/gitops-engine/pkg/diff/mocks"
"github.com/argoproj/gitops-engine/pkg/diff/testdata"
openapi_v2 "github.com/google/gnostic/openapiv2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -887,6 +890,76 @@ func TestStructuredMergeDiff(t *testing.T) {
})
}

func TestServerSideDiff(t *testing.T) {
buildOpts := func(predictedLive string) []Option {
gvkParser := buildGVKParser(t)
manager := "argocd-controller"
dryRunner := mocks.NewServerSideDryRunner(t)

dryRunner.On("Run", mock.Anything, mock.AnythingOfType("*unstructured.Unstructured"), manager).
Return(func(ctx context.Context, obj *unstructured.Unstructured, manager string) (string, error) {
return predictedLive, nil
})
opts := []Option{
WithGVKParser(gvkParser),
WithManager(manager),
WithServerSideDryRunner(dryRunner),
}

return opts
}

t.Run("will ignore modifications done by mutation webhook by default", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.ServiceLiveYAMLSSD)
desiredState := StrToUnstructured(testdata.ServiceConfigYAMLSSD)
opts := buildOpts(testdata.ServicePredictedLiveJSONSSD)

// when
result, err := serverSideDiff(desiredState, liveState, opts...)

// then
require.NoError(t, err)
assert.NotNil(t, result)
assert.True(t, result.Modified)
predictedSVC := YamlToSvc(t, result.PredictedLive)
liveSVC := YamlToSvc(t, result.NormalizedLive)
require.NotNil(t, predictedSVC.Spec.InternalTrafficPolicy)
require.NotNil(t, liveSVC.Spec.InternalTrafficPolicy)
assert.Equal(t, "Cluster", string(*predictedSVC.Spec.InternalTrafficPolicy))
assert.Equal(t, "Cluster", string(*liveSVC.Spec.InternalTrafficPolicy))
assert.Empty(t, predictedSVC.Annotations[AnnotationLastAppliedConfig])
assert.Empty(t, liveSVC.Annotations[AnnotationLastAppliedConfig])
assert.Empty(t, predictedSVC.Labels["event"])
})
t.Run("will include mutation webhook modifications", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.ServiceLiveYAMLSSD)
desiredState := StrToUnstructured(testdata.ServiceConfigYAMLSSD)
opts := buildOpts(testdata.ServicePredictedLiveJSONSSD)
opts = append(opts, WithIgnoreMutationWebhook(false))

// when
result, err := serverSideDiff(desiredState, liveState, opts...)

// then
require.NoError(t, err)
assert.NotNil(t, result)
assert.True(t, result.Modified)
predictedSVC := YamlToSvc(t, result.PredictedLive)
liveSVC := YamlToSvc(t, result.NormalizedLive)
require.NotNil(t, predictedSVC.Spec.InternalTrafficPolicy)
require.NotNil(t, liveSVC.Spec.InternalTrafficPolicy)
assert.Equal(t, "Cluster", string(*predictedSVC.Spec.InternalTrafficPolicy))
assert.Equal(t, "Cluster", string(*liveSVC.Spec.InternalTrafficPolicy))
assert.Empty(t, predictedSVC.Annotations[AnnotationLastAppliedConfig])
assert.Empty(t, liveSVC.Annotations[AnnotationLastAppliedConfig])
assert.NotEmpty(t, predictedSVC.Labels["event"])
})
}

func createSecret(data map[string]string) *unstructured.Unstructured {
secret := corev1.Secret{TypeMeta: metav1.TypeMeta{Kind: "Secret"}}
if data != nil {
Expand Down
Loading