Skip to content

Commit

Permalink
roshi-walker: use new tb.Bucket API
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbourgon committed Mar 29, 2014
1 parent 0b83bfc commit 1e7325f
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 54 deletions.
33 changes: 9 additions & 24 deletions roshi-walker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func main() {
go func() { log.Print(http.ListenAndServe(*httpAddress, nil)) }()

// Set up our rate limiter. Remember: it's per-key, not per-request.
throttle := newThrottle(*maxKeysPerSecond)
freq := time.Duration(1/(*maxKeysPerSecond)) * time.Second
bucket := tb.NewBucket(*maxKeysPerSecond, freq)

// Build the farm
readStrategy := farm.SendAllReadAll
Expand All @@ -102,7 +103,7 @@ func main() {
begin := time.Now()
for {
src := scan(clusters, *batchSize, *scanLogInterval) // new key set
walkOnce(dst, throttle, src, *maxSize, instr)
walkOnce(dst, bucket, src, *maxSize, instr)
if *once {
break
}
Expand Down Expand Up @@ -163,41 +164,21 @@ func scan(clusters []cluster.Cluster, batchSize int, logInterval time.Duration)

func walkOnce(
dst farm.Selecter,
throttle *throttle,
wait waiter,
src <-chan []string,
maxSize int,
instr instrumentation.WalkInstrumentation,
) {
for batch := range src {
log.Printf("walk: received batch of %d, requesting tokens", len(batch))
throttle.wait(int64(len(batch)))
wait.Wait(int64(len(batch)))
log.Printf("walk: received tokens, performing Select")
dst.Select(batch, 0, maxSize)
instr.WalkKeys(len(batch))
log.Printf("walk: performed Select, waiting for next batch")
}
}

type throttle struct {
bucket *tb.Bucket
waitInterval time.Duration
}

func newThrottle(maxPerSecond int64) *throttle {
return &throttle{
bucket: tb.NewBucket(maxPerSecond, -1),
waitInterval: (1 * time.Second) / time.Duration(maxPerSecond),
}
}

func (t *throttle) wait(n int64) {
got := t.bucket.Take(n)
for got < n {
time.Sleep(t.waitInterval)
got += t.bucket.Take(n - got)
}
}

func stripBlank(src []string) []string {
dst := []string{}
for _, s := range src {
Expand All @@ -208,3 +189,7 @@ func stripBlank(src []string) []string {
}
return dst
}

type waiter interface {
Wait(int64) time.Duration
}
30 changes: 0 additions & 30 deletions roshi-walker/main_test.go

This file was deleted.

0 comments on commit 1e7325f

Please sign in to comment.