Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/config options #37

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: Go

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.16'

- name: Build
run: go build -v ./...

- name: Test
run: go test -v ./...
13 changes: 8 additions & 5 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ package etcd
import "fmt"

const (
etcdPrefixTpl = "kitex/registry-etcd/%v/"
etcdPrefixTpl = "kitex/registry-etcd/%v"
)

func serviceKeyPrefix(serviceName string) string {
return fmt.Sprintf(etcdPrefixTpl, serviceName)
func serviceKeyPrefix(prefix string, serviceName string) string {
if prefix == "" {
return fmt.Sprintf(etcdPrefixTpl, serviceName)
}
return prefix + "/" + serviceName
}

// serviceKey generates the key used to stored in etcd.
func serviceKey(serviceName, addr string) string {
return serviceKeyPrefix(serviceName) + addr
func serviceKey(prefix string, serviceName, addr string) string {
return serviceKeyPrefix(prefix, serviceName) + "/" + addr
}

// instanceInfo used to stored service basic info in etcd.
Expand Down
45 changes: 45 additions & 0 deletions common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021 CloudWeGo Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcd

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_serviceKeyPrefix(t *testing.T) {
assert.Equal(t,
"kitex/registry-etcd/serviceName",
serviceKeyPrefix("", "serviceName"),
)

assert.Equal(t,
"tmp/serviceName",
serviceKeyPrefix("tmp", "serviceName"),
)
}

func Test_serviceKey(t *testing.T) {
assert.Equal(t,
"kitex/registry-etcd/serviceName/addr",
serviceKey("", "serviceName", "addr"),
)

assert.Equal(t,
"tmp/serviceName/addr",
serviceKey("tmp", "serviceName", "addr"),
)
}
29 changes: 17 additions & 12 deletions etcd_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type etcdRegistry struct {
retryConfig *retry.Config
stop chan struct{}
address net.Addr
prefix string
}

type registerMeta struct {
Expand All @@ -55,13 +56,15 @@ type registerMeta struct {

// NewEtcdRegistry creates an etcd based registry.
func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
cfg := &Config{
EtcdClient: &clientv3.Config{
Endpoints: endpoints,
},
}
for _, opt := range opts {
opt(&cfg)
opt(cfg)
}
etcdClient, err := clientv3.New(cfg)
etcdClient, err := clientv3.New(*cfg.EtcdClient)
if err != nil {
return nil, err
}
Expand All @@ -71,6 +74,7 @@ func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, err
leaseTTL: getTTL(),
retryConfig: retryConfig,
stop: make(chan struct{}, 1),
prefix: cfg.Prefix,
}, nil
}

Expand All @@ -86,13 +90,15 @@ func SetFixedAddress(r registry.Registry, address net.Addr) {

// NewEtcdRegistryWithRetry creates an etcd based registry with given custom retry configs
func NewEtcdRegistryWithRetry(endpoints []string, retryConfig *retry.Config, opts ...Option) (registry.Registry, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
cfg := &Config{
EtcdClient: &clientv3.Config{
Endpoints: endpoints,
},
}
for _, opt := range opts {
opt(&cfg)
opt(cfg)
}
etcdClient, err := clientv3.New(cfg)
etcdClient, err := clientv3.New(*cfg.EtcdClient)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,14 +187,14 @@ func (e *etcdRegistry) register(info *registry.Info, leaseID clientv3.LeaseID) e
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err = e.etcdClient.Put(ctx, serviceKey(info.ServiceName, addr), string(val), clientv3.WithLease(leaseID))
_, err = e.etcdClient.Put(ctx, serviceKey(e.prefix, info.ServiceName, addr), string(val), clientv3.WithLease(leaseID))
if err != nil {
return err
}

go func(key, val string) {
e.keepRegister(key, val, e.retryConfig)
}(serviceKey(info.ServiceName, addr), string(val))
}(serviceKey(e.prefix, info.ServiceName, addr), string(val))

return nil
}
Expand Down Expand Up @@ -262,7 +268,7 @@ func (e *etcdRegistry) deregister(info *registry.Info) error {
if err != nil {
return err
}
_, err = e.etcdClient.Delete(ctx, serviceKey(info.ServiceName, addr))
_, err = e.etcdClient.Delete(ctx, serviceKey(e.prefix, info.ServiceName, addr))
if err != nil {
return err
}
Expand Down Expand Up @@ -302,7 +308,6 @@ func (e *etcdRegistry) keepalive(meta *registerMeta) error {

// getAddressOfRegistration returns the address of the service registration.
func (e *etcdRegistry) getAddressOfRegistration(info *registry.Info) (string, error) {

host, port, err := net.SplitHostPort(info.Addr.String())
if err != nil {
return "", err
Expand Down
18 changes: 13 additions & 5 deletions etcd_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,26 @@ const (
// etcdResolver is a resolver using etcd.
type etcdResolver struct {
etcdClient *clientv3.Client
prefix string
}

// NewEtcdResolver creates a etcd based resolver.
func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
cfg := &Config{
EtcdClient: &clientv3.Config{
Endpoints: endpoints,
},
}
for _, opt := range opts {
opt(&cfg)
opt(cfg)
}
etcdClient, err := clientv3.New(cfg)
etcdClient, err := clientv3.New(*cfg.EtcdClient)
if err != nil {
return nil, err
}
return &etcdResolver{
etcdClient: etcdClient,
prefix: cfg.Prefix,
}, nil
}

Expand All @@ -75,7 +79,7 @@ func (e *etcdResolver) Target(ctx context.Context, target rpcinfo.EndpointInfo)

// Resolve implements the Resolver interface.
func (e *etcdResolver) Resolve(ctx context.Context, desc string) (discovery.Result, error) {
prefix := serviceKeyPrefix(desc)
prefix := serviceKeyPrefix(e.prefix, desc)
resp, err := e.etcdClient.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return discovery.Result{}, err
Expand Down Expand Up @@ -113,3 +117,7 @@ func (e *etcdResolver) Diff(cacheKey string, prev, next discovery.Result) (disco
func (e *etcdResolver) Name() string {
return "etcd"
}

func (e *etcdResolver) GetPrefix() string {
return e.prefix
}
127 changes: 124 additions & 3 deletions etcd_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"io/ioutil"
"io/ioutil" //nolint
"math/big"
"net"
"net/url"
Expand Down Expand Up @@ -193,7 +193,8 @@ func TestEtcdRegistryWithAddressBlank(t *testing.T) {
Addr: utils.NewNetAddr("tcp", "127.0.0.1:9999"),
Weight: 27,
Tags: map[string]string{"hello": "world"},
}}
},
}

// test register service
{
Expand Down Expand Up @@ -260,7 +261,8 @@ func TestEtcdRegistryWithEnvironmentVariable(t *testing.T) {
Addr: utils.NewNetAddr("tcp", "10.122.1.108:9999"),
Weight: 27,
Tags: map[string]string{"hello": "world"},
}}
},
}

// test register service
{
Expand Down Expand Up @@ -511,3 +513,122 @@ func teardownEmbedEtcd(s *embed.Etcd) {
s.Close()
_ = os.RemoveAll(s.Config().Dir)
}

func TestEtcdResolverWithEtcdPrefix(t *testing.T) {
s, endpoint := setupEmbedEtcd(t)
tpl := "etcd/v1"
rg, err := NewEtcdRegistry([]string{endpoint}, WithServiceKey(tpl))
require.Nil(t, err)
rs, err := NewEtcdResolver([]string{endpoint}, WithServiceKey(tpl))
require.Nil(t, err)

infoList := []registry.Info{
{
ServiceName: "registry-etcd-test-suffix",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8888"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
{
ServiceName: "registry-etcd-test",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8889"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
}

// test register service
{
for _, info := range infoList {
err = rg.Register(&info)
require.Nil(t, err)

desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
result, err := rs.Resolve(context.TODO(), desc)
require.Nil(t, err)
expected := discovery.Result{
Cacheable: true,
CacheKey: info.ServiceName,
Instances: []discovery.Instance{
discovery.NewInstance(info.Addr.Network(), info.Addr.String(), info.Weight, info.Tags),
},
}
require.Equal(t, expected, result)
prefix := serviceKeyPrefix(rs.(*etcdResolver).GetPrefix(), info.ServiceName)
println(prefix)
require.Equal(t, fmt.Sprintf(tpl+"/%v", info.ServiceName), prefix)
}
}

// test deregister service
{
for _, info := range infoList {
err = rg.Deregister(&info)
require.Nil(t, err)
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
_, err = rs.Resolve(context.TODO(), desc)
require.NotNil(t, err)
}
}

teardownEmbedEtcd(s)
}

func TestEtcdResolverWithEtcdPrefix2(t *testing.T) {
s, endpoint := setupEmbedEtcd(t)
rg, err := NewEtcdRegistry([]string{endpoint})
require.Nil(t, err)
rs, err := NewEtcdResolver([]string{endpoint})
require.Nil(t, err)

infoList := []registry.Info{
{
ServiceName: "registry-etcd-test-suffix",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8888"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
{
ServiceName: "registry-etcd-test",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8889"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
}

// test register service
{
for _, info := range infoList {
err = rg.Register(&info)
require.Nil(t, err)

desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
result, err := rs.Resolve(context.TODO(), desc)
require.Nil(t, err)
expected := discovery.Result{
Cacheable: true,
CacheKey: info.ServiceName,
Instances: []discovery.Instance{
discovery.NewInstance(info.Addr.Network(), info.Addr.String(), info.Weight, info.Tags),
},
}
require.Equal(t, expected, result)
prefix := serviceKeyPrefix(rs.(*etcdResolver).GetPrefix(), info.ServiceName)
println(prefix)
require.Equal(t, fmt.Sprintf("kitex/registry-etcd/%v/", info.ServiceName), prefix)
}
}

// test deregister service
{
for _, info := range infoList {
err = rg.Deregister(&info)
require.Nil(t, err)
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
_, err = rs.Resolve(context.TODO(), desc)
require.NotNil(t, err)
}
}

teardownEmbedEtcd(s)
}
Loading
Loading