Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Asset Inventory] Run fetchers periodically #2902

Merged
merged 13 commits into from
Jan 8, 2025
2 changes: 1 addition & 1 deletion internal/flavors/assetinventory/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *strategy) NewAssetInventory(ctx context.Context, client beat.Client) (i
s.logger.Infof("Creating %s AssetInventory", strings.ToUpper(s.cfg.AssetInventoryProvider))

now := func() time.Time { return time.Now() } //nolint:gocritic
return inventory.NewAssetInventory(s.logger, fetchers, client, now), nil
return inventory.NewAssetInventory(s.logger, fetchers, client, now, s.cfg.Period), nil
}

func (s *strategy) initAzureFetchers(_ context.Context) ([]inventory.AssetFetcher, error) {
Expand Down
35 changes: 27 additions & 8 deletions internal/inventory/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ import (
"github.com/samber/lo"
)

const indexTemplate = "logs-cloud_asset_inventory.asset_inventory-%s_%s_%s_%s-default"
const (
indexTemplate = "logs-cloud_asset_inventory.asset_inventory-%s_%s_%s_%s-default"
minimalPeriod = 30 * time.Second
Copy link
Collaborator

@orouz orouz Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather keep it. It's not a default value, but a guardian value. If the customer sets value too low, this will start to spawn many goroutines that hit API. I can lower it if you think it makes sense, but we definitely need to make sure there is a reasonable minimum threshold. Obviously the worst-case scenario is period: 0s, where we get an infinite loop of new goroutines, but even 5s or 10s is too small IMHO.

Copy link
Member Author

@kubasobon kubasobon Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np with keeping it imo. but this value isn't configurable by the users, it's hardcoded in the integration package.

)

type AssetInventory struct {
fetchers []AssetFetcher
publisher AssetPublisher
bufferFlushInterval time.Duration
bufferMaxSize int
period time.Duration
logger *logp.Logger
assetCh chan AssetEvent
now func() time.Time
Expand All @@ -49,36 +53,40 @@ type AssetPublisher interface {
PublishAll([]beat.Event)
}

func NewAssetInventory(logger *logp.Logger, fetchers []AssetFetcher, publisher AssetPublisher, now func() time.Time) AssetInventory {
logger.Info("Initializing Asset Inventory POC")
func NewAssetInventory(logger *logp.Logger, fetchers []AssetFetcher, publisher AssetPublisher, now func() time.Time, period time.Duration) AssetInventory {
if period < minimalPeriod {
period = minimalPeriod
}
logger.Infof("Initializing Asset Inventory POC with period of %s", period)
return AssetInventory{
logger: logger,
fetchers: fetchers,
publisher: publisher,
// move to a configuration parameter
bufferFlushInterval: 10 * time.Second,
bufferMaxSize: 1600,
period: period,
assetCh: make(chan AssetEvent),
now: now,
}
}

func (a *AssetInventory) Run(ctx context.Context) {
for _, fetcher := range a.fetchers {
go func(fetcher AssetFetcher) {
fetcher.Fetch(ctx, a.assetCh)
}(fetcher)
}
a.runAllFetchersOnce(ctx)

assetsBuffer := make([]AssetEvent, 0, a.bufferMaxSize)
flushTicker := time.NewTicker(a.bufferFlushInterval)
fetcherPeriod := time.NewTicker(a.period)
for {
select {
case <-ctx.Done():
a.logger.Warnf("Asset Inventory context is done: %v", ctx.Err())
a.publish(assetsBuffer)
return

case <-fetcherPeriod.C:
a.runAllFetchersOnce(ctx)

case <-flushTicker.C:
if len(assetsBuffer) == 0 {
a.logger.Debugf("Interval reached without events")
Expand All @@ -101,6 +109,17 @@ func (a *AssetInventory) Run(ctx context.Context) {
}
}

// runAllFetchersOnce runs every fetcher to collect assets to assetCh ONCE. It
// should be called every cycle, once every `a.period`.
func (a *AssetInventory) runAllFetchersOnce(ctx context.Context) {
a.logger.Debug("Running all fetchers once")
for _, fetcher := range a.fetchers {
go func(fetcher AssetFetcher) {
fetcher.Fetch(ctx, a.assetCh)
}(fetcher)
}
}

func (a *AssetInventory) publish(assets []AssetEvent) {
events := lo.Map(assets, func(e AssetEvent, _ int) beat.Event {
var relatedEntity []string
Expand Down
84 changes: 84 additions & 0 deletions internal/inventory/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package inventory

import (
"context"
"sync/atomic"
"testing"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/stretchr/testify/mock"

"github.com/elastic/cloudbeat/internal/resources/utils/pointers"
"github.com/elastic/cloudbeat/internal/resources/utils/testhelper"
)

func TestAssetInventory_Run(t *testing.T) {
Expand Down Expand Up @@ -150,6 +152,7 @@ func TestAssetInventory_Run(t *testing.T) {
publisher: publisher,
bufferFlushInterval: 10 * time.Millisecond,
bufferMaxSize: 1,
period: 24 * time.Hour,
assetCh: make(chan AssetEvent),
now: now,
}
Expand All @@ -169,3 +172,84 @@ func TestAssetInventory_Run(t *testing.T) {
assert.ElementsMatch(t, received, expected)
}
}

func TestAssetInventory_Period(t *testing.T) {
testhelper.SkipLong(t)
now := func() time.Time { return time.Date(2024, 1, 1, 1, 1, 1, 0, time.Local) }

var cycleCounter int64

publisher := NewMockAssetPublisher(t)
publisher.EXPECT().PublishAll(mock.Anything).Maybe()

fetcher := NewMockAssetFetcher(t)
fetcher.EXPECT().Fetch(mock.Anything, mock.Anything).Run(func(_ context.Context, _ chan<- AssetEvent) {
atomic.AddInt64(&cycleCounter, 1)
})

logger := logp.NewLogger("test_run")
inventory := AssetInventory{
logger: logger,
fetchers: []AssetFetcher{fetcher},
publisher: publisher,
bufferFlushInterval: 10 * time.Millisecond,
bufferMaxSize: 1,
period: 500 * time.Millisecond,
assetCh: make(chan AssetEvent),
now: now,
}

// Run it enough for 2 cycles to finish; one starts immediately, the other after 500 milliseconds
ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond)
defer cancel()

go func() {
inventory.Run(ctx)
}()

<-ctx.Done()
val := atomic.LoadInt64(&cycleCounter)
assert.Equal(t, int64(2), val, "Expected to run 2 cycles, got %d", val)
}

func TestAssetInventory_RunAllFetchersOnce(t *testing.T) {
now := func() time.Time { return time.Date(2024, 1, 1, 1, 1, 1, 0, time.Local) }
publisher := NewMockAssetPublisher(t)
publisher.EXPECT().PublishAll(mock.Anything).Maybe()

fetchers := []AssetFetcher{}
fetcherCounters := [](*int64){}
for i := 0; i < 5; i++ {
fetcher := NewMockAssetFetcher(t)
counter := int64(0)
fetcher.EXPECT().Fetch(mock.Anything, mock.Anything).Run(func(_ context.Context, _ chan<- AssetEvent) {
atomic.AddInt64(&counter, 1)
})
fetchers = append(fetchers, fetcher)
fetcherCounters = append(fetcherCounters, &counter)
}

logger := logp.NewLogger("test_run")
inventory := AssetInventory{
logger: logger,
fetchers: fetchers,
publisher: publisher,
bufferFlushInterval: 10 * time.Millisecond,
bufferMaxSize: 1,
period: 24 * time.Hour,
assetCh: make(chan AssetEvent),
now: now,
}

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

inventory.runAllFetchersOnce(ctx)
<-ctx.Done()

// Check that EVERY fetcher has been called EXACTLY ONCE
for _, counter := range fetcherCounters {
val := atomic.LoadInt64(counter)
assert.Equal(t, int64(1), val, "Expected to run once, got %d", val)
}
}
Loading