diff --git a/collector/go.mod b/collector/go.mod index f9678aec4..c9727d704 100644 --- a/collector/go.mod +++ b/collector/go.mod @@ -1,6 +1,6 @@ module github.com/wavefronthq/observability-for-kubernetes/collector -go 1.20 +go 1.21 require ( github.com/coreos/go-systemd/v22 v22.5.0 diff --git a/collector/go.sum b/collector/go.sum index 5ac13266a..cf0103972 100644 --- a/collector/go.sum +++ b/collector/go.sum @@ -51,6 +51,7 @@ github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/getkin/kin-openapi v0.76.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= @@ -101,6 +102,7 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= +github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/cadvisor v0.48.1 h1:eyYTxKBd+KxI1kh6rst4JSTLUhfHQM34qGpp+0AMlSg= github.com/google/cadvisor v0.48.1/go.mod h1:ZkYbiiVdyoqBmI2ahZI8GlmirT78OAOER0z4EQugkxQ= github.com/google/gnostic v0.5.7-v3refs h1:FhTMOKj2VhjpouxvWJAV1TL304uMlb9zcDqkl6cEI54= @@ -117,6 +119,7 @@ github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/hashicorp/consul/api v1.9.1 h1:SngrdG2L62qqLsUz85qcPhFZ78rPf8tcD5qjMgs6MME= github.com/hashicorp/consul/api v1.9.1/go.mod h1:XjsvQN+RJGWI2TWy1/kqaE16HrR2J/FWgkYjdZQsX9M= @@ -232,10 +235,13 @@ github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hz github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= +github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -433,11 +439,13 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw= gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/collector/internal/metrics/types.go b/collector/internal/metrics/types.go index e67db1d32..40edbad9b 100644 --- a/collector/internal/metrics/types.go +++ b/collector/internal/metrics/types.go @@ -130,9 +130,16 @@ func (l *LabeledValue) GetValue() interface{} { // Source produces metric batches type Source interface { + + // No idea what this is AutoDiscovered() bool + Name() string + + // Perhaps this grabs metrics? Scrape() (*Batch, error) + + // Seems to free resources this object uses Cleanup() } @@ -161,7 +168,11 @@ type Processor interface { // ProviderHandler is an interface for dynamically adding and removing MetricSourceProviders type ProviderHandler interface { + + // AddProvider adds a new SourceProvider if one already exists with same name it deletes + // that one first. AddProvider(provider SourceProvider) + DeleteProvider(name string) } diff --git a/collector/internal/util/clock.go b/collector/internal/util/clock.go new file mode 100644 index 000000000..6bd4e9067 --- /dev/null +++ b/collector/internal/util/clock.go @@ -0,0 +1,66 @@ +package util + +import ( + "sync" + "time" +) + +// Interface Clock represents the clock functions this package needs. +// SystemClock implements this interface. +type Clock interface { + + // Now returns the current time in the local time zone. + Now() time.Time + + // After waits for given duration to elapse and then sends current time on + // the returned channel. + After(d time.Duration) <-chan time.Time +} + +// RegularInterval returns start, start + interval, start + 2*interval etc. +// on result at those times. For example, if interval is 7*time.Minute, +// RegularInterval returns a time every 7 minutes on result. When caller wants +// to quit reading from result, the caller must call the stop function. After +// calling stop, the caller must read any times in the past from the result channel. +// Once the caller reads all of those, RegularInterval closes the result +// channel. +func RegularInterval( + clock Clock, start time.Time, interval time.Duration) (result <-chan time.Time, stop func()) { + resultCh := make(chan time.Time) + stopCh := make(chan struct{}) + result = resultCh + stop = sync.OnceFunc(func() { + close(stopCh) + }) + go func() { + nextTime := start + for { + timeToWait := nextTime.Sub(clock.Now()) + if timeToWait > 0 { + select { + case <-stopCh: + close(resultCh) + return + case <-clock.After(timeToWait): + } + } + resultCh <- nextTime + nextTime = nextTime.Add(interval) + } + }() + return +} + +// SystemClock represents the system clock. SystemClock implements Clock. +type SystemClock struct { +} + +// Now calls time.Now() +func (s SystemClock) Now() time.Time { + return time.Now() +} + +// After calls time.After() +func (s SystemClock) After(d time.Duration) <-chan time.Time { + return time.After(d) +} diff --git a/collector/internal/util/clock_test.go b/collector/internal/util/clock_test.go new file mode 100644 index 000000000..da89b0b28 --- /dev/null +++ b/collector/internal/util/clock_test.go @@ -0,0 +1,63 @@ +package util + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const ( + timeAndTz = "15:04 MST" +) + +type clockForTesting struct { + + // The current time. + Current time.Time +} + +func (c *clockForTesting) Now() time.Time { + return c.Current +} + +// After immediately advances current time by d and sends that currnet time +// on the returned channel. +func (c *clockForTesting) After(d time.Duration) <-chan time.Time { + c.Current = c.Current.Add(d) + result := make(chan time.Time, 1) + result <- c.Current + close(result) + return result +} + +func TestEverySeven(t *testing.T) { + startTime := time.Date(2024, 10, 28, 16, 30, 0, 0, time.UTC) + fakeClock := &clockForTesting{Current: startTime} + everySeven, stop := RegularInterval(fakeClock, startTime, 7*time.Minute) + assert.Equal(t, "16:30 UTC", (<-everySeven).Format(timeAndTz)) + assert.Equal(t, "16:37 UTC", (<-everySeven).Format(timeAndTz)) + + // Pretend that we wait a long time before reading the channel. + // All the hours should still show up. + fakeClock.Current = time.Date(2024, 10, 28, 17, 0, 0, 0, time.UTC) + + assert.Equal(t, "16:44 UTC", (<-everySeven).Format(timeAndTz)) + assert.Equal(t, "16:51 UTC", (<-everySeven).Format(timeAndTz)) + + // Make sure time is still 17:00 + assert.Equal(t, "17:00 UTC", fakeClock.Now().Format(timeAndTz)) + + assert.Equal(t, "16:58 UTC", (<-everySeven).Format(timeAndTz)) + assert.Equal(t, "17:05 UTC", (<-everySeven).Format(timeAndTz)) + stop() + stop() + for range everySeven { + } +} + +func TestSystemClock(t *testing.T) { + clock := SystemClock{} + assert.NotZero(t, clock.Now()) + assert.NotZero(t, <-clock.After(1*time.Nanosecond)) +} diff --git a/collector/internal/util/interval.go b/collector/internal/util/interval.go new file mode 100644 index 000000000..b209e7949 --- /dev/null +++ b/collector/internal/util/interval.go @@ -0,0 +1,70 @@ +package util + +import ( + gometrics "github.com/rcrowley/go-metrics" + "sync" + "time" +) + +// Interval starts a particular function at regular time intervals. If Interval is to +// start a function every 7 seconds, and the function takes 2 seconds to run, it will pause +// 5 seconds between runs. If Interval is to start the function every 7 seconds, and the function +// takes 10 seconds to run, that is called an overrun. Interval will log an overrun and wait +// 4 seconds for the start of the following interval to run the function again. If the function +// takes 20 seconds to run on 7 second intervals, Interval will log 2 overruns and wait 1 +// second for the start of the following interval to run the function again. +type Interval struct { + clock Clock + interval time.Duration + runTimes <-chan time.Time + f func() + stop func() + overrunCount gometrics.Counter + wg sync.WaitGroup +} + +// NewInterval returns a new Interval. interval is the amount of time between +// function starts e.g 7 seconds, f is the function to run periodically, +// overrunCount is where overruns are logged. +func NewInterval(interval time.Duration, f func(), overrunCount gometrics.Counter) *Interval { + return newInterval(SystemClock{}, interval, f, overrunCount) +} + +// StopAndWait stops the periodic execution of f. In case f is currently running, +// StopAndWait waits for f to finish before returning. +func (in *Interval) StopAndWait() { + in.stop() + in.wg.Wait() +} + +func (in *Interval) loop() { + for nextRunTime := range in.runTimes { + now := in.clock.Now() + + // If nextRunTime is more than 1/30 of an interval earlier than now, + // we had an overrun. The 1/30 is arbitrary + if now.Sub(nextRunTime) > in.interval/30 { + in.overrunCount.Inc(1) + continue + } + in.f() + } + in.wg.Done() +} + +func newInterval( + clock Clock, interval time.Duration, f func(), overrunCount gometrics.Counter) *Interval { + now := clock.Now() + runTimes, stop := RegularInterval(clock, now.Add(interval), interval) + result := &Interval{ + clock: clock, + interval: interval, + runTimes: runTimes, + f: f, + stop: stop, + overrunCount: overrunCount, + } + result.wg.Add(1) + go result.loop() + return result +} diff --git a/collector/internal/util/interval_test.go b/collector/internal/util/interval_test.go new file mode 100644 index 000000000..dd3adcca3 --- /dev/null +++ b/collector/internal/util/interval_test.go @@ -0,0 +1,82 @@ +package util + +import ( + gm "github.com/rcrowley/go-metrics" + "github.com/stretchr/testify/assert" + "sync" + "testing" + "time" +) + +func TestIntervalNoOverrun(t *testing.T) { + var overruns counterForTesting + var calls counterForTesting + interval := NewInterval(70*time.Millisecond, sleepFunc(50*time.Millisecond, &calls), &overruns) + time.Sleep(150 * time.Millisecond) + start := time.Now() + interval.StopAndWait() + + // We expect StopAndWait to take around 40ms + assert.Less(t, 30*time.Millisecond, time.Since(start)) + + // Runs 70ms, 140ms after interval created + assert.Equal(t, int64(2), calls.Count()) + assert.Equal(t, int64(0), overruns.Count()) +} + +func TestIntervalOverrun(t *testing.T) { + var overruns counterForTesting + var calls counterForTesting + interval := NewInterval(70*time.Millisecond, sleepFunc(100*time.Millisecond, &calls), &overruns) + time.Sleep(300 * time.Millisecond) + start := time.Now() + interval.StopAndWait() + + // We expect StopAndWait to take around 10ms + assert.Less(t, 5*time.Millisecond, time.Since(start)) + + // Runs 70ms, 210ms after interval created + assert.Equal(t, int64(2), calls.Count()) + assert.Equal(t, int64(2), overruns.Count()) +} + +func sleepFunc(d time.Duration, counter gm.Counter) func() { + return func() { + time.Sleep(d) + counter.Inc(1) + } +} + +type counterForTesting struct { + mu sync.Mutex + counter int64 +} + +func (c *counterForTesting) Clear() { + c.mu.Lock() + defer c.mu.Unlock() + c.counter = 0 +} + +func (c *counterForTesting) Count() int64 { + c.mu.Lock() + defer c.mu.Unlock() + return c.counter +} + +func (c *counterForTesting) Dec(i int64) { + c.mu.Lock() + defer c.mu.Unlock() + c.counter -= i +} + +func (c *counterForTesting) Inc(i int64) { + c.mu.Lock() + defer c.mu.Unlock() + c.counter += i +} + +func (c *counterForTesting) Snapshot() gm.Counter { + //TODO implement me + panic("implement me") +} diff --git a/collector/internal/util/wait.go b/collector/internal/util/wait.go index 55af6e7fa..74b88f95e 100644 --- a/collector/internal/util/wait.go +++ b/collector/internal/util/wait.go @@ -10,6 +10,7 @@ var ( ) // TODO: this needs a test! Tricky... +// TODO: Rewrite callsites of this to use util.Interval // Retry makes the function run infinitely after certain time period func Retry(f func(), duration time.Duration, stopCh <-chan struct{}) { t := time.NewTicker(duration) diff --git a/collector/plugins/sources/interval_timer.go b/collector/plugins/sources/interval_timer.go deleted file mode 100644 index e6fa05482..000000000 --- a/collector/plugins/sources/interval_timer.go +++ /dev/null @@ -1,50 +0,0 @@ -package sources - -import ( - "time" -) - -type IntervalTimer struct { - *time.Timer - interval time.Duration - startTime time.Time - lastResetTime time.Time -} - -func (t *IntervalTimer) Reset() (intervalsMissed int64) { - now := time.Now() - intervals := t.intervalsMissed(now) - t.lastResetTime = now - waitTime := t.waitToNextInterval(now.Sub(t.startTime)) - t.Timer.Reset(waitTime) - return intervals -} - -func NewIntervalTimer(interval time.Duration) *IntervalTimer { - now := time.Now() - return &IntervalTimer{ - Timer: time.NewTimer(interval), - interval: interval, - startTime: now, - lastResetTime: now, - } -} - -func (t *IntervalTimer) intervalsMissed(now time.Time) (intervalsMissed int64) { - if now.Sub(t.lastResetTime) < t.interval { - return 0 - } - return int64((now.Sub(t.lastResetTime) / t.interval) - 1) -} - -func (t *IntervalTimer) waitToNextInterval(diff time.Duration) time.Duration { - wait := t.interval - (diff % t.interval) - if wait < scaleInterval(t.interval, 0.0333) { // 3.33%. This was chosen arbitrarily. If you have a better idea, change it! - wait += t.interval - } - return wait -} - -func scaleInterval(interval time.Duration, ratio float64) time.Duration { - return time.Duration(float64(interval) * ratio) -} diff --git a/collector/plugins/sources/interval_timer_test.go b/collector/plugins/sources/interval_timer_test.go deleted file mode 100644 index 531580447..000000000 --- a/collector/plugins/sources/interval_timer_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package sources - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestWaitInterval(t *testing.T) { - t.Run("zero time", func(t *testing.T) { - timer := NewIntervalTimer(15 * time.Second) - assert.Equal(t, 15*time.Second, timer.waitToNextInterval(0)) - }) - - t.Run("inside first interval", func(t *testing.T) { - timer := NewIntervalTimer(15 * time.Second) - assert.Equal(t, 10*time.Second, timer.waitToNextInterval(5*time.Second)) - }) - - t.Run("outside first interval", func(t *testing.T) { - timer := NewIntervalTimer(15 * time.Second) - assert.Equal(t, 14*time.Second, timer.waitToNextInterval(16*time.Second)) - }) - - t.Run("many intervals late", func(t *testing.T) { - timer := NewIntervalTimer(15 * time.Second) - assert.Equal(t, 14*time.Second, timer.waitToNextInterval(61*time.Second)) - }) - - t.Run("near but still before the interval", func(t *testing.T) { - timer := NewIntervalTimer(15 * time.Second) - assert.Equal(t, 15499*time.Millisecond, timer.waitToNextInterval(14501*time.Millisecond)) - }) -} - -func TestReset(t *testing.T) { - t.Run("lastResetTime is set", func(t *testing.T) { - timer := NewIntervalTimer(15 * time.Second) - timer.Reset() - assert.NotZero(t, timer.lastResetTime) - }) -} - -func TestMissedCount(t *testing.T) { - now := time.Now() - t.Run("no resets", func(t *testing.T) { - timer := NewIntervalTimer(15 * time.Second) - assert.Equal(t, int64(0), timer.intervalsMissed(time.Now())) - }) - - t.Run("first interval", func(t *testing.T) { - timer := NewIntervalTimer(15 * time.Second) - timer.lastResetTime = now.Add(-15 * time.Second) - assert.Equal(t, int64(0), timer.intervalsMissed(now)) - }) - - t.Run("outside second interval", func(t *testing.T) { - timer := NewIntervalTimer(15 * time.Second) - timer.lastResetTime = now.Add(-31 * time.Second) - assert.Equal(t, int64(1), timer.intervalsMissed(now)) - }) - - t.Run("many intervals late", func(t *testing.T) { - timer := NewIntervalTimer(15 * time.Second) - timer.lastResetTime = now.Add(-61 * time.Second) - assert.Equal(t, int64(3), timer.intervalsMissed(now)) - }) - - t.Run("near but still before the interval", func(t *testing.T) { - timer := NewIntervalTimer(15 * time.Second) - timer.lastResetTime = now.Add(-14501 * time.Millisecond) - assert.Equal(t, int64(0), timer.intervalsMissed(now)) - }) -} diff --git a/collector/plugins/sources/manager.go b/collector/plugins/sources/manager.go index 955a9c54e..e06ddc918 100644 --- a/collector/plugins/sources/manager.go +++ b/collector/plugins/sources/manager.go @@ -19,6 +19,7 @@ package sources import ( "fmt" + "github.com/wavefronthq/observability-for-kubernetes/collector/internal/util" "math/rand" "sort" "sync" @@ -71,30 +72,37 @@ func init() { type SourceManager interface { metrics.ProviderHandler + // Deletes each provider. StopProviders() + + // Seems to get all the batches read since the last call and ensures they are sorted by + // timestamp GetPendingMetrics() []*metrics.Batch + SetDefaultCollectionInterval(time.Duration) + + // BuildProviders uses the client but never reads its value. BuildProviders seems to add + // all the SourceProviders described in config by calling AddProvider on each one. BuildProviders(config configuration.SourceConfig) error + + // Sets the client. Must be called before calling BuildProviders. SetClient(kubernetes.Interface) } type sourceManagerImpl struct { - responseChannel chan *metrics.Batch - defaultCollectionInterval time.Duration + responseChannel chan *metrics.Batch - metricsSourcesMtx sync.Mutex - metricsSourceProviders map[string]metrics.SourceProvider - metricsSourceTimers map[string]*IntervalTimer - metricsSourceQuits map[string]chan struct{} + mtx sync.Mutex + defaultCollectionInterval time.Duration + metricsSourceProviders map[string]metrics.SourceProvider + metricsSourceTimers map[string]*util.Interval responseMtx sync.Mutex response []*metrics.Batch - - client kubernetes.Interface } func (sm *sourceManagerImpl) SetClient(client kubernetes.Interface) { - sm.client = client + // Do nothing for now } // Manager return the SourceManager @@ -103,8 +111,7 @@ func Manager() SourceManager { singleton = &sourceManagerImpl{ responseChannel: make(chan *metrics.Batch), metricsSourceProviders: make(map[string]metrics.SourceProvider), - metricsSourceTimers: make(map[string]*IntervalTimer), - metricsSourceQuits: make(map[string]chan struct{}), + metricsSourceTimers: make(map[string]*util.Interval), defaultCollectionInterval: time.Minute, } singleton.rotateResponse() @@ -115,7 +122,7 @@ func Manager() SourceManager { // BuildProviders creates a new source manager with the configured MetricsSourceProviders func (sm *sourceManagerImpl) BuildProviders(cfg configuration.SourceConfig) error { - sources := buildProviders(sm.client, cfg) + sources := buildProviders(cfg) for _, runtime := range sources { sm.AddProvider(runtime) } @@ -126,6 +133,8 @@ func (sm *sourceManagerImpl) BuildProviders(cfg configuration.SourceConfig) erro } func (sm *sourceManagerImpl) SetDefaultCollectionInterval(defaultCollectionInterval time.Duration) { + sm.mtx.Lock() + defer sm.mtx.Unlock() sm.defaultCollectionInterval = defaultCollectionInterval } @@ -139,14 +148,14 @@ func (sm *sourceManagerImpl) AddProvider(provider metrics.SourceProvider) { "timeout": provider.Timeout(), }).Info("Adding provider") + sm.mtx.Lock() + defer sm.mtx.Unlock() + if _, found := sm.metricsSourceProviders[name]; found { log.WithField("name", name).Info("deleting existing provider") - sm.DeleteProvider(name) + sm.deleteProvider(name) } - sm.metricsSourcesMtx.Lock() - defer sm.metricsSourcesMtx.Unlock() - var interval time.Duration if provider.CollectionInterval() > 0 { interval = provider.CollectionInterval() @@ -158,53 +167,36 @@ func (sm *sourceManagerImpl) AddProvider(provider metrics.SourceProvider) { "collection_interval": sm.defaultCollectionInterval, }).Info("Using default collection interval") } - intervalTimer := NewIntervalTimer(interval) - - quit := make(chan struct{}) + f := func() { + scrape(provider, sm.responseChannel) + } sm.metricsSourceProviders[name] = provider - sm.metricsSourceTimers[name] = intervalTimer - sm.metricsSourceQuits[name] = quit + sm.metricsSourceTimers[name] = util.NewInterval(interval, f, scrapesMissed) providerCount.Update(int64(len(sm.metricsSourceProviders))) - - go func() { - for { - select { - case <-intervalTimer.C: - scrape(provider, sm.responseChannel) - scrapesMissed.Inc(intervalTimer.Reset()) - case <-quit: - return - } - } - }() } func (sm *sourceManagerImpl) DeleteProvider(name string) { - provider, found := sm.metricsSourceProviders[name] + sm.mtx.Lock() + defer sm.mtx.Unlock() + + _, found := sm.metricsSourceProviders[name] if !found { log.Debugf("Metrics Source Provider '%s' not found", name) return } + sm.deleteProvider(name) +} - sm.metricsSourcesMtx.Lock() - defer sm.metricsSourcesMtx.Unlock() - +func (sm *sourceManagerImpl) deleteProvider(name string) { + provider := sm.metricsSourceProviders[name] delete(sm.metricsSourceProviders, name) - if ticker, ok := sm.metricsSourceTimers[name]; ok { - ticker.Stop() - delete(sm.metricsSourceTimers, name) - } - if quit, ok := sm.metricsSourceQuits[name]; ok { - close(quit) - delete(sm.metricsSourceQuits, name) - } - + sm.metricsSourceTimers[name].StopAndWait() + delete(sm.metricsSourceTimers, name) for _, source := range provider.GetMetricsSources() { source.Cleanup() } - log.WithField("name", name).Info("Deleted provider") } @@ -285,7 +277,7 @@ func (sm *sourceManagerImpl) GetPendingMetrics() []*metrics.Batch { return response } -func buildProviders(client kubernetes.Interface, cfg configuration.SourceConfig) (result []metrics.SourceProvider) { +func buildProviders(cfg configuration.SourceConfig) (result []metrics.SourceProvider) { if cfg.SummaryConfig != nil { provider, err := summary.NewSummaryProvider(*cfg.SummaryConfig) result = appendProvider(result, provider, err, cfg.SummaryConfig.Collection) diff --git a/collector/plugins/sources/manager_test.go b/collector/plugins/sources/manager_test.go index daf3e0a64..af7da3e5f 100644 --- a/collector/plugins/sources/manager_test.go +++ b/collector/plugins/sources/manager_test.go @@ -18,6 +18,7 @@ package sources import ( + "sync" "testing" "time" @@ -53,6 +54,34 @@ func TestNoTimeout(t *testing.T) { assert.True(t, present["nto_2"], "nto_2 not found - present:%v", present) } +// This test should pass when run with go test -race +func TestRace(t *testing.T) { + provider1 := util.NewDummyMetricsSourceProvider("dummy_nt1", + 100*time.Millisecond, 100*time.Millisecond, + util.NewDummyMetricsSource("nto_1", 10*time.Millisecond), + util.NewDummyMetricsSource("nto_2", 10*time.Millisecond)) + provider2 := util.NewDummyMetricsSourceProvider("dummy_nt2", + 100*time.Millisecond, 100*time.Millisecond, + util.NewDummyMetricsSource("nto_1", 10*time.Millisecond), + util.NewDummyMetricsSource("nto_2", 10*time.Millisecond)) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + Manager().AddProvider(provider1) + Manager().DeleteProvider("dummy_nt1") + wg.Done() + }() + wg.Add(1) + go func() { + Manager().AddProvider(provider2) + Manager().DeleteProvider("dummy_nt2") + wg.Done() + }() + wg.Wait() +} + func TestScrapeMetrics(t *testing.T) { t.Run("Test Scrape Errors with Non AutoDiscovered Source", func(t *testing.T) {