From c77c8d07d4f1a8d3669f7e08d81b09f500745a20 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Mon, 21 Dec 2020 13:39:26 -0600 Subject: [PATCH] Peer info provided to etcd and memberlist pools is now consistent --- CHANGELOG | 4 ++ config.go | 16 ++++---- etcd.go | 30 ++++++--------- memberlist.go | 100 +++++++++++++++++++------------------------------- version | 2 +- 5 files changed, 63 insertions(+), 89 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 2808b439..c9ed2053 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.0.0-rc.5] - 2020-12-21 +### Change +* Peer info provided to etcd and memberlist pools is now consistent. + ## [1.0.0-rc.4] - 2020-12-18 ### Change * Fix leaky bucket algorithm diff --git a/config.go b/config.go index db70099b..63aed505 100644 --- a/config.go +++ b/config.go @@ -134,13 +134,13 @@ func (c *Config) SetDefaults() error { type PeerInfo struct { // (Optional) The name of the data center this peer is in. Leave blank if not using multi data center support. - DataCenter string + DataCenter string `json:"data-center"` // (Optional) The http address:port of the peer - HTTPAddress string + HTTPAddress string `json:"http-address"` // (Required) The grpc address:port of the peer - GRPCAddress string + GRPCAddress string `json:"grpc-address"` // (Optional) Is true if PeerInfo is for this instance of gubernator - IsOwner bool + IsOwner bool `json:"is-owner,omitempty"` } // Returns the hash key used to identify this peer in the Picker. @@ -303,13 +303,13 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig, setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.DialTimeout, getEnvDuration(log, "GUBER_ETCD_DIAL_TIMEOUT"), clock.Second*5) setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Username, os.Getenv("GUBER_ETCD_USER")) setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Password, os.Getenv("GUBER_ETCD_PASSWORD")) - setter.SetDefault(&conf.EtcdPoolConf.AdvertiseAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), conf.AdvertiseAddress) - setter.SetDefault(&conf.EtcdPoolConf.DataCenter, os.Getenv("GUBER_ETCD_DATA_CENTER"), conf.DataCenter) + setter.SetDefault(&conf.EtcdPoolConf.Advertise.GRPCAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), conf.AdvertiseAddress) + setter.SetDefault(&conf.EtcdPoolConf.Advertise.DataCenter, os.Getenv("GUBER_ETCD_DATA_CENTER"), conf.DataCenter) - setter.SetDefault(&conf.MemberListPoolConf.AdvertiseAddress, os.Getenv("GUBER_MEMBERLIST_ADVERTISE_ADDRESS"), conf.AdvertiseAddress) + setter.SetDefault(&conf.MemberListPoolConf.Advertise.GRPCAddress, os.Getenv("GUBER_MEMBERLIST_ADVERTISE_ADDRESS"), conf.AdvertiseAddress) setter.SetDefault(&conf.MemberListPoolConf.MemberListAddress, os.Getenv("GUBER_MEMBERLIST_ADDRESS"), fmt.Sprintf("%s:7946", advAddr)) setter.SetDefault(&conf.MemberListPoolConf.KnownNodes, getEnvSlice("GUBER_MEMBERLIST_KNOWN_NODES"), []string{}) - setter.SetDefault(&conf.MemberListPoolConf.DataCenter, conf.DataCenter) + setter.SetDefault(&conf.MemberListPoolConf.Advertise.DataCenter, conf.DataCenter) // Kubernetes Config setter.SetDefault(&conf.K8PoolConf.Namespace, os.Getenv("GUBER_K8S_NAMESPACE"), "default") diff --git a/etcd.go b/etcd.go index b15632ba..ec873a41 100644 --- a/etcd.go +++ b/etcd.go @@ -51,8 +51,8 @@ type EtcdPool struct { } type EtcdPoolConfig struct { - // (Required) The address etcd will advertise to other gubernator instances - AdvertiseAddress string + // (Required) This is the peer information that will be advertised to other gubernator instances + Advertise PeerInfo // (Required) An etcd client currently connected to an etcd cluster Client *etcd.Client @@ -68,17 +68,14 @@ type EtcdPoolConfig struct { // (Optional) An interface through which logging will occur (Usually *logrus.Entry) Logger logrus.FieldLogger - - // (Optional) The datacenter this instance belongs too - DataCenter string } func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) { setter.SetDefault(&conf.KeyPrefix, defaultBaseKey) setter.SetDefault(&conf.Logger, logrus.WithField("category", "gubernator")) - if conf.AdvertiseAddress == "" { - return nil, errors.New("AdvertiseAddress is required") + if conf.Advertise.GRPCAddress == "" { + return nil, errors.New("Advertise.GRPCAddress is required") } if conf.Client == nil { @@ -93,13 +90,13 @@ func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) { conf: conf, ctx: ctx, } - return pool, pool.run(conf.AdvertiseAddress) + return pool, pool.run(conf.Advertise) } -func (e *EtcdPool) run(addr string) error { +func (e *EtcdPool) run(peer PeerInfo) error { // Register our instance with etcd - if err := e.register(addr); err != nil { + if err := e.register(peer); err != nil { return err } @@ -233,14 +230,11 @@ func (e *EtcdPool) watch() error { return nil } -func (e *EtcdPool) register(name string) error { - instanceKey := e.conf.KeyPrefix + name - e.log.Infof("Registering peer '%s' with etcd", name) +func (e *EtcdPool) register(peer PeerInfo) error { + instanceKey := e.conf.KeyPrefix + peer.GRPCAddress + e.log.Infof("Registering peer '%s' with etcd", peer) - b, err := json.Marshal(PeerInfo{ - GRPCAddress: e.conf.AdvertiseAddress, - DataCenter: e.conf.DataCenter, - }) + b, err := json.Marshal(peer) if err != nil { return errors.Wrap(err, "while marshalling PeerInfo") } @@ -341,7 +335,7 @@ func (e *EtcdPool) callOnUpdate() { var peers []PeerInfo for _, p := range e.peers { - if p.GRPCAddress == e.conf.AdvertiseAddress { + if p.GRPCAddress == e.conf.Advertise.GRPCAddress { p.IsOwner = true } peers = append(peers, p) diff --git a/memberlist.go b/memberlist.go index 03b5b649..4e906b62 100644 --- a/memberlist.go +++ b/memberlist.go @@ -43,6 +43,9 @@ type MemberListPool struct { } type MemberListPoolConfig struct { + // (Required) This is the peer information that will be advertised to other members + Advertise PeerInfo + // (Required) This is the address:port the member list protocol listen for other members on MemberListAddress string @@ -60,9 +63,6 @@ type MemberListPoolConfig struct { // (Optional) An interface through which logging will occur (Usually *logrus.Entry) Logger logrus.FieldLogger - - // (Optional) The datacenter this instance belongs too - DataCenter string } func NewMemberListPool(ctx context.Context, conf MemberListPoolConfig) (*MemberListPool, error) { @@ -89,11 +89,6 @@ func NewMemberListPool(ctx context.Context, conf MemberListPoolConfig) (*MemberL host = addrs[0] } - _, advPort, err := splitAddress(conf.AdvertiseAddress) - if err != nil { - return nil, errors.Wrap(err, "AdvertiseAddress=`%s` is invalid;") - } - // Configure member list event handler m.events = newMemberListEventHandler(m.log, conf) @@ -118,14 +113,9 @@ func NewMemberListPool(ctx context.Context, conf MemberListPoolConfig) (*MemberL // Prep metadata gob.Register(memberListMetadata{}) - metadata := memberListMetadata{ - DataCenter: conf.DataCenter, - AdvertiseAddress: conf.AdvertiseAddress, - GubernatorPort: advPort, - } // Join member list pool - err = m.joinPool(ctx, conf.KnownNodes, metadata) + err = m.joinPool(ctx, conf) if err != nil { return nil, errors.Wrap(err, "while attempting to join the member-list pool") } @@ -133,18 +123,18 @@ func NewMemberListPool(ctx context.Context, conf MemberListPoolConfig) (*MemberL return m, nil } -func (m *MemberListPool) joinPool(ctx context.Context, knownNodes []string, metadata memberListMetadata) error { +func (m *MemberListPool) joinPool(ctx context.Context, conf MemberListPoolConfig) error { // Get local node and set metadata node := m.memberList.LocalNode() - serializedMetadata, err := serializeMemberListMetadata(metadata) + b, err := json.Marshal(&conf.Advertise) if err != nil { - return err + return errors.Wrap(err, "error marshalling PeerInfo as JSON") } - node.Meta = serializedMetadata + node.Meta = b err = retry.Until(ctx, retry.Interval(clock.Millisecond*300), func(ctx context.Context, i int) error { // Join member list - _, err = m.memberList.Join(knownNodes) + _, err = m.memberList.Join(conf.KnownNodes) if err != nil { return errors.Wrap(err, "while joining member-list") } @@ -185,16 +175,11 @@ func newMemberListEventHandler(log logrus.FieldLogger, conf MemberListPoolConfig func (e *memberListEventHandler) addPeer(node *ml.Node) { ip := getIP(node.Address()) - // Deserialize metadata - metadata, err := deserializeMemberListMetadata(node.Meta) + peer, err := unmarshallPeer(node.Meta, ip) if err != nil { e.log.WithError(err).Warnf("while adding to peers") } else { - // Handle deprecated GubernatorPort - if metadata.AdvertiseAddress == "" { - metadata.AdvertiseAddress = makeAddress(ip, metadata.GubernatorPort) - } - e.peers[ip] = PeerInfo{GRPCAddress: metadata.AdvertiseAddress, DataCenter: metadata.DataCenter} + e.peers[ip] = peer e.callOnUpdate() } } @@ -202,20 +187,16 @@ func (e *memberListEventHandler) addPeer(node *ml.Node) { func (e *memberListEventHandler) NotifyJoin(node *ml.Node) { ip := getIP(node.Address()) - // Deserialize metadata - metadata, err := deserializeMemberListMetadata(node.Meta) + peer, err := unmarshallPeer(node.Meta, ip) if err != nil { - // This is called during memberlist initialization due to the fact that the local node + // This is called during member list initialization due to the fact that the local node // has no metadata yet - e.log.WithError(err).Warn("while deserialize member-list metadata") - } else { - // Handle deprecated GubernatorPort - if metadata.AdvertiseAddress == "" { - metadata.AdvertiseAddress = makeAddress(ip, metadata.GubernatorPort) - } - e.peers[ip] = PeerInfo{GRPCAddress: metadata.AdvertiseAddress, DataCenter: metadata.DataCenter} - e.callOnUpdate() + e.log.WithError(err).Warn("while deserialize member-list peer") + return } + peer.IsOwner = false + e.peers[ip] = peer + e.callOnUpdate() } func (e *memberListEventHandler) NotifyLeave(node *ml.Node) { @@ -230,23 +211,20 @@ func (e *memberListEventHandler) NotifyLeave(node *ml.Node) { func (e *memberListEventHandler) NotifyUpdate(node *ml.Node) { ip := getIP(node.Address()) - // Deserialize metadata - metadata, err := deserializeMemberListMetadata(node.Meta) + peer, err := unmarshallPeer(node.Meta, ip) if err != nil { - e.log.WithError(err).Warn("while updating member-list") - } else { - // Construct Gubernator address and create PeerInfo - gubernatorAddress := makeAddress(ip, metadata.GubernatorPort) - e.peers[ip] = PeerInfo{GRPCAddress: gubernatorAddress, DataCenter: metadata.DataCenter} - e.callOnUpdate() + e.log.WithError(err).Warn("while unmarshalling peer info") } + peer.IsOwner = false + e.peers[ip] = peer + e.callOnUpdate() } func (e *memberListEventHandler) callOnUpdate() { var peers []PeerInfo for _, p := range e.peers { - if p.GRPCAddress == e.conf.AdvertiseAddress { + if p.GRPCAddress == e.conf.Advertise.GRPCAddress { p.IsOwner = true } peers = append(peers, p) @@ -263,30 +241,28 @@ func makeAddress(ip string, port int) string { return net.JoinHostPort(ip, strconv.Itoa(port)) } +// Deprecated type memberListMetadata struct { DataCenter string AdvertiseAddress string - // Deprecated - GubernatorPort int -} - -func serializeMemberListMetadata(metadata memberListMetadata) ([]byte, error) { - b, err := json.Marshal(&metadata) - if err != nil { - return nil, errors.Wrap(err, "error marshalling metadata as JSON") - } - return b, nil + GubernatorPort int } -func deserializeMemberListMetadata(b []byte) (*memberListMetadata, error) { - var metadata memberListMetadata - if err := json.Unmarshal(b, &metadata); err != nil { +func unmarshallPeer(b []byte, ip string) (PeerInfo, error) { + var peer PeerInfo + if err := json.Unmarshal(b, &peer); err != nil { + var metadata memberListMetadata decoder := gob.NewDecoder(bytes.NewBuffer(b)) - if err := decoder.Decode(&metadata); err != nil { - return nil, errors.Wrap(err, "error decoding metadata") + if err := decoder.Decode(&peer); err != nil { + return peer, errors.Wrap(err, "error decoding peer") + } + // Handle deprecated GubernatorPort + if metadata.AdvertiseAddress == "" { + metadata.AdvertiseAddress = makeAddress(ip, metadata.GubernatorPort) } + return PeerInfo{GRPCAddress: metadata.AdvertiseAddress, DataCenter: metadata.DataCenter}, nil } - return &metadata, nil + return peer, nil } func newLogWriter(log logrus.FieldLogger) *io.PipeWriter { diff --git a/version b/version index d17f0653..8d3e6e31 100644 --- a/version +++ b/version @@ -1 +1 @@ -1.0.0-rc.4 +1.0.0-rc.5