Skip to content

Commit

Permalink
Implement aggregated datacenter metrics
Browse files Browse the repository at this point in the history
Add flag to use aggregated datacenter metrics instead of per-datacenter.

Closes: #152
Co-authored-by: Lukas Eklund <[email protected]>
Signed-off-by: Michael Lorant <[email protected]>
  • Loading branch information
mikelorant and leklund committed May 27, 2024
1 parent 1158c47 commit 5fc5c39
Show file tree
Hide file tree
Showing 7 changed files with 647 additions and 16 deletions.
3 changes: 3 additions & 0 deletions cmd/fastly-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func main() {
serviceRefresh time.Duration
apiTimeout time.Duration
rtTimeout time.Duration
aggregateOnly bool
debug bool
versionFlag bool
configFileExample bool
Expand All @@ -67,6 +68,7 @@ func main() {
fs.DurationVar(&serviceRefresh, "api-refresh", 1*time.Minute, "DEPRECATED -- use service-refresh instead")
fs.DurationVar(&apiTimeout, "api-timeout", 15*time.Second, "HTTP client timeout for api.fastly.com requests (5–60s)")
fs.DurationVar(&rtTimeout, "rt-timeout", 45*time.Second, "HTTP client timeout for rt.fastly.com requests (45–120s)")
fs.BoolVar(&aggregateOnly, "aggregate-only", false, "Use aggregated data rather than per-datacenter")
fs.BoolVar(&debug, "debug", false, "log debug information")
fs.BoolVar(&versionFlag, "version", false, "print version information and exit")
fs.String("config-file", "", "config file (optional)")
Expand Down Expand Up @@ -341,6 +343,7 @@ func main() {
subscriberOptions = []rt.SubscriberOption{
rt.WithLogger(rtLogger),
rt.WithMetadataProvider(serviceCache),
rt.WithAggregateOnly(aggregateOnly),
}
)
manager = rt.NewManager(serviceCache, rtClient, token, registry, subscriberOptions, productCache, rtLogger)
Expand Down
12 changes: 11 additions & 1 deletion pkg/domain/process.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
package domain

// Process updates the metrics with data from the API response.
func Process(response *Response, serviceID, serviceName, serviceVersion string, m *Metrics) {
func Process(response *Response, serviceID, serviceName, serviceVersion string, m *Metrics, aggregateOnly bool) {

Check failure on line 4 in pkg/domain/process.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

parameter 'serviceVersion' seems to be unused, consider removing or renaming it as _

Check failure on line 4 in pkg/domain/process.go

View workflow job for this annotation

GitHub Actions / test (macos-latest)

parameter 'serviceVersion' seems to be unused, consider removing or renaming it as _

Check failure on line 4 in pkg/domain/process.go

View workflow job for this annotation

GitHub Actions / test (windows-latest)

parameter 'serviceVersion' seems to be unused, consider removing or renaming it as _
const aggregateDC = "aggregate"

for _, d := range response.Data {
if aggregateOnly {
for domain, stats := range d.Aggregated {
process(serviceID, serviceName, aggregateDC, domain, stats, m)
}

continue
}

for datacenter, byDomain := range d.Datacenter {
for domain, stats := range byDomain {
process(serviceID, serviceName, datacenter, domain, stats, m)
Expand Down
12 changes: 11 additions & 1 deletion pkg/origin/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,18 @@ const (
)

// Process updates the metrics with data from the API response.
func Process(response *Response, serviceID, serviceName, serviceVersion string, m *Metrics) {
func Process(response *Response, serviceID, serviceName, serviceVersion string, m *Metrics, aggregateOnly bool) {

Check failure on line 10 in pkg/origin/process.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

parameter 'serviceVersion' seems to be unused, consider removing or renaming it as _

Check failure on line 10 in pkg/origin/process.go

View workflow job for this annotation

GitHub Actions / test (macos-latest)

parameter 'serviceVersion' seems to be unused, consider removing or renaming it as _

Check failure on line 10 in pkg/origin/process.go

View workflow job for this annotation

GitHub Actions / test (windows-latest)

parameter 'serviceVersion' seems to be unused, consider removing or renaming it as _
const aggregateDC = "aggregate"

for _, d := range response.Data {
if aggregateOnly {
for origin, stats := range d.Aggregated {
process(serviceID, serviceName, aggregateDC, origin, stats, m)
}

continue
}

for datacenter, byOrigin := range d.Datacenter {
for origin, stats := range byOrigin {
process(serviceID, serviceName, datacenter, origin, stats, m)
Expand Down
10 changes: 9 additions & 1 deletion pkg/realtime/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@ import (
)

// Process updates the metrics with data from the API response.
func Process(response *Response, serviceID, serviceName, serviceVersion string, m *Metrics) {
func Process(response *Response, serviceID, serviceName, serviceVersion string, m *Metrics, aggregateOnly bool) {

Check failure on line 10 in pkg/realtime/process.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

parameter 'serviceVersion' seems to be unused, consider removing or renaming it as _

Check failure on line 10 in pkg/realtime/process.go

View workflow job for this annotation

GitHub Actions / test (macos-latest)

parameter 'serviceVersion' seems to be unused, consider removing or renaming it as _

Check failure on line 10 in pkg/realtime/process.go

View workflow job for this annotation

GitHub Actions / test (windows-latest)

parameter 'serviceVersion' seems to be unused, consider removing or renaming it as _
const aggregateDC = "aggregate"

for _, d := range response.Data {
if aggregateOnly {
process(serviceID, serviceName, aggregateDC, d.Aggregated, m)

continue
}

for datacenter, stats := range d.Datacenter {
process(serviceID, serviceName, datacenter, stats, m)
}
Expand Down
450 changes: 450 additions & 0 deletions pkg/rt/common_test.go

Large diffs are not rendered by default.

32 changes: 19 additions & 13 deletions pkg/rt/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ type MetadataProvider interface {
// Subscriber polls rt.fastly.com endpoints for a single service ID. It emits
// the received stats data to Prometheus metrics.
type Subscriber struct {
client HTTPClient
token string
serviceID string
provider MetadataProvider
metrics *prom.Metrics
postprocess func()
logger log.Logger
rtDelayCount int
oiDelayCount int
diDelayCount int
client HTTPClient
token string
serviceID string
provider MetadataProvider
metrics *prom.Metrics
postprocess func()
logger log.Logger
rtDelayCount int
oiDelayCount int
diDelayCount int
aggregateOnly bool
}

// SubscriberOption provides some additional behavior to a subscriber.
Expand All @@ -71,6 +72,11 @@ func WithPostprocess(f func()) SubscriberOption {
return func(s *Subscriber) { s.postprocess = f }
}

// TODO add doc comment

Check failure on line 75 in pkg/rt/subscriber.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

comment on exported function WithAggregateOnly should be of the form "WithAggregateOnly ..."

Check failure on line 75 in pkg/rt/subscriber.go

View workflow job for this annotation

GitHub Actions / test (macos-latest)

comment on exported function WithAggregateOnly should be of the form "WithAggregateOnly ..."

Check failure on line 75 in pkg/rt/subscriber.go

View workflow job for this annotation

GitHub Actions / test (windows-latest)

comment on exported function WithAggregateOnly should be of the form "WithAggregateOnly ..."
func WithAggregateOnly(aggregateOnly bool) SubscriberOption {
return func(s *Subscriber) { s.aggregateOnly = aggregateOnly }
}

// NewSubscriber returns a ready-to-use subscriber. Callers must be sure to
// invoke the Run method of the returned subscriber in order to actually update
// any metrics.
Expand Down Expand Up @@ -223,7 +229,7 @@ func (s *Subscriber) queryRealtime(ctx context.Context, ts uint64) (currentName
s.rtDelayCount = 0
result = apiResultSuccess
}
realtime.Process(&response, s.serviceID, name, version, s.metrics.Realtime)
realtime.Process(&response, s.serviceID, name, version, s.metrics.Realtime, s.aggregateOnly)
s.postprocess()

case http.StatusUnauthorized, http.StatusForbidden:
Expand Down Expand Up @@ -287,7 +293,7 @@ func (s *Subscriber) queryOrigins(ctx context.Context, ts uint64) (currentName s
s.oiDelayCount = 0
result = apiResultSuccess
}
origin.Process(&response, s.serviceID, name, version, s.metrics.Origin)
origin.Process(&response, s.serviceID, name, version, s.metrics.Origin, s.aggregateOnly)
s.postprocess()

case http.StatusUnauthorized, http.StatusForbidden:
Expand Down Expand Up @@ -351,7 +357,7 @@ func (s *Subscriber) queryDomains(ctx context.Context, ts uint64) (currentName s
s.diDelayCount = 0
result = apiResultSuccess
}
domain.Process(&response, s.serviceID, name, version, s.metrics.Domain)
domain.Process(&response, s.serviceID, name, version, s.metrics.Domain, s.aggregateOnly)
s.postprocess()

case http.StatusUnauthorized, http.StatusForbidden:
Expand Down
144 changes: 144 additions & 0 deletions pkg/rt/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,54 @@ func TestRTSubscriberFixture(t *testing.T) {
}
}

func TestRTSubscriberAggOnlyFixture(t *testing.T) {
var (
namespace = "testspace"
subsystem = "testsystem"
registry = prometheus.NewRegistry()
nameFilter = filter.Filter{}
metrics = prom.NewMetrics(namespace, subsystem, nameFilter, registry)
)

// Set up a subscriber.
var (
client = newMockRealtimeClient(rtResponseFixture, `{}`)
serviceID = "my-service-id"
serviceName = "my-service-name"
serviceVersion = 123
cache = &mockCache{}
processed = make(chan struct{})
postprocess = func() { close(processed) }
options = []rt.SubscriberOption{rt.WithMetadataProvider(cache), rt.WithPostprocess(postprocess), rt.WithAggregateOnly(true)}
subscriber = rt.NewSubscriber(client, "irrelevant token", serviceID, metrics, options...)
)

// Prep the mock cache.
cache.update([]api.Service{{ID: serviceID, Name: serviceName, Version: serviceVersion}})

// Tell the subscriber to fetch real-time stats.
ctx, cancel := context.WithCancel(context.Background())
errc := make(chan error, 1)
go func() { errc <- subscriber.RunRealtime(ctx) }()

// Block until the subscriber does finishes one fetch
<-processed

// Assert the Prometheus metrics.
output := prometheusOutput(t, registry, namespace+"_"+subsystem+"_")
assertMetricOutput(t, expectedRTMetricsAggOutputMap, output)

// Kill the subscriber's goroutine, and wait for it to finish.
cancel()
err := <-errc
switch {
case err == nil:
case errors.Is(err, context.Canceled):
case err != nil:
t.Fatal(err)
}
}

func TestOriginSubscriberFixture(t *testing.T) {
var (
namespace = "testspace"
Expand Down Expand Up @@ -110,6 +158,54 @@ func TestOriginSubscriberFixture(t *testing.T) {
}
}

func TestOriginSubscriberAggOnlyFixture(t *testing.T) {
var (
namespace = "testspace"
subsystem = "testsytem"
registry = prometheus.NewRegistry()
nameFilter = filter.Filter{}
metrics = prom.NewMetrics(namespace, subsystem, nameFilter, registry)
)

// Set up a subscriber.
var (
client = newMockRealtimeClient(originsResponseFixture, `{}`)
serviceID = "my-service-id"
serviceName = "my-service-name"
serviceVersion = 123
cache = &mockCache{}
processed = make(chan struct{})
postprocess = func() { close(processed) }
options = []rt.SubscriberOption{rt.WithMetadataProvider(cache), rt.WithPostprocess(postprocess), rt.WithAggregateOnly(true)}
subscriber = rt.NewSubscriber(client, "irrelevant token", serviceID, metrics, options...)
)

// Prep the mock cache.
cache.update([]api.Service{{ID: serviceID, Name: serviceName, Version: serviceVersion}})

// Tell the subscriber to fetch real-time stats.
ctx, cancel := context.WithCancel(context.Background())
errc := make(chan error, 1)
go func() { errc <- subscriber.RunOrigins(ctx) }()

// Block until the subscriber does finishes one fetch
<-processed

// Assert the Prometheus metrics.
output := prometheusOutput(t, registry, namespace+"_origin_")
assertMetricOutput(t, expectedOriginsMetricsAggOutputMap, output)

// Kill the subscriber's goroutine, and wait for it to finish.
cancel()
err := <-errc
switch {
case err == nil:
case errors.Is(err, context.Canceled):
case err != nil:
t.Fatal(err)
}
}

func TestDomainSubscriberFixture(t *testing.T) {
var (
namespace = "testspace"
Expand Down Expand Up @@ -158,6 +254,54 @@ func TestDomainSubscriberFixture(t *testing.T) {
}
}

func TestDomainSubscriberAggOnlyFixture(t *testing.T) {
var (
namespace = "testspace"
subsystem = "testsytem"
registry = prometheus.NewRegistry()
nameFilter = filter.Filter{}
metrics = prom.NewMetrics(namespace, subsystem, nameFilter, registry)
)

// Set up a subscriber.
var (
client = newMockRealtimeClient(domainsResponseFixture, `{}`)
serviceID = "my-service-id"
serviceName = "my-service-name"
serviceVersion = 123
cache = &mockCache{}
processed = make(chan struct{})
postprocess = func() { close(processed) }
options = []rt.SubscriberOption{rt.WithMetadataProvider(cache), rt.WithPostprocess(postprocess), rt.WithAggregateOnly(true)}
subscriber = rt.NewSubscriber(client, "irrelevant token", serviceID, metrics, options...)
)

// Prep the mock cache.
cache.update([]api.Service{{ID: serviceID, Name: serviceName, Version: serviceVersion}})

// Tell the subscriber to fetch real-time stats.
ctx, cancel := context.WithCancel(context.Background())
errc := make(chan error, 1)
go func() { errc <- subscriber.RunDomains(ctx) }()

// Block until the subscriber does finishes one fetch
<-processed

// Assert the Prometheus metrics.
output := prometheusOutput(t, registry, namespace+"_domain_")
assertMetricOutput(t, expectedDomainsMetricsAggOutputMap, output)

// Kill the subscriber's goroutine, and wait for it to finish.
cancel()
err := <-errc
switch {
case err == nil:
case errors.Is(err, context.Canceled):
case err != nil:
t.Fatal(err)
}
}

func TestSubscriberNoData(t *testing.T) {
var (
client = newMockRealtimeClient(`{"Error": "No data available, please retry"}`, `{}`)
Expand Down

0 comments on commit 5fc5c39

Please sign in to comment.