diff --git a/config.go b/config.go index 9e392547..5ae132cd 100644 --- a/config.go +++ b/config.go @@ -107,11 +107,6 @@ type Config struct { // (Optional) Number of worker goroutines to launch for request processing in GubernatorPool. // Default is set to number of CPUs. PoolWorkers int - - // Number of nodes to add to hashring per worker. - // Higher number means more even distribution of hash ranges, but incurs - // potential performance burden in `getWorker()`. - PoolWorkerHashRingRedundancy int } func (c *Config) SetDefaults() error { @@ -148,8 +143,6 @@ func (c *Config) SetDefaults() error { c.PeerTLS = c.PeerTLS.Clone() } - setter.SetDefault(&c.PoolWorkerHashRingRedundancy, 4) - return nil } diff --git a/gubernator_pool.go b/gubernator_pool.go index f2dc820a..b4ef03db 100644 --- a/gubernator_pool.go +++ b/gubernator_pool.go @@ -23,12 +23,17 @@ package gubernator // Uses hash ring design pattern to distribute requests to an assigned worker. // No mutex locking necessary because each worker has its own data space and // processes requests sequentially. +// +// Request workflow: +// - A 64-bit hash is generated from an incoming request by its Key/Name values. +// - Workers are assigned equal size hash ranges. The worker is selected by choosing the worker index associated with that linear hash value range. +// - The worker has command channels for each method call. The request is enqueued to the appropriate channel. +// - The worker pulls the request from the appropriate channel and executes the business logic for that method. Then, it sends a response back using the requester's provided response channel. import ( "context" "fmt" "io" - "sort" "strconv" "sync" "sync/atomic" @@ -44,12 +49,9 @@ import ( type GubernatorPool struct { workers []*poolWorker workerCacheSize int - - // Workers in the hash ring. Must be sorted by hash value. - hashRing []poolHashRingNode - - conf *Config - done chan struct{} + hashRingStep uint64 + conf *Config + done chan struct{} } type poolWorker struct { @@ -63,12 +65,7 @@ type poolWorker struct { getCacheItemRequest chan poolGetCacheItemRequest } -// Reference to a poolWorker in the hash ring. -type poolHashRingNode struct { - hash uint64 - worker *poolWorker -} - +// Method request/response structs. type poolStoreRequest struct { ctx context.Context response chan poolStoreResponse @@ -113,14 +110,17 @@ func NewGubernatorPool(conf *Config, concurrency int, cacheSize int) *Gubernator setter.SetDefault(&cacheSize, 50_000) chp := &GubernatorPool{ + workers: make([]*poolWorker, concurrency), workerCacheSize: cacheSize / concurrency, + hashRingStep: ^uint64(0) / uint64(concurrency), conf: conf, done: make(chan struct{}), } + // Create workers. for i := 0; i < concurrency; i++ { - worker := chp.addWorker() - go chp.worker(worker) + chp.workers[i] = chp.newWorker() + go chp.worker(chp.workers[i]) } return chp @@ -131,8 +131,8 @@ func (chp *GubernatorPool) Close() error { return nil } -// Add a request channel to the worker pool. -func (chp *GubernatorPool) addWorker() *poolWorker { +// Create a new pool worker instance. +func (chp *GubernatorPool) newWorker() *poolWorker { const commandChannelSize = 10000 worker := &poolWorker{ @@ -145,46 +145,15 @@ func (chp *GubernatorPool) addWorker() *poolWorker { } workerNumber := atomic.AddInt64(&poolWorkerCounter, 1) - 1 worker.name = strconv.FormatInt(workerNumber, 10) - chp.workers = append(chp.workers, worker) - - // Create redundant poolWorker references in the hash ring to improve even - // distribution of workload. - for i := 0; i < chp.conf.PoolWorkerHashRingRedundancy; i++ { - // Generate an arbitrary hash based off the worker pointer. - // This hash value is the beginning range of a hash ring node. - key := fmt.Sprintf("%p-%x", worker, i) - node := poolHashRingNode{ - hash: xxhash.ChecksumString64S(key, 0), - worker: worker, - } - - chp.hashRing = append(chp.hashRing, node) - } - - // Sort hashRing array by hash value. - sort.Slice(chp.hashRing, func(a, b int) bool { - return chp.hashRing[a].hash < chp.hashRing[b].hash - }) - return worker } // Returns the request channel associated with the key. -// Hash the key, then lookup hash ring to find the channel. +// Hash the key, then lookup hash ring to find the worker. func (chp *GubernatorPool) getWorker(key string) *poolWorker { hash := xxhash.ChecksumString64S(key, 0) - - // Binary search for appropriate channel. - idx := sort.Search(len(chp.hashRing), func(i int) bool { - return chp.hashRing[i].hash >= hash - }) - - // Means we have cycled back to the first. - if idx >= len(chp.hashRing) { - idx = 0 - } - - return chp.hashRing[idx].worker + idx := hash / chp.hashRingStep + return chp.workers[idx] } // Pool worker for processing Gubernator requests. diff --git a/store_test.go b/store_test.go index d2ac6b9c..8f5b2bea 100644 --- a/store_test.go +++ b/store_test.go @@ -82,7 +82,6 @@ func TestLoader(t *testing.T) { GlobalTimeout: clock.Second, }, Loader: loader, - PoolWorkerHashRingRedundancy: 1, }) // loader.Load() should have been called for gubernator startup