Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion collector/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/wavefronthq/observability-for-kubernetes/collector

go 1.20
go 1.21

require (
github.com/coreos/go-systemd/v22 v22.5.0
Expand Down
8 changes: 8 additions & 0 deletions collector/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/getkin/kin-openapi v0.76.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
Expand Down Expand Up @@ -101,6 +102,7 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/cadvisor v0.48.1 h1:eyYTxKBd+KxI1kh6rst4JSTLUhfHQM34qGpp+0AMlSg=
github.com/google/cadvisor v0.48.1/go.mod h1:ZkYbiiVdyoqBmI2ahZI8GlmirT78OAOER0z4EQugkxQ=
github.com/google/gnostic v0.5.7-v3refs h1:FhTMOKj2VhjpouxvWJAV1TL304uMlb9zcDqkl6cEI54=
Expand All @@ -117,6 +119,7 @@ github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/hashicorp/consul/api v1.9.1 h1:SngrdG2L62qqLsUz85qcPhFZ78rPf8tcD5qjMgs6MME=
github.com/hashicorp/consul/api v1.9.1/go.mod h1:XjsvQN+RJGWI2TWy1/kqaE16HrR2J/FWgkYjdZQsX9M=
Expand Down Expand Up @@ -232,10 +235,13 @@ github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hz
github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand Down Expand Up @@ -433,11 +439,13 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
11 changes: 11 additions & 0 deletions collector/internal/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,16 @@ func (l *LabeledValue) GetValue() interface{} {

// Source produces metric batches
type Source interface {

// No idea what this is
AutoDiscovered() bool

Name() string

// Perhaps this grabs metrics?
Scrape() (*Batch, error)

// Seems to free resources this object uses
Cleanup()
}

Expand Down Expand Up @@ -161,7 +168,11 @@ type Processor interface {

// ProviderHandler is an interface for dynamically adding and removing MetricSourceProviders
type ProviderHandler interface {

// AddProvider adds a new SourceProvider if one already exists with same name it deletes
// that one first.
AddProvider(provider SourceProvider)

DeleteProvider(name string)
}

Expand Down
66 changes: 66 additions & 0 deletions collector/internal/util/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package util

import (
"sync"
"time"
)

// Interface Clock represents the clock functions this package needs.
// SystemClock implements this interface.
type Clock interface {

// Now returns the current time in the local time zone.
Now() time.Time

// After waits for given duration to elapse and then sends current time on
// the returned channel.
After(d time.Duration) <-chan time.Time
}

// RegularInterval returns start, start + interval, start + 2*interval etc.
// on result at those times. For example, if interval is 7*time.Minute,
// RegularInterval returns a time every 7 minutes on result. When caller wants
// to quit reading from result, the caller must call the stop function. After
// calling stop, the caller must read any times in the past from the result channel.
// Once the caller reads all of those, RegularInterval closes the result
// channel.
func RegularInterval(
clock Clock, start time.Time, interval time.Duration) (result <-chan time.Time, stop func()) {
resultCh := make(chan time.Time)
stopCh := make(chan struct{})
result = resultCh
stop = sync.OnceFunc(func() {
close(stopCh)
})
go func() {
nextTime := start
for {
timeToWait := nextTime.Sub(clock.Now())
if timeToWait > 0 {
select {
case <-stopCh:
close(resultCh)
return
case <-clock.After(timeToWait):
}
}
resultCh <- nextTime
nextTime = nextTime.Add(interval)
}
}()
return
}

// SystemClock represents the system clock. SystemClock implements Clock.
type SystemClock struct {
}

// Now calls time.Now()
func (s SystemClock) Now() time.Time {
return time.Now()
}

// After calls time.After()
func (s SystemClock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
63 changes: 63 additions & 0 deletions collector/internal/util/clock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package util

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

const (
timeAndTz = "15:04 MST"
)

type clockForTesting struct {

// The current time.
Current time.Time
}

func (c *clockForTesting) Now() time.Time {
return c.Current
}

// After immediately advances current time by d and sends that currnet time
// on the returned channel.
func (c *clockForTesting) After(d time.Duration) <-chan time.Time {
c.Current = c.Current.Add(d)
result := make(chan time.Time, 1)
result <- c.Current
close(result)
return result
}

func TestEverySeven(t *testing.T) {
startTime := time.Date(2024, 10, 28, 16, 30, 0, 0, time.UTC)
fakeClock := &clockForTesting{Current: startTime}
everySeven, stop := RegularInterval(fakeClock, startTime, 7*time.Minute)
assert.Equal(t, "16:30 UTC", (<-everySeven).Format(timeAndTz))
assert.Equal(t, "16:37 UTC", (<-everySeven).Format(timeAndTz))

// Pretend that we wait a long time before reading the channel.
// All the hours should still show up.
fakeClock.Current = time.Date(2024, 10, 28, 17, 0, 0, 0, time.UTC)

assert.Equal(t, "16:44 UTC", (<-everySeven).Format(timeAndTz))
assert.Equal(t, "16:51 UTC", (<-everySeven).Format(timeAndTz))

// Make sure time is still 17:00
assert.Equal(t, "17:00 UTC", fakeClock.Now().Format(timeAndTz))

assert.Equal(t, "16:58 UTC", (<-everySeven).Format(timeAndTz))
assert.Equal(t, "17:05 UTC", (<-everySeven).Format(timeAndTz))
stop()
stop()
for range everySeven {
}
}

func TestSystemClock(t *testing.T) {
clock := SystemClock{}
assert.NotZero(t, clock.Now())
assert.NotZero(t, <-clock.After(1*time.Nanosecond))
}
70 changes: 70 additions & 0 deletions collector/internal/util/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package util

import (
gometrics "github.com/rcrowley/go-metrics"
"sync"
"time"
)

// Interval starts a particular function at regular time intervals. If Interval is to
// start a function every 7 seconds, and the function takes 2 seconds to run, it will pause
// 5 seconds between runs. If Interval is to start the function every 7 seconds, and the function
// takes 10 seconds to run, that is called an overrun. Interval will log an overrun and wait
// 4 seconds for the start of the following interval to run the function again. If the function
// takes 20 seconds to run on 7 second intervals, Interval will log 2 overruns and wait 1
// second for the start of the following interval to run the function again.
type Interval struct {
clock Clock
interval time.Duration
runTimes <-chan time.Time
f func()
stop func()
overrunCount gometrics.Counter
wg sync.WaitGroup
}

// NewInterval returns a new Interval. interval is the amount of time between
// function starts e.g 7 seconds, f is the function to run periodically,
// overrunCount is where overruns are logged.
func NewInterval(interval time.Duration, f func(), overrunCount gometrics.Counter) *Interval {
return newInterval(SystemClock{}, interval, f, overrunCount)
}

// StopAndWait stops the periodic execution of f. In case f is currently running,
// StopAndWait waits for f to finish before returning.
func (in *Interval) StopAndWait() {
in.stop()
in.wg.Wait()
}

func (in *Interval) loop() {
for nextRunTime := range in.runTimes {
now := in.clock.Now()

// If nextRunTime is more than 1/30 of an interval earlier than now,
// we had an overrun. The 1/30 is arbitrary
if now.Sub(nextRunTime) > in.interval/30 {
in.overrunCount.Inc(1)
continue
}
in.f()
}
in.wg.Done()
}

func newInterval(
clock Clock, interval time.Duration, f func(), overrunCount gometrics.Counter) *Interval {
now := clock.Now()
runTimes, stop := RegularInterval(clock, now.Add(interval), interval)
result := &Interval{
clock: clock,
interval: interval,
runTimes: runTimes,
f: f,
stop: stop,
overrunCount: overrunCount,
}
result.wg.Add(1)
go result.loop()
return result
}
82 changes: 82 additions & 0 deletions collector/internal/util/interval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package util

import (
gm "github.com/rcrowley/go-metrics"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
)

func TestIntervalNoOverrun(t *testing.T) {
var overruns counterForTesting
var calls counterForTesting
interval := NewInterval(70*time.Millisecond, sleepFunc(50*time.Millisecond, &calls), &overruns)
time.Sleep(150 * time.Millisecond)
start := time.Now()
interval.StopAndWait()

// We expect StopAndWait to take around 40ms
assert.Less(t, 30*time.Millisecond, time.Since(start))

// Runs 70ms, 140ms after interval created
assert.Equal(t, int64(2), calls.Count())
assert.Equal(t, int64(0), overruns.Count())
}

func TestIntervalOverrun(t *testing.T) {
var overruns counterForTesting
var calls counterForTesting
interval := NewInterval(70*time.Millisecond, sleepFunc(100*time.Millisecond, &calls), &overruns)
time.Sleep(300 * time.Millisecond)
start := time.Now()
interval.StopAndWait()

// We expect StopAndWait to take around 10ms
assert.Less(t, 5*time.Millisecond, time.Since(start))

// Runs 70ms, 210ms after interval created
assert.Equal(t, int64(2), calls.Count())
assert.Equal(t, int64(2), overruns.Count())
}

func sleepFunc(d time.Duration, counter gm.Counter) func() {
return func() {
time.Sleep(d)
counter.Inc(1)
}
}

type counterForTesting struct {
mu sync.Mutex
counter int64
}

func (c *counterForTesting) Clear() {
c.mu.Lock()
defer c.mu.Unlock()
c.counter = 0
}

func (c *counterForTesting) Count() int64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.counter
}

func (c *counterForTesting) Dec(i int64) {
c.mu.Lock()
defer c.mu.Unlock()
c.counter -= i
}

func (c *counterForTesting) Inc(i int64) {
c.mu.Lock()
defer c.mu.Unlock()
c.counter += i
}

func (c *counterForTesting) Snapshot() gm.Counter {
//TODO implement me
panic("implement me")
}
1 change: 1 addition & 0 deletions collector/internal/util/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var (
)

// TODO: this needs a test! Tricky...
// TODO: Rewrite callsites of this to use util.Interval
// Retry makes the function run infinitely after certain time period
func Retry(f func(), duration time.Duration, stopCh <-chan struct{}) {
t := time.NewTicker(duration)
Expand Down
Loading
Loading