Skip to content

Commit

Permalink
Merge pull request #58 from getlantern/metrics
Browse files Browse the repository at this point in the history
Added facility for tracking and exposing basic database-level stats
  • Loading branch information
oxtoacart authored Jun 7, 2018
2 parents 57ba9d8 + 6815143 commit ca2f6a8
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions cluster_follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/getlantern/wal"
"github.com/getlantern/zenodb/common"
"github.com/getlantern/zenodb/encoding"
"github.com/getlantern/zenodb/metrics"
"github.com/spaolacci/murmur3"
"hash"
"runtime"
Expand Down Expand Up @@ -36,9 +37,10 @@ type followSpec struct {

type follower struct {
common.Follow
cb func(data []byte, offset wal.Offset) error
entries chan *walEntry
hasFailed int32
followerId int
cb func(data []byte, offset wal.Offset) error
entries chan *walEntry
hasFailed int32
}

func (f *follower) read() {
Expand Down Expand Up @@ -69,6 +71,7 @@ func (f *follower) submit(entry *walEntry) {

func (f *follower) markFailed() {
atomic.StoreInt32(&f.hasFailed, 1)
metrics.FollowerFailed(f.followerId)
}

func (f *follower) failed() bool {
Expand Down Expand Up @@ -109,6 +112,8 @@ func (db *DB) processFollowers() {
newlyJoinedStreams := make(map[string]bool)
onFollowerJoined := func(f *follower) {
nextFollowerID++
f.followerId = nextFollowerID
metrics.FollowerJoined(nextFollowerID, f.PartitionNumber)
log.Debugf("Follower joined: %d -> %d", nextFollowerID, f.PartitionNumber)
followers[nextFollowerID] = f

Expand Down Expand Up @@ -296,7 +301,9 @@ func (db *DB) processFollowers() {
stats = make([]int, db.opts.NumPartitions)

for _, f := range followers {
log.Debugf("Queued for follower %d: %v", f.PartitionNumber, humanize.Comma(int64(len(f.entries))))
queued := int64(len(f.entries))
metrics.QueuedForFollower(f.followerId, int(queued))
log.Debugf("Queued for follower %d: %v", f.PartitionNumber, humanize.Comma(queued))
}
}
}
Expand Down
138 changes: 138 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package metrics

import (
"sort"
"sync"
"time"

"github.com/getlantern/wal"
)

var (
leaderStats *LeaderStats
followerStats map[int]*FollowerStats
partitionStats map[int]*PartitionStats

mx sync.RWMutex
)

func init() {
reset()
}

func reset() {
leaderStats = &LeaderStats{}
followerStats = make(map[int]*FollowerStats, 0)
partitionStats = make(map[int]*PartitionStats, 0)
}

// Stats are the overall stats
type Stats struct {
Leader *LeaderStats
Followers sortedFollowerStats
Partitions sortedPartitionStats
}

// LeaderStats provides stats for the cluster leader
type LeaderStats struct {
ConnectedFollowers int
CurrentlyReadingWAL time.Time
}

// FollowerStats provides stats for a single follower
type FollowerStats struct {
followerId int
Partition int
Queued int
}

// PartitionStats provides stats for a single partition
type PartitionStats struct {
Partition int
NumFollowers int
}

type sortedFollowerStats []*FollowerStats

func (s sortedFollowerStats) Len() int { return len(s) }
func (s sortedFollowerStats) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortedFollowerStats) Less(i, j int) bool {
if s[i].Partition < s[j].Partition {
return true
}
return s[i].followerId < s[j].followerId
}

type sortedPartitionStats []*PartitionStats

func (s sortedPartitionStats) Len() int { return len(s) }
func (s sortedPartitionStats) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortedPartitionStats) Less(i, j int) bool {
return s[i].Partition < s[j].Partition
}

func CurrentlyReadingWAL(offset wal.Offset) {
ts := offset.TS()
mx.Lock()
leaderStats.CurrentlyReadingWAL = ts
mx.Unlock()
}

// FollowerJoined records the fact that a follower joined the leader
func FollowerJoined(followerID int, partition int) {
mx.Lock()
defer mx.Unlock()
leaderStats.ConnectedFollowers++
followerStats[followerID] = &FollowerStats{
followerId: followerID,
Partition: partition,
Queued: 0,
}
ps := partitionStats[partition]
if ps == nil {
ps = &PartitionStats{Partition: partition}
partitionStats[partition] = ps
}
ps.NumFollowers++
}

// FollowerFailed records the fact that a follower failed (which is analogous to leaving)
func FollowerFailed(followerID int) {
mx.Lock()
defer mx.Unlock()
// Only delete once
fs, found := followerStats[followerID]
if found {
leaderStats.ConnectedFollowers--
delete(followerStats, followerID)
partitionStats[fs.Partition].NumFollowers--
}
}

// QueuedForFollower records how many measurements are queued for a given Follower
func QueuedForFollower(followerID int, queued int) {
mx.Lock()
defer mx.Unlock()
followerStats[followerID].Queued = queued
}

func GetStats() *Stats {
mx.RLock()
s := &Stats{
Leader: leaderStats,
Followers: make(sortedFollowerStats, 0, len(followerStats)),
Partitions: make(sortedPartitionStats, 0, len(partitionStats)),
}

for _, fs := range followerStats {
s.Followers = append(s.Followers, fs)
}
for _, ps := range partitionStats {
s.Partitions = append(s.Partitions, ps)
}
mx.RUnlock()

sort.Sort(s.Followers)
sort.Sort(s.Partitions)
return s
}
62 changes: 62 additions & 0 deletions metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package metrics

import (
"testing"
"time"

"github.com/getlantern/wal"

"github.com/stretchr/testify/assert"
)

func TestMetrics(t *testing.T) {
reset()

ts := time.Now()
FollowerJoined(1, 1)
FollowerJoined(2, 1)
FollowerJoined(3, 2)
FollowerJoined(4, 2)
CurrentlyReadingWAL(wal.NewOffsetForTS(ts))
QueuedForFollower(1, 11)
QueuedForFollower(2, 22)
QueuedForFollower(3, 33)
QueuedForFollower(4, 44)

s := GetStats()
assert.Equal(t, 4, s.Leader.ConnectedFollowers)
assert.Equal(t, ts.Format(time.RFC3339), s.Leader.CurrentlyReadingWAL.Format(time.RFC3339))

assert.Equal(t, 1, s.Followers[0].Partition)
assert.Equal(t, 11, s.Followers[0].Queued)
assert.Equal(t, 1, s.Followers[1].Partition)
assert.Equal(t, 22, s.Followers[1].Queued)
assert.Equal(t, 2, s.Followers[2].Partition)
assert.Equal(t, 33, s.Followers[2].Queued)
assert.Equal(t, 2, s.Followers[3].Partition)
assert.Equal(t, 44, s.Followers[3].Queued)

assert.Equal(t, 1, s.Partitions[0].Partition)
assert.Equal(t, 2, s.Partitions[0].NumFollowers)
assert.Equal(t, 2, s.Partitions[1].Partition)
assert.Equal(t, 2, s.Partitions[1].NumFollowers)

// Fail a couple of followers. Fail each twice to make sure we don't double-
// subtract.
FollowerFailed(2)
FollowerFailed(2)
FollowerFailed(3)
FollowerFailed(3)

s = GetStats()
assert.Equal(t, 2, s.Leader.ConnectedFollowers)
assert.Equal(t, 1, s.Followers[0].Partition)
assert.Equal(t, 11, s.Followers[0].Queued)
assert.Equal(t, 2, s.Followers[1].Partition)
assert.Equal(t, 44, s.Followers[1].Queued)

assert.Equal(t, 1, s.Partitions[0].Partition)
assert.Equal(t, 1, s.Partitions[0].NumFollowers)
assert.Equal(t, 2, s.Partitions[1].Partition)
assert.Equal(t, 1, s.Partitions[1].NumFollowers)
}
1 change: 1 addition & 0 deletions web/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func Configure(db *zenodb.DB, router *mux.Router, opts *Opts) error {
router.PathPrefix("/cached/{permalink}").HandlerFunc(h.cachedQuery)
router.PathPrefix("/favicon").Handler(http.NotFoundHandler())
router.PathPrefix("/report/{permalink}").HandlerFunc(h.index)
router.PathPrefix("/metrics").HandlerFunc(h.metrics)
router.PathPrefix("/").HandlerFunc(h.index)

return nil
Expand Down
17 changes: 17 additions & 0 deletions web/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package web

import (
"encoding/json"
"net/http"

"github.com/getlantern/zenodb/metrics"
)

func (h *handler) metrics(resp http.ResponseWriter, req *http.Request) {
if !h.authenticate(resp, req) {
resp.WriteHeader(http.StatusForbidden)
return
}

json.NewEncoder(resp).Encode(metrics.GetStats())
}

0 comments on commit ca2f6a8

Please sign in to comment.