Skip to content

Commit

Permalink
Merge pull request #2744 from ncdc/make-syncer-vw-global
Browse files Browse the repository at this point in the history
🌱 Make syncer vw global
  • Loading branch information
openshift-merge-robot authored Feb 13, 2023
2 parents b505b8a + c0b0f1a commit 49d1b8c
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 46 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ test-e2e-shared: require-kind build-all build-kind-images
./bin/test-server --quiet --log-file-path="$(LOG_DIR)/kcp.log" $(TEST_SERVER_ARGS) 2>&1 & PID=$$! && echo "PID $$PID" && \
trap 'kill -TERM $$PID' TERM INT EXIT && \
while [ ! -f "$(WORK_DIR)/.kcp/ready-to-test" ]; do sleep 1; done && \
echo 'Starting test(s)' && \
NO_GORUN=1 GOOS=$(OS) GOARCH=$(ARCH) \
$(GO_TEST) -race $(COUNT_ARG) $(PARALLELISM_ARG) $(WHAT) $(TEST_ARGS) \
-args --use-default-kcp-server --syncer-image="$(SYNCER_IMAGE)" --kcp-test-image="$(TEST_IMAGE)" --pcluster-kubeconfig="$(abspath $(WORK_DIR)/.kcp/kind.kubeconfig)" $(SUITES_ARG) \
Expand All @@ -303,6 +304,7 @@ test-e2e-shared-minimal: build-all
./bin/test-server --quiet --log-file-path="$(LOG_DIR)/kcp.log" $(TEST_SERVER_ARGS) 2>&1 & PID=$$! && echo "PID $$PID" && \
trap 'kill -TERM $$PID' TERM INT EXIT && \
while [ ! -f "$(WORK_DIR)/.kcp/ready-to-test" ]; do sleep 1; done && \
echo 'Starting test(s)' && \
NO_GORUN=1 GOOS=$(OS) GOARCH=$(ARCH) \
$(GO_TEST) -race $(COUNT_ARG) $(PARALLELISM_ARG) $(WHAT) $(TEST_ARGS) \
-args --use-default-kcp-server $(SUITES_ARG) \
Expand All @@ -329,6 +331,7 @@ test-e2e-sharded: require-kind build-all build-kind-images
./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=$(SHARDS) 2>&1 & PID=$$!; echo "PID $$PID" && \
trap 'kill -TERM $$PID' TERM INT EXIT && \
while [ ! -f "$(WORK_DIR)/.kcp/ready-to-test" ]; do sleep 1; done && \
echo 'Starting test(s)' && \
NO_GORUN=1 GOOS=$(OS) GOARCH=$(ARCH) \
$(GO_TEST) -race $(COUNT_ARG) $(PARALLELISM_ARG) $(WHAT) $(TEST_ARGS) \
-args --use-default-kcp-server --shard-kubeconfigs=root=$(PWD)/.kcp-0/admin.kubeconfig$(shell if [ $(SHARDS) -gt 1 ]; then seq 1 $$[$(SHARDS) - 1]; fi | while read n; do echo -n ",shard-$$n=$(PWD)/.kcp-$$n/admin.kubeconfig"; done) \
Expand All @@ -355,6 +358,7 @@ test-e2e-sharded-minimal: build-all
UNSAFE_E2E_HACK_DISABLE_ETCD_FSYNC=true NO_GORUN=1 ./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=$(SHARDS) 2>&1 & PID=$$!; echo "PID $$PID" && \
trap 'kill -TERM $$PID' TERM INT EXIT && \
while [ ! -f "$(WORK_DIR)/.kcp/ready-to-test" ]; do sleep 1; done && \
echo 'Starting test(s)' && \
NO_GORUN=1 GOOS=$(OS) GOARCH=$(ARCH) $(GO_TEST) -race $(COUNT_ARG) $(PARALLELISM_ARG) $(WHAT) $(TEST_ARGS) \
-args --use-default-kcp-server --shard-kubeconfigs=root=$(PWD)/.kcp-0/admin.kubeconfig$(shell if [ $(SHARDS) -gt 1 ]; then seq 1 $$[$(SHARDS) - 1]; fi | while read n; do echo -n ",shard-$$n=$(PWD)/.kcp-$$n/admin.kubeconfig"; done) \
$(SUITES_ARGS) \
Expand Down
6 changes: 4 additions & 2 deletions pkg/reconciler/cache/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import (
apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/pkg/apis/core"
corev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/core/v1alpha1"
schedulingv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/scheduling/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
cacheclient "github.com/kcp-dev/kcp/pkg/cache/client"
"github.com/kcp-dev/kcp/pkg/cache/client/shard"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -114,12 +116,12 @@ func NewController(
local: localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(),
global: globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(),
},
tenancyv1alpha1.SchemeGroupVersion.WithResource("synctargets"): {
workloadv1alpha1.SchemeGroupVersion.WithResource("synctargets"): {
kind: "SyncTarget",
local: localKcpInformers.Workload().V1alpha1().SyncTargets().Informer(),
global: globalKcpInformers.Workload().V1alpha1().SyncTargets().Informer(),
},
tenancyv1alpha1.SchemeGroupVersion.WithResource("locations"): {
schedulingv1alpha1.SchemeGroupVersion.WithResource("locations"): {
kind: "Location",
local: localKcpInformers.Scheduling().V1alpha1().Locations().Informer(),
global: globalKcpInformers.Scheduling().V1alpha1().Locations().Informer(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/virtual/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (o *Options) NewVirtualWorkspaces(
wildcardKubeInformers kcpkubernetesinformers.SharedInformerFactory,
wildcardKcpInformers, cachedKcpInformers kcpinformers.SharedInformerFactory,
) ([]rootapiserver.NamedVirtualWorkspace, error) {
syncer, err := o.Syncer.NewVirtualWorkspaces(rootPathPrefix, config, wildcardKcpInformers)
syncer, err := o.Syncer.NewVirtualWorkspaces(rootPathPrefix, config, cachedKcpInformers)
if err != nil {
return nil, err
}
Expand Down
31 changes: 15 additions & 16 deletions pkg/virtual/syncer/builder/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"k8s.io/client-go/tools/cache"

workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/virtual/framework/forwardingregistry"
"github.com/kcp-dev/kcp/pkg/virtual/framework/rootapiserver"
"github.com/kcp-dev/kcp/pkg/virtual/syncer/controllers/apireconciler"
Expand All @@ -48,31 +48,30 @@ func BuildVirtualWorkspace(
rootPathPrefix string,
kubeClusterClient kcpkubernetesclientset.ClusterInterface,
dynamicClusterClient kcpdynamic.ClusterInterface,
kcpClusterClient kcpclientset.ClusterInterface,
wildcardKcpInformers kcpinformers.SharedInformerFactory,
cachedKCPInformers kcpinformers.SharedInformerFactory,
) []rootapiserver.NamedVirtualWorkspace {
if !strings.HasSuffix(rootPathPrefix, "/") {
rootPathPrefix += "/"
}

// Setup the APIReconciler indexes to share between both virtualworkspaces.
if err := wildcardKcpInformers.Workload().V1alpha1().SyncTargets().Informer().AddIndexers(cache.Indexers{
apireconciler.IndexSyncTargetsByExport: apireconciler.IndexSyncTargetsByExports,
}); err != nil {
return nil
}

if err := wildcardKcpInformers.Apis().V1alpha1().APIExports().Informer().AddIndexers(cache.Indexers{
apireconciler.IndexAPIExportsByAPIResourceSchema: apireconciler.IndexAPIExportsByAPIResourceSchemas,
}); err != nil {
return nil
}
indexers.AddIfNotPresentOrDie(
cachedKCPInformers.Workload().V1alpha1().SyncTargets().Informer().GetIndexer(),
cache.Indexers{
apireconciler.IndexSyncTargetsByExport: apireconciler.IndexSyncTargetsByExports,
},
)
indexers.AddIfNotPresentOrDie(
cachedKCPInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(),
cache.Indexers{
apireconciler.IndexAPIExportsByAPIResourceSchema: apireconciler.IndexAPIExportsByAPIResourceSchemas,
},
)

provider := templateProvider{
kubeClusterClient: kubeClusterClient,
dynamicClusterClient: dynamicClusterClient,
kcpClusterClient: kcpClusterClient,
wildcardKcpInformers: wildcardKcpInformers,
cachedKCPInformers: cachedKCPInformers,
rootPathPrefix: rootPathPrefix,
}

Expand Down
25 changes: 11 additions & 14 deletions pkg/virtual/syncer/builder/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
"github.com/kcp-dev/kcp/pkg/authorization/delegated"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
"github.com/kcp-dev/kcp/pkg/virtual/framework"
virtualworkspacesdynamic "github.com/kcp-dev/kcp/pkg/virtual/framework/dynamic"
Expand All @@ -54,8 +53,7 @@ import (
type templateProvider struct {
kubeClusterClient kcpkubernetesclientset.ClusterInterface
dynamicClusterClient kcpdynamic.ClusterInterface
kcpClusterClient kcpclientset.ClusterInterface
wildcardKcpInformers kcpinformers.SharedInformerFactory
cachedKCPInformers kcpinformers.SharedInformerFactory
rootPathPrefix string
}

Expand All @@ -70,8 +68,8 @@ type templateParameters struct {
storageWrapperBuilder func(labels.Requirements) forwardingregistry.StorageWrapper
}

func (p *templateProvider) newTemplate(parameters templateParameters) template {
return template{
func (p *templateProvider) newTemplate(parameters templateParameters) *template {
return &template{
templateProvider: *p,
templateParameters: parameters,
readyCh: make(chan struct{}),
Expand Down Expand Up @@ -120,7 +118,7 @@ func (t *template) resolveRootPath(urlPath string, requestContext context.Contex

// In order to avoid conflicts with reusing deleted synctarget names, let's make sure that the synctarget name and synctarget UID match, if not,
// that likely means that a syncer is running with a stale synctarget that got deleted.
syncTarget, err := t.wildcardKcpInformers.Workload().V1alpha1().SyncTargets().Cluster(clusterName).Lister().Get(syncTargetName)
syncTarget, err := t.cachedKCPInformers.Workload().V1alpha1().SyncTargets().Cluster(clusterName).Lister().Get(syncTargetName)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get synctarget %s|%s: %w", path, syncTargetName, err))
return
Expand Down Expand Up @@ -205,10 +203,9 @@ func (t *template) authorize(ctx context.Context, a authorizer.Attributes) (auth
func (t *template) bootstrapManagement(mainConfig genericapiserver.CompletedConfig) (apidefinition.APIDefinitionSetGetter, error) {
apiReconciler, err := apireconciler.NewAPIReconciler(
t.virtualWorkspaceName,
t.kcpClusterClient,
t.wildcardKcpInformers.Workload().V1alpha1().SyncTargets(),
t.wildcardKcpInformers.Apis().V1alpha1().APIResourceSchemas(),
t.wildcardKcpInformers.Apis().V1alpha1().APIExports(),
t.cachedKCPInformers.Workload().V1alpha1().SyncTargets(),
t.cachedKCPInformers.Apis().V1alpha1().APIResourceSchemas(),
t.cachedKCPInformers.Apis().V1alpha1().APIExports(),
func(syncTargetClusterName logicalcluster.Name, syncTargetName string, apiResourceSchema *apisv1alpha1.APIResourceSchema, version string, apiExportIdentityHash string) (apidefinition.APIDefinition, error) {
syncTargetKey := workloadv1alpha1.ToSyncTargetKey(syncTargetClusterName, syncTargetName)
requirements, selectable := labels.SelectorFromSet(map[string]string{
Expand Down Expand Up @@ -244,9 +241,9 @@ func (t *template) bootstrapManagement(mainConfig genericapiserver.CompletedConf
defer close(t.readyCh)

for name, informer := range map[string]cache.SharedIndexInformer{
"synctargets": t.wildcardKcpInformers.Workload().V1alpha1().SyncTargets().Informer(),
"apiresourceschemas": t.wildcardKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
"apiexports": t.wildcardKcpInformers.Apis().V1alpha1().APIExports().Informer(),
"synctargets": t.cachedKCPInformers.Workload().V1alpha1().SyncTargets().Informer(),
"apiresourceschemas": t.cachedKCPInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
"apiexports": t.cachedKCPInformers.Apis().V1alpha1().APIExports().Informer(),
} {
if !cache.WaitForNamedCacheSync(name, hookContext.StopCh, informer.HasSynced) {
klog.Background().Error(nil, "informer not synced")
Expand All @@ -263,7 +260,7 @@ func (t *template) bootstrapManagement(mainConfig genericapiserver.CompletedConf
return apiReconciler, nil
}

func (t template) buildVirtualWorkspace() *virtualworkspacesdynamic.DynamicVirtualWorkspace {
func (t *template) buildVirtualWorkspace() *virtualworkspacesdynamic.DynamicVirtualWorkspace {
return &virtualworkspacesdynamic.DynamicVirtualWorkspace{
RootPathResolver: framework.RootPathResolverFunc(t.resolveRootPath),
Authorizer: authorizer.AuthorizerFunc(t.authorize),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
apisv1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/apis/v1alpha1"
workloadv1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/workload/v1alpha1"
apisv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/apis/v1alpha1"
Expand All @@ -59,7 +58,6 @@ type AllowedAPIfilterFunc func(apiGroupResource schema.GroupResource) bool

func NewAPIReconciler(
virtualWorkspaceName string,
kcpClusterClient kcpclientset.ClusterInterface,
syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer,
apiResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer,
apiExportInformer apisv1alpha1informers.APIExportClusterInformer,
Expand All @@ -71,8 +69,6 @@ func NewAPIReconciler(
c := &APIReconciler{
virtualWorkspaceName: virtualWorkspaceName,

kcpClusterClient: kcpClusterClient,

syncTargetLister: syncTargetInformer.Lister(),
syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(),

Expand Down Expand Up @@ -128,8 +124,6 @@ func NewAPIReconciler(
type APIReconciler struct {
virtualWorkspaceName string

kcpClusterClient kcpclientset.ClusterInterface

syncTargetLister workloadv1alpha1listers.SyncTargetClusterLister
syncTargetIndexer cache.Indexer

Expand Down
9 changes: 2 additions & 7 deletions pkg/virtual/syncer/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"k8s.io/client-go/rest"

kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
"github.com/kcp-dev/kcp/pkg/virtual/framework/rootapiserver"
"github.com/kcp-dev/kcp/pkg/virtual/syncer/builder"
Expand Down Expand Up @@ -53,13 +52,9 @@ func (o *Syncer) Validate(flagPrefix string) []error {
func (o *Syncer) NewVirtualWorkspaces(
rootPathPrefix string,
config *rest.Config,
wildcardKcpInformers kcpinformers.SharedInformerFactory,
cachedKCPInformers kcpinformers.SharedInformerFactory,
) (workspaces []rootapiserver.NamedVirtualWorkspace, err error) {
config = rest.AddUserAgent(rest.CopyConfig(config), "syncer-virtual-workspace")
kcpClusterClient, err := kcpclientset.NewForConfig(config)
if err != nil {
return nil, err
}
kubeClusterClient, err := kcpkubernetesclientset.NewForConfig(config)
if err != nil {
return nil, err
Expand All @@ -69,5 +64,5 @@ func (o *Syncer) NewVirtualWorkspaces(
return nil, err
}

return builder.BuildVirtualWorkspace(rootPathPrefix, kubeClusterClient, dynamicClusterClient, kcpClusterClient, wildcardKcpInformers), nil
return builder.BuildVirtualWorkspace(rootPathPrefix, kubeClusterClient, dynamicClusterClient, cachedKCPInformers), nil
}

0 comments on commit 49d1b8c

Please sign in to comment.