Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Peer info provided to etcd and memberlist pools is now consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Dec 22, 2020
1 parent 7f075f3 commit c77c8d0
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 89 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.0.0-rc.5] - 2020-12-21
### Change
* Peer info provided to etcd and memberlist pools is now consistent.

## [1.0.0-rc.4] - 2020-12-18
### Change
* Fix leaky bucket algorithm
Expand Down
16 changes: 8 additions & 8 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ func (c *Config) SetDefaults() error {

type PeerInfo struct {
// (Optional) The name of the data center this peer is in. Leave blank if not using multi data center support.
DataCenter string
DataCenter string `json:"data-center"`
// (Optional) The http address:port of the peer
HTTPAddress string
HTTPAddress string `json:"http-address"`
// (Required) The grpc address:port of the peer
GRPCAddress string
GRPCAddress string `json:"grpc-address"`
// (Optional) Is true if PeerInfo is for this instance of gubernator
IsOwner bool
IsOwner bool `json:"is-owner,omitempty"`
}

// Returns the hash key used to identify this peer in the Picker.
Expand Down Expand Up @@ -303,13 +303,13 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig,
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.DialTimeout, getEnvDuration(log, "GUBER_ETCD_DIAL_TIMEOUT"), clock.Second*5)
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Username, os.Getenv("GUBER_ETCD_USER"))
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Password, os.Getenv("GUBER_ETCD_PASSWORD"))
setter.SetDefault(&conf.EtcdPoolConf.AdvertiseAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), conf.AdvertiseAddress)
setter.SetDefault(&conf.EtcdPoolConf.DataCenter, os.Getenv("GUBER_ETCD_DATA_CENTER"), conf.DataCenter)
setter.SetDefault(&conf.EtcdPoolConf.Advertise.GRPCAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), conf.AdvertiseAddress)
setter.SetDefault(&conf.EtcdPoolConf.Advertise.DataCenter, os.Getenv("GUBER_ETCD_DATA_CENTER"), conf.DataCenter)

setter.SetDefault(&conf.MemberListPoolConf.AdvertiseAddress, os.Getenv("GUBER_MEMBERLIST_ADVERTISE_ADDRESS"), conf.AdvertiseAddress)
setter.SetDefault(&conf.MemberListPoolConf.Advertise.GRPCAddress, os.Getenv("GUBER_MEMBERLIST_ADVERTISE_ADDRESS"), conf.AdvertiseAddress)
setter.SetDefault(&conf.MemberListPoolConf.MemberListAddress, os.Getenv("GUBER_MEMBERLIST_ADDRESS"), fmt.Sprintf("%s:7946", advAddr))
setter.SetDefault(&conf.MemberListPoolConf.KnownNodes, getEnvSlice("GUBER_MEMBERLIST_KNOWN_NODES"), []string{})
setter.SetDefault(&conf.MemberListPoolConf.DataCenter, conf.DataCenter)
setter.SetDefault(&conf.MemberListPoolConf.Advertise.DataCenter, conf.DataCenter)

// Kubernetes Config
setter.SetDefault(&conf.K8PoolConf.Namespace, os.Getenv("GUBER_K8S_NAMESPACE"), "default")
Expand Down
30 changes: 12 additions & 18 deletions etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type EtcdPool struct {
}

type EtcdPoolConfig struct {
// (Required) The address etcd will advertise to other gubernator instances
AdvertiseAddress string
// (Required) This is the peer information that will be advertised to other gubernator instances
Advertise PeerInfo

// (Required) An etcd client currently connected to an etcd cluster
Client *etcd.Client
Expand All @@ -68,17 +68,14 @@ type EtcdPoolConfig struct {

// (Optional) An interface through which logging will occur (Usually *logrus.Entry)
Logger logrus.FieldLogger

// (Optional) The datacenter this instance belongs too
DataCenter string
}

func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) {
setter.SetDefault(&conf.KeyPrefix, defaultBaseKey)
setter.SetDefault(&conf.Logger, logrus.WithField("category", "gubernator"))

if conf.AdvertiseAddress == "" {
return nil, errors.New("AdvertiseAddress is required")
if conf.Advertise.GRPCAddress == "" {
return nil, errors.New("Advertise.GRPCAddress is required")
}

if conf.Client == nil {
Expand All @@ -93,13 +90,13 @@ func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) {
conf: conf,
ctx: ctx,
}
return pool, pool.run(conf.AdvertiseAddress)
return pool, pool.run(conf.Advertise)
}

func (e *EtcdPool) run(addr string) error {
func (e *EtcdPool) run(peer PeerInfo) error {

// Register our instance with etcd
if err := e.register(addr); err != nil {
if err := e.register(peer); err != nil {
return err
}

Expand Down Expand Up @@ -233,14 +230,11 @@ func (e *EtcdPool) watch() error {
return nil
}

func (e *EtcdPool) register(name string) error {
instanceKey := e.conf.KeyPrefix + name
e.log.Infof("Registering peer '%s' with etcd", name)
func (e *EtcdPool) register(peer PeerInfo) error {
instanceKey := e.conf.KeyPrefix + peer.GRPCAddress
e.log.Infof("Registering peer '%s' with etcd", peer)

b, err := json.Marshal(PeerInfo{
GRPCAddress: e.conf.AdvertiseAddress,
DataCenter: e.conf.DataCenter,
})
b, err := json.Marshal(peer)
if err != nil {
return errors.Wrap(err, "while marshalling PeerInfo")
}
Expand Down Expand Up @@ -341,7 +335,7 @@ func (e *EtcdPool) callOnUpdate() {
var peers []PeerInfo

for _, p := range e.peers {
if p.GRPCAddress == e.conf.AdvertiseAddress {
if p.GRPCAddress == e.conf.Advertise.GRPCAddress {
p.IsOwner = true
}
peers = append(peers, p)
Expand Down
100 changes: 38 additions & 62 deletions memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type MemberListPool struct {
}

type MemberListPoolConfig struct {
// (Required) This is the peer information that will be advertised to other members
Advertise PeerInfo

// (Required) This is the address:port the member list protocol listen for other members on
MemberListAddress string

Expand All @@ -60,9 +63,6 @@ type MemberListPoolConfig struct {

// (Optional) An interface through which logging will occur (Usually *logrus.Entry)
Logger logrus.FieldLogger

// (Optional) The datacenter this instance belongs too
DataCenter string
}

func NewMemberListPool(ctx context.Context, conf MemberListPoolConfig) (*MemberListPool, error) {
Expand All @@ -89,11 +89,6 @@ func NewMemberListPool(ctx context.Context, conf MemberListPoolConfig) (*MemberL
host = addrs[0]
}

_, advPort, err := splitAddress(conf.AdvertiseAddress)
if err != nil {
return nil, errors.Wrap(err, "AdvertiseAddress=`%s` is invalid;")
}

// Configure member list event handler
m.events = newMemberListEventHandler(m.log, conf)

Expand All @@ -118,33 +113,28 @@ func NewMemberListPool(ctx context.Context, conf MemberListPoolConfig) (*MemberL

// Prep metadata
gob.Register(memberListMetadata{})
metadata := memberListMetadata{
DataCenter: conf.DataCenter,
AdvertiseAddress: conf.AdvertiseAddress,
GubernatorPort: advPort,
}

// Join member list pool
err = m.joinPool(ctx, conf.KnownNodes, metadata)
err = m.joinPool(ctx, conf)
if err != nil {
return nil, errors.Wrap(err, "while attempting to join the member-list pool")
}

return m, nil
}

func (m *MemberListPool) joinPool(ctx context.Context, knownNodes []string, metadata memberListMetadata) error {
func (m *MemberListPool) joinPool(ctx context.Context, conf MemberListPoolConfig) error {
// Get local node and set metadata
node := m.memberList.LocalNode()
serializedMetadata, err := serializeMemberListMetadata(metadata)
b, err := json.Marshal(&conf.Advertise)
if err != nil {
return err
return errors.Wrap(err, "error marshalling PeerInfo as JSON")
}
node.Meta = serializedMetadata
node.Meta = b

err = retry.Until(ctx, retry.Interval(clock.Millisecond*300), func(ctx context.Context, i int) error {
// Join member list
_, err = m.memberList.Join(knownNodes)
_, err = m.memberList.Join(conf.KnownNodes)
if err != nil {
return errors.Wrap(err, "while joining member-list")
}
Expand Down Expand Up @@ -185,37 +175,28 @@ func newMemberListEventHandler(log logrus.FieldLogger, conf MemberListPoolConfig
func (e *memberListEventHandler) addPeer(node *ml.Node) {
ip := getIP(node.Address())

// Deserialize metadata
metadata, err := deserializeMemberListMetadata(node.Meta)
peer, err := unmarshallPeer(node.Meta, ip)
if err != nil {
e.log.WithError(err).Warnf("while adding to peers")
} else {
// Handle deprecated GubernatorPort
if metadata.AdvertiseAddress == "" {
metadata.AdvertiseAddress = makeAddress(ip, metadata.GubernatorPort)
}
e.peers[ip] = PeerInfo{GRPCAddress: metadata.AdvertiseAddress, DataCenter: metadata.DataCenter}
e.peers[ip] = peer
e.callOnUpdate()
}
}

func (e *memberListEventHandler) NotifyJoin(node *ml.Node) {
ip := getIP(node.Address())

// Deserialize metadata
metadata, err := deserializeMemberListMetadata(node.Meta)
peer, err := unmarshallPeer(node.Meta, ip)
if err != nil {
// This is called during memberlist initialization due to the fact that the local node
// This is called during member list initialization due to the fact that the local node
// has no metadata yet
e.log.WithError(err).Warn("while deserialize member-list metadata")
} else {
// Handle deprecated GubernatorPort
if metadata.AdvertiseAddress == "" {
metadata.AdvertiseAddress = makeAddress(ip, metadata.GubernatorPort)
}
e.peers[ip] = PeerInfo{GRPCAddress: metadata.AdvertiseAddress, DataCenter: metadata.DataCenter}
e.callOnUpdate()
e.log.WithError(err).Warn("while deserialize member-list peer")
return
}
peer.IsOwner = false
e.peers[ip] = peer
e.callOnUpdate()
}

func (e *memberListEventHandler) NotifyLeave(node *ml.Node) {
Expand All @@ -230,23 +211,20 @@ func (e *memberListEventHandler) NotifyLeave(node *ml.Node) {
func (e *memberListEventHandler) NotifyUpdate(node *ml.Node) {
ip := getIP(node.Address())

// Deserialize metadata
metadata, err := deserializeMemberListMetadata(node.Meta)
peer, err := unmarshallPeer(node.Meta, ip)
if err != nil {
e.log.WithError(err).Warn("while updating member-list")
} else {
// Construct Gubernator address and create PeerInfo
gubernatorAddress := makeAddress(ip, metadata.GubernatorPort)
e.peers[ip] = PeerInfo{GRPCAddress: gubernatorAddress, DataCenter: metadata.DataCenter}
e.callOnUpdate()
e.log.WithError(err).Warn("while unmarshalling peer info")
}
peer.IsOwner = false
e.peers[ip] = peer
e.callOnUpdate()
}

func (e *memberListEventHandler) callOnUpdate() {
var peers []PeerInfo

for _, p := range e.peers {
if p.GRPCAddress == e.conf.AdvertiseAddress {
if p.GRPCAddress == e.conf.Advertise.GRPCAddress {
p.IsOwner = true
}
peers = append(peers, p)
Expand All @@ -263,30 +241,28 @@ func makeAddress(ip string, port int) string {
return net.JoinHostPort(ip, strconv.Itoa(port))
}

// Deprecated
type memberListMetadata struct {
DataCenter string
AdvertiseAddress string
// Deprecated
GubernatorPort int
}

func serializeMemberListMetadata(metadata memberListMetadata) ([]byte, error) {
b, err := json.Marshal(&metadata)
if err != nil {
return nil, errors.Wrap(err, "error marshalling metadata as JSON")
}
return b, nil
GubernatorPort int
}

func deserializeMemberListMetadata(b []byte) (*memberListMetadata, error) {
var metadata memberListMetadata
if err := json.Unmarshal(b, &metadata); err != nil {
func unmarshallPeer(b []byte, ip string) (PeerInfo, error) {
var peer PeerInfo
if err := json.Unmarshal(b, &peer); err != nil {
var metadata memberListMetadata
decoder := gob.NewDecoder(bytes.NewBuffer(b))
if err := decoder.Decode(&metadata); err != nil {
return nil, errors.Wrap(err, "error decoding metadata")
if err := decoder.Decode(&peer); err != nil {
return peer, errors.Wrap(err, "error decoding peer")
}
// Handle deprecated GubernatorPort
if metadata.AdvertiseAddress == "" {
metadata.AdvertiseAddress = makeAddress(ip, metadata.GubernatorPort)
}
return PeerInfo{GRPCAddress: metadata.AdvertiseAddress, DataCenter: metadata.DataCenter}, nil
}
return &metadata, nil
return peer, nil
}

func newLogWriter(log logrus.FieldLogger) *io.PipeWriter {
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.0-rc.4
1.0.0-rc.5

0 comments on commit c77c8d0

Please sign in to comment.