From d3171b951d9b2fa7589416def5ebedbcb6ae38dd Mon Sep 17 00:00:00 2001 From: kubasobon Date: Tue, 7 Jan 2025 12:41:25 +0100 Subject: [PATCH 01/12] run asset inventory fetchers periodically --- internal/flavors/assetinventory/strategy.go | 2 +- internal/inventory/inventory.go | 23 ++++++++++++++++++--- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/internal/flavors/assetinventory/strategy.go b/internal/flavors/assetinventory/strategy.go index 3878b20c0a..ef688967ba 100644 --- a/internal/flavors/assetinventory/strategy.go +++ b/internal/flavors/assetinventory/strategy.go @@ -73,7 +73,7 @@ func (s *strategy) NewAssetInventory(ctx context.Context, client beat.Client) (i s.logger.Infof("Creating %s AssetInventory", strings.ToUpper(s.cfg.AssetInventoryProvider)) now := func() time.Time { return time.Now() } //nolint:gocritic - return inventory.NewAssetInventory(s.logger, fetchers, client, now), nil + return inventory.NewAssetInventory(s.logger, fetchers, client, now, s.cfg.Period), nil } func (s *strategy) initAzureFetchers(_ context.Context) ([]inventory.AssetFetcher, error) { diff --git a/internal/inventory/inventory.go b/internal/inventory/inventory.go index 3cc7438127..1cdb250c68 100644 --- a/internal/inventory/inventory.go +++ b/internal/inventory/inventory.go @@ -29,13 +29,17 @@ import ( "github.com/samber/lo" ) -const indexTemplate = "logs-cloud_asset_inventory.asset_inventory-%s_%s_%s_%s-default" +const ( + indexTemplate = "logs-cloud_asset_inventory.asset_inventory-%s_%s_%s_%s-default" + minimalPeriod = 30 * time.Second +) type AssetInventory struct { fetchers []AssetFetcher publisher AssetPublisher bufferFlushInterval time.Duration bufferMaxSize int + period time.Duration logger *logp.Logger assetCh chan AssetEvent now func() time.Time @@ -49,8 +53,11 @@ type AssetPublisher interface { PublishAll([]beat.Event) } -func NewAssetInventory(logger *logp.Logger, fetchers []AssetFetcher, publisher AssetPublisher, now func() time.Time) AssetInventory { - logger.Info("Initializing Asset Inventory POC") +func NewAssetInventory(logger *logp.Logger, fetchers []AssetFetcher, publisher AssetPublisher, now func() time.Time, period time.Duration) AssetInventory { + if period < minimalPeriod { + period = minimalPeriod + } + logger.Infof("Initializing Asset Inventory POC with period of %s", period) return AssetInventory{ logger: logger, fetchers: fetchers, @@ -58,6 +65,7 @@ func NewAssetInventory(logger *logp.Logger, fetchers []AssetFetcher, publisher A // move to a configuration parameter bufferFlushInterval: 10 * time.Second, bufferMaxSize: 1600, + period: period, assetCh: make(chan AssetEvent), now: now, } @@ -72,6 +80,7 @@ func (a *AssetInventory) Run(ctx context.Context) { assetsBuffer := make([]AssetEvent, 0, a.bufferMaxSize) flushTicker := time.NewTicker(a.bufferFlushInterval) + fetcherPeriod := time.NewTicker(a.period) for { select { case <-ctx.Done(): @@ -79,6 +88,14 @@ func (a *AssetInventory) Run(ctx context.Context) { a.publish(assetsBuffer) return + case <-fetcherPeriod.C: + a.logger.Debug("starting a new fetch cycle") + for _, fetcher := range a.fetchers { + go func(fetcher AssetFetcher) { + fetcher.Fetch(ctx, a.assetCh) + }(fetcher) + } + case <-flushTicker.C: if len(assetsBuffer) == 0 { a.logger.Debugf("Interval reached without events") From c36509789d3704fd3ac5c55fa61e82b66e9078e0 Mon Sep 17 00:00:00 2001 From: kubasobon Date: Tue, 7 Jan 2025 12:44:53 +0100 Subject: [PATCH 02/12] update test setup --- internal/inventory/inventory_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/inventory/inventory_test.go b/internal/inventory/inventory_test.go index 685d951634..1dee09b125 100644 --- a/internal/inventory/inventory_test.go +++ b/internal/inventory/inventory_test.go @@ -150,6 +150,7 @@ func TestAssetInventory_Run(t *testing.T) { publisher: publisher, bufferFlushInterval: 10 * time.Millisecond, bufferMaxSize: 1, + period: 24 * time.Hour, assetCh: make(chan AssetEvent), now: now, } From 7f8b20bf7cbfdc24e246be7bf662a7056d4acc0a Mon Sep 17 00:00:00 2001 From: kubasobon Date: Tue, 7 Jan 2025 12:56:10 +0100 Subject: [PATCH 03/12] test asset inventory cycles --- internal/inventory/inventory_test.go | 37 ++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/internal/inventory/inventory_test.go b/internal/inventory/inventory_test.go index 1dee09b125..130ce4ded0 100644 --- a/internal/inventory/inventory_test.go +++ b/internal/inventory/inventory_test.go @@ -170,3 +170,40 @@ func TestAssetInventory_Run(t *testing.T) { assert.ElementsMatch(t, received, expected) } } + +func TestAssetInventory_Period(t *testing.T) { + now := func() time.Time { return time.Date(2024, 1, 1, 1, 1, 1, 0, time.Local) } + + var cycleCounter int = 0 + publisher := NewMockAssetPublisher(t) + + fetcher := NewMockAssetFetcher(t) + fetcher.EXPECT().Fetch(mock.Anything, mock.Anything).Run(func(_ context.Context, _ chan<- AssetEvent) { + cycleCounter += 1 + }) + + logger := logp.NewLogger("test_run") + inventory := AssetInventory{ + logger: logger, + fetchers: []AssetFetcher{fetcher}, + publisher: publisher, + bufferFlushInterval: 10 * time.Millisecond, + bufferMaxSize: 1, + period: 500 * time.Millisecond, + assetCh: make(chan AssetEvent), + now: now, + } + + // Run it enough for 2 cycles to finish; one starts immediately, the other after 500 milliseconds + ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond) + defer cancel() + + go func() { + inventory.Run(ctx) + }() + + select { + case <-ctx.Done(): + assert.Equal(t, 2, cycleCounter, "Expected to run 2 cycles, got %d", cycleCounter) + } +} From 638fcef6cf70d2a0c382ee34230a8fbee51ed3a9 Mon Sep 17 00:00:00 2001 From: kubasobon Date: Tue, 7 Jan 2025 13:14:04 +0100 Subject: [PATCH 04/12] handle PublishAll calls --- internal/inventory/inventory_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/inventory/inventory_test.go b/internal/inventory/inventory_test.go index 130ce4ded0..ae9536a44e 100644 --- a/internal/inventory/inventory_test.go +++ b/internal/inventory/inventory_test.go @@ -176,6 +176,7 @@ func TestAssetInventory_Period(t *testing.T) { var cycleCounter int = 0 publisher := NewMockAssetPublisher(t) + publisher.Mock.On("PublishAll").Run(func(mock.Arguments) {}).Maybe() fetcher := NewMockAssetFetcher(t) fetcher.EXPECT().Fetch(mock.Anything, mock.Anything).Run(func(_ context.Context, _ chan<- AssetEvent) { From e6fd5af66a060ca01f901d588d5afa74cff099fa Mon Sep 17 00:00:00 2001 From: kubasobon Date: Tue, 7 Jan 2025 13:17:41 +0100 Subject: [PATCH 05/12] handle PublishAll calls --- internal/inventory/inventory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/inventory/inventory_test.go b/internal/inventory/inventory_test.go index ae9536a44e..088b3674cc 100644 --- a/internal/inventory/inventory_test.go +++ b/internal/inventory/inventory_test.go @@ -176,7 +176,7 @@ func TestAssetInventory_Period(t *testing.T) { var cycleCounter int = 0 publisher := NewMockAssetPublisher(t) - publisher.Mock.On("PublishAll").Run(func(mock.Arguments) {}).Maybe() + publisher.EXPECT().PublishAll(mock.Anything).Maybe() fetcher := NewMockAssetFetcher(t) fetcher.EXPECT().Fetch(mock.Anything, mock.Anything).Run(func(_ context.Context, _ chan<- AssetEvent) { From 2071af8822e44b400874f4a36e4a5340115779f2 Mon Sep 17 00:00:00 2001 From: kubasobon Date: Tue, 7 Jan 2025 13:42:06 +0100 Subject: [PATCH 06/12] make counter atomic --- internal/inventory/inventory_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/inventory/inventory_test.go b/internal/inventory/inventory_test.go index 088b3674cc..908fce9e8d 100644 --- a/internal/inventory/inventory_test.go +++ b/internal/inventory/inventory_test.go @@ -19,6 +19,7 @@ package inventory import ( "context" + "sync/atomic" "testing" "time" @@ -174,13 +175,14 @@ func TestAssetInventory_Run(t *testing.T) { func TestAssetInventory_Period(t *testing.T) { now := func() time.Time { return time.Date(2024, 1, 1, 1, 1, 1, 0, time.Local) } - var cycleCounter int = 0 + var cycleCounter int64 = 0 + publisher := NewMockAssetPublisher(t) publisher.EXPECT().PublishAll(mock.Anything).Maybe() fetcher := NewMockAssetFetcher(t) fetcher.EXPECT().Fetch(mock.Anything, mock.Anything).Run(func(_ context.Context, _ chan<- AssetEvent) { - cycleCounter += 1 + atomic.AddInt64(&cycleCounter, 1) }) logger := logp.NewLogger("test_run") @@ -203,8 +205,7 @@ func TestAssetInventory_Period(t *testing.T) { inventory.Run(ctx) }() - select { - case <-ctx.Done(): - assert.Equal(t, 2, cycleCounter, "Expected to run 2 cycles, got %d", cycleCounter) - } + <-ctx.Done() + val := atomic.LoadInt64(&cycleCounter) + assert.Equal(t, val, cycleCounter, "Expected to run 2 cycles, got %d", cycleCounter) } From 9059026cfdf06393009707f7b19989dedfd2b4ec Mon Sep 17 00:00:00 2001 From: kubasobon Date: Tue, 7 Jan 2025 14:12:59 +0100 Subject: [PATCH 07/12] skip in short test suite --- internal/inventory/inventory_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/inventory/inventory_test.go b/internal/inventory/inventory_test.go index 908fce9e8d..b3d2b41f2d 100644 --- a/internal/inventory/inventory_test.go +++ b/internal/inventory/inventory_test.go @@ -32,6 +32,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/elastic/cloudbeat/internal/resources/utils/pointers" + "github.com/elastic/cloudbeat/internal/resources/utils/testhelper" ) func TestAssetInventory_Run(t *testing.T) { @@ -173,6 +174,7 @@ func TestAssetInventory_Run(t *testing.T) { } func TestAssetInventory_Period(t *testing.T) { + testhelper.SkipLong(t) now := func() time.Time { return time.Date(2024, 1, 1, 1, 1, 1, 0, time.Local) } var cycleCounter int64 = 0 From f431f4e248c204c44561e3ac92750478680c8d72 Mon Sep 17 00:00:00 2001 From: kubasobon Date: Tue, 7 Jan 2025 14:34:54 +0100 Subject: [PATCH 08/12] drop explicit zero --- internal/inventory/inventory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/inventory/inventory_test.go b/internal/inventory/inventory_test.go index b3d2b41f2d..3bcad837ff 100644 --- a/internal/inventory/inventory_test.go +++ b/internal/inventory/inventory_test.go @@ -177,7 +177,7 @@ func TestAssetInventory_Period(t *testing.T) { testhelper.SkipLong(t) now := func() time.Time { return time.Date(2024, 1, 1, 1, 1, 1, 0, time.Local) } - var cycleCounter int64 = 0 + var cycleCounter int64 publisher := NewMockAssetPublisher(t) publisher.EXPECT().PublishAll(mock.Anything).Maybe() From 2e32a94fc4730c05b8675e773cc0ee7dbc35a204 Mon Sep 17 00:00:00 2001 From: kubasobon Date: Tue, 7 Jan 2025 14:54:06 +0100 Subject: [PATCH 09/12] fix expected value --- internal/inventory/inventory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/inventory/inventory_test.go b/internal/inventory/inventory_test.go index 3bcad837ff..4da7185424 100644 --- a/internal/inventory/inventory_test.go +++ b/internal/inventory/inventory_test.go @@ -209,5 +209,5 @@ func TestAssetInventory_Period(t *testing.T) { <-ctx.Done() val := atomic.LoadInt64(&cycleCounter) - assert.Equal(t, val, cycleCounter, "Expected to run 2 cycles, got %d", cycleCounter) + assert.Equal(t, int64(2), val, "Expected to run 2 cycles, got %d", cycleCounter) } From 7f1495f8bbe3aba70def47feab966d443a909fc8 Mon Sep 17 00:00:00 2001 From: kubasobon Date: Tue, 7 Jan 2025 14:55:36 +0100 Subject: [PATCH 10/12] be consistent, use val --- internal/inventory/inventory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/inventory/inventory_test.go b/internal/inventory/inventory_test.go index 4da7185424..590f90d874 100644 --- a/internal/inventory/inventory_test.go +++ b/internal/inventory/inventory_test.go @@ -209,5 +209,5 @@ func TestAssetInventory_Period(t *testing.T) { <-ctx.Done() val := atomic.LoadInt64(&cycleCounter) - assert.Equal(t, int64(2), val, "Expected to run 2 cycles, got %d", cycleCounter) + assert.Equal(t, int64(2), val, "Expected to run 2 cycles, got %d", val) } From 8d6a00afcacb77bf97f64e4107438b173afd7b5b Mon Sep 17 00:00:00 2001 From: kubasobon Date: Wed, 8 Jan 2025 10:34:08 +0100 Subject: [PATCH 11/12] extract duplicate code to runAllFetchersOnce() --- internal/inventory/inventory.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/internal/inventory/inventory.go b/internal/inventory/inventory.go index 1cdb250c68..3cb6d88c12 100644 --- a/internal/inventory/inventory.go +++ b/internal/inventory/inventory.go @@ -72,11 +72,7 @@ func NewAssetInventory(logger *logp.Logger, fetchers []AssetFetcher, publisher A } func (a *AssetInventory) Run(ctx context.Context) { - for _, fetcher := range a.fetchers { - go func(fetcher AssetFetcher) { - fetcher.Fetch(ctx, a.assetCh) - }(fetcher) - } + a.runAllFetchersOnce(ctx) assetsBuffer := make([]AssetEvent, 0, a.bufferMaxSize) flushTicker := time.NewTicker(a.bufferFlushInterval) @@ -89,12 +85,7 @@ func (a *AssetInventory) Run(ctx context.Context) { return case <-fetcherPeriod.C: - a.logger.Debug("starting a new fetch cycle") - for _, fetcher := range a.fetchers { - go func(fetcher AssetFetcher) { - fetcher.Fetch(ctx, a.assetCh) - }(fetcher) - } + a.runAllFetchersOnce(ctx) case <-flushTicker.C: if len(assetsBuffer) == 0 { @@ -118,6 +109,17 @@ func (a *AssetInventory) Run(ctx context.Context) { } } +// runAllFetchersOnce runs every fetcher to collect assets to assetCh ONCE. It +// should be called every cycle, once every `a.period`. +func (a *AssetInventory) runAllFetchersOnce(ctx context.Context) { + a.logger.Debug("Running all fetchers once") + for _, fetcher := range a.fetchers { + go func(fetcher AssetFetcher) { + fetcher.Fetch(ctx, a.assetCh) + }(fetcher) + } +} + func (a *AssetInventory) publish(assets []AssetEvent) { events := lo.Map(assets, func(e AssetEvent, _ int) beat.Event { var relatedEntity []string From dcf2c4b775394fb6367048dc88adc18740bea2d9 Mon Sep 17 00:00:00 2001 From: kubasobon Date: Wed, 8 Jan 2025 10:43:47 +0100 Subject: [PATCH 12/12] add test for runAllFetchersOnce --- internal/inventory/inventory_test.go | 42 ++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/internal/inventory/inventory_test.go b/internal/inventory/inventory_test.go index 590f90d874..30e4bb3deb 100644 --- a/internal/inventory/inventory_test.go +++ b/internal/inventory/inventory_test.go @@ -211,3 +211,45 @@ func TestAssetInventory_Period(t *testing.T) { val := atomic.LoadInt64(&cycleCounter) assert.Equal(t, int64(2), val, "Expected to run 2 cycles, got %d", val) } + +func TestAssetInventory_RunAllFetchersOnce(t *testing.T) { + now := func() time.Time { return time.Date(2024, 1, 1, 1, 1, 1, 0, time.Local) } + publisher := NewMockAssetPublisher(t) + publisher.EXPECT().PublishAll(mock.Anything).Maybe() + + fetchers := []AssetFetcher{} + fetcherCounters := [](*int64){} + for i := 0; i < 5; i++ { + fetcher := NewMockAssetFetcher(t) + counter := int64(0) + fetcher.EXPECT().Fetch(mock.Anything, mock.Anything).Run(func(_ context.Context, _ chan<- AssetEvent) { + atomic.AddInt64(&counter, 1) + }) + fetchers = append(fetchers, fetcher) + fetcherCounters = append(fetcherCounters, &counter) + } + + logger := logp.NewLogger("test_run") + inventory := AssetInventory{ + logger: logger, + fetchers: fetchers, + publisher: publisher, + bufferFlushInterval: 10 * time.Millisecond, + bufferMaxSize: 1, + period: 24 * time.Hour, + assetCh: make(chan AssetEvent), + now: now, + } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + inventory.runAllFetchersOnce(ctx) + <-ctx.Done() + + // Check that EVERY fetcher has been called EXACTLY ONCE + for _, counter := range fetcherCounters { + val := atomic.LoadInt64(counter) + assert.Equal(t, int64(1), val, "Expected to run once, got %d", val) + } +}