Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill Sedin committed Aug 17, 2024
0 parents commit de80931
Show file tree
Hide file tree
Showing 81 changed files with 23,224 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.git
.env
.vscode
.idea
.DS_Store
34 changes: 34 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package goconsist

const (
defaultSectionFactor = uint32(1)
defaultSectionCount = uint32(10)
)

// Config is configuration to Ring structure.
type Config struct {
// SectionFactor is a range of numbers included to single ring section.
//
// Example:
// Given a ring of 3 ranges:
// 0 - 2, 3 - 5, 6 - 0.
// In this case, shard factor equals to 2.
//
// Be default: defaultSectionFactor.
SectionFactor uint32
// SectionCount is a number of ranges located in the ring.
//
// Example:
// 0 - 1, 2 - 3, 4 - 5, 6 - 0.
// In this case ranges count equals to 4.
//
// By default: defaultSectionCount.
SectionCount uint32
}

func defaultConfig() Config {
return Config{
SectionFactor: defaultSectionFactor,
SectionCount: defaultSectionCount,
}
}
28 changes: 28 additions & 0 deletions example/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"log/slog"
"net/netip"

"github.com/google/uuid"

"github.com/Melenium2/goconsist"
)

var servers = []netip.AddrPort{
netip.AddrPortFrom(netip.AddrFrom4([4]byte{10, 1, 1, 1}), 10),
netip.AddrPortFrom(netip.AddrFrom4([4]byte{10, 1, 1, 1}), 20),
netip.AddrPortFrom(netip.AddrFrom4([4]byte{10, 1, 1, 1}), 30),
}

func main() {
ring := goconsist.NewRing(goconsist.Config{}, servers...)

for i := 0; i < 100; i++ {
id, _ := uuid.New().MarshalBinary()

server := ring.Acquire(id)

slog.Info("server acquired", "server", server)
}
}
16 changes: 16 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module github.com/Melenium2/goconsist

go 1.23

require github.com/google/uuid v1.6.0

require (
github.com/stretchr/testify v1.9.0
github.com/twmb/murmur3 v1.1.8
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg=
github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
161 changes: 161 additions & 0 deletions ring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package goconsist

import (
"net/netip"

"github.com/twmb/murmur3"
)

// Ring is not concurrency safety implementation of consistency hash ring with
// virtual shards.
type Ring struct {
// A module value to prevent going beyond the ring.
//
// This value is calculated as the minimum value of the last section plus
// the section factor from configuration.
//
// Example:
// last section - start: 48, end 0, server: 2
// sectionFactor - 15
// hashMod - 48 + 15 = 63
hashMod uint32

// Ring sections.
//
// Example:
//
// start: 0, end 15, server: 1,
// start: 16, end 31, server: 2,
// start: 32, end 47, server: 1,
// start: 48, end 0, server: 2
sections []section
// Servers that are distributed in a ring.
servers []netip.AddrPort
}

func NewRing(config Config, servers ...netip.AddrPort) *Ring {
defaultCfg := defaultConfig()

if config.SectionFactor > 0 {
defaultCfg.SectionFactor = config.SectionFactor
}

if config.SectionCount > 0 {
defaultCfg.SectionCount = config.SectionCount
}

distrubution := distribute(defaultCfg.SectionFactor, defaultCfg.SectionCount)
lastShard := distrubution[len(distrubution)-1]
hashMod := lastShard.min + defaultCfg.SectionFactor

s := &Ring{
servers: make([]netip.AddrPort, 0),
sections: distrubution,
hashMod: hashMod,
}

s.AddServers(servers...)

return s
}

// AddServers adds new servers and redistribute available
// servers across the ring.
//
// Example:
//
// start: 0, end 15, server: 1,
// start: 16, end 31, server: 1,
// start: 32, end 47, server: 1,
// start: 48, end 0, server: 1
//
// Add server2.
//
// start: 0, end 15, server: 1,
// start: 16, end 31, server: 2,
// start: 32, end 47, server: 1,
// start: 48, end 0, server: 2
func (s *Ring) AddServers(servers ...netip.AddrPort) {
s.servers = append(s.servers, servers...)

s.DistributeServers()
}

// DistributeServers starts server balancing across the ring.
// If no servers were previously distributed, then they
// will be distributed.
func (s *Ring) DistributeServers() {
if len(s.servers) == 0 {
return
}

for i, curr := range s.sections {
servIndex := i % len(s.servers)

curr.serverAddr = s.servers[servIndex]

s.sections[i] = curr
}
}

// Acquire searches for server which falls within the range
// calculated by hash function. The function uses murmur3 hash algorithm
// for calculating hash.
//
// Example:
//
// sections:
// start: 0, end 15, server: 1,
// start: 16, end 31, server: 2,
// start: 32, end 47, server: 1,
// start: 48, end 0, server: 2
//
// key:
// []byte("4")
//
// hashMod = 63 (48 + 15)
// hash(50) = murmur3([]byte("4")) % hashMod
func (s *Ring) Acquire(key []byte) netip.AddrPort {
hash := murmur3.Sum32(key) % s.hashMod

result, ok := search(s.sections, hash)
if !ok {
return netip.AddrPort{}
}

return result.serverAddr
}

// RemoveServer removes the server with the provided IP address.
// Redistribute servers after removal.
//
// Example:
//
// start: 0, end 15, server: 1,
// start: 16, end 31, server: 2,
// start: 32, end 47, server: 1,
// start: 48, end 0, server: 2
//
// Remove server2.
//
// start: 0, end 15, server: 1,
// start: 16, end 31, server: 1,
// start: 32, end 47, server: 1,
// start: 48, end 0, server: 1
func (s *Ring) RemoveServer(server netip.AddrPort) {
var index int

for i := 0; i < len(s.servers); i++ {
if s.servers[i] != server {
continue
}

index = i

break
}

s.servers = append(s.servers[:index], s.servers[index+1:]...)

s.DistributeServers()
}
Loading

0 comments on commit de80931

Please sign in to comment.