diff --git a/admin/admin.go b/admin/admin.go index 487c8b44..45ccf8e3 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -36,7 +36,7 @@ type Admin interface { GetAllSubscriptionGroup(ctx context.Context, brokerAddr string, timeoutMillis time.Duration) (*SubscriptionGroupWrapper, error) FetchAllTopicList(ctx context.Context) (*TopicList, error) - //GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error) + GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error) FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) Close() error } @@ -266,7 +266,9 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error { func (a *admin) FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) { return a.cli.GetNameSrv().FetchPublishMessageQueues(utils.WrapNamespace(a.opts.Namespace, topic)) } - +func (a *admin) GetBrokerClusterInfo(ctx context.Context) (*internal.ClusterInfo, error) { + return a.cli.GetBrokerClusterInfo(ctx) +} func (a *admin) Close() error { a.closeOnce.Do(func() { a.cli.Shutdown() diff --git a/admin/admin_test.go b/admin/admin_test.go new file mode 100644 index 00000000..7836beeb --- /dev/null +++ b/admin/admin_test.go @@ -0,0 +1,59 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admin + +import ( + "context" + "encoding/json" + "fmt" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/stretchr/testify/assert" + "testing" +) + +var nameSrvAddr = []string{"127.0.0.1:9876"} + +func TestFetchAllTopicList(t *testing.T) { + adminTool, err := NewAdmin(WithResolver(primitive.NewPassthroughResolver(nameSrvAddr))) + assert.True(t, err == nil) + if err != nil { + fmt.Println(err) + } + + topicNames, error := adminTool.FetchAllTopicList(context.Background()) + assert.True(t, error == nil) + assert.True(t, len(topicNames.TopicList) > 0) + + prettyJson, _ := json.MarshalIndent(topicNames, "", " ") + fmt.Println(string(prettyJson)) +} + +func TestGetBrokerClusterInfo(t *testing.T) { + adminTool, err := NewAdmin(WithResolver(primitive.NewPassthroughResolver(nameSrvAddr))) + assert.True(t, err == nil) + if err != nil { + fmt.Println(err) + } + + cluster, err := adminTool.GetBrokerClusterInfo(context.Background()) + assert.True(t, err == nil) + assert.True(t, cluster != nil) + + prettyJson, _ := json.MarshalIndent(cluster, "", " ") + fmt.Println(string(prettyJson)) +} diff --git a/go.mod b/go.mod index 1a11b28b..e9d50614 100644 --- a/go.mod +++ b/go.mod @@ -19,5 +19,6 @@ require ( golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 + gopkg.in/yaml.v2 v2.4.0 stathat.com/c/consistent v1.0.0 ) diff --git a/internal/client.go b/internal/client.go index 6f27f5e8..a7534e1a 100644 --- a/internal/client.go +++ b/internal/client.go @@ -20,6 +20,7 @@ package internal import ( "context" "fmt" + "github.com/pkg/errors" "net" "os" "sort" @@ -163,6 +164,8 @@ type RMQClient interface { GetNameSrv() Namesrvs RegisterACL() + + GetBrokerClusterInfo(ctx context.Context) (*ClusterInfo, error) } var _ RMQClient = new(rmqClient) @@ -932,6 +935,24 @@ func (c *rmqClient) isNeedUpdateSubscribeInfo(topic string) bool { return result } +func (c *rmqClient) GetBrokerClusterInfo(ctx context.Context) (*ClusterInfo, error) { + request := remote.NewRemotingCommand(ReqGetBrokerClusterInfo, nil, nil) + c.SendHeartbeatToAllBrokerWithLock() + + responseCommand, _ := c.remoteClient.InvokeSync(ctx, c.GetNameSrv().AddrList()[0], request) + + switch responseCommand.Code { + case ResSuccess: + if responseCommand.Body != nil { + var cluster, err = ParseClusterInfo(string(responseCommand.Body)) + return cluster, err + } + default: + rlog.Warning("no any topic list", nil) + } + return nil, errors.New("no any response from broker") + +} func (c *rmqClient) resetOffset(topic string, group string, offsetTable map[primitive.MessageQueue]int64) { consumer, exist := c.consumerMap.Load(group) if !exist { diff --git a/internal/cluster.go b/internal/cluster.go new file mode 100644 index 00000000..747b01d0 --- /dev/null +++ b/internal/cluster.go @@ -0,0 +1,136 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "encoding/json" + "github.com/apache/rocketmq-client-go/v2/rlog" + "github.com/pkg/errors" + "github.com/tidwall/gjson" + "gopkg.in/yaml.v2" + "strings" +) + +type ClusterInfo struct { + BrokerAddrTable map[string]BrokerData `json:"brokerAddrTable"` + ClusterAddrTable map[string][]string `json:"clusterAddrTable"` +} + +type ClusterInfoMap struct { + BrokerAddrTable map[string]string `json:"brokerAddrTable"` + ClusterAddrTable map[string]string `json:"clusterAddrTable"` +} + +func ParseClusterInfo(jsonString string) (cluster *ClusterInfo, errResult error) { + var brokerAddrTableResult = gjson.Get(jsonString, "brokerAddrTable") + if !brokerAddrTableResult.Exists() { + return nil, errors.New("json key for brokerAddrTable not exist") + } + var clusterAddrTableResult = gjson.Get(jsonString, "clusterAddrTable") + if !clusterAddrTableResult.Exists() { + return nil, errors.New("json key for clusterAddrTable not exist") + } + + var mapAndArray, err = ParseClusterAddrTable(clusterAddrTableResult.String()) + if err != nil { + return nil, err + } + + if len(mapAndArray) <= 0 { + return nil, nil + } + + var clusterInfo = &ClusterInfo{} + clusterInfo.ClusterAddrTable, errResult = ParseClusterAddrTable(clusterAddrTableResult.String()) + if errResult != nil { + return nil, errResult + } + clusterInfo.BrokerAddrTable, errResult = ParseBrokerAddrTable(brokerAddrTableResult.String(), clusterInfo.ClusterAddrTable) + if errResult != nil { + return nil, errResult + } + return clusterInfo, nil +} + +// parse broker addr table +// input like : {"broker-a":{"brokerAddrs":{0:"127.0.0.1:10911"},"brokerName":"broker-a","cluster":"DefaultCluster"}} +// result.key = broker name, result.value=BrokerData +func ParseBrokerAddrTable(jsonString string, clusterAndBrokerNamesMap map[string][]string) (map[string]BrokerData, error) { + var brokerMap = make(map[string]BrokerData) + for _, brokerNames := range clusterAndBrokerNamesMap { + for _, brokerName := range brokerNames { + var brokerInfo = gjson.Get(jsonString, brokerName).String() + var brokerAddrsString = gjson.Get(brokerInfo, "brokerAddrs").String() // {0:"x.x.x.x:10911"} + var brokerName1 = gjson.Get(brokerInfo, "brokerName").String() + var cluster = gjson.Get(brokerInfo, "cluster").String() + + bd := BrokerData{ + Cluster: cluster, + BrokerName: brokerName1, + BrokerAddresses: ParseBrokerAddrs(brokerAddrsString), + } + + brokerMap[brokerName1] = bd + } + } + + return brokerMap, nil +} + +// input like : "{0:"127.0.0.1:10911"}" to map directly,can't parse, so get one by one +// result.key = broker id , result.value = broker address and port +func ParseBrokerAddrs1(jsonString string) map[int64]string { + var resultMap = make(map[int64]string) + var broker0Addr = gjson.Get(jsonString, "0") + if broker0Addr.Exists() { + resultMap[0] = broker0Addr.String() + } + var broker1Addr = gjson.Get(jsonString, "1") + if broker1Addr.Exists() { + resultMap[1] = broker1Addr.String() + } + return resultMap +} + +// input like : "{0:"127.0.0.1:10911"}" to map directly,can't parse, so get one by one +// result.key = broker id , result.value = broker address and port +func ParseBrokerAddrs(jsonString string) map[int64]string { + // for yaml parse, do replace + jsonString = strings.ReplaceAll(jsonString, "0:", "0: ") + jsonString = strings.ReplaceAll(jsonString, "1:", "1: ") + + var yamlmap map[int64]string + err := yaml.Unmarshal([]byte(jsonString), &yamlmap) + if err != nil { + rlog.Error("parse addr error "+jsonString, nil) + return nil + } + + return yamlmap +} + +// input like : {"DefaultCluster":["broker-a"]} +// result.key = cluster name, result.value = broker names +func ParseClusterAddrTable(jsonString string) (map[string][]string, error) { + var result = make(map[string][]string) + err := json.Unmarshal([]byte(jsonString), &result) + if err != nil { + return nil, err + } + return result, nil +} diff --git a/internal/cluster_test.go b/internal/cluster_test.go new file mode 100644 index 00000000..a2e58121 --- /dev/null +++ b/internal/cluster_test.go @@ -0,0 +1,47 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" + "testing" +) + +func TestParseBrokerAddrs(t *testing.T) { + str := `{0: "127.0.0.1:1111",1: "127.0.0.1:1112"}` + blob := []byte(str) + var yamlmap map[int]string + yaml.Unmarshal(blob, &yamlmap) + for k, v := range yamlmap { + fmt.Println(fmt.Sprintf("k=%d, v=%s", k, v)) + } + +} + +func TestJsonToMap(t *testing.T) { + var jsonString = []byte(`{"brokerAddrTable":{"broker-a":{"brokerAddrs":{0:"127.0.0.1:10911"},"brokerName":"broker-a","cluster":"DefaultCluster"}},"clusterAddrTable":{"DefaultCluster":["broker-a"]}}`) + cluster, err := ParseClusterInfo(string(jsonString)) + + var cl = &ClusterInfo{} + yaml.Unmarshal(jsonString, cl) + + assert.True(t, err == nil) + fmt.Println(cluster) +}