Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added facility for tracking and exposing basic database-level stats #58

Merged
merged 2 commits into from
Jun 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions cluster_follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/getlantern/wal"
"github.com/getlantern/zenodb/common"
"github.com/getlantern/zenodb/encoding"
"github.com/getlantern/zenodb/metrics"
"github.com/spaolacci/murmur3"
"hash"
"runtime"
Expand Down Expand Up @@ -36,9 +37,10 @@ type followSpec struct {

type follower struct {
common.Follow
cb func(data []byte, offset wal.Offset) error
entries chan *walEntry
hasFailed int32
followerId int
cb func(data []byte, offset wal.Offset) error
entries chan *walEntry
hasFailed int32
}

func (f *follower) read() {
Expand Down Expand Up @@ -69,6 +71,7 @@ func (f *follower) submit(entry *walEntry) {

func (f *follower) markFailed() {
atomic.StoreInt32(&f.hasFailed, 1)
metrics.FollowerFailed(f.followerId)
}

func (f *follower) failed() bool {
Expand Down Expand Up @@ -109,6 +112,8 @@ func (db *DB) processFollowers() {
newlyJoinedStreams := make(map[string]bool)
onFollowerJoined := func(f *follower) {
nextFollowerID++
f.followerId = nextFollowerID
metrics.FollowerJoined(nextFollowerID, f.PartitionNumber)
log.Debugf("Follower joined: %d -> %d", nextFollowerID, f.PartitionNumber)
followers[nextFollowerID] = f

Expand Down Expand Up @@ -296,7 +301,9 @@ func (db *DB) processFollowers() {
stats = make([]int, db.opts.NumPartitions)

for _, f := range followers {
log.Debugf("Queued for follower %d: %v", f.PartitionNumber, humanize.Comma(int64(len(f.entries))))
queued := int64(len(f.entries))
metrics.QueuedForFollower(f.followerId, int(queued))
log.Debugf("Queued for follower %d: %v", f.PartitionNumber, humanize.Comma(queued))
}
}
}
Expand Down
138 changes: 138 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package metrics

import (
"sort"
"sync"
"time"

"github.com/getlantern/wal"
)

var (
leaderStats *LeaderStats
followerStats map[int]*FollowerStats
partitionStats map[int]*PartitionStats

mx sync.RWMutex
)

func init() {
reset()
}

func reset() {
leaderStats = &LeaderStats{}
followerStats = make(map[int]*FollowerStats, 0)
partitionStats = make(map[int]*PartitionStats, 0)
}

// Stats are the overall stats
type Stats struct {
Leader *LeaderStats
Followers sortedFollowerStats
Partitions sortedPartitionStats
}

// LeaderStats provides stats for the cluster leader
type LeaderStats struct {
ConnectedFollowers int
CurrentlyReadingWAL time.Time
}

// FollowerStats provides stats for a single follower
type FollowerStats struct {
followerId int
Partition int
Queued int
}

// PartitionStats provides stats for a single partition
type PartitionStats struct {
Partition int
NumFollowers int
}

type sortedFollowerStats []*FollowerStats

func (s sortedFollowerStats) Len() int { return len(s) }
func (s sortedFollowerStats) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortedFollowerStats) Less(i, j int) bool {
if s[i].Partition < s[j].Partition {
return true
}
return s[i].followerId < s[j].followerId
}

type sortedPartitionStats []*PartitionStats

func (s sortedPartitionStats) Len() int { return len(s) }
func (s sortedPartitionStats) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortedPartitionStats) Less(i, j int) bool {
return s[i].Partition < s[j].Partition
}

func CurrentlyReadingWAL(offset wal.Offset) {
ts := offset.TS()
mx.Lock()
leaderStats.CurrentlyReadingWAL = ts
mx.Unlock()
}

// FollowerJoined records the fact that a follower joined the leader
func FollowerJoined(followerID int, partition int) {
mx.Lock()
defer mx.Unlock()
leaderStats.ConnectedFollowers++
followerStats[followerID] = &FollowerStats{
followerId: followerID,
Partition: partition,
Queued: 0,
}
ps := partitionStats[partition]
if ps == nil {
ps = &PartitionStats{Partition: partition}
partitionStats[partition] = ps
}
ps.NumFollowers++
}

// FollowerFailed records the fact that a follower failed (which is analogous to leaving)
func FollowerFailed(followerID int) {
mx.Lock()
defer mx.Unlock()
// Only delete once
fs, found := followerStats[followerID]
if found {
leaderStats.ConnectedFollowers--
delete(followerStats, followerID)
partitionStats[fs.Partition].NumFollowers--
}
}

// QueuedForFollower records how many measurements are queued for a given Follower
func QueuedForFollower(followerID int, queued int) {
mx.Lock()
defer mx.Unlock()
followerStats[followerID].Queued = queued
Copy link

@forkner forkner Jun 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks okay by eyeball -- but could followerStats[followerID] be nil here if it was in a race for the lock with FollowerFailed and FollowerFailed wins?

It looks like markFailed can only be triggered from a read that starts strictly after processing that follower, but it's a bit subtle, so just wanted to check if that was the expectation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is the expectation. Great question!

}

func GetStats() *Stats {
mx.RLock()
s := &Stats{
Leader: leaderStats,
Followers: make(sortedFollowerStats, 0, len(followerStats)),
Partitions: make(sortedPartitionStats, 0, len(partitionStats)),
}

for _, fs := range followerStats {
s.Followers = append(s.Followers, fs)
}
for _, ps := range partitionStats {
s.Partitions = append(s.Partitions, ps)
}
mx.RUnlock()

sort.Sort(s.Followers)
sort.Sort(s.Partitions)
return s
}
62 changes: 62 additions & 0 deletions metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package metrics

import (
"testing"
"time"

"github.com/getlantern/wal"

"github.com/stretchr/testify/assert"
)

func TestMetrics(t *testing.T) {
reset()

ts := time.Now()
FollowerJoined(1, 1)
FollowerJoined(2, 1)
FollowerJoined(3, 2)
FollowerJoined(4, 2)
CurrentlyReadingWAL(wal.NewOffsetForTS(ts))
QueuedForFollower(1, 11)
QueuedForFollower(2, 22)
QueuedForFollower(3, 33)
QueuedForFollower(4, 44)

s := GetStats()
assert.Equal(t, 4, s.Leader.ConnectedFollowers)
assert.Equal(t, ts.Format(time.RFC3339), s.Leader.CurrentlyReadingWAL.Format(time.RFC3339))

assert.Equal(t, 1, s.Followers[0].Partition)
assert.Equal(t, 11, s.Followers[0].Queued)
assert.Equal(t, 1, s.Followers[1].Partition)
assert.Equal(t, 22, s.Followers[1].Queued)
assert.Equal(t, 2, s.Followers[2].Partition)
assert.Equal(t, 33, s.Followers[2].Queued)
assert.Equal(t, 2, s.Followers[3].Partition)
assert.Equal(t, 44, s.Followers[3].Queued)

assert.Equal(t, 1, s.Partitions[0].Partition)
assert.Equal(t, 2, s.Partitions[0].NumFollowers)
assert.Equal(t, 2, s.Partitions[1].Partition)
assert.Equal(t, 2, s.Partitions[1].NumFollowers)

// Fail a couple of followers. Fail each twice to make sure we don't double-
// subtract.
FollowerFailed(2)
FollowerFailed(2)
FollowerFailed(3)
FollowerFailed(3)

s = GetStats()
assert.Equal(t, 2, s.Leader.ConnectedFollowers)
assert.Equal(t, 1, s.Followers[0].Partition)
assert.Equal(t, 11, s.Followers[0].Queued)
assert.Equal(t, 2, s.Followers[1].Partition)
assert.Equal(t, 44, s.Followers[1].Queued)

assert.Equal(t, 1, s.Partitions[0].Partition)
assert.Equal(t, 1, s.Partitions[0].NumFollowers)
assert.Equal(t, 2, s.Partitions[1].Partition)
assert.Equal(t, 1, s.Partitions[1].NumFollowers)
}
1 change: 1 addition & 0 deletions web/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func Configure(db *zenodb.DB, router *mux.Router, opts *Opts) error {
router.PathPrefix("/cached/{permalink}").HandlerFunc(h.cachedQuery)
router.PathPrefix("/favicon").Handler(http.NotFoundHandler())
router.PathPrefix("/report/{permalink}").HandlerFunc(h.index)
router.PathPrefix("/metrics").HandlerFunc(h.metrics)
router.PathPrefix("/").HandlerFunc(h.index)

return nil
Expand Down
17 changes: 17 additions & 0 deletions web/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package web

import (
"encoding/json"
"net/http"

"github.com/getlantern/zenodb/metrics"
)

func (h *handler) metrics(resp http.ResponseWriter, req *http.Request) {
if !h.authenticate(resp, req) {
resp.WriteHeader(http.StatusForbidden)
return
}

json.NewEncoder(resp).Encode(metrics.GetStats())
}