Skip to content

Commit

Permalink
Support modification of prefix (#34)
Browse files Browse the repository at this point in the history
* Support modification of prefix

* Support modification of prefix

* Support modification of prefix

* Support modification of prefix

* Support modification of prefix

* Support modification of prefix

* Modify naming, add comments
  • Loading branch information
logicwu0 authored Jun 26, 2024
1 parent 8f24094 commit da79456
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 36 deletions.
13 changes: 5 additions & 8 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@ package etcd

import "fmt"

const (
etcdPrefixTpl = "kitex/registry-etcd/%v/"
)

func serviceKeyPrefix(serviceName string) string {
return fmt.Sprintf(etcdPrefixTpl, serviceName)
func serviceKeyPrefix(prefix string, serviceName string) string {
prefix = prefix + "/%v/"
return fmt.Sprintf(prefix, serviceName)
}

// serviceKey generates the key used to stored in etcd.
func serviceKey(serviceName, addr string) string {
return serviceKeyPrefix(serviceName) + addr
func serviceKey(prefix string, serviceName, addr string) string {
return serviceKeyPrefix(prefix, serviceName) + addr
}

// instanceInfo used to stored service basic info in etcd.
Expand Down
30 changes: 19 additions & 11 deletions etcd_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type etcdRegistry struct {
retryConfig *retry.Config
stop chan struct{}
address net.Addr
prefix string
}

type registerMeta struct {
Expand All @@ -55,13 +56,16 @@ type registerMeta struct {

// NewEtcdRegistry creates an etcd based registry.
func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
cfg := &Config{
EtcdConfig: &clientv3.Config{
Endpoints: endpoints,
},
Prefix: "kitex/registry-etcd",
}
for _, opt := range opts {
opt(&cfg)
opt(cfg)
}
etcdClient, err := clientv3.New(cfg)
etcdClient, err := clientv3.New(*cfg.EtcdConfig)
if err != nil {
return nil, err
}
Expand All @@ -71,6 +75,7 @@ func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, err
leaseTTL: getTTL(),
retryConfig: retryConfig,
stop: make(chan struct{}, 1),
prefix: cfg.Prefix,
}, nil
}

Expand All @@ -86,13 +91,16 @@ func SetFixedAddress(r registry.Registry, address net.Addr) {

// NewEtcdRegistryWithRetry creates an etcd based registry with given custom retry configs
func NewEtcdRegistryWithRetry(endpoints []string, retryConfig *retry.Config, opts ...Option) (registry.Registry, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
cfg := &Config{
EtcdConfig: &clientv3.Config{
Endpoints: endpoints,
},
Prefix: "kitex/registry-etcd",
}
for _, opt := range opts {
opt(&cfg)
opt(cfg)
}
etcdClient, err := clientv3.New(cfg)
etcdClient, err := clientv3.New(*cfg.EtcdConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,14 +189,14 @@ func (e *etcdRegistry) register(info *registry.Info, leaseID clientv3.LeaseID) e
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err = e.etcdClient.Put(ctx, serviceKey(info.ServiceName, addr), string(val), clientv3.WithLease(leaseID))
_, err = e.etcdClient.Put(ctx, serviceKey(e.prefix, info.ServiceName, addr), string(val), clientv3.WithLease(leaseID))
if err != nil {
return err
}

go func(key, val string) {
e.keepRegister(key, val, e.retryConfig)
}(serviceKey(info.ServiceName, addr), string(val))
}(serviceKey(e.prefix, info.ServiceName, addr), string(val))

return nil
}
Expand Down Expand Up @@ -262,7 +270,7 @@ func (e *etcdRegistry) deregister(info *registry.Info) error {
if err != nil {
return err
}
_, err = e.etcdClient.Delete(ctx, serviceKey(info.ServiceName, addr))
_, err = e.etcdClient.Delete(ctx, serviceKey(e.prefix, info.ServiceName, addr))
if err != nil {
return err
}
Expand Down
18 changes: 13 additions & 5 deletions etcd_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,27 @@ const (
// etcdResolver is a resolver using etcd.
type etcdResolver struct {
etcdClient *clientv3.Client
prefix string
}

// NewEtcdResolver creates a etcd based resolver.
func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
cfg := &Config{
EtcdConfig: &clientv3.Config{
Endpoints: endpoints,
},
Prefix: "kitex/registry-etcd",
}
for _, opt := range opts {
opt(&cfg)
opt(cfg)
}
etcdClient, err := clientv3.New(cfg)
etcdClient, err := clientv3.New(*cfg.EtcdConfig)
if err != nil {
return nil, err
}
return &etcdResolver{
etcdClient: etcdClient,
prefix: cfg.Prefix,
}, nil
}

Expand All @@ -75,7 +80,7 @@ func (e *etcdResolver) Target(ctx context.Context, target rpcinfo.EndpointInfo)

// Resolve implements the Resolver interface.
func (e *etcdResolver) Resolve(ctx context.Context, desc string) (discovery.Result, error) {
prefix := serviceKeyPrefix(desc)
prefix := serviceKeyPrefix(e.prefix, desc)
resp, err := e.etcdClient.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return discovery.Result{}, err
Expand Down Expand Up @@ -113,3 +118,6 @@ func (e *etcdResolver) Diff(cacheKey string, prev, next discovery.Result) (disco
func (e *etcdResolver) Name() string {
return "etcd"
}
func (e *etcdResolver) GetPrefix() string {
return e.prefix
}
120 changes: 119 additions & 1 deletion etcd_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"io/ioutil"
"io/ioutil" //nolint
"math/big"
"net"
"net/url"
Expand Down Expand Up @@ -511,3 +511,121 @@ func teardownEmbedEtcd(s *embed.Etcd) {
s.Close()
_ = os.RemoveAll(s.Config().Dir)
}
func TestEtcdResolverWithEtcdPrefix(t *testing.T) {
s, endpoint := setupEmbedEtcd(t)
tpl := "etcd/v1"
rg, err := NewEtcdRegistry([]string{endpoint}, WithEtcdConfigAndPrefix(tpl))
require.Nil(t, err)
rs, err := NewEtcdResolver([]string{endpoint}, WithEtcdConfigAndPrefix(tpl))
require.Nil(t, err)

infoList := []registry.Info{
{
ServiceName: "registry-etcd-test-suffix",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8888"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
{
ServiceName: "registry-etcd-test",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8889"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
}

// test register service
{
for _, info := range infoList {
err = rg.Register(&info)
require.Nil(t, err)

desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
result, err := rs.Resolve(context.TODO(), desc)
require.Nil(t, err)
expected := discovery.Result{
Cacheable: true,
CacheKey: info.ServiceName,
Instances: []discovery.Instance{
discovery.NewInstance(info.Addr.Network(), info.Addr.String(), info.Weight, info.Tags),
},
}
require.Equal(t, expected, result)
prefix := serviceKeyPrefix(rs.(*etcdResolver).GetPrefix(), info.ServiceName)
println(prefix)
require.Equal(t, fmt.Sprintf(tpl+"/%v/", info.ServiceName), prefix)
}
}

// test deregister service
{
for _, info := range infoList {
err = rg.Deregister(&info)
require.Nil(t, err)
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
_, err = rs.Resolve(context.TODO(), desc)
require.NotNil(t, err)
}
}

teardownEmbedEtcd(s)
}

func TestEtcdResolverWithEtcdPrefix2(t *testing.T) {
s, endpoint := setupEmbedEtcd(t)
rg, err := NewEtcdRegistry([]string{endpoint})
require.Nil(t, err)
rs, err := NewEtcdResolver([]string{endpoint})
require.Nil(t, err)

infoList := []registry.Info{
{
ServiceName: "registry-etcd-test-suffix",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8888"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
{
ServiceName: "registry-etcd-test",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8889"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
}

// test register service
{
for _, info := range infoList {
err = rg.Register(&info)
require.Nil(t, err)

desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
result, err := rs.Resolve(context.TODO(), desc)
require.Nil(t, err)
expected := discovery.Result{
Cacheable: true,
CacheKey: info.ServiceName,
Instances: []discovery.Instance{
discovery.NewInstance(info.Addr.Network(), info.Addr.String(), info.Weight, info.Tags),
},
}
require.Equal(t, expected, result)
prefix := serviceKeyPrefix(rs.(*etcdResolver).GetPrefix(), info.ServiceName)
println(prefix)
require.Equal(t, fmt.Sprintf("kitex/registry-etcd/%v/", info.ServiceName), prefix)
}
}

// test deregister service
{
for _, info := range infoList {
err = rg.Deregister(&info)
require.Nil(t, err)
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
_, err = rs.Resolve(context.TODO(), desc)
require.NotNil(t, err)
}
}

teardownEmbedEtcd(s)
}
33 changes: 22 additions & 11 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,43 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"io/ioutil"
"time"

"github.com/cloudwego/kitex/pkg/klog"
clientv3 "go.etcd.io/etcd/client/v3"
"io/ioutil" //nolint
"time"
)

// Option sets options such as username, tls etc.
type Option func(cfg *clientv3.Config)
type Option func(cfg *Config)

type Config struct {
EtcdConfig *clientv3.Config
Prefix string
}

// WithTLSOpt returns a option that authentication by tls/ssl.
func WithTLSOpt(certFile, keyFile, caFile string) Option {
return func(cfg *clientv3.Config) {
return func(cfg *Config) {
tlsCfg, err := newTLSConfig(certFile, keyFile, caFile, "")
if err != nil {
klog.Errorf("tls failed with err: %v , skipping tls.", err)
}
cfg.TLS = tlsCfg
cfg.EtcdConfig.TLS = tlsCfg
}
}

// WithAuthOpt returns a option that authentication by usernane and password.
func WithAuthOpt(username, password string) Option {
return func(cfg *clientv3.Config) {
cfg.Username = username
cfg.Password = password
return func(cfg *Config) {
cfg.EtcdConfig.Username = username
cfg.EtcdConfig.Password = password
}
}

// WithDialTimeoutOpt returns a option set dialTimeout
func WithDialTimeoutOpt(dialTimeout time.Duration) Option {
return func(cfg *clientv3.Config) {
cfg.DialTimeout = dialTimeout
return func(cfg *Config) {
cfg.EtcdConfig.DialTimeout = dialTimeout
}
}

Expand All @@ -74,3 +78,10 @@ func newTLSConfig(certFile, keyFile, caFile, serverName string) (*tls.Config, er
}
return cfg, nil
}

// WithEtcdConfigAndPrefix returns an option that sets the Prefix field in the Config struct
func WithEtcdConfigAndPrefix(prefix string) Option {
return func(c *Config) {
c.Prefix = prefix
}
}

0 comments on commit da79456

Please sign in to comment.