Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instantiate data sources from the policy engine #5060

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/datasources/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ func (d *dataSourceService) BuildDataSourceRegistry(
instantiations := rt.GetDef().GetEval().GetDataSources()
reg := v1datasources.NewDataSourceRegistry()

// return early so we don't need to do useless work
if len(instantiations) == 0 {
return reg, nil
}

stx, err := d.txBuilder(d, opts)
if err != nil {
return nil, fmt.Errorf("failed to start transaction: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions internal/engine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/open-feature/go-sdk/openfeature"
"github.com/rs/zerolog"

datasourceservice "github.com/mindersec/minder/internal/datasources/service"
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/actions"
"github.com/mindersec/minder/internal/engine/actions/alert"
Expand Down Expand Up @@ -112,6 +113,8 @@ func (e *executor) EvalEntityEvent(ctx context.Context, inf *entities.EntityInfo

defer e.releaseLockAndFlush(ctx, inf)

dssvc := datasourceservice.NewDataSourceService(e.querier)

entityType := entities.EntityTypeToDB(inf.Type)
// Load all the relevant rule type engines for this entity
ruleEngineCache, err := rtengine.NewRuleEngineCache(
Expand All @@ -121,6 +124,7 @@ func (e *executor) EvalEntityEvent(ctx context.Context, inf *entities.EntityInfo
inf.ProjectID,
provider,
ingestCache,
dssvc,
eoptions.WithFlagsClient(e.featureFlags),
)
if err != nil {
Expand Down
36 changes: 32 additions & 4 deletions internal/engine/rtengine/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/google/uuid"

datasourceservice "github.com/mindersec/minder/internal/datasources/service"
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/ingestcache"
eoptions "github.com/mindersec/minder/internal/engine/options"
Expand All @@ -32,6 +33,7 @@ type ruleEngineCache struct {
provider provinfv1.Provider
ingestCache ingestcache.Cache
engines cacheType
dssvc datasourceservice.DataSourcesService
opts []eoptions.Option
}

Expand All @@ -40,13 +42,15 @@ type ruleEngineCache struct {
// for this entity and project hierarchy.
func NewRuleEngineCache(
ctx context.Context,
store db.Querier,
store db.Store,
entityType db.Entities,
projectID uuid.UUID,
provider provinfv1.Provider,
ingestCache ingestcache.Cache,
dssvc datasourceservice.DataSourcesService,
opts ...eoptions.Option,
) (Cache, error) {

// Get the full project hierarchy
hierarchy, err := store.GetParentProjects(ctx, projectID)
if err != nil {
Expand All @@ -66,14 +70,22 @@ func NewRuleEngineCache(
// Populate the cache with rule type engines for the rule types we found.
engines := make(cacheType, len(ruleTypes))
for _, ruleType := range ruleTypes {
ruleEngine, err := cacheRuleEngine(ctx, &ruleType, provider, ingestCache, engines, opts...)
ruleEngine, err := cacheRuleEngine(
ctx, &ruleType, provider, ingestCache, engines, dssvc, opts...)
if err != nil {
return nil, err
}
engines[ruleType.ID] = ruleEngine
}

return &ruleEngineCache{engines: engines, opts: opts}, nil
return &ruleEngineCache{
store: store,
provider: provider,
ingestCache: ingestCache,
engines: engines,
opts: opts,
dssvc: dssvc,
}, nil
}

func (r *ruleEngineCache) GetRuleEngine(ctx context.Context, ruleTypeID uuid.UUID) (*rtengine2.RuleTypeEngine, error) {
Expand All @@ -100,7 +112,7 @@ func (r *ruleEngineCache) GetRuleEngine(ctx context.Context, ruleTypeID uuid.UUI
}

// If we find the rule type, insert into the cache and return.
ruleTypeEngine, err := cacheRuleEngine(ctx, &ruleType, r.provider, r.ingestCache, r.engines, r.opts...)
ruleTypeEngine, err := cacheRuleEngine(ctx, &ruleType, r.provider, r.ingestCache, r.engines, r.dssvc, r.opts...)
if err != nil {
return nil, fmt.Errorf("error while caching rule type engine: %w", err)
}
Expand All @@ -113,6 +125,7 @@ func cacheRuleEngine(
provider provinfv1.Provider,
ingestCache ingestcache.Cache,
engineCache cacheType,
dssvc datasourceservice.DataSourcesService,
opts ...eoptions.Option,
) (*rtengine2.RuleTypeEngine, error) {
// Parse the rule type
Expand All @@ -121,6 +134,21 @@ func cacheRuleEngine(
return nil, fmt.Errorf("error parsing rule type when parsing rule type %s: %w", ruleType.ID, err)
}

// Build a registry instance per rule type. This allows us to have an
// isolated data source list per instance of the rule type engine which is
// what we want. We don't want rule types using data sources they haven't
// instantiated. It is in this spot that we would add something like a cache
// so data sources could optimize in a per-execution context.
//
// TODO: Do we need to pass in a transaction here?
// TODO: We _might_ want to pass in a slice of the hierarchy here.
dsreg, err := dssvc.BuildDataSourceRegistry(ctx, pbRuleType, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible for this to conflict right? I.e. one ruletype having access to the data sources for another one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. This is why I make the instantiation of the registry here and per rule type. The registry is not shared.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect! 💯 😃

if err != nil {
return nil, fmt.Errorf("error building data source registry: %w", err)
}

opts = append(opts, eoptions.WithDataSources(dsreg))

// Create the rule type engine
ruleEngine, err := rtengine2.NewRuleTypeEngine(ctx, pbRuleType, provider, opts...)
if err != nil {
Expand Down
146 changes: 142 additions & 4 deletions internal/engine/rtengine/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,145 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

mockdssvc "github.com/mindersec/minder/internal/datasources/service/mock"
"github.com/mindersec/minder/internal/db"
dbf "github.com/mindersec/minder/internal/db/fixtures"
"github.com/mindersec/minder/internal/engine/ingestcache"
"github.com/mindersec/minder/internal/providers/testproviders"
v1datasources "github.com/mindersec/minder/pkg/datasources/v1"
rtengine2 "github.com/mindersec/minder/pkg/engine/v1/rtengine"
)

func TestNewRuleTypeEngineCacheConstructor(t *testing.T) {
t.Parallel()

scenarios := []struct {
Name string
DBSetup dbf.DBMockBuilder
DSServiceSetup func(service *mockdssvc.MockDataSourcesService)
ExpectedError string
}{
{
Name: "Returns error when getting parent projects fails",
DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) {
mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()).Return(nil, errTest)
}),
ExpectedError: "error getting parent projects",
},
{
Name: "Returns error when getting rule types fails",
DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) {
mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()).
Return([]uuid.UUID{uuid.New()}, nil)
mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()).
Return(nil, errTest)
}),
ExpectedError: "error while retrieving rule types",
},
{
Name: "Returns error when getting rule type with no def",
DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) {
mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()).
Return([]uuid.UUID{uuid.New()}, nil)
mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()).
Return([]db.RuleType{{ID: uuid.New()}}, nil)
}),
ExpectedError: "cannot unmarshal rule type definition",
},
{
Name: "Returns error when building data source registry fails",
DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) {
hierarchy := []uuid.UUID{uuid.New(), uuid.New()}
// Calls from the engine builder itself
mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()).
Return(hierarchy, nil)
mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()).
Return([]db.RuleType{{
ID: uuid.New(),
ProjectID: hierarchy[0],
Definition: []byte(ruleDefJSON),
}}, nil)
}),
DSServiceSetup: func(service *mockdssvc.MockDataSourcesService) {
service.EXPECT().BuildDataSourceRegistry(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, errTest)
},
ExpectedError: errTest.Error(),
},
{
Name: "Creates rule engine cache",
DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) {
hierarchy := []uuid.UUID{uuid.New(), uuid.New()}
// Calls from the engine builder itself
mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()).
Return(hierarchy, nil)
mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()).
Return([]db.RuleType{{
ID: uuid.New(),
ProjectID: hierarchy[0],
Definition: []byte(ruleDefJSON),
}}, nil)
}),
DSServiceSetup: func(service *mockdssvc.MockDataSourcesService) {
service.EXPECT().BuildDataSourceRegistry(gomock.Any(), gomock.Any(), gomock.Any()).
Return(v1datasources.NewDataSourceRegistry(), nil)
},
},
}

for _, scenario := range scenarios {
t.Run(scenario.Name, func(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := context.Background()

dssvc := mockdssvc.NewMockDataSourcesService(ctrl)

var store db.Store
if scenario.DBSetup != nil {
store = scenario.DBSetup(ctrl)
}

if scenario.DSServiceSetup != nil {
scenario.DSServiceSetup(dssvc)
}

cache, err := NewRuleEngineCache(
ctx, store, db.EntitiesRepository, uuid.New(),
testproviders.NewGitProvider(nil), ingestcache.NewNoopCache(),
dssvc)
if scenario.ExpectedError != "" {
require.ErrorContains(t, err, scenario.ExpectedError)
require.Nil(t, cache)
} else {
require.NoError(t, err)
require.NotNil(t, cache)

// Ensure members are not null so we don't fall on the same issue
// we had of not initializing them.
impl, ok := cache.(*ruleEngineCache)
require.True(t, ok)
require.NotNil(t, impl.store)
require.NotNil(t, impl.provider)
require.NotNil(t, impl.ingestCache)
require.NotNil(t, impl.engines)
require.NotNil(t, impl.dssvc)
}
})
}
}

func TestGetRuleEngine(t *testing.T) {
t.Parallel()

scenarios := []struct {
Name string
Cache cacheType
DBSetup dbf.DBMockBuilder
ExpectedError string
Name string
Cache cacheType
DBSetup dbf.DBMockBuilder
ExpectedError string
dsRegistryError error
}{
{
Name: "Retrieves rule engine from cache",
Expand Down Expand Up @@ -62,6 +186,13 @@ func TestGetRuleEngine(t *testing.T) {
Cache: cacheType{},
DBSetup: dbf.NewDBMock(withRuleTypeLookup(&ruleType, nil)),
},
{
Name: "Returns error when building data source registry fails",
Cache: cacheType{},
DBSetup: dbf.NewDBMock(withRuleTypeLookup(&ruleType, nil)),
dsRegistryError: errTest,
ExpectedError: errTest.Error(),
},
}

for _, scenario := range scenarios {
Expand All @@ -77,11 +208,18 @@ func TestGetRuleEngine(t *testing.T) {
store = scenario.DBSetup(ctrl)
}

dssvc := mockdssvc.NewMockDataSourcesService(ctrl)
reg := v1datasources.NewDataSourceRegistry()

dssvc.EXPECT().BuildDataSourceRegistry(gomock.Any(), gomock.Any(), gomock.Any()).
Return(reg, scenario.dsRegistryError).AnyTimes()

cache := ruleEngineCache{
store: store,
provider: testproviders.NewGitProvider(nil),
ingestCache: ingestcache.NewNoopCache(),
engines: scenario.Cache,
dssvc: dssvc,
}

result, err := cache.GetRuleEngine(ctx, ruleTypeID)
Expand Down