Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
PIP-1480: Refactor GubernatorPool hash ring implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Jan 20, 2022
1 parent 86d4a0a commit fd9d144
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 59 deletions.
7 changes: 0 additions & 7 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@ type Config struct {
// (Optional) Number of worker goroutines to launch for request processing in GubernatorPool.
// Default is set to number of CPUs.
PoolWorkers int

// Number of nodes to add to hashring per worker.
// Higher number means more even distribution of hash ranges, but incurs
// potential performance burden in `getWorker()`.
PoolWorkerHashRingRedundancy int
}

func (c *Config) SetDefaults() error {
Expand Down Expand Up @@ -148,8 +143,6 @@ func (c *Config) SetDefaults() error {
c.PeerTLS = c.PeerTLS.Clone()
}

setter.SetDefault(&c.PoolWorkerHashRingRedundancy, 4)

return nil
}

Expand Down
71 changes: 20 additions & 51 deletions gubernator_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ package gubernator
// Uses hash ring design pattern to distribute requests to an assigned worker.
// No mutex locking necessary because each worker has its own data space and
// processes requests sequentially.
//
// Request workflow:
// - A 64-bit hash is generated from an incoming request by its Key/Name values.
// - Workers are assigned equal size hash ranges. The worker is selected by choosing the worker index associated with that linear hash value range.
// - The worker has command channels for each method call. The request is enqueued to the appropriate channel.
// - The worker pulls the request from the appropriate channel and executes the business logic for that method. Then, it sends a response back using the requester's provided response channel.

import (
"context"
"fmt"
"io"
"sort"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -44,12 +49,9 @@ import (
type GubernatorPool struct {
workers []*poolWorker
workerCacheSize int

// Workers in the hash ring. Must be sorted by hash value.
hashRing []poolHashRingNode

conf *Config
done chan struct{}
hashRingStep uint64
conf *Config
done chan struct{}
}

type poolWorker struct {
Expand All @@ -63,12 +65,7 @@ type poolWorker struct {
getCacheItemRequest chan poolGetCacheItemRequest
}

// Reference to a poolWorker in the hash ring.
type poolHashRingNode struct {
hash uint64
worker *poolWorker
}

// Method request/response structs.
type poolStoreRequest struct {
ctx context.Context
response chan poolStoreResponse
Expand Down Expand Up @@ -113,14 +110,17 @@ func NewGubernatorPool(conf *Config, concurrency int, cacheSize int) *Gubernator
setter.SetDefault(&cacheSize, 50_000)

chp := &GubernatorPool{
workers: make([]*poolWorker, concurrency),
workerCacheSize: cacheSize / concurrency,
hashRingStep: ^uint64(0) / uint64(concurrency),
conf: conf,
done: make(chan struct{}),
}

// Create workers.
for i := 0; i < concurrency; i++ {
worker := chp.addWorker()
go chp.worker(worker)
chp.workers[i] = chp.newWorker()
go chp.worker(chp.workers[i])
}

return chp
Expand All @@ -131,8 +131,8 @@ func (chp *GubernatorPool) Close() error {
return nil
}

// Add a request channel to the worker pool.
func (chp *GubernatorPool) addWorker() *poolWorker {
// Create a new pool worker instance.
func (chp *GubernatorPool) newWorker() *poolWorker {
const commandChannelSize = 10000

worker := &poolWorker{
Expand All @@ -145,46 +145,15 @@ func (chp *GubernatorPool) addWorker() *poolWorker {
}
workerNumber := atomic.AddInt64(&poolWorkerCounter, 1) - 1
worker.name = strconv.FormatInt(workerNumber, 10)
chp.workers = append(chp.workers, worker)

// Create redundant poolWorker references in the hash ring to improve even
// distribution of workload.
for i := 0; i < chp.conf.PoolWorkerHashRingRedundancy; i++ {
// Generate an arbitrary hash based off the worker pointer.
// This hash value is the beginning range of a hash ring node.
key := fmt.Sprintf("%p-%x", worker, i)
node := poolHashRingNode{
hash: xxhash.ChecksumString64S(key, 0),
worker: worker,
}

chp.hashRing = append(chp.hashRing, node)
}

// Sort hashRing array by hash value.
sort.Slice(chp.hashRing, func(a, b int) bool {
return chp.hashRing[a].hash < chp.hashRing[b].hash
})

return worker
}

// Returns the request channel associated with the key.
// Hash the key, then lookup hash ring to find the channel.
// Hash the key, then lookup hash ring to find the worker.
func (chp *GubernatorPool) getWorker(key string) *poolWorker {
hash := xxhash.ChecksumString64S(key, 0)

// Binary search for appropriate channel.
idx := sort.Search(len(chp.hashRing), func(i int) bool {
return chp.hashRing[i].hash >= hash
})

// Means we have cycled back to the first.
if idx >= len(chp.hashRing) {
idx = 0
}

return chp.hashRing[idx].worker
idx := hash / chp.hashRingStep
return chp.workers[idx]
}

// Pool worker for processing Gubernator requests.
Expand Down
1 change: 0 additions & 1 deletion store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func TestLoader(t *testing.T) {
GlobalTimeout: clock.Second,
},
Loader: loader,
PoolWorkerHashRingRedundancy: 1,
})

// loader.Load() should have been called for gubernator startup
Expand Down

0 comments on commit fd9d144

Please sign in to comment.