Skip to content

Commit 5e0e5c5

Browse files
authoredSep 20, 2024··
✨ Enable the agent to send event to inventory server (#1104)
* ✨ Install the agent by helm chart (#1090) * add standalone mode Signed-off-by: myan <myan@redhat.com> d Signed-off-by: myan <myan@redhat.com> agent support multiple transports Signed-off-by: myan <myan@redhat.com> fix the transport type Signed-off-by: myan <myan@redhat.com> improve ut Signed-off-by: myan <myan@redhat.com> fix the error Signed-off-by: myan <myan@redhat.com> remove the useless Signed-off-by: myan <myan@redhat.com> add e2e test Signed-off-by: myan <myan@redhat.com> format Signed-off-by: myan <myan@redhat.com> remove the Signed-off-by: myan <myan@redhat.com> fix the grafana Signed-off-by: myan <myan@redhat.com> fix the e2e Signed-off-by: myan <myan@redhat.com> pg and kafka Signed-off-by: myan <myan@redhat.com> fix the ns Signed-off-by: myan <myan@redhat.com> renaming Signed-off-by: myan <myan@redhat.com> rerun Signed-off-by: myan <myan@redhat.com> add standalone mode Signed-off-by: myan <myan@redhat.com> d Signed-off-by: myan <myan@redhat.com> agent support multiple transports Signed-off-by: myan <myan@redhat.com> fix the transport type Signed-off-by: myan <myan@redhat.com> improve ut Signed-off-by: myan <myan@redhat.com> fix the error Signed-off-by: myan <myan@redhat.com> remove the useless Signed-off-by: myan <myan@redhat.com> add e2e test Signed-off-by: myan <myan@redhat.com> format Signed-off-by: myan <myan@redhat.com> remove the Signed-off-by: myan <myan@redhat.com> fix the grafana Signed-off-by: myan <myan@redhat.com> fix the e2e Signed-off-by: myan <myan@redhat.com> pg and kafka Signed-off-by: myan <myan@redhat.com> fix the ns Signed-off-by: myan <myan@redhat.com> install agent by helm chart Signed-off-by: myan <myan@redhat.com> fix helm Signed-off-by: myan <myan@redhat.com> update doc Signed-off-by: myan <myan@redhat.com> fix the secret error Signed-off-by: myan <myan@redhat.com> doc Signed-off-by: myan <myan@redhat.com> test Signed-off-by: myan <myan@redhat.com> * add the review Signed-off-by: myan <myan@redhat.com> * add clusterId as default leaf_hub_name for standalone Signed-off-by: myan <myan@redhat.com> * add clusterId as default leaf_hub_name for standalone Signed-off-by: myan <myan@redhat.com> * improve ut Signed-off-by: myan <myan@redhat.com> * ut Signed-off-by: myan <myan@redhat.com> --------- Signed-off-by: myan <myan@redhat.com> :bug: Fix the issue of event loss caused by time filtering. (#1101) * skipp the expired event Signed-off-by: myan <myan@redhat.com> * fix the ut Signed-off-by: myan <myan@redhat.com> * integration Signed-off-by: myan <myan@redhat.com> --------- Signed-off-by: myan <myan@redhat.com> kind_cluster (#1098) Signed-off-by: myan <myan@redhat.com> MGMT-18597: Poll Stackrox Central for violation counts and push it to kafka (#1091) This patch changes the agent so that it discovers Stackrox _central_ instances in the hub, polls them to extract the summary of security violations and sends them to the manager via the Kafka broker. Related: https://issues.redhat.com/browse/MGMT-18597 Signed-off-by: danmanor <dmanor@redhat.com> Co-authored-by: danmanor <dmanor@redhat.com> ACM-14143: Fix section levels in StackRox integration doc (#1106) The levels of the sections are incorrect, this patch fixes them. Related: https://issues.redhat.com/browse/ACM-14143 Related: https://issues.redhat.com/browse/MGMT-18591 Signed-off-by: Juan Hernandez <juan.hernandez@redhat.com> :sparkles: Introduce managedclustermigration api (#1102) * support migration Signed-off-by: clyang82 <chuyang@redhat.com> * rename to managedclustermigration Signed-off-by: clyang82 <chuyang@redhat.com> --------- Signed-off-by: clyang82 <chuyang@redhat.com> MGMT-18903: Add 'source' column to violations by severity count SQL table to support multiple central instances in one hub (#1105) Signed-off-by: danmanor <dmanor@redhat.com> fix install hosted in other ns (#1107) Signed-off-by: DangPeng Liu <daliu@redhat.com> support sending to the inventory server Signed-off-by: myan <myan@redhat.com> generate the secret Signed-off-by: myan <myan@redhat.com> fix the e2e Signed-off-by: myan <myan@redhat.com> shell Signed-off-by: myan <myan@redhat.com> local test finished Signed-off-by: myan <myan@redhat.com> rebase Signed-off-by: myan <myan@redhat.com> fix the ut Signed-off-by: myan <myan@redhat.com> add inventory unit test Signed-off-by: myan <myan@redhat.com> add ut Signed-off-by: myan <myan@redhat.com> ignore leak Signed-off-by: myan <myan@redhat.com> watch the clusterinfo Signed-off-by: myan <myan@redhat.com> upgrade addon Signed-off-by: myan <myan@redhat.com> add ut Signed-off-by: myan <myan@redhat.com> format Signed-off-by: myan <myan@redhat.com> fix the e2e Signed-off-by: myan <myan@redhat.com> fix the consumer error Signed-off-by: myan <myan@redhat.com> * fix sum Signed-off-by: myan <myan@redhat.com> * reply review Signed-off-by: myan <myan@redhat.com> * remove the namespace Signed-off-by: myan <myan@redhat.com> --------- Signed-off-by: myan <myan@redhat.com>
1 parent 604bed7 commit 5e0e5c5

36 files changed

+2684
-289
lines changed
 

‎agent/cmd/agent/main.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
configv1 "github.com/openshift/api/config/v1"
1111
routev1 "github.com/openshift/api/route/v1"
1212
"github.com/spf13/pflag"
13+
clusterinfov1beta1 "github.com/stolostron/cluster-lifecycle-api/clusterinfo/v1beta1"
1314
coordinationv1 "k8s.io/api/coordination/v1"
1415
corev1 "k8s.io/api/core/v1"
1516
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@@ -58,7 +59,7 @@ func main() {
5859
restConfig.QPS = agentConfig.QPS
5960
restConfig.Burst = agentConfig.Burst
6061

61-
c, err := client.New(restConfig, client.Options{})
62+
c, err := client.New(restConfig, client.Options{Scheme: config.GetRuntimeScheme()})
6263
if err != nil {
6364
setupLog.Error(err, "failed to int controller runtime client")
6465
os.Exit(1)
@@ -125,8 +126,6 @@ func parseFlags() *config.AgentConfig {
125126
pflag.StringVar(&agentConfig.LeafHubName, "leaf-hub-name", "", "The name of the leaf hub.")
126127
pflag.StringVar(&agentConfig.PodNamespace, "pod-namespace", constants.GHAgentNamespace,
127128
"The agent running namespace, also used as leader election namespace")
128-
pflag.StringVar(&agentConfig.TransportConfig.TransportType, "transport-type", "kafka",
129-
"The transport type, 'kafka'")
130129
pflag.IntVar(&agentConfig.SpecWorkPoolSize, "consumer-worker-pool-size", 10,
131130
"The goroutine number to propagate the bundles on managed cluster.")
132131
pflag.BoolVar(&agentConfig.SpecEnforceHohRbac, "enforce-hoh-rbac", false,
@@ -175,11 +174,11 @@ func completeConfig(ctx context.Context, c client.Client, agentConfig *config.Ag
175174
return fmt.Errorf("failed to get the ClusterVersion(version): %w", err)
176175
}
177176

178-
clusterId := string(clusterVersion.Spec.ClusterID)
179-
if clusterId == "" {
177+
clusterID := string(clusterVersion.Spec.ClusterID)
178+
if clusterID == "" {
180179
return fmt.Errorf("the clusterId from ClusterVersion must not be empty")
181180
}
182-
agentConfig.LeafHubName = clusterId
181+
agentConfig.LeafHubName = clusterID
183182
}
184183

185184
if agentConfig.MetricsAddress == "" {
@@ -275,6 +274,7 @@ func initCache(restConfig *rest.Config, cacheOpts cache.Options) (cache.Cache, e
275274
&apiextensionsv1.CustomResourceDefinition{}: {},
276275
&policyv1.Policy{}: {},
277276
&clusterv1.ManagedCluster{}: {},
277+
&clusterinfov1beta1.ManagedClusterInfo{}: {},
278278
&clustersv1alpha1.ClusterClaim{}: {},
279279
&routev1.Route{}: {},
280280
&placementrulev1.PlacementRule{}: {},

‎agent/cmd/agent/main_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,13 @@ func TestParseFlags(t *testing.T) {
2525
"cmd",
2626
"--leaf-hub-name=test-hub",
2727
"--pod-namespace=test-namespace",
28-
"--transport-type=kafka",
2928
"--consumer-worker-pool-size=5",
3029
}
3130

3231
agentConfig := parseFlags()
3332

3433
assert.Equal(t, "test-hub", agentConfig.LeafHubName)
3534
assert.Equal(t, "test-namespace", agentConfig.PodNamespace)
36-
assert.Equal(t, "kafka", agentConfig.TransportConfig.TransportType)
3735
assert.Equal(t, 5, agentConfig.SpecWorkPoolSize)
3836
}
3937

‎agent/pkg/config/scheme.go

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package config
66
import (
77
configv1 "github.com/openshift/api/config/v1"
88
routev1 "github.com/openshift/api/route/v1"
9+
clusterinfov1beta1 "github.com/stolostron/cluster-lifecycle-api/clusterinfo/v1beta1"
910
mchv1 "github.com/stolostron/multiclusterhub-operator/api/v1"
1011
coordinationv1 "k8s.io/api/coordination/v1"
1112
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@@ -44,5 +45,6 @@ func GetRuntimeScheme() *runtime.Scheme {
4445
utilruntime.Must(channelv1.AddToScheme(scheme))
4546
utilruntime.Must(appsubv1.SchemeBuilder.AddToScheme(scheme))
4647
utilruntime.Must(appv1beta1.AddToScheme(scheme))
48+
utilruntime.Must(clusterinfov1beta1.AddToScheme(scheme))
4749
return scheme
4850
}

‎agent/pkg/status/controller/controller.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ func AddControllers(ctx context.Context, mgr ctrl.Manager, producer transport.Pr
3030
if statusCtrlStarted {
3131
return nil
3232
}
33+
// managed cluster info
34+
if err := managedclusters.LaunchManagedClusterInfoSyncer(ctx, mgr, agentConfig, producer); err != nil {
35+
return fmt.Errorf("failed to launch managedclusterinfo syncer: %w", err)
36+
}
37+
38+
// if it's rest transport, skip the following controllers
39+
if agentConfig.TransportConfig.TransportType == string(transport.Rest) {
40+
statusCtrlStarted = true
41+
return nil
42+
}
3343

3444
if err := agentstatusconfig.AddConfigController(mgr, agentConfig); err != nil {
3545
return fmt.Errorf("failed to add ConfigMap controller: %w", err)
@@ -86,6 +96,5 @@ func AddControllers(ctx context.Context, mgr ctrl.Manager, producer transport.Pr
8696
return fmt.Errorf("failed to launch time filter: %w", err)
8797
}
8898

89-
statusCtrlStarted = true
9099
return nil
91100
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package managedclusters
2+
3+
import (
4+
"context"
5+
6+
clusterinfov1beta1 "github.com/stolostron/cluster-lifecycle-api/clusterinfo/v1beta1"
7+
ctrl "sigs.k8s.io/controller-runtime"
8+
"sigs.k8s.io/controller-runtime/pkg/client"
9+
"sigs.k8s.io/controller-runtime/pkg/predicate"
10+
11+
"github.com/stolostron/multicluster-global-hub/agent/pkg/config"
12+
statusconfig "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/config"
13+
"github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/generic"
14+
"github.com/stolostron/multicluster-global-hub/pkg/constants"
15+
"github.com/stolostron/multicluster-global-hub/pkg/enum"
16+
"github.com/stolostron/multicluster-global-hub/pkg/transport"
17+
"github.com/stolostron/multicluster-global-hub/pkg/utils"
18+
)
19+
20+
// LaunchManagedClusterInfoSyncer only for the restful client
21+
func LaunchManagedClusterInfoSyncer(ctx context.Context, mgr ctrl.Manager, agentConfig *config.AgentConfig,
22+
producer transport.Producer,
23+
) error {
24+
if agentConfig.TransportConfig.TransportType != string(transport.Rest) {
25+
return nil
26+
}
27+
28+
// controller config
29+
instance := func() client.Object { return &clusterinfov1beta1.ManagedClusterInfo{} }
30+
predicate := predicate.NewPredicateFuncs(func(object client.Object) bool { return true })
31+
32+
// emitter config
33+
tweakFunc := func(object client.Object) {
34+
utils.MergeAnnotations(object, map[string]string{
35+
constants.ManagedClusterManagedByAnnotation: statusconfig.GetLeafHubName(),
36+
})
37+
}
38+
emitter := generic.ObjectEmitterWrapper(enum.ManagedClusterInfoType, func(obj client.Object) bool {
39+
return true
40+
}, tweakFunc, false)
41+
42+
return generic.LaunchGenericObjectSyncer(
43+
"status.managed_cluster_info",
44+
mgr,
45+
generic.NewGenericController(instance, predicate),
46+
producer,
47+
statusconfig.GetManagerClusterDuration,
48+
[]generic.ObjectEmitter{
49+
emitter,
50+
})
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package managedclusters
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"k8s.io/client-go/rest"
9+
"sigs.k8s.io/controller-runtime/pkg/manager"
10+
11+
"github.com/stolostron/multicluster-global-hub/agent/pkg/config"
12+
"github.com/stolostron/multicluster-global-hub/pkg/transport"
13+
)
14+
15+
func TestLaunchManagedClusterInfoSyncer(t *testing.T) {
16+
ctx := context.Background()
17+
agentConfig := &config.AgentConfig{
18+
TransportConfig: &transport.TransportConfig{
19+
TransportType: string(transport.Rest),
20+
},
21+
}
22+
cfg := &rest.Config{
23+
Host: "https://mock-cluster",
24+
APIPath: "/api",
25+
BearerToken: "mock-token",
26+
TLSClientConfig: rest.TLSClientConfig{Insecure: true},
27+
}
28+
mgr, err := manager.New(cfg, manager.Options{Scheme: config.GetRuntimeScheme()})
29+
assert.Nil(t, err)
30+
err = LaunchManagedClusterInfoSyncer(ctx, mgr, agentConfig, nil)
31+
assert.Nil(t, err)
32+
}

‎doc/event-exporter/templates/clusterrole.yaml

+17-1
Original file line numberDiff line numberDiff line change
@@ -181,4 +181,20 @@ rules:
181181
verbs:
182182
- list
183183
- watch
184-
- get
184+
- get
185+
- apiGroups:
186+
- config.openshift.io
187+
resources:
188+
- clusterversions
189+
verbs:
190+
- get
191+
- list
192+
- watch
193+
- apiGroups:
194+
- internal.open-cluster-management.io
195+
resources:
196+
- managedclusterinfos
197+
verbs:
198+
- get
199+
- list
200+
- watch

‎go.mod

+60-36
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
module github.com/stolostron/multicluster-global-hub
22

3-
go 1.22.4
3+
go 1.22.5
44

55
require (
66
github.com/RedHatInsights/strimzi-client-go v0.34.2
77
github.com/Shopify/sarama v1.38.1
88
github.com/cenkalti/backoff/v4 v4.3.0
9-
github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 v2.0.0-20240805051634-1aecb204b50d
9+
github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 v2.0.0-20240911135016-682f3a9684e4
1010
github.com/cloudevents/sdk-go/v2 v2.15.2
1111
github.com/cloudflare/cfssl v1.6.5
12-
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0
12+
github.com/confluentinc/confluent-kafka-go/v2 v2.5.3
1313
github.com/crunchydata/postgres-operator v1.3.3-0.20230629151007-94ebcf2df74d
1414
github.com/deckarep/golang-set v1.8.0
1515
github.com/evanphx/json-patch v5.7.0+incompatible
@@ -29,18 +29,20 @@ require (
2929
github.com/openshift/library-go v0.0.0-20240723172506-8bb8fe6cc56d
3030
github.com/operator-framework/api v0.17.7-0.20230626210316-aa3e49803e7b
3131
github.com/operator-framework/operator-lifecycle-manager v0.22.0
32+
github.com/project-kessel/inventory-api v0.0.0-20240902141731-aad011c715fd
33+
github.com/project-kessel/inventory-client-go v0.0.0-20240918035700-76e5efdd0022
3234
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.63.0
3335
github.com/spf13/pflag v1.0.5
34-
github.com/stolostron/cluster-lifecycle-api v0.0.0-20230222063645-5b18b26381ff
36+
github.com/stolostron/cluster-lifecycle-api v0.0.0-20240918064238-a5e71b599118
3537
github.com/stolostron/klusterlet-addon-controller v0.0.0-20230528112800-a466a2368df4
3638
github.com/stolostron/multiclusterhub-operator v0.0.0-20230829141355-4ad378ab367f
3739
github.com/stretchr/testify v1.9.0
3840
go.uber.org/zap v1.27.0
3941
gopkg.in/ini.v1 v1.67.0
4042
gopkg.in/yaml.v2 v2.4.0
4143
gorm.io/datatypes v1.2.0
42-
gorm.io/driver/postgres v1.5.2
43-
gorm.io/gorm v1.25.4
44+
gorm.io/driver/postgres v1.5.9
45+
gorm.io/gorm v1.25.11
4446
k8s.io/api v0.31.0
4547
k8s.io/apiextensions-apiserver v0.31.0
4648
k8s.io/apimachinery v0.31.0
@@ -49,10 +51,10 @@ require (
4951
k8s.io/kube-aggregator v0.31.0
5052
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
5153
open-cluster-management.io/addon-framework v0.10.0
52-
open-cluster-management.io/api v0.13.0
54+
open-cluster-management.io/api v0.14.1-0.20240627145512-bd6f2229b53c
5355
open-cluster-management.io/governance-policy-propagator v0.11.1-0.20230815182526-b4ee1b24b1d0
54-
open-cluster-management.io/multicloud-operators-channel v0.11.0
55-
open-cluster-management.io/multicloud-operators-subscription v0.11.0
56+
open-cluster-management.io/multicloud-operators-channel v0.13.1-0.20240423040139-ad986cafc6e8
57+
open-cluster-management.io/multicloud-operators-subscription v0.14.0
5658
sigs.k8s.io/application v0.8.3
5759
sigs.k8s.io/controller-runtime v0.19.0
5860
sigs.k8s.io/kustomize/api v0.17.2
@@ -61,19 +63,42 @@ require (
6163
)
6264

6365
require (
66+
cloud.google.com/go v0.99.0 // indirect
67+
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
68+
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b // indirect
69+
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect
70+
github.com/docker/cli v26.1.4+incompatible // indirect
71+
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
6472
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
73+
github.com/go-kratos/aegis v0.2.0 // indirect
74+
github.com/go-kratos/kratos/v2 v2.8.0 // indirect
6575
github.com/go-ole/go-ole v1.3.0 // indirect
76+
github.com/go-playground/form/v4 v4.2.1 // indirect
6677
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
78+
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
6779
github.com/google/certificate-transparency-go v1.1.7 // indirect
68-
github.com/jackc/puddle/v2 v2.2.1 // indirect
80+
github.com/gorilla/mux v1.8.1 // indirect
81+
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
82+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
83+
github.com/jackc/puddle/v2 v2.2.2 // indirect
6984
github.com/jmoiron/sqlx v1.3.5 // indirect
85+
github.com/mattn/go-sqlite3 v1.14.23 // indirect
86+
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
7087
github.com/pelletier/go-toml v1.9.5 // indirect
7188
github.com/weppos/publicsuffix-go v0.30.0 // indirect
7289
github.com/x448/float16 v0.8.4 // indirect
7390
github.com/zmap/zcrypto v0.0.0-20230310154051-c8b263fd8300 // indirect
7491
github.com/zmap/zlint/v3 v3.5.0 // indirect
92+
go.opentelemetry.io/otel v1.30.0 // indirect
93+
go.opentelemetry.io/otel/exporters/prometheus v0.52.0 // indirect
94+
go.opentelemetry.io/otel/metric v1.30.0 // indirect
95+
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
96+
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
97+
go.opentelemetry.io/otel/trace v1.30.0 // indirect
98+
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
99+
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
75100
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
76-
helm.sh/helm/v3 v3.14.2 // indirect
101+
helm.sh/helm/v3 v3.14.4 // indirect
77102
)
78103

79104
require (
@@ -93,17 +118,17 @@ require (
93118
github.com/eapache/go-resiliency v1.3.0 // indirect
94119
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
95120
github.com/eapache/queue v1.1.0 // indirect
96-
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
121+
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
97122
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
98123
github.com/fatih/structs v1.1.0 // indirect
99124
github.com/fsnotify/fsnotify v1.7.0 // indirect
100125
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
101126
github.com/gin-contrib/sse v0.1.0 // indirect
102127
github.com/go-errors/errors v1.4.2 // indirect
103128
github.com/go-logr/zapr v1.3.0 // indirect
104-
github.com/go-openapi/jsonpointer v0.19.6 // indirect
105-
github.com/go-openapi/jsonreference v0.20.2 // indirect
106-
github.com/go-openapi/swag v0.22.4 // indirect
129+
github.com/go-openapi/jsonpointer v0.21.0 // indirect
130+
github.com/go-openapi/jsonreference v0.21.0 // indirect
131+
github.com/go-openapi/swag v0.23.0 // indirect
107132
github.com/go-playground/locales v0.14.1 // indirect
108133
github.com/go-playground/universal-translator v0.18.1 // indirect
109134
github.com/go-playground/validator/v10 v10.14.0 // indirect
@@ -137,9 +162,9 @@ require (
137162
github.com/jackc/pgio v1.0.0 // indirect
138163
github.com/jackc/pgpassfile v1.0.0 // indirect
139164
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
140-
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
165+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
141166
github.com/jackc/pgtype v1.14.0 // indirect
142-
github.com/jackc/pgx/v5 v5.5.4 // indirect
167+
github.com/jackc/pgx/v5 v5.7.1 // indirect
143168
github.com/jackc/puddle v1.3.0 // indirect
144169
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
145170
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
@@ -150,8 +175,8 @@ require (
150175
github.com/jinzhu/now v1.1.5 // indirect
151176
github.com/josharian/intern v1.0.0 // indirect
152177
github.com/json-iterator/go v1.1.12 // indirect
153-
github.com/klauspost/compress v1.17.8 // indirect
154-
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
178+
github.com/klauspost/compress v1.17.9 // indirect
179+
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
155180
github.com/leodido/go-urn v1.2.4 // indirect; indirec
156181
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
157182
github.com/mailru/easyjson v0.7.7 // indirect
@@ -166,20 +191,20 @@ require (
166191
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
167192
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
168193
github.com/operator-framework/operator-registry v1.17.5 // indirect
169-
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
194+
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
170195
github.com/pierrec/lz4/v4 v4.1.17 // indirect
171196
github.com/pkg/errors v0.9.1 // indirect
172197
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
173-
github.com/prometheus/client_golang v1.19.1
198+
github.com/prometheus/client_golang v1.20.3
174199
github.com/prometheus/client_model v0.6.1 // indirect
175-
github.com/prometheus/common v0.55.0 // indirect
200+
github.com/prometheus/common v0.59.1 // indirect
176201
github.com/prometheus/procfs v0.15.1 // indirect
177202
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
178203
github.com/robfig/cron/v3 v3.0.1 // indirect
179-
github.com/sergi/go-diff v1.3.1 // indirect
204+
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
180205
github.com/shopspring/decimal v1.3.1 // indirect
181206
github.com/sirupsen/logrus v1.9.3 // indirect
182-
github.com/spf13/cast v1.5.0 // indirect
207+
github.com/spf13/cast v1.7.0 // indirect
183208
github.com/texttheater/golang-levenshtein v1.0.1 // indirect
184209
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
185210
github.com/ugorji/go/codec v1.2.11 // indirect
@@ -189,30 +214,29 @@ require (
189214
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
190215
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
191216
github.com/xlab/treeprint v1.2.0 // indirect
192-
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
193217
go.uber.org/multierr v1.11.0 // indirect
194218
golang.org/x/arch v0.3.0 // indirect
195-
golang.org/x/crypto v0.26.0 // indirect
196-
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
197-
golang.org/x/net v0.28.0 // indirect
198-
golang.org/x/oauth2 v0.21.0 // indirect
219+
golang.org/x/crypto v0.27.0 // indirect
220+
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
221+
golang.org/x/net v0.29.0 // indirect
222+
golang.org/x/oauth2 v0.23.0 // indirect
199223
golang.org/x/sync v0.8.0 // indirect
200-
golang.org/x/sys v0.24.0 // indirect
201-
golang.org/x/term v0.23.0 // indirect
202-
golang.org/x/text v0.17.0 // indirect
224+
golang.org/x/sys v0.25.0 // indirect
225+
golang.org/x/term v0.24.0 // indirect
226+
golang.org/x/text v0.18.0 // indirect
203227
golang.org/x/time v0.5.0 // indirect
204-
golang.org/x/tools v0.24.0 // indirect
228+
golang.org/x/tools v0.25.0 // indirect
205229
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
206-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
207-
google.golang.org/grpc v1.65.0 // indirect
230+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
231+
google.golang.org/grpc v1.66.2 // indirect
208232
google.golang.org/protobuf v1.34.2 // indirect
209233
gopkg.in/inf.v0 v0.9.1 // indirect
210234
gopkg.in/yaml.v3 v3.0.1 // indirect
211235
gorm.io/driver/mysql v1.4.7 // indirect
212236
k8s.io/apiserver v0.31.0 // indirect
213237
k8s.io/component-base v0.31.0 // indirect
214238
k8s.io/klog/v2 v2.130.1
215-
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
239+
k8s.io/kube-openapi v0.0.0-20240411171206-dc4e619f62f3 // indirect
216240
open-cluster-management.io/sdk-go v0.13.1-0.20240416062924-20307e6fe090 // indirect
217241
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
218242
sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96 // indirect

‎go.sum

+360-110
Large diffs are not rendered by default.

‎manager/cmd/manager/main.go

-2
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,6 @@ func parseFlags() *managerconfig.ManagerConfig {
106106
"The URL of database server for the process user.")
107107
pflag.StringVar(&managerConfig.DatabaseConfig.TransportBridgeDatabaseURL,
108108
"transport-bridge-database-url", "", "The URL of database server for the transport-bridge user.")
109-
pflag.StringVar(&managerConfig.TransportConfig.TransportType, "transport-type", "kafka",
110-
"The transport type, 'kafka'.")
111109
pflag.DurationVar(&managerConfig.TransportConfig.CommitterInterval, "transport-committer-interval",
112110
40*time.Second, "The committer interval for transport layer.")
113111
pflag.StringVar(&managerConfig.DatabaseConfig.CACertPath, "postgres-ca-path", "/postgres-ca/ca.crt",

‎operator/pkg/controllers/agent/addon_installer.go

+1
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ func expectedManagedClusterAddon(cluster *clusterv1.ManagedCluster, cma *v1alpha
195195
Labels: map[string]string{
196196
constants.GlobalHubOwnerLabelKey: constants.GHOperatorOwnerLabelVal,
197197
},
198+
// The OwnerReferences will be added automatically by the addon-manager(OCM) in the production evnvironment.
198199
OwnerReferences: []metav1.OwnerReference{
199200
{
200201
APIVersion: "addon.open-cluster-management.io/v1alpha1",

‎operator/pkg/controllers/agent/manifests/templates/agent/multicluster-global-hub-agent-clusterrole.yaml

+16
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,20 @@ rules:
194194
- get
195195
- list
196196
- watch
197+
- apiGroups:
198+
- config.openshift.io
199+
resources:
200+
- clusterversions
201+
verbs:
202+
- get
203+
- list
204+
- watch
205+
- apiGroups:
206+
- internal.open-cluster-management.io
207+
resources:
208+
- managedclusterinfos
209+
verbs:
210+
- get
211+
- list
212+
- watch
197213
{{- end -}}

‎operator/pkg/controllers/agent/manifests/templates/hostedagent/multicluster-global-hub-hosting-agent-deployment.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ spec:
2727
- --leaf-hub-name={{ .LeafHubID }}
2828
- --kafka-consumer-id={{ .LeafHubID }}
2929
- --enforce-hoh-rbac=false
30-
- --transport-type={{ .TransportType }}
3130
- --kafka-bootstrap-server={{ .KafkaBootstrapServer }}
3231
- --kafka-ca-cert-path=/kafka-certs/ca.crt
3332
- --kafka-client-cert-path=/kafka-certs/client.crt

‎operator/pkg/controllers/hubofhubs/manager/manifests/deployment.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ spec:
3737
- --zap-log-level={{.LogLevel}}
3838
- --manager-namespace=$(POD_NAMESPACE)
3939
- --watch-namespace=$(WATCH_NAMESPACE)
40-
- --transport-type={{.TransportType}}
4140
- --postgres-ca-path=/postgres-credential/ca.crt
4241
- --process-database-url=$(DATABASE_URL)
4342
- --transport-bridge-database-url=$(DATABASE_URL)

‎pkg/enum/event_type.go

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const (
88
HubClusterInfoType EventType = "io.open-cluster-management.operator.multiclusterglobalhubs.managedhub.info"
99
HubClusterHeartbeatType EventType = "io.open-cluster-management.operator.multiclusterglobalhubs.managedhub.heartbeat"
1010
ManagedClusterType EventType = "io.open-cluster-management.operator.multiclusterglobalhubs.managedcluster"
11+
ManagedClusterInfoType EventType = "io.open-cluster-management.operator.multiclusterglobalhubs.managedclusterinfo"
1112
SubscriptionReportType EventType = "io.open-cluster-management.operator.multiclusterglobalhubs.subscription.report"
1213
SubscriptionStatusType EventType = "io.open-cluster-management.operator.multiclusterglobalhubs.subscription.status"
1314

‎pkg/transport/config/rest_config.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package config
2+
3+
import (
4+
"encoding/base64"
5+
"fmt"
6+
7+
corev1 "k8s.io/api/core/v1"
8+
"sigs.k8s.io/kustomize/kyaml/yaml"
9+
10+
"github.com/stolostron/multicluster-global-hub/pkg/transport"
11+
)
12+
13+
func GetRestfulConnBySecret(transportConfig *corev1.Secret) (*transport.RestfulConnCredentail, error) {
14+
restfulYaml, ok := transportConfig.Data["rest.yaml"]
15+
if !ok {
16+
return nil, fmt.Errorf("must set the `rest.yaml` in the transport secret(%s)", transportConfig.Name)
17+
}
18+
restfulConn := &transport.RestfulConnCredentail{}
19+
if err := yaml.Unmarshal(restfulYaml, restfulConn); err != nil {
20+
return nil, fmt.Errorf("failed to unmarshal kafka config to transport credentail: %w", err)
21+
}
22+
23+
// decode the ca and client cert
24+
if restfulConn.CACert != "" {
25+
bytes, err := base64.StdEncoding.DecodeString(restfulConn.CACert)
26+
if err != nil {
27+
return nil, err
28+
}
29+
restfulConn.CACert = string(bytes)
30+
}
31+
if restfulConn.ClientCert != "" {
32+
bytes, err := base64.StdEncoding.DecodeString(restfulConn.ClientCert)
33+
if err != nil {
34+
return nil, err
35+
}
36+
restfulConn.ClientCert = string(bytes)
37+
}
38+
if restfulConn.ClientKey != "" {
39+
bytes, err := base64.StdEncoding.DecodeString(restfulConn.ClientKey)
40+
if err != nil {
41+
return nil, err
42+
}
43+
restfulConn.ClientKey = string(bytes)
44+
}
45+
return restfulConn, nil
46+
}

‎pkg/transport/controller/controller.go

+100-80
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package controller
22

33
import (
44
"context"
5-
"encoding/base64"
65
"fmt"
76
"reflect"
87
"sync"
@@ -15,7 +14,6 @@ import (
1514
"sigs.k8s.io/controller-runtime/pkg/client"
1615
"sigs.k8s.io/controller-runtime/pkg/event"
1716
"sigs.k8s.io/controller-runtime/pkg/predicate"
18-
"sigs.k8s.io/kustomize/kyaml/yaml"
1917

2018
"github.com/stolostron/multicluster-global-hub/pkg/transport"
2119
"github.com/stolostron/multicluster-global-hub/pkg/transport/config"
@@ -68,75 +66,134 @@ func (c *TransportCtrl) Reconcile(ctx context.Context, request ctrl.Request) (ct
6866
return ctrl.Result{}, err
6967
}
7068

71-
// load the kafka connection credentail based on the transport type. kafka, multiple
72-
kafkaConn, err := config.GetTransportCredentailBySecret(secret, c.runtimeClient)
73-
if err != nil {
74-
return ctrl.Result{}, err
69+
_, isKafka := secret.Data["kafka.yaml"]
70+
if isKafka {
71+
c.transportConfig.TransportType = string(transport.Kafka)
7572
}
7673

77-
// update the kafka credential secret colletions for the predicates
78-
if kafkaConn.CASecretName != "" || !utils.ContainsString(c.extraSecretNames, kafkaConn.CASecretName) {
79-
c.extraSecretNames = append(c.extraSecretNames, kafkaConn.CASecretName)
74+
_, isRestful := secret.Data["rest.yaml"]
75+
if isRestful {
76+
c.transportConfig.TransportType = string(transport.Rest)
8077
}
81-
if kafkaConn.ClientSecretName != "" || utils.ContainsString(c.extraSecretNames, kafkaConn.ClientSecretName) {
82-
c.extraSecretNames = append(c.extraSecretNames, kafkaConn.ClientSecretName)
78+
79+
var updated bool
80+
var err error
81+
switch c.transportConfig.TransportType {
82+
case string(transport.Kafka):
83+
updated, err = c.ReconcileKafkaCredentail(ctx, secret)
84+
if err != nil {
85+
return ctrl.Result{}, err
86+
}
87+
case string(transport.Rest):
88+
updated, err = c.ReconcileRestfulCredentail(ctx, secret)
89+
if err != nil {
90+
return ctrl.Result{}, err
91+
}
92+
default:
93+
return ctrl.Result{}, fmt.Errorf("unsupported transport type: %s", c.transportConfig.TransportType)
8394
}
8495

85-
restfulConn, err := c.GetRestfulConnBySecret(secret)
96+
if !updated {
97+
return ctrl.Result{}, nil
98+
}
99+
100+
// secret is changed, then create/update the producer
101+
err = c.ReconcileProducer()
86102
if err != nil {
87103
return ctrl.Result{}, err
88104
}
105+
klog.Info("the transport producer is created/updated")
89106

90-
// if credentials aren't updated, then return
91-
if reflect.DeepEqual(c.transportConfig.KafkaCredential, kafkaConn) &&
92-
reflect.DeepEqual(c.transportConfig.RestfulCredentail, restfulConn) {
93-
return ctrl.Result{}, nil
107+
if c.producer != nil && c.callback != nil {
108+
if err := c.callback(c.producer, c.consumer); err != nil {
109+
return ctrl.Result{}, fmt.Errorf("failed to invoke the callback function: %w", err)
110+
}
94111
}
95-
c.transportConfig.KafkaCredential = kafkaConn
96-
c.transportConfig.RestfulCredentail = restfulConn
97112

98-
// transport config is changed, then create/update the consumer/producer
113+
return ctrl.Result{}, nil
114+
}
115+
116+
// ReconcileProducer, transport config is changed, then create/update the producer
117+
func (c *TransportCtrl) ReconcileProducer() error {
99118
if c.producer == nil {
100119
sender, err := producer.NewGenericProducer(c.transportConfig)
101120
if err != nil {
102-
return ctrl.Result{}, fmt.Errorf("failed to create/update the producer: %w", err)
121+
return fmt.Errorf("failed to create/update the producer: %w", err)
103122
}
104123
c.producer = sender
105124
} else {
106125
if err := c.producer.Reconnect(c.transportConfig); err != nil {
107-
return ctrl.Result{}, fmt.Errorf("failed to reconnect the producer: %w", err)
126+
return fmt.Errorf("failed to reconnect the producer: %w", err)
108127
}
109128
}
129+
return nil
130+
}
110131

111-
// don't create the consumer when the consumer groupId is empty, that means the agent maybe in the standalone mode
112-
if c.transportConfig.ConsumerGroupId != "" {
113-
if c.consumer == nil {
114-
receiver, err := consumer.NewGenericConsumer(c.transportConfig)
115-
if err != nil {
116-
return ctrl.Result{}, fmt.Errorf("failed to create the consumer: %w", err)
117-
}
118-
c.consumer = receiver
119-
go func() {
120-
if err = c.consumer.Start(ctx); err != nil {
121-
klog.Errorf("failed to start the consumser: %v", err)
122-
}
123-
}()
124-
} else {
125-
if err := c.consumer.Reconnect(ctx, c.transportConfig); err != nil {
126-
return ctrl.Result{}, fmt.Errorf("failed to reconnect the consumer: %w", err)
132+
// ReconcileKafkaCredentail update the kafka connection credentail based on the secret, return true if the kafka
133+
// credentail is updated, It also create/update the consumer if not in the standalone mode
134+
func (c *TransportCtrl) ReconcileKafkaCredentail(ctx context.Context, secret *corev1.Secret) (updated bool, err error) {
135+
// load the kafka connection credentail based on the transport type. kafka, multiple
136+
kafkaConn, err := config.GetTransportCredentailBySecret(secret, c.runtimeClient)
137+
if err != nil {
138+
return updated, err
139+
}
140+
141+
// update the wathing secret lits
142+
if kafkaConn.CASecretName != "" || !utils.ContainsString(c.extraSecretNames, kafkaConn.CASecretName) {
143+
c.extraSecretNames = append(c.extraSecretNames, kafkaConn.CASecretName)
144+
}
145+
if kafkaConn.ClientSecretName != "" || utils.ContainsString(c.extraSecretNames, kafkaConn.ClientSecretName) {
146+
c.extraSecretNames = append(c.extraSecretNames, kafkaConn.ClientSecretName)
147+
}
148+
149+
// if credentials aren't updated, then return
150+
if reflect.DeepEqual(c.transportConfig.KafkaCredential, kafkaConn) {
151+
return
152+
}
153+
c.transportConfig.KafkaCredential = kafkaConn
154+
updated = true
155+
156+
// if the consumer groupId is empty, then it's means the agent is in the standalone mode, don't create the consumer
157+
if c.transportConfig.ConsumerGroupId == "" {
158+
return
159+
}
160+
161+
// create/update the consumer with the kafka transport
162+
if c.consumer == nil {
163+
receiver, err := consumer.NewGenericConsumer(c.transportConfig)
164+
if err != nil {
165+
return updated, fmt.Errorf("failed to create the consumer: %w", err)
166+
}
167+
c.consumer = receiver
168+
go func() {
169+
if err = c.consumer.Start(ctx); err != nil {
170+
klog.Errorf("failed to start the consumser: %v", err)
127171
}
172+
}()
173+
} else {
174+
if err := c.consumer.Reconnect(ctx, c.transportConfig); err != nil {
175+
return updated, fmt.Errorf("failed to reconnect the consumer: %w", err)
128176
}
129177
}
178+
klog.Info("the transport(kafka) onsumer is created/updated")
130179

131-
klog.Info("the transport secret(producer, consumer) is created/updated")
180+
return updated, nil
181+
}
132182

133-
if c.callback != nil {
134-
if err := c.callback(c.producer, c.consumer); err != nil {
135-
return ctrl.Result{}, fmt.Errorf("failed to invoke the callback function: %w", err)
136-
}
183+
func (c *TransportCtrl) ReconcileRestfulCredentail(ctx context.Context, secret *corev1.Secret) (
184+
updated bool, err error,
185+
) {
186+
restfulConn, err := config.GetRestfulConnBySecret(secret)
187+
if err != nil {
188+
return updated, err
137189
}
138190

139-
return ctrl.Result{}, nil
191+
if reflect.DeepEqual(c.transportConfig.RestfulCredentail, restfulConn) {
192+
return
193+
}
194+
updated = true
195+
c.transportConfig.RestfulCredentail = restfulConn
196+
return
140197
}
141198

142199
// SetupWithManager sets up the controller with the Manager.
@@ -178,40 +235,3 @@ func (c *TransportCtrl) credentialSecret(name string) bool {
178235
}
179236
return false
180237
}
181-
182-
func (c *TransportCtrl) GetRestfulConnBySecret(transportConfig *corev1.Secret) (
183-
*transport.RestfulConnCredentail, error,
184-
) {
185-
restfulYaml, ok := transportConfig.Data["rest.yaml"]
186-
if !ok {
187-
return nil, nil
188-
}
189-
restfulConn := &transport.RestfulConnCredentail{}
190-
if err := yaml.Unmarshal(restfulYaml, restfulConn); err != nil {
191-
return nil, fmt.Errorf("failed to unmarshal kafka config to transport credentail: %w", err)
192-
}
193-
194-
// decode the ca and client cert
195-
if restfulConn.CACert != "" {
196-
bytes, err := base64.StdEncoding.DecodeString(restfulConn.CACert)
197-
if err != nil {
198-
return nil, err
199-
}
200-
restfulConn.CACert = string(bytes)
201-
}
202-
if restfulConn.ClientCert != "" {
203-
bytes, err := base64.StdEncoding.DecodeString(restfulConn.ClientCert)
204-
if err != nil {
205-
return nil, err
206-
}
207-
restfulConn.ClientCert = string(bytes)
208-
}
209-
if restfulConn.ClientKey != "" {
210-
bytes, err := base64.StdEncoding.DecodeString(restfulConn.ClientKey)
211-
if err != nil {
212-
return nil, err
213-
}
214-
restfulConn.ClientKey = string(bytes)
215-
}
216-
return restfulConn, nil
217-
}

‎pkg/transport/controller/controller_test.go

+103-6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ func TestSecretCtrlReconcile(t *testing.T) {
3232
transportConfig: &transport.TransportConfig{
3333
TransportType: string(transport.Chan),
3434
ConsumerGroupId: "test",
35+
KafkaCredential: &transport.KafkaConnCredential{
36+
SpecTopic: "spec",
37+
StatusTopic: "status",
38+
},
3539
},
3640
callback: func(p transport.Producer, c transport.Consumer) error {
3741
callbackInvoked = true
@@ -56,12 +60,66 @@ func TestSecretCtrlReconcile(t *testing.T) {
5660
kafkaConnYaml, err := kafkaConn.YamlMarshal(false)
5761
assert.NoError(t, err)
5862

63+
secret := &corev1.Secret{
64+
ObjectMeta: metav1.ObjectMeta{
65+
Namespace: "default",
66+
Name: "test-secret",
67+
},
68+
Data: map[string][]byte{
69+
"kafka.yaml": kafkaConnYaml,
70+
},
71+
}
72+
_ = fakeClient.Create(ctx, secret)
73+
74+
// Reconcile
75+
req := reconcile.Request{
76+
NamespacedName: types.NamespacedName{
77+
Namespace: "default",
78+
Name: "test-secret",
79+
},
80+
}
81+
result, err := secretController.Reconcile(ctx, req)
82+
assert.NoError(t, err)
83+
assert.False(t, result.Requeue)
84+
assert.NotNil(t, secretController.producer)
85+
assert.NotNil(t, secretController.consumer)
86+
assert.True(t, callbackInvoked)
87+
88+
// Test when transport config changes
89+
result, err = secretController.Reconcile(ctx, req)
90+
assert.NoError(t, err)
91+
assert.False(t, result.Requeue)
92+
utils.PrettyPrint(secretController.transportConfig.RestfulCredentail)
93+
}
94+
95+
func TestInventorySecretCtrlReconcile(t *testing.T) {
96+
// Set up a fake Kubernetes client
97+
scheme := runtime.NewScheme()
98+
_ = corev1.AddToScheme(scheme)
99+
100+
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
101+
102+
callbackInvoked := false
103+
104+
secretController := &TransportCtrl{
105+
secretNamespace: "default",
106+
secretName: "test-secret",
107+
transportConfig: &transport.TransportConfig{},
108+
callback: func(p transport.Producer, c transport.Consumer) error {
109+
callbackInvoked = true
110+
return nil
111+
},
112+
runtimeClient: fakeClient,
113+
}
114+
115+
ctx := context.TODO()
116+
59117
restfulConn := &transport.RestfulConnCredentail{
60118
Host: "localhost:123",
61119
// the following fields are only for the manager, and the agent of byo/standalone kafka
62-
CACert: base64.StdEncoding.EncodeToString([]byte("11")),
63-
ClientCert: base64.StdEncoding.EncodeToString([]byte("12")),
64-
ClientKey: base64.StdEncoding.EncodeToString([]byte("13")),
120+
CACert: base64.StdEncoding.EncodeToString(rootPEM),
121+
ClientCert: base64.StdEncoding.EncodeToString(certPem),
122+
ClientKey: base64.StdEncoding.EncodeToString(keyPem),
65123
}
66124

67125
restfulConnYaml, err := restfulConn.YamlMarshal()
@@ -73,8 +131,7 @@ func TestSecretCtrlReconcile(t *testing.T) {
73131
Name: "test-secret",
74132
},
75133
Data: map[string][]byte{
76-
"kafka.yaml": kafkaConnYaml,
77-
"rest.yaml": restfulConnYaml,
134+
"rest.yaml": restfulConnYaml,
78135
},
79136
}
80137
_ = fakeClient.Create(ctx, secret)
@@ -90,12 +147,52 @@ func TestSecretCtrlReconcile(t *testing.T) {
90147
assert.NoError(t, err)
91148
assert.False(t, result.Requeue)
92149
assert.NotNil(t, secretController.producer)
93-
assert.NotNil(t, secretController.consumer)
150+
assert.Nil(t, secretController.consumer)
94151
assert.True(t, callbackInvoked)
152+
assert.Equal(t, string(transport.Rest), secretController.transportConfig.TransportType)
95153

96154
// Test when transport config changes
97155
result, err = secretController.Reconcile(ctx, req)
98156
assert.NoError(t, err)
99157
assert.False(t, result.Requeue)
100158
utils.PrettyPrint(secretController.transportConfig.RestfulCredentail)
101159
}
160+
161+
var rootPEM = []byte(`
162+
-- GlobalSign Root R2, valid until Dec 15, 2021
163+
-----BEGIN CERTIFICATE-----
164+
MIIDujCCAqKgAwIBAgILBAAAAAABD4Ym5g0wDQYJKoZIhvcNAQEFBQAwTDEgMB4G
165+
A1UECxMXR2xvYmFsU2lnbiBSb290IENBIC0gUjIxEzARBgNVBAoTCkdsb2JhbFNp
166+
Z24xEzARBgNVBAMTCkdsb2JhbFNpZ24wHhcNMDYxMjE1MDgwMDAwWhcNMjExMjE1
167+
MDgwMDAwWjBMMSAwHgYDVQQLExdHbG9iYWxTaWduIFJvb3QgQ0EgLSBSMjETMBEG
168+
A1UEChMKR2xvYmFsU2lnbjETMBEGA1UEAxMKR2xvYmFsU2lnbjCCASIwDQYJKoZI
169+
hvcNAQEBBQADggEPADCCAQoCggEBAKbPJA6+Lm8omUVCxKs+IVSbC9N/hHD6ErPL
170+
v4dfxn+G07IwXNb9rfF73OX4YJYJkhD10FPe+3t+c4isUoh7SqbKSaZeqKeMWhG8
171+
eoLrvozps6yWJQeXSpkqBy+0Hne/ig+1AnwblrjFuTosvNYSuetZfeLQBoZfXklq
172+
tTleiDTsvHgMCJiEbKjNS7SgfQx5TfC4LcshytVsW33hoCmEofnTlEnLJGKRILzd
173+
C9XZzPnqJworc5HGnRusyMvo4KD0L5CLTfuwNhv2GXqF4G3yYROIXJ/gkwpRl4pa
174+
zq+r1feqCapgvdzZX99yqWATXgAByUr6P6TqBwMhAo6CygPCm48CAwEAAaOBnDCB
175+
mTAOBgNVHQ8BAf8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUm+IH
176+
V2ccHsBqBt5ZtJot39wZhi4wNgYDVR0fBC8wLTAroCmgJ4YlaHR0cDovL2NybC5n
177+
bG9iYWxzaWduLm5ldC9yb290LXIyLmNybDAfBgNVHSMEGDAWgBSb4gdXZxwewGoG
178+
3lm0mi3f3BmGLjANBgkqhkiG9w0BAQUFAAOCAQEAmYFThxxol4aR7OBKuEQLq4Gs
179+
J0/WwbgcQ3izDJr86iw8bmEbTUsp9Z8FHSbBuOmDAGJFtqkIk7mpM0sYmsL4h4hO
180+
291xNBrBVNpGP+DTKqttVCL1OmLNIG+6KYnX3ZHu01yiPqFbQfXf5WRDLenVOavS
181+
ot+3i9DAgBkcRcAtjOj4LaR0VknFBbVPFd5uRHg5h6h+u/N5GJG79G+dwfCMNYxd
182+
AfvDbbnvRG15RjF+Cv6pgsH/76tuIMRQyV+dTZsXjAzlAcmgQWpzU/qlULRuJQ/7
183+
TBj0/VLZjmmx6BEP3ojY+x1J96relc8geMJgEtslQIxq/H5COEBkEveegeGTLg==
184+
-----END CERTIFICATE-----`)
185+
186+
var certPem = []byte(`-----BEGIN CERTIFICATE-----
187+
MIIBhTCCASugAwIBAgIQIRi6zePL6mKjOipn+dNuaTAKBggqhkjOPQQDAjASMRAw
188+
DgYDVQQKEwdBY21lIENvMB4XDTE3MTAyMDE5NDMwNloXDTE4MTAyMDE5NDMwNlow
189+
EjEQMA4GA1UEChMHQWNtZSBDbzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABD0d
190+
7VNhbWvZLWPuj/RtHFjvtJBEwOkhbN/BnnE8rnZR8+sbwnc/KhCk3FhnpHZnQz7B
191+
5aETbbIgmuvewdjvSBSjYzBhMA4GA1UdDwEB/wQEAwICpDATBgNVHSUEDDAKBggr
192+
BgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MCkGA1UdEQQiMCCCDmxvY2FsaG9zdDo1
193+
NDUzgg4xMjcuMC4wLjE6NTQ1MzAKBggqhkjOPQQDAgNIADBFAiEA2zpJEPQyz6/l
194+
Wf86aX6PepsntZv2GYlA5UpabfT2EZICICpJ5h/iI+i341gBmLiAFQOyTDT+/wQc
195+
6MF9+Yw1Yy0t
196+
-----END CERTIFICATE-----`)
197+
198+
var keyPem = []byte("-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIIrYSSNQFaA2Hwf1duRSxKtLYX5CB04fSeQ6tF1aY/PuoAoGCCqGSM49\nAwEHoUQDQgAEPR3tU2Fta9ktY+6P9G0cWO+0kETA6SFs38GecTyudlHz6xvCdz8q\nEKTcWGekdmdDPsHloRNtsiCa697B2O9IFA==\n-----END EC PRIVATE KEY-----") // notsecret
+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"crypto/x509"
7+
"fmt"
8+
9+
cloudevents "github.com/cloudevents/sdk-go/v2"
10+
"github.com/project-kessel/inventory-client-go/v1beta1"
11+
clusterinfov1beta1 "github.com/stolostron/cluster-lifecycle-api/clusterinfo/v1beta1"
12+
13+
"github.com/stolostron/multicluster-global-hub/pkg/enum"
14+
"github.com/stolostron/multicluster-global-hub/pkg/transport"
15+
"github.com/stolostron/multicluster-global-hub/pkg/transport/inventory/transfer"
16+
)
17+
18+
type InventoryClient struct {
19+
httpUrl string
20+
tlsConfig *tls.Config
21+
}
22+
23+
func NewInventoryClient(ctx context.Context, restfulConn *transport.RestfulConnCredentail) (*InventoryClient, error) {
24+
client := &InventoryClient{}
25+
err := client.RefreshCredential(ctx, restfulConn)
26+
if err != nil {
27+
return nil, err
28+
}
29+
return client, nil
30+
}
31+
32+
func (c *InventoryClient) RefreshCredential(ctx context.Context, restfulConn *transport.RestfulConnCredentail) error {
33+
clientCert, err := tls.X509KeyPair([]byte(restfulConn.ClientCert), []byte(restfulConn.ClientKey))
34+
if err != nil {
35+
return fmt.Errorf("failed the load client cert from raw data: %w", err)
36+
}
37+
38+
caCertPool := x509.NewCertPool()
39+
if !caCertPool.AppendCertsFromPEM([]byte(restfulConn.CACert)) {
40+
return fmt.Errorf("failed to append CA certificate to pool")
41+
}
42+
43+
// #nosec G402
44+
tlsConfig := tls.Config{
45+
Certificates: []tls.Certificate{clientCert},
46+
RootCAs: caCertPool,
47+
}
48+
c.httpUrl = restfulConn.Host
49+
c.tlsConfig = &tlsConfig
50+
51+
return nil
52+
}
53+
54+
func (c *InventoryClient) Request(ctx context.Context, evt cloudevents.Event) error {
55+
// Extend the other type in the future
56+
if evt.Type() != string(enum.ManagedClusterInfoType) {
57+
return nil
58+
}
59+
60+
client, err := v1beta1.NewHttpClient(ctx, v1beta1.NewConfig(v1beta1.WithHTTPUrl(c.httpUrl),
61+
v1beta1.WithHTTPTLSConfig(c.tlsConfig)))
62+
if err != nil {
63+
return fmt.Errorf("failed to init the inventory client: %w", err)
64+
}
65+
66+
var data []clusterinfov1beta1.ManagedClusterInfo
67+
if err := evt.DataAs(&data); err != nil {
68+
return err
69+
}
70+
71+
for _, clusterInfo := range data {
72+
clusterRequest := transfer.GetK8SCluster(&clusterInfo)
73+
if clusterRequest != nil {
74+
if _, err := client.K8sClusterService.CreateK8SCluster(ctx, clusterRequest); err != nil {
75+
return err
76+
}
77+
}
78+
}
79+
return nil
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
cloudevents "github.com/cloudevents/sdk-go/v2"
8+
"github.com/stretchr/testify/assert"
9+
10+
"github.com/stolostron/multicluster-global-hub/pkg/enum"
11+
)
12+
13+
func TestReqeust(t *testing.T) {
14+
inventoryClient := &InventoryClient{}
15+
16+
evt := cloudevents.NewEvent()
17+
evt.SetType(string(enum.ManagedClusterInfoType))
18+
err := inventoryClient.Request(context.Background(), evt)
19+
assert.Nil(t, err)
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package transfer
2+
3+
import (
4+
kessel "github.com/project-kessel/inventory-api/api/kessel/inventory/v1beta1/resources"
5+
clusterinfov1beta1 "github.com/stolostron/cluster-lifecycle-api/clusterinfo/v1beta1"
6+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7+
clusterv1 "open-cluster-management.io/api/cluster/v1"
8+
)
9+
10+
func GetK8SCluster(clusterInfo *clusterinfov1beta1.ManagedClusterInfo) *kessel.CreateK8SClusterRequest {
11+
clusterRequest := &kessel.CreateK8SClusterRequest{
12+
K8SCluster: &kessel.K8SCluster{
13+
Metadata: &kessel.Metadata{
14+
ResourceType: "k8s-cluster",
15+
},
16+
ReporterData: &kessel.ReporterData{
17+
ReporterType: kessel.ReporterData_ACM,
18+
ReporterInstanceId: "guest",
19+
ReporterVersion: "0.1",
20+
LocalResourceId: "1",
21+
ApiHref: clusterInfo.Spec.MasterEndpoint,
22+
ConsoleHref: clusterInfo.Status.ConsoleURL,
23+
},
24+
ResourceData: &kessel.K8SClusterDetail{
25+
ExternalClusterId: clusterInfo.Status.ClusterID,
26+
KubeVersion: clusterInfo.Status.Version,
27+
Nodes: []*kessel.K8SClusterDetailNodesInner{},
28+
},
29+
},
30+
}
31+
32+
// platform
33+
switch clusterInfo.Status.CloudVendor {
34+
case clusterinfov1beta1.CloudVendorAWS:
35+
clusterRequest.K8SCluster.ResourceData.CloudPlatform = kessel.K8SClusterDetail_AWS_UPI
36+
case clusterinfov1beta1.CloudVendorGoogle:
37+
clusterRequest.K8SCluster.ResourceData.CloudPlatform = kessel.K8SClusterDetail_GCP_UPI
38+
case clusterinfov1beta1.CloudVendorBareMetal:
39+
clusterRequest.K8SCluster.ResourceData.CloudPlatform = kessel.K8SClusterDetail_BAREMETAL_UPI
40+
case clusterinfov1beta1.CloudVendorIBM:
41+
clusterRequest.K8SCluster.ResourceData.CloudPlatform = kessel.K8SClusterDetail_IBMCLOUD_UPI
42+
case clusterinfov1beta1.CloudVendorAzure:
43+
clusterRequest.K8SCluster.ResourceData.CloudPlatform = kessel.K8SClusterDetail_AWS_UPI
44+
default:
45+
clusterRequest.K8SCluster.ResourceData.CloudPlatform = kessel.K8SClusterDetail_CLOUD_PLATFORM_OTHER
46+
}
47+
48+
// kubevendor, ony have the openshift version
49+
switch clusterInfo.Status.KubeVendor {
50+
case clusterinfov1beta1.KubeVendorOpenShift:
51+
clusterRequest.K8SCluster.ResourceData.KubeVendor = kessel.K8SClusterDetail_OPENSHIFT
52+
clusterRequest.K8SCluster.ResourceData.VendorVersion = clusterInfo.Status.DistributionInfo.OCP.Version
53+
case clusterinfov1beta1.KubeVendorEKS:
54+
clusterRequest.K8SCluster.ResourceData.KubeVendor = kessel.K8SClusterDetail_EKS
55+
case clusterinfov1beta1.KubeVendorGKE:
56+
clusterRequest.K8SCluster.ResourceData.KubeVendor = kessel.K8SClusterDetail_GKE
57+
default:
58+
clusterRequest.K8SCluster.ResourceData.KubeVendor = kessel.K8SClusterDetail_KUBE_VENDOR_OTHER
59+
}
60+
61+
// cluster status
62+
for _, cond := range clusterInfo.Status.Conditions {
63+
if cond.Type == clusterv1.ManagedClusterConditionAvailable {
64+
if cond.Status == metav1.ConditionTrue {
65+
clusterRequest.K8SCluster.ResourceData.ClusterStatus = kessel.K8SClusterDetail_READY
66+
} else {
67+
clusterRequest.K8SCluster.ResourceData.ClusterStatus = kessel.K8SClusterDetail_FAILED
68+
}
69+
}
70+
}
71+
72+
// nodes
73+
for _, node := range clusterInfo.Status.NodeList {
74+
kesselNode := &kessel.K8SClusterDetailNodesInner{
75+
Name: node.Name,
76+
}
77+
cpu, ok := node.Capacity[clusterv1.ResourceCPU]
78+
if ok {
79+
kesselNode.Cpu = cpu.String()
80+
}
81+
memory, ok := node.Capacity[clusterv1.ResourceMemory]
82+
if ok {
83+
kesselNode.Memory = memory.String()
84+
}
85+
86+
labels := []*kessel.ResourceLabel{}
87+
for key, val := range node.Labels {
88+
if key != "" && val != "" {
89+
labels = append(labels, &kessel.ResourceLabel{Key: key, Value: val})
90+
}
91+
}
92+
kesselNode.Labels = labels
93+
94+
clusterRequest.K8SCluster.ResourceData.Nodes = append(clusterRequest.K8SCluster.ResourceData.Nodes, kesselNode)
95+
}
96+
return clusterRequest
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package transfer
2+
3+
import (
4+
"testing"
5+
6+
kessel "github.com/project-kessel/inventory-api/api/kessel/inventory/v1beta1/resources"
7+
clusterinfov1beta1 "github.com/stolostron/cluster-lifecycle-api/clusterinfo/v1beta1"
8+
"github.com/stretchr/testify/assert"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
clusterv1 "open-cluster-management.io/api/cluster/v1"
11+
)
12+
13+
func TestGetK8SCluster(t *testing.T) {
14+
clusterInfo := createMockClusterInfo("test-cluster", clusterinfov1beta1.KubeVendorOpenShift, "4.10.0",
15+
clusterinfov1beta1.CloudVendorAWS)
16+
17+
// Call the function
18+
result := GetK8SCluster(clusterInfo)
19+
20+
// Assert the results
21+
assert.NotNil(t, result)
22+
assert.Equal(t, "k8s-cluster", result.K8SCluster.Metadata.ResourceType)
23+
assert.Equal(t, kessel.ReporterData_ACM, result.K8SCluster.ReporterData.ReporterType)
24+
assert.Equal(t, "https://api.test-cluster.example.com", result.K8SCluster.ReporterData.ApiHref)
25+
assert.Equal(t, "https://console.test-cluster.example.com", result.K8SCluster.ReporterData.ConsoleHref)
26+
assert.Equal(t, "test-cluster-id", result.K8SCluster.ResourceData.ExternalClusterId)
27+
assert.Equal(t, "1.23.0", result.K8SCluster.ResourceData.KubeVersion)
28+
assert.Equal(t, kessel.K8SClusterDetail_READY, result.K8SCluster.ResourceData.ClusterStatus)
29+
assert.Equal(t, kessel.K8SClusterDetail_AWS_UPI, result.K8SCluster.ResourceData.CloudPlatform)
30+
assert.Equal(t, kessel.K8SClusterDetail_OPENSHIFT, result.K8SCluster.ResourceData.KubeVendor)
31+
assert.Equal(t, "4.10.0", result.K8SCluster.ResourceData.VendorVersion)
32+
}
33+
34+
func TestKubeVendorK8SCluster(t *testing.T) {
35+
testCases := []struct {
36+
name string
37+
clusterInfo *clusterinfov1beta1.ManagedClusterInfo
38+
expectedVendor kessel.K8SClusterDetail_KubeVendor
39+
expectedVersion string
40+
}{
41+
{
42+
name: "OpenShift Cluster",
43+
clusterInfo: createMockClusterInfo("openshift-cluster", clusterinfov1beta1.KubeVendorOpenShift, "4.10.0",
44+
clusterinfov1beta1.CloudVendorAWS),
45+
expectedVendor: kessel.K8SClusterDetail_OPENSHIFT,
46+
expectedVersion: "4.10.0",
47+
},
48+
{
49+
name: "EKS Cluster",
50+
clusterInfo: createMockClusterInfo("eks-cluster", clusterinfov1beta1.KubeVendorEKS, "",
51+
clusterinfov1beta1.CloudVendorAzure),
52+
expectedVendor: kessel.K8SClusterDetail_EKS,
53+
expectedVersion: "",
54+
},
55+
{
56+
name: "GKE Cluster",
57+
clusterInfo: createMockClusterInfo("gke-cluster", clusterinfov1beta1.KubeVendorGKE, "",
58+
clusterinfov1beta1.CloudVendorGoogle),
59+
expectedVendor: kessel.K8SClusterDetail_GKE,
60+
expectedVersion: "",
61+
},
62+
{
63+
name: "Other Kubernetes Vendor",
64+
clusterInfo: createMockClusterInfo("other-cluster", "SomeOtherVendor", "",
65+
clusterinfov1beta1.CloudVendorBareMetal),
66+
expectedVendor: kessel.K8SClusterDetail_KUBE_VENDOR_OTHER,
67+
expectedVersion: "",
68+
},
69+
}
70+
71+
for _, tc := range testCases {
72+
t.Run(tc.name, func(t *testing.T) {
73+
result := GetK8SCluster(tc.clusterInfo)
74+
75+
assert.NotNil(t, result)
76+
assert.Equal(t, tc.expectedVendor, result.K8SCluster.ResourceData.KubeVendor)
77+
assert.Equal(t, tc.expectedVersion, result.K8SCluster.ResourceData.VendorVersion)
78+
// Add more assertions for common fields
79+
assert.Equal(t, "k8s-cluster", result.K8SCluster.Metadata.ResourceType)
80+
assert.Equal(t, kessel.ReporterData_ACM, result.K8SCluster.ReporterData.ReporterType)
81+
assert.Equal(t, "https://api.test-cluster.example.com", result.K8SCluster.ReporterData.ApiHref)
82+
assert.Equal(t, "https://console.test-cluster.example.com", result.K8SCluster.ReporterData.ConsoleHref)
83+
assert.Equal(t, "test-cluster-id", result.K8SCluster.ResourceData.ExternalClusterId)
84+
assert.Equal(t, "1.23.0", result.K8SCluster.ResourceData.KubeVersion)
85+
assert.Equal(t, kessel.K8SClusterDetail_READY, result.K8SCluster.ResourceData.ClusterStatus)
86+
})
87+
}
88+
}
89+
90+
func createMockClusterInfo(name string, kubeVendor clusterinfov1beta1.KubeVendorType,
91+
vendorVersion string, platform clusterinfov1beta1.CloudVendorType,
92+
) *clusterinfov1beta1.ManagedClusterInfo {
93+
clusterInfo := &clusterinfov1beta1.ManagedClusterInfo{
94+
ObjectMeta: metav1.ObjectMeta{
95+
Name: name,
96+
},
97+
Spec: clusterinfov1beta1.ClusterInfoSpec{
98+
MasterEndpoint: "https://api.test-cluster.example.com",
99+
},
100+
Status: clusterinfov1beta1.ClusterInfoStatus{
101+
ClusterID: "test-cluster-id",
102+
Version: "1.23.0",
103+
ConsoleURL: "https://console.test-cluster.example.com",
104+
CloudVendor: platform,
105+
KubeVendor: kubeVendor,
106+
Conditions: []metav1.Condition{
107+
{
108+
Type: clusterv1.ManagedClusterConditionAvailable,
109+
Status: metav1.ConditionTrue,
110+
},
111+
},
112+
},
113+
}
114+
115+
if kubeVendor == clusterinfov1beta1.KubeVendorOpenShift {
116+
clusterInfo.Status.DistributionInfo = clusterinfov1beta1.DistributionInfo{
117+
OCP: clusterinfov1beta1.OCPDistributionInfo{
118+
Version: vendorVersion,
119+
},
120+
}
121+
}
122+
123+
return clusterInfo
124+
}

‎pkg/transport/producer/generic_producer.go

+37-15
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"github.com/stolostron/multicluster-global-hub/pkg/transport"
2020
"github.com/stolostron/multicluster-global-hub/pkg/transport/config"
21+
"github.com/stolostron/multicluster-global-hub/pkg/transport/inventory/client"
2122
)
2223

2324
const (
@@ -27,8 +28,9 @@ const (
2728

2829
type GenericProducer struct {
2930
log logr.Logger
30-
clientProtocol interface{}
31-
client cloudevents.Client
31+
ceProtocol interface{}
32+
ceClient cloudevents.Client
33+
inventoryClient *client.InventoryClient
3234
messageSizeLimit int
3335
}
3436

@@ -46,17 +48,21 @@ func NewGenericProducer(transportConfig *transport.TransportConfig) (*GenericPro
4648
}
4749

4850
func (p *GenericProducer) SendEvent(ctx context.Context, evt cloudevents.Event) error {
51+
// inventory client
52+
if p.inventoryClient != nil {
53+
return p.inventoryClient.Request(ctx, evt)
54+
}
55+
// cloudevent kafka/gochan client
4956
// message key
5057
evtCtx := ctx
5158
if kafka_confluent.MessageKeyFrom(ctx) == "" {
5259
evtCtx = kafka_confluent.WithMessageKey(ctx, evt.Type())
5360
}
54-
5561
// data
5662
payloadBytes := evt.Data()
5763
chunks := p.splitPayloadIntoChunks(payloadBytes)
5864
if len(chunks) == 1 {
59-
if ret := p.client.Send(evtCtx, evt); cloudevents.IsUndelivered(ret) {
65+
if ret := p.ceClient.Send(evtCtx, evt); cloudevents.IsUndelivered(ret) {
6066
return fmt.Errorf("failed to send event to transport: %v", ret)
6167
}
6268
return nil
@@ -70,7 +76,7 @@ func (p *GenericProducer) SendEvent(ctx context.Context, evt cloudevents.Event)
7076
if err := evt.SetData(cloudevents.ApplicationJSON, chunk); err != nil {
7177
return fmt.Errorf("failed to set cloudevents data: %v", evt)
7278
}
73-
if result := p.client.Send(evtCtx, evt); cloudevents.IsUndelivered(result) {
79+
if result := p.ceClient.Send(evtCtx, evt); cloudevents.IsUndelivered(result) {
7480
return fmt.Errorf("failed to send events to transport: %v", result)
7581
}
7682
}
@@ -79,7 +85,12 @@ func (p *GenericProducer) SendEvent(ctx context.Context, evt cloudevents.Event)
7985

8086
// Reconnect close the previous producer state and init a new producer
8187
func (p *GenericProducer) Reconnect(config *transport.TransportConfig) error {
82-
closer, ok := p.clientProtocol.(protocol.Closer)
88+
// invenory client
89+
if config.TransportType == string(transport.Rest) {
90+
return p.inventoryClient.RefreshCredential(context.Background(), config.RestfulCredentail)
91+
}
92+
// cloudevent kafka/gochan client
93+
closer, ok := p.ceProtocol.(protocol.Closer)
8394
if ok {
8495
if err := closer.Close(context.Background()); err != nil {
8596
return fmt.Errorf("failed to close the previous producer: %w", err)
@@ -90,9 +101,13 @@ func (p *GenericProducer) Reconnect(config *transport.TransportConfig) error {
90101

91102
// initClient will init/update the client, clientProtocol and messageLimitSize based on the transportConfig
92103
func (p *GenericProducer) initClient(transportConfig *transport.TransportConfig) error {
93-
topic := transportConfig.KafkaCredential.SpecTopic
94-
if !transportConfig.IsManager {
95-
topic = transportConfig.KafkaCredential.StatusTopic
104+
topic := ""
105+
if transportConfig.TransportType == string(transport.Kafka) ||
106+
transportConfig.TransportType == string(transport.Chan) {
107+
topic = transportConfig.KafkaCredential.SpecTopic
108+
if !transportConfig.IsManager {
109+
topic = transportConfig.KafkaCredential.StatusTopic
110+
}
96111
}
97112

98113
switch transportConfig.TransportType {
@@ -107,29 +122,36 @@ func (p *GenericProducer) initClient(transportConfig *transport.TransportConfig)
107122
return err
108123
}
109124
handleProducerEvents(p.log, eventChan)
110-
p.clientProtocol = kafkaProtocol
125+
p.ceProtocol = kafkaProtocol
111126
case string(transport.Chan):
112127
if transportConfig.Extends == nil {
113128
transportConfig.Extends = make(map[string]interface{})
114129
}
115130
if _, found := transportConfig.Extends[topic]; !found {
116131
transportConfig.Extends[topic] = gochan.New()
117132
}
118-
p.clientProtocol = transportConfig.Extends[topic]
133+
p.ceProtocol = transportConfig.Extends[topic]
119134
case string(transport.Rest):
120135
if transportConfig.RestfulCredentail == nil {
121136
return fmt.Errorf("the restful credentail must not be nil")
122137
}
138+
inventoryClient, err := client.NewInventoryClient(context.Background(), transportConfig.RestfulCredentail)
139+
if err != nil {
140+
return fmt.Errorf("initial the inventory client error %w", err)
141+
}
142+
p.inventoryClient = inventoryClient
123143
default:
124144
return fmt.Errorf("transport-type - %s is not a valid option", transportConfig.TransportType)
125145
}
126146

127147
// kafka or gochan protocol
128-
client, err := cloudevents.NewClient(p.clientProtocol, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
129-
if err != nil {
130-
return err
148+
if p.ceProtocol != nil {
149+
client, err := cloudevents.NewClient(p.ceProtocol, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
150+
if err != nil {
151+
return err
152+
}
153+
p.ceClient = client
131154
}
132-
p.client = client
133155
return nil
134156
}
135157

‎samples/config/confluent_config.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func GetConfluentConfigMapBySecret(isProducer bool) (*kafka.ConfigMap, error) {
7474
func GetConfluentConfigMap(producer bool) (*kafka.ConfigMap, error) {
7575
bootstrapSever := os.Getenv("BOOTSTRAP_SEVER")
7676
if bootstrapSever == "" {
77-
return GetConfluentConfigMapFromGlobalHub(producer)
77+
return GetConfluentConfigMapFromGlobalHub(KAFKA_USER, producer)
7878
}
7979
return GetConfluentConfigMapFromManagedHub(producer)
8080
}
@@ -164,7 +164,7 @@ func GetConfluentConfigMapFromManagedHub(producer bool) (*kafka.ConfigMap, error
164164
return configMap, nil
165165
}
166166

167-
func GetConfluentConfigMapFromGlobalHub(producer bool) (*kafka.ConfigMap, error) {
167+
func GetConfluentConfigMapFromGlobalHub(kafkaUser string, producer bool) (*kafka.ConfigMap, error) {
168168
kubeconfig, err := DefaultKubeConfig()
169169
if err != nil {
170170
return nil, fmt.Errorf("failed to get kubeconfig")
@@ -174,17 +174,12 @@ func GetConfluentConfigMapFromGlobalHub(producer bool) (*kafka.ConfigMap, error)
174174
return nil, fmt.Errorf("failed to get runtime client")
175175
}
176176

177-
kafkaUserName := KAFKA_USER
178-
if user := os.Getenv("KAFKA_USER"); user != "" {
179-
kafkaUserName = user
180-
}
181-
182-
kafkaConfigMap, err := GetConfluentConfigMapByUser(c, KAFKA_NAMESPACE, KAFKA_CLUSTER, kafkaUserName)
177+
kafkaConfigMap, err := GetConfluentConfigMapByUser(c, KAFKA_NAMESPACE, KAFKA_CLUSTER, kafkaUser)
183178
if err != nil {
184179
return nil, err
185180
}
186181

187-
consumerGroupId := "test-group-id" + kafkaUserName
182+
consumerGroupId := "test-group-id" + kafkaUser
188183
fmt.Println(">> consumer group id:", consumerGroupId)
189184

190185
if producer {

‎samples/config/transport_secret.go

+23
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ import (
55
"fmt"
66
"os"
77

8+
operatorconfig "github.com/stolostron/multicluster-global-hub/operator/pkg/config"
9+
corev1 "k8s.io/api/core/v1"
810
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/types"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
913

1014
v1 "k8s.io/api/core/v1"
1115
"k8s.io/client-go/kubernetes"
@@ -40,3 +44,22 @@ func GetTransportSecret() (*v1.Secret, error) {
4044
}
4145
return secret, nil
4246
}
47+
48+
func GetTransportConfigSecret(namespace, name string) (*corev1.Secret, error) {
49+
kubeconfig, err := DefaultKubeConfig()
50+
if err != nil {
51+
return nil, err
52+
}
53+
c, err := client.New(kubeconfig, client.Options{Scheme: operatorconfig.GetRuntimeScheme()})
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
transportConfig := &corev1.Secret{}
59+
60+
err = c.Get(context.Background(), types.NamespacedName{Name: name, Namespace: namespace}, transportConfig)
61+
if err != nil {
62+
return nil, err
63+
}
64+
return transportConfig, nil
65+
}

‎samples/inventory/consumer/main.go

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"os"
8+
"os/signal"
9+
"syscall"
10+
"time"
11+
12+
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
13+
"github.com/stolostron/multicluster-global-hub/samples/config"
14+
)
15+
16+
var topic = "kessel-inventory"
17+
18+
func main() {
19+
signals := make(chan os.Signal, 1)
20+
signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
21+
22+
// kafkaConfigMap, err := config.GetConfluentConfigMap()
23+
kafkaConfigMap, err := config.GetConfluentConfigMapFromGlobalHub("global-hub-kafka-user", false)
24+
if err != nil {
25+
log.Fatalf("failed to get kafka config map: %v", err)
26+
}
27+
consumer, err := kafka.NewConsumer(kafkaConfigMap)
28+
if err != nil {
29+
log.Fatalf("failed to create kafka consumer: %v", err)
30+
}
31+
32+
log.Printf(">> subscribe topic %s", topic)
33+
if err := consumer.SubscribeTopics([]string{topic}, rebalanceCallback); err != nil {
34+
log.Fatalf("failed to subscribe topic: %v", err)
35+
}
36+
ctx, cancel := context.WithCancel(context.Background())
37+
go func() {
38+
for {
39+
select {
40+
case <-ctx.Done():
41+
_ = consumer.Unsubscribe()
42+
log.Printf("unsubscribed topic: %s", topic)
43+
return
44+
default:
45+
ev := consumer.Poll(100)
46+
if ev == nil {
47+
continue
48+
}
49+
if err := processEvent(ev); err != nil {
50+
log.Printf("failed to process event: %s \n", ev)
51+
}
52+
53+
}
54+
}
55+
}()
56+
57+
sig := <-signals
58+
log.Printf("got signal: %s\n", sig.String())
59+
cancel()
60+
log.Println("context is done")
61+
62+
// graceful shutdown
63+
time.Sleep(1 * time.Second)
64+
log.Printf("exit main")
65+
os.Exit(0)
66+
}
67+
68+
func processEvent(ev kafka.Event) error {
69+
switch e := ev.(type) {
70+
71+
case *kafka.Message:
72+
if e.TopicPartition.Error != nil {
73+
log.Printf("failed message on %s [%d %v]: %v\n", *e.TopicPartition.Topic, e.TopicPartition.Partition, e.TopicPartition.Offset, e.TopicPartition.Error)
74+
} else {
75+
log.Printf("received message on %s [%d %v]: %s\n", *e.TopicPartition.Topic, e.TopicPartition.Partition, e.TopicPartition.Offset, e.Value)
76+
}
77+
78+
// https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/consumer_rebalance_example/consumer_rebalance_example.go
79+
// // Handle manual commit since enable.auto.commit is unset.
80+
// if err := maybeCommit(c, e.TopicPartition); err != nil {
81+
// return err
82+
// }
83+
84+
case kafka.Error:
85+
// Errors should generally be considered informational, the client
86+
// will try to automatically recover.
87+
return fmt.Errorf("kafka error with %v", e)
88+
default:
89+
// log.Printf("ignored event %v\n", e)
90+
}
91+
92+
return nil
93+
}
94+
95+
// rebalanceCallback is called on each group rebalance to assign additional
96+
// partitions, or remove existing partitions, from the consumer's current
97+
// assignment.
98+
//
99+
// A rebalance occurs when a consumer joins or leaves a consumer group, if it
100+
// changes the topic(s) it's subscribed to, or if there's a change in one of
101+
// the topics it's subscribed to, for example, the total number of partitions
102+
// increases.
103+
//
104+
// The application may use this optional callback to inspect the assignment,
105+
// alter the initial start offset (the .Offset field of each assigned partition),
106+
// and read/write offsets to commit to an alternative store outside of Kafka.
107+
func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
108+
switch ev := event.(type) {
109+
case kafka.AssignedPartitions:
110+
// log.Printf("%% %s rebalance: %d new partition(s) assigned: %v\n", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions)
111+
112+
log.Printf("%% %s rebalance: %d new partition(s) assigned\n", c.GetRebalanceProtocol(), len(ev.Partitions))
113+
114+
// The application may update the start .Offset of each assigned
115+
// partition and then call Assign(). It is optional to call Assign
116+
// in case the application is not modifying any start .Offsets. In
117+
// that case we don't, the library takes care of it.
118+
// It is called here despite not modifying any .Offsets for illustrative
119+
// purposes.
120+
err := c.Assign(ev.Partitions)
121+
if err != nil {
122+
return err
123+
}
124+
125+
case kafka.RevokedPartitions:
126+
// log.Printf("%% %s rebalance: %d partition(s) revoked: %v\n", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions)
127+
128+
log.Printf("%% %s rebalance: %d partition(s) revoked\n", c.GetRebalanceProtocol(), len(ev.Partitions))
129+
130+
// Usually, the rebalance callback for `RevokedPartitions` is called
131+
// just before the partitions are revoked. We can be certain that a
132+
// partition being revoked is not yet owned by any other consumer.
133+
// This way, logic like storing any pending offsets or committing
134+
// offsets can be handled.
135+
// However, there can be cases where the assignment is lost
136+
// involuntarily. In this case, the partition might already be owned
137+
// by another consumer, and operations including committing
138+
// offsets may not work.
139+
if c.AssignmentLost() {
140+
// Our consumer has been kicked out of the group and the
141+
// entire assignment is thus lost.
142+
fmt.Fprintln(os.Stderr, "Assignment lost involuntarily, commit may fail")
143+
}
144+
145+
// Since enable.auto.commit is unset, we need to commit offsets manually
146+
// before the partition is revoked.
147+
commitedOffsets, err := c.Commit()
148+
149+
if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
150+
fmt.Fprintf(os.Stderr, "Failed to commit offsets: %s\n", err)
151+
return err
152+
}
153+
fmt.Printf("%% Commited offsets to Kafka: %v\n", commitedOffsets)
154+
155+
// Similar to Assign, client automatically calls Unassign() unless the
156+
// callback has already called that method. Here, we don't call it.
157+
158+
default:
159+
fmt.Fprintf(os.Stderr, "Unxpected event type: %v\n", event)
160+
}
161+
162+
return nil
163+
}

‎samples/inventory/producer/main.go

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
7+
cloudevents "github.com/cloudevents/sdk-go/v2"
8+
clusterinfov1beta1 "github.com/stolostron/cluster-lifecycle-api/clusterinfo/v1beta1"
9+
agentconfig "github.com/stolostron/multicluster-global-hub/agent/pkg/config"
10+
"github.com/stolostron/multicluster-global-hub/pkg/enum"
11+
transportconfig "github.com/stolostron/multicluster-global-hub/pkg/transport/config"
12+
"github.com/stolostron/multicluster-global-hub/pkg/transport/inventory/client"
13+
"github.com/stolostron/multicluster-global-hub/samples/config"
14+
corev1 "k8s.io/api/core/v1"
15+
"k8s.io/apimachinery/pkg/types"
16+
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
17+
)
18+
19+
func main() {
20+
clusterInfoList, err := listClusterInfo()
21+
if err != nil {
22+
log.Fatalf("failed to get cluster info list: %v", err)
23+
}
24+
25+
transportConfigSecret, err := config.GetTransportConfigSecret("open-cluster-management", "transport-config")
26+
if err != nil {
27+
log.Fatalf("failed to get transport config secret: %v", err)
28+
}
29+
restfulConn, err := transportconfig.GetRestfulConnBySecret(transportConfigSecret)
30+
if err != nil {
31+
log.Fatalf("failed to extract rest credentail: %v", err)
32+
}
33+
// utils.PrettyPrint(restfulConn)
34+
35+
inventoryClient, err := client.NewInventoryClient(context.Background(), restfulConn)
36+
if err != nil {
37+
log.Fatalf("failed to init the inventory client: %v", err)
38+
}
39+
40+
evt := cloudevents.NewEvent()
41+
evt.SetType(string(enum.ManagedClusterInfoType))
42+
evt.SetSource("test")
43+
evt.SetData(cloudevents.ApplicationJSON, clusterInfoList)
44+
45+
err = inventoryClient.Request(context.Background(), evt)
46+
if err != nil {
47+
log.Fatalf("failed to send the request: %v", err)
48+
}
49+
}
50+
51+
func listClusterInfo() ([]clusterinfov1beta1.ManagedClusterInfo, error) {
52+
c, err := getRuntimeClient()
53+
if err != nil {
54+
return nil, err
55+
}
56+
clusterInfoList := clusterinfov1beta1.ManagedClusterInfoList{}
57+
err = c.List(context.Background(), &clusterInfoList)
58+
if err != nil {
59+
return nil, err
60+
}
61+
return clusterInfoList.Items, nil
62+
}
63+
64+
func getTransportConfigSecret(namespace, name string) (*corev1.Secret, error) {
65+
c, err := getRuntimeClient()
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
transportConfig := &corev1.Secret{}
71+
err = c.Get(context.Background(), types.NamespacedName{Name: name, Namespace: namespace}, transportConfig)
72+
if err != nil {
73+
return nil, err
74+
}
75+
return transportConfig, nil
76+
}
77+
78+
func getRuntimeClient() (runtimeclient.Client, error) {
79+
kubeconfig, err := config.DefaultKubeConfig()
80+
if err != nil {
81+
return nil, err
82+
}
83+
c, err := runtimeclient.New(kubeconfig, runtimeclient.Options{Scheme: agentconfig.GetRuntimeScheme()})
84+
if err != nil {
85+
return nil, err
86+
}
87+
return c, nil
88+
}

‎test/integration/operator/agent/addon_registry_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants"
2121
)
2222

23-
// go test ./test/integration/operator/addon -ginkgo.focus "addon registry" -v
23+
// go test ./test/integration/operator/agent -ginkgo.focus "addon registry" -v
2424
var _ = Describe("addon registry", Ordered, func() {
2525
BeforeAll(func() {
2626
})

‎test/manifest/crd/0000_06_internal.open-cluster-management.io_managedclusterinfos.crd.yaml

+463
Large diffs are not rendered by default.

‎test/manifest/crd/clusterversion.crd.yaml

+730
Large diffs are not rendered by default.

‎test/script/e2e_clean_globalhub.sh

100644100755
File mode changed.

‎test/script/e2e_cleanup.sh

+9-2
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,30 @@ DELETE=${DELETE:-true}
1010
# setup kubeconfig
1111
KUBECONFIG=${KUBECONFIG:-${CONFIG_DIR}/clusters}
1212

13+
echo "kill process"
1314
while read -r line; do
1415
if [[ $line != "" ]]; then
1516
kill -9 "${line}" >/dev/null 2>&1
1617
fi
1718
done <"$CONFIG_DIR/PID"
1819

20+
ps -ef | grep "ocm.sh" | grep -v grep |awk '{print $2}' | xargs kill -9 >/dev/null 2>&1
21+
ps -ef | grep -E 'e2e-setup|e2e_setup' | grep -v grep |awk '{print $2}' | xargs kill -9 >/dev/null 2>&1
22+
1923
[ "$DELETE" = false ] && exit 0
2024

25+
echo "delete kind clusters"
2126
kind delete cluster --name "${GH_NAME}" > /dev/null 2>&1
2227
for i in $(seq 1 "${MH_NUM}"); do
28+
echo "delete hub${i}"
2329
kind delete cluster --name "hub${i}" > /dev/null 2>&1
2430
rm "$CONFIG_DIR/hub${i}" > /dev/null 2>&1
2531
for j in $(seq 1 "${MC_NUM}"); do
32+
echo "delete hub${i}-cluster${j}"
2633
kind delete cluster --name "hub${i}-cluster${j}" > /dev/null 2>&1
2734
rm "$CONFIG_DIR/hub${i}-cluster${j}" > /dev/null 2>&1
2835
done
2936
done
30-
rm "$KUBECONFIG" > /dev/null 2>&1
3137

32-
# ps -ef | grep "e2e" | grep -v grep |awk '{print $2}' | xargs kill -9 >/dev/null 2>&1
38+
echo "delete config $CONFIG_DIR"
39+
rm -rf "$CONFIG_DIR/*"

‎test/script/e2e_kafka.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ echo "Kafka cluster is ready"
4747
# generate resource for standalone agent
4848
export KAFKA_NAMESPACE=kafka
4949
export SECRET_NAMESPACE=open-cluster-management
50-
bash "$CURRENT_DIR/standalone_agent_secret.sh" "$KAFKA_KUBECONFIG" "$SECRET_KUBECONFIG"
50+
bash "$CURRENT_DIR/event_exporter_kafka.sh" "$KAFKA_KUBECONFIG" "$SECRET_KUBECONFIG"
5151
echo "Kafka standalone secret is ready! KUBECONFIG=$SECRET_KUBECONFIG"
5252

5353
echo -e "\r${BOLD_GREEN}[ END - $(date +"%T") ] Install Kafka ${NC} $(($(date +%s) - start_time)) seconds"
+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#!/bin/bash
2+
3+
CURRENT_DIR=$(
4+
cd "$(dirname "$0")" || exit
5+
pwd
6+
)
7+
8+
# shellcheck source=/dev/null
9+
source "$CURRENT_DIR/util.sh"
10+
11+
KUBECONFIG=${1:-$KUBECONFIG} # the kubeconfig for running the inventory server
12+
SECRET_KUBECONFIG=${2:-$KUBECONFIG} # the kubeconfig for generating the inventory connection secret
13+
14+
inventory_namespace=${INVENTORY_NAMESPACE:-"multicluster-global-hub"}
15+
secret_namespace=${SECRET_NAMESPACE:-"open-cluster-management"}
16+
17+
# kubectl get secret inventory-api-server-ca-certs -n "$inventory_namespace" -ojsonpath='{.data.ca\.crt}' | base64 -d > /tmp/ca.crt
18+
# kubectl get secret inventory-api-guest-certs -n "$inventory_namespace" -ojsonpath='{.data.tls\.crt}' | base64 -d > /tmp/client.crt
19+
# kubectl get secret inventory-api-guest-certs -n "$inventory_namespace" -ojsonpath='{.data.tls\.key}' | base64 -d > /tmp/client.key
20+
21+
host=$(kubectl get route inventory-api -n "$inventory_namespace" -ojsonpath='{.spec.host}')
22+
cat <<EOF >"$CURRENT_DIR/rest.yaml"
23+
host: https://${host}:443
24+
ca.crt: $(kubectl get secret inventory-api-server-ca-certs -n "$inventory_namespace" -ojsonpath='{.data.ca\.crt}')
25+
client.crt: $(kubectl get secret inventory-api-guest-certs -n "$inventory_namespace" -ojsonpath='{.data.tls\.crt}')
26+
client.key: $(kubectl get secret inventory-api-guest-certs -n "$inventory_namespace" -ojsonpath='{.data.tls\.key}')
27+
EOF
28+
29+
kubectl create secret generic transport-config -n "$secret_namespace" --kubeconfig "$SECRET_KUBECONFIG" \
30+
--from-file=rest.yaml="$CURRENT_DIR/rest.yaml"
31+
rm "$CURRENT_DIR/rest.yaml"
32+
echo "restful configuration is ready!"

‎test/script/standalone_agent_secret.sh ‎test/script/event_exporter_kafka.sh

+1-14
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ CURRENT_DIR=$(
99
source "$CURRENT_DIR/util.sh"
1010

1111
KUBECONFIG=${1:-$KUBECONFIG} # the kubeconfig for running the kafka
12-
SECRET_KUBECONFIG=${2:-$KUBECONFIG} # generate the crenditial secret
12+
SECRET_KUBECONFIG=${2:-$KUBECONFIG} # the kubeconfig for generating the kafka connection secret
1313

1414
kafka_namespace=${KAFKA_NAMESPACE:-"kafka"}
1515
secret_namespace=${SECRET_NAMESPACE:-"open-cluster-management"}
@@ -48,16 +48,3 @@ kubectl create secret generic transport-config -n "$secret_namespace" --kubeconf
4848
--from-file=kafka.yaml="$CURRENT_DIR/kafka.yaml"
4949
rm "$CURRENT_DIR/kafka.yaml"
5050
echo "kafka configuration is ready!"
51-
52-
# host=$(kubectl get route inventory-api -n "$kafka_namespace" -ojsonpath='{.spec.host}')
53-
# cat <<EOF >"$CURRENT_DIR/inventory.yaml"
54-
# host: https://$host
55-
# ca.crt: $(kubectl get secret inventory-api-server-ca-certs -n "$kafka_namespace" -ojsonpath='{.data.ca\.crt}')
56-
# client.crt: $(kubectl get secret inventory-api-guest-certs -n "$kafka_namespace" -ojsonpath='{.data.tls\.crt}')
57-
# client.key: $(kubectl get secret inventory-api-guest-certs -n "$kafka_namespace" -ojsonpath='{.data.tls\.key}')
58-
# EOF
59-
60-
# kubectl patch secret transport-config -n "$secret_namespace" --kubeconfig "$SECRET_KUBECONFIG" \
61-
# --type='json' \
62-
# -p='[{"op": "add", "path": "/data/inventory.yaml", "value":"'"$(base64 -w 0 "$CURRENT_DIR/inventory.yaml")"'"}]'
63-
# rm "$CURRENT_DIR/inventory.yaml"

‎test/script/util.sh

+6-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ kind_cluster() {
118118
ensure_cluster() {
119119
local cluster_name="$1"
120120
local kubeconfig="$2"
121-
if [ -f "$kubeconfig" ]; then
121+
if [ -f "$kubeconfig" ] && kubectl config get-contexts -o name | grep -wq "$cluster_name"; then
122122
return 0
123123
fi
124124

@@ -361,6 +361,11 @@ install_crds() {
361361

362362
# clusterclaim: agent
363363
kubectl --context "$ctx" apply -f ${CURRENT_DIR}/../manifest/crd/0000_02_clusters.open-cluster-management.io_clusterclaims.crd.yaml
364+
365+
# clusterinfo for rest
366+
kubectl --context "$ctx" apply -f ${CURRENT_DIR}/../manifest/crd/0000_06_internal.open-cluster-management.io_managedclusterinfos.crd.yaml
367+
# clusterID from clusterversion
368+
kubectl --context "$ctx" apply -f ${CURRENT_DIR}/../manifest/crd/clusterversion.crd.yaml
364369
}
365370

366371
enable_service_ca() {

0 commit comments

Comments
 (0)
Please sign in to comment.