Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Merge pull request #56 from joshbohde/consistent-hashing
Browse files Browse the repository at this point in the history
Make ConsistantHash have a better distribution
  • Loading branch information
thrawn01 authored May 14, 2020
2 parents 171909a + 3afc9f2 commit cd5588d
Show file tree
Hide file tree
Showing 12 changed files with 479 additions and 15 deletions.
5 changes: 1 addition & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ func FromUnixMilliseconds(ts int64) time.Time {

// Given a list of peers, return a random peer
func RandomPeer(peers []string) string {
rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
return peers[0]
return peers[rand.Intn(len(peers))]
}

// Return a random alpha string of 'n' length
Expand Down
10 changes: 10 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ func InstanceAt(idx int) *instance {
return instances[idx]
}

// Return the specific instance for a host
func InstanceForHost(host string) *instance {
for i := range instances {
if instances[i].Address == host {
return instances[i]
}
}
return nil
}

// Start a local cluster of gubernator servers
func Start(numInstances int) error {
addresses := make([]string, numInstances, numInstances)
Expand Down
56 changes: 56 additions & 0 deletions cmd/gubernator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/mailgun/gubernator"
"github.com/mailgun/holster/v3/setter"
"github.com/pkg/errors"
"github.com/segmentio/fasthash/fnv1"
"github.com/segmentio/fasthash/fnv1a"
"github.com/sirupsen/logrus"
"k8s.io/klog"
)
Expand All @@ -53,6 +55,9 @@ type ServerConfig struct {

// K8s configuration used to find peers inside a K8s cluster
K8PoolConf gubernator.K8sPoolConfig

// The PeerPicker as selected by `GUBER_PEER_PICKER`
Picker gubernator.PeerPicker
}

func confFromEnv() (ServerConfig, error) {
Expand Down Expand Up @@ -114,6 +119,41 @@ func confFromEnv() (ServerConfig, error) {
conf.K8PoolConf.PodPort = os.Getenv("GUBER_K8S_POD_PORT")
conf.K8PoolConf.Selector = os.Getenv("GUBER_K8S_ENDPOINTS_SELECTOR")

// PeerPicker Config
if pp := os.Getenv("GUBER_PEER_PICKER"); pp != "" {
var replicas int
var hash string

switch pp {
case "consistent-hash":
setter.SetDefault(&hash, os.Getenv("GUBER_PEER_PICKER_HASH"), "crc32")
hashFuncs := map[string]gubernator.HashFunc{
"fnv1a": fnv1a.HashBytes32,
"fnv1": fnv1.HashBytes32,
"crc32": nil,
}
if fn, ok := hashFuncs[hash]; ok {
conf.Picker = gubernator.NewConsistantHash(fn)
}
return conf, errors.Errorf("'GUBER_PEER_PICKER_HASH=%s' is invalid; choices are [%s]",
hash, validHashKeys(hashFuncs))

case "replicated-hash":
setter.SetDefault(&replicas, getEnvInteger("GUBER_REPLICATED_HASH_REPLICAS"), 1)
conf.Picker = gubernator.NewReplicatedConsistantHash(nil, replicas)
setter.SetDefault(&hash, os.Getenv("GUBER_PEER_PICKER_HASH"), "fnv1a")
hashFuncs := map[string]gubernator.HashFunc64{
"fnv1a": fnv1a.HashBytes64,
"fnv1": fnv1.HashBytes64,
}
if fn, ok := hashFuncs[hash]; ok {
conf.Picker = gubernator.NewReplicatedConsistantHash(fn, replicas)
}
return conf, errors.Errorf("'GUBER_PEER_PICKER_HASH=%s' is invalid; choices are [%s]",
hash, validHash64Keys(hashFuncs))
}
}

if anyHasPrefix("GUBER_K8S_", os.Environ()) {
logrus.Debug("K8s peer pool config found")
conf.K8PoolConf.Enabled = true
Expand Down Expand Up @@ -264,3 +304,19 @@ func fromEnvFile(configFile string) error {
}
return nil
}

func validHashKeys(m map[string]gubernator.HashFunc) string {
var rs []string
for k, _ := range m {
rs = append(rs, k)
}
return strings.Join(rs, ",")
}

func validHash64Keys(m map[string]gubernator.HashFunc64) string {
var rs []string
for k, _ := range m {
rs = append(rs, k)
}
return strings.Join(rs, ",")
}
1 change: 1 addition & 0 deletions cmd/gubernator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func main() {

// Registers a new gubernator instance with the GRPC server
guber, err := gubernator.New(gubernator.Config{
Picker: conf.Picker,
GRPCServer: grpcSrv,
Cache: cache,
})
Expand Down
16 changes: 16 additions & 0 deletions example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,21 @@ GUBER_ETCD_ADVERTISE_ADDRESS=localhost:81
# Skip CERT verification
#GUBER_ETCD_TLS_SKIP_VERIFY=true

############################
# Picker Config
############################
# Choose which picker algorithm to use
# GUBER_PEER_PICKER=consistent-hash

# Choose the hash algorithm for `consistent-hash` (crc32, fnv1a, fnv1)
# GUBER_PEER_PICKER_HASH = crc32

# Choose which picker algorithm to use
# GUBER_PEER_PICKER=replicated-hash

# Choose the hash algorithm for `replicated-hash` (fnv1a, fnv1)
# GUBER_PEER_PICKER_HASH = fnv1a

# Choose the number of replications
# GUBER_REPLICATED_HASH_REPLICAS = 1

23 changes: 16 additions & 7 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func TestTokenBucket(t *testing.T) {

rl := resp.Responses[0]

assert.Empty(t, rl.Error)
assert.Equal(t, test.Status, rl.Status)
assert.Equal(t, test.Remaining, rl.Remaining)
assert.Equal(t, int64(2), rl.Limit)
Expand Down Expand Up @@ -270,14 +271,15 @@ func TestMissingFields(t *testing.T) {
}

func TestGlobalRateLimits(t *testing.T) {
peer := cluster.PeerAt(0)
const clientInstance = 1
peer := cluster.PeerAt(clientInstance)
client, errs := guber.DialV1Server(peer)
require.Nil(t, errs)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

sendHit := func(status guber.Status, remain int64, i int) {
sendHit := func(status guber.Status, remain int64, i int) string {
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Expand All @@ -296,9 +298,15 @@ func TestGlobalRateLimits(t *testing.T) {
assert.Equal(t, status, resp.Responses[0].Status, i)
assert.Equal(t, remain, resp.Responses[0].Remaining, i)
assert.Equal(t, int64(5), resp.Responses[0].Limit, i)

// ensure that we have a canonical host
assert.NotEmpty(t, resp.Responses[0].Metadata["owner"])

// name/key should ensure our connected peer is NOT the owner,
// the peer we are connected to should forward requests asynchronously to the owner.
assert.NotEqual(t, peer, resp.Responses[0].Metadata["owner"])

return resp.Responses[0].Metadata["owner"]
}

// Our first hit should create the request on the peer and queue for async forward
Expand All @@ -311,10 +319,13 @@ func TestGlobalRateLimits(t *testing.T) {

// After sleeping this response should be from the updated async call from our owner. Notice the
// remaining is still 3 as the hit is queued for update to the owner
sendHit(guber.Status_UNDER_LIMIT, 3, 3)
canonicalHost := sendHit(guber.Status_UNDER_LIMIT, 3, 3)

canonicalInstance := cluster.InstanceForHost(canonicalHost)

// Inspect our metrics, ensure they collected the counts we expected during this test
instance := cluster.InstanceAt(0)
instance := cluster.InstanceForHost(peer)

metricCh := make(chan prometheus.Metric, 5)
instance.Guber.Collect(metricCh)

Expand All @@ -323,10 +334,8 @@ func TestGlobalRateLimits(t *testing.T) {
assert.Nil(t, m.Write(&buf))
assert.Equal(t, uint64(1), *buf.Histogram.SampleCount)

// Instance 3 should be the owner of our global rate limit
instance = cluster.InstanceAt(3)
metricCh = make(chan prometheus.Metric, 5)
instance.Guber.Collect(metricCh)
canonicalInstance.Guber.Collect(metricCh)

m = <-metricCh // Async metric
m = <-metricCh // Broadcast metric
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/segmentio/fasthash v1.0.2
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.4.0
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE=
github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/segmentio/fasthash v1.0.2 h1:86fGDl2hB+iSHYlccB/FP9qRGvLNuH/fhEEFn6gnQUs=
github.com/segmentio/fasthash v1.0.2/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down
22 changes: 18 additions & 4 deletions hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package gubernator

import (
"github.com/pkg/errors"
"hash/crc32"
"reflect"
"sort"
"unsafe"

"github.com/pkg/errors"
)

type HashFunc func(data []byte) uint32
Expand Down Expand Up @@ -60,7 +63,7 @@ func (ch *ConsistantHash) Peers() []*PeerClient {

// Adds a peer to the hash
func (ch *ConsistantHash) Add(peer *PeerClient) {
hash := int(ch.hashFunc([]byte(peer.host)))
hash := int(ch.hashFunc(bytes(peer.host)))
ch.peerKeys = append(ch.peerKeys, hash)
ch.peerMap[hash] = peer
sort.Ints(ch.peerKeys)
Expand All @@ -73,7 +76,7 @@ func (ch *ConsistantHash) Size() int {

// Returns the peer by hostname
func (ch *ConsistantHash) GetPeerByHost(host string) *PeerClient {
return ch.peerMap[int(ch.hashFunc([]byte(host)))]
return ch.peerMap[int(ch.hashFunc(bytes(host)))]
}

// Given a key, return the peer that key is assigned too
Expand All @@ -82,7 +85,7 @@ func (ch *ConsistantHash) Get(key string) (*PeerClient, error) {
return nil, errors.New("unable to pick a peer; pool is empty")
}

hash := int(ch.hashFunc([]byte(key)))
hash := int(ch.hashFunc(bytes(key)))

// Binary search for appropriate peer
idx := sort.Search(len(ch.peerKeys), func(i int) bool { return ch.peerKeys[i] >= hash })
Expand All @@ -94,3 +97,14 @@ func (ch *ConsistantHash) Get(key string) (*PeerClient, error) {

return ch.peerMap[ch.peerKeys[idx]], nil
}

// unsafely return the underlying bytes of a string
// the caller cannot alter the returned byte slice
func bytes(str string) []byte {
hdr := *(*reflect.StringHeader)(unsafe.Pointer(&str))
return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{
Data: hdr.Data,
Len: hdr.Len,
Cap: hdr.Len,
}))
}
Loading

0 comments on commit cd5588d

Please sign in to comment.