Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build Blockstore stats from config #8814

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/lakefs/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,13 @@ var runCmd = &cobra.Command{
authenticationService = authentication.NewDummyService()
}

cloudMetadataProvider := stats.BuildMetadataProvider(logger, baseCfg)
blockstoreType := baseCfg.Blockstore.Type
if blockstoreType == "mem" {
printLocalWarning(os.Stderr, fmt.Sprintf("blockstore type %s", blockstoreType))
logger.WithField("adapter_type", blockstoreType).Warn("Block adapter NOT SUPPORTED for production use")
}

metadata := stats.NewMetadata(ctx, logger, blockstoreType, authMetadataManager, cloudMetadataProvider)
metadata := stats.NewMetadata(ctx, logger, authMetadataManager, baseCfg.StorageConfig())
bufferedCollector := stats.NewBufferedCollector(metadata.InstallationID, stats.Config(baseCfg.Stats),
stats.WithLogger(logger.WithField("service", "stats_collector")))

Expand Down Expand Up @@ -307,7 +306,6 @@ var runCmd = &cobra.Command{
authMetadataManager,
migrator,
bufferedCollector,
cloudMetadataProvider,
actionsService,
auditChecker,
logger.WithField("service", "api_gateway"),
Expand Down
3 changes: 1 addition & 2 deletions cmd/lakefs/cmd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ var setupCmd = &cobra.Command{
logger := logging.FromContext(ctx)
authMetadataManage := auth.NewKVMetadataManager(version.Version, cfg.Installation.FixedID, cfg.Database.Type, kvStore)
authService = NewAuthService(ctx, cfg, logger, kvStore, authMetadataManage)
cloudMetadataProvider := stats.BuildMetadataProvider(logger, cfg)
metadata := stats.NewMetadata(ctx, logger, cfg.Blockstore.Type, authMetadataManage, cloudMetadataProvider)
metadata := stats.NewMetadata(ctx, logger, authMetadataManage, cfg.StorageConfig())

credentials, err := setupLakeFS(ctx, cfg, authMetadataManage, authService, userName, accessKeyID, secretAccessKey, noCheck)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/lakefs/cmd/superuser.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ If the wrong user or credentials were chosen it is possible to delete the user a

authMetadataManager := auth.NewKVMetadataManager(version.Version, cfg.Installation.FixedID, cfg.Database.Type, kvStore)

metadataProvider := stats.BuildMetadataProvider(logger, cfg)
metadata := stats.NewMetadata(ctx, logger, cfg.Blockstore.Type, authMetadataManager, metadataProvider)
credentials, err := setup.AddAdminUser(ctx, authService, &model.SuperuserConfiguration{
User: model.User{
CreatedAt: time.Now(),
Expand All @@ -107,12 +105,15 @@ If the wrong user or credentials were chosen it is possible to delete the user a
os.Exit(1)
}

metadata := stats.NewMetadata(ctx, logger, authMetadataManager, cfg.StorageConfig())

ctx, cancelFn := context.WithCancel(ctx)
collector := stats.NewBufferedCollector(metadata.InstallationID, stats.Config(cfg.Stats),
stats.WithLogger(logger.WithField("service", "stats_collector")))
collector.Start(ctx)
defer collector.Close()

collector.SetInstallationID(metadata.InstallationID)
collector.CollectMetadata(metadata)
collector.CollectEvent(stats.Event{Class: "global", Name: "superuser"})

Expand Down
63 changes: 40 additions & 23 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,24 +112,39 @@ type Controller struct {

var usageCounter = stats.NewUsageCounter()

func NewController(cfg config.Config, catalog *catalog.Catalog, authenticator auth.Authenticator, authService auth.Service, authenticationService authentication.Service, blockAdapter block.Adapter, metadataManager auth.MetadataManager, migrator Migrator, collector stats.Collector, cloudMetadataProvider cloud.MetadataProvider, actions actionsHandler, auditChecker AuditChecker, logger logging.Logger, sessionStore sessions.Store, pathProvider upload.PathProvider, usageReporter stats.UsageReporterOperations) *Controller {
func NewController(
Copy link
Contributor Author

@itaigilo itaigilo Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was too damn long for a single line.
Removed cloudMetadataProvider.

cfg config.Config,
catalog *catalog.Catalog,
authenticator auth.Authenticator,
authService auth.Service,
authenticationService authentication.Service,
blockAdapter block.Adapter,
metadataManager auth.MetadataManager,
migrator Migrator,
collector stats.Collector,
actions actionsHandler,
auditChecker AuditChecker,
logger logging.Logger,
sessionStore sessions.Store,
pathProvider upload.PathProvider,
usageReporter stats.UsageReporterOperations,
) *Controller {
return &Controller{
Config: cfg,
Catalog: catalog,
Authenticator: authenticator,
Auth: authService,
Authentication: authenticationService,
BlockAdapter: blockAdapter,
MetadataManager: metadataManager,
Migrator: migrator,
Collector: collector,
CloudMetadataProvider: cloudMetadataProvider,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only removed this one. The rest is whitespaces.

Actions: actions,
AuditChecker: auditChecker,
Logger: logger,
sessionStore: sessionStore,
PathProvider: pathProvider,
usageReporter: usageReporter,
Config: cfg,
Catalog: catalog,
Authenticator: authenticator,
Auth: authService,
Authentication: authenticationService,
BlockAdapter: blockAdapter,
MetadataManager: metadataManager,
Migrator: migrator,
Collector: collector,
Actions: actions,
AuditChecker: auditChecker,
Logger: logger,
sessionStore: sessionStore,
PathProvider: pathProvider,
usageReporter: usageReporter,
}
}

Expand Down Expand Up @@ -5155,26 +5170,28 @@ func (c *Controller) Setup(w http.ResponseWriter, r *http.Request, body apigen.S
return
}

if c.Config.GetBaseConfig().Auth.UIConfig.RBAC == config.AuthRBACExternal {
baseConfig := c.Config.GetBaseConfig()

if baseConfig.Auth.UIConfig.RBAC == config.AuthRBACExternal {
// nothing to do - users are managed elsewhere
writeResponse(w, r, http.StatusOK, apigen.CredentialsWithSecret{})
return
}

var cred *model.Credential
if body.Key == nil {
cred, err = setup.CreateInitialAdminUser(ctx, c.Auth, c.Config.GetBaseConfig(), c.MetadataManager, body.Username)
cred, err = setup.CreateInitialAdminUser(ctx, c.Auth, baseConfig, c.MetadataManager, body.Username)
} else {
cred, err = setup.CreateInitialAdminUserWithKeys(ctx, c.Auth, c.Config.GetBaseConfig(), c.MetadataManager, body.Username, &body.Key.AccessKeyId, &body.Key.SecretAccessKey)
cred, err = setup.CreateInitialAdminUserWithKeys(ctx, c.Auth, baseConfig, c.MetadataManager, body.Username, &body.Key.AccessKeyId, &body.Key.SecretAccessKey)
}
if err != nil {
writeError(w, r, http.StatusInternalServerError, err)
return
}

meta := stats.NewMetadata(ctx, c.Logger, c.BlockAdapter.BlockstoreType(), c.MetadataManager, c.CloudMetadataProvider)
c.Collector.SetInstallationID(meta.InstallationID)
c.Collector.CollectMetadata(meta)
metadata := stats.NewMetadata(ctx, c.Logger, c.MetadataManager, baseConfig.StorageConfig())
c.Collector.SetInstallationID(metadata.InstallationID)
c.Collector.CollectMetadata(metadata)
c.Collector.CollectEvent(stats.Event{Class: "global", Name: "init", UserID: body.Username, Client: httputil.GetRequestLakeFSClient(r)})

response := apigen.CredentialsWithSecret{
Expand Down
24 changes: 20 additions & 4 deletions pkg/api/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/treeverse/lakefs/pkg/authentication"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/catalog"
"github.com/treeverse/lakefs/pkg/cloud"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/httputil"
"github.com/treeverse/lakefs/pkg/logging"
Expand All @@ -33,7 +32,24 @@ const (
extensionValidationExcludeBody = "x-validation-exclude-body"
)

func Serve(cfg config.Config, catalog *catalog.Catalog, middlewareAuthenticator auth.Authenticator, authService auth.Service, authenticationService authentication.Service, blockAdapter block.Adapter, metadataManager auth.MetadataManager, migrator Migrator, collector stats.Collector, cloudMetadataProvider cloud.MetadataProvider, actions actionsHandler, auditChecker AuditChecker, logger logging.Logger, gatewayDomains []string, snippets []params.CodeSnippet, pathProvider upload.PathProvider, usageReporter stats.UsageReporterOperations) http.Handler {
func Serve(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was too damn long for a single line.
Removed cloudMetadataProvider, and renamed middlewareAuthenticator to authenticator (not related, but simplifies).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please provide the style template you are using, if we change to something like this I need to break it down manually on every refactor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this to the list of topics for the Code-Style effort -
So this will be done systematically.

cfg config.Config,
catalog *catalog.Catalog,
authenticator auth.Authenticator,
authService auth.Service,
authenticationService authentication.Service,
blockAdapter block.Adapter,
metadataManager auth.MetadataManager,
migrator Migrator,
collector stats.Collector,
actions actionsHandler,
auditChecker AuditChecker,
logger logging.Logger,
gatewayDomains []string,
snippets []params.CodeSnippet,
pathProvider upload.PathProvider,
usageReporter stats.UsageReporterOperations,
) http.Handler {
logger.Info("initialize OpenAPI server")
swagger, err := apigen.GetSwagger()
if err != nil {
Expand All @@ -53,10 +69,10 @@ func Serve(cfg config.Config, catalog *catalog.Catalog, middlewareAuthenticator
cfg.GetBaseConfig().Logging.AuditLogLevel,
cfg.GetBaseConfig().Logging.TraceRequestHeaders,
cfg.GetBaseConfig().IsAdvancedAuth()),
AuthMiddleware(logger, swagger, middlewareAuthenticator, authService, sessionStore, &oidcConfig, &cookieAuthConfig),
AuthMiddleware(logger, swagger, authenticator, authService, sessionStore, &oidcConfig, &cookieAuthConfig),
MetricsMiddleware(swagger),
)
controller := NewController(cfg, catalog, middlewareAuthenticator, authService, authenticationService, blockAdapter, metadataManager, migrator, collector, cloudMetadataProvider, actions, auditChecker, logger, sessionStore, pathProvider, usageReporter)
controller := NewController(cfg, catalog, authenticator, authService, authenticationService, blockAdapter, metadataManager, migrator, collector, actions, auditChecker, logger, sessionStore, pathProvider, usageReporter)
apigen.HandlerFromMuxWithBaseURL(controller, apiRouter, apiutil.BaseURL)

r.Mount("/_health", httputil.ServeHealth())
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func setupHandler(t testing.TB) (http.Handler, *dependencies) {
auditChecker := version.NewDefaultAuditChecker(cfg.Security.AuditCheckURL, "", nil)

authenticationService := authentication.NewDummyService()
handler := api.Serve(cfg, c, authenticator, authService, authenticationService, c.BlockAdapter, meta, migrator, collector, nil, actionsService, auditChecker, logging.ContextUnavailable(), nil, nil, upload.DefaultPathProvider, stats.DefaultUsageReporter)
handler := api.Serve(cfg, c, authenticator, authService, authenticationService, c.BlockAdapter, meta, migrator, collector, actionsService, auditChecker, logging.ContextUnavailable(), nil, nil, upload.DefaultPathProvider, stats.DefaultUsageReporter)

return handler, &dependencies{
blocks: c.BlockAdapter,
Expand Down
2 changes: 1 addition & 1 deletion pkg/loadtest/local_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestLocalLoad(t *testing.T) {
})
auditChecker := version.NewDefaultAuditChecker(cfg.Security.AuditCheckURL, "", nil)
authenticationService := authentication.NewDummyService()
handler := api.Serve(cfg, c, authenticator, authService, authenticationService, blockAdapter, meta, migrator, &stats.NullCollector{}, nil, actionsService, auditChecker, logging.ContextUnavailable(), nil, nil, upload.DefaultPathProvider, stats.DefaultUsageReporter)
handler := api.Serve(cfg, c, authenticator, authService, authenticationService, blockAdapter, meta, migrator, &stats.NullCollector{}, actionsService, auditChecker, logging.ContextUnavailable(), nil, nil, upload.DefaultPathProvider, stats.DefaultUsageReporter)

ts := httptest.NewServer(handler)
defer ts.Close()
Expand Down
49 changes: 39 additions & 10 deletions pkg/stats/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type MetadataProvider interface {
GetMetadata(context.Context) (map[string]string, error)
}

func NewMetadata(ctx context.Context, logger logging.Logger, blockstoreType string, metadataProvider MetadataProvider, cloudMetadataProvider cloud.MetadataProvider) *Metadata {
func NewMetadata(ctx context.Context, logger logging.Logger, metadataProvider MetadataProvider, storageConfig config.StorageConfig) *Metadata {
Copy link
Contributor

@guy-har guy-har Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internals of the provider should stay externally, I would expect all this code to be in a storageAdapterMetadata provider. Maybe even provided by the block adapter factory.

If we're already here I would suggest replacing the current signature of this function.
func NewMetadata(ctx context.Context, logger logging.Logger, blockstoreType string, metadataProvider []MetadataProvider) *Metadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is -
blockstoreType should also be an array.
So it'll be a bunch of repeated code outside of this function.

Note that BuildMetadataProvider in this file was also "aware" of blockstore logic -
It wasn't part of the NewMetadata() function, but each NewMetadata() caller was calling this function to pas its result as a param.
So I don't think that using the same trick gives much better encapsulation, only more repeated code...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following, why should blockstore type be an array?
I know, it was done in order to not have circular dependency and not to build a factory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guy-har I tried what you suggested, which is basically moving buildMetadataProvider to the blockfactory, so it can be implemented for Enterprise as well.
However, as you suspected, this creates a circular dependency -
Since blockfactory is using the stats package when building the block.Adapter.
This is probably why buildMetadataProvider is in this file in first place.

We can extract the metadata builder into a separate module (in modules) that will be implement by Enterprise, and I think this can solve the circular dependencies (though it requires validation) - but I think that's a huge overkill for this.

Any other suggestions?

res := &Metadata{}
authMetadata, err := metadataProvider.GetMetadata(ctx)
if err != nil {
Expand All @@ -39,28 +39,57 @@ func NewMetadata(ctx context.Context, logger logging.Logger, blockstoreType stri
}
res.Entries = append(res.Entries, MetadataEntry{Name: k, Value: v})
}
if cloudMetadataProvider != nil {
cloudMetadata := cloudMetadataProvider.GetMetadata()
for k, v := range cloudMetadata {
res.Entries = append(res.Entries, MetadataEntry{Name: k, Value: v})

for _, id := range storageConfig.GetStorageIDs() {
if adapterCfg := storageConfig.GetStorageByID(id); adapterCfg != nil {
appendOrUpdateEntry(res, BlockstoreTypeKey, adapterCfg.BlockstoreType())

provider := buildMetadataProvider(logger, adapterCfg)
cloudMetadata := provider.GetMetadata()
if cloudMetadata == nil {
cloudMetadata = getErrorMetadata()
}
for k, v := range cloudMetadata {
appendOrUpdateEntry(res, k, v)
}
}
}
res.Entries = append(res.Entries, MetadataEntry{Name: BlockstoreTypeKey, Value: blockstoreType})

return res
}

func appendOrUpdateEntry(metadata *Metadata, key, value string) {
for i, entry := range metadata.Entries {
if entry.Name == key {
metadata.Entries[i].Value = entry.Value + "," + value
return
}
}
metadata.Entries = append(metadata.Entries, MetadataEntry{Name: key, Value: value})
}

func getErrorMetadata() map[string]string {
return map[string]string{
cloud.IDKey: "err",
cloud.IDTypeKey: "err",
}
}

Comment on lines +71 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's ErrorMetadata? Is it stats we want to collect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases we know the blockstore type, but failing to get the metadata from the cloud-provider.
In this case, we'll report err to the BI (instead of nil).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this requested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not as a product requirement, but from a functional pov -
You can check the tests here, and see that once it's nil and not err, the BI consumer will think it's not configured, instead of this being configured but failing.

I can return nil here, but this also means we won't be able to unit-test these params.

type noopMetadataProvider struct{}

func (n *noopMetadataProvider) GetMetadata() map[string]string {
return nil
return map[string]string{
cloud.IDKey: "nil",
cloud.IDTypeKey: "nil",
}
}

func BuildMetadataProvider(logger logging.Logger, c *config.BaseConfig) cloud.MetadataProvider {
switch c.Blockstore.Type {
func buildMetadataProvider(logger logging.Logger, c config.AdapterConfig) cloud.MetadataProvider {
switch c.BlockstoreType() {
case block.BlockstoreTypeGS:
return gcp.NewMetadataProvider(logger)
case block.BlockstoreTypeS3:
s3Params, err := c.Blockstore.BlockstoreS3Params()
s3Params, err := c.BlockstoreS3Params()
if err != nil {
logger.WithError(err).Warn("Failed to create S3 client for MetadataProvider")
return &noopMetadataProvider{}
Expand Down
95 changes: 95 additions & 0 deletions pkg/stats/metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package stats_test

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/cloud"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/stats"
)

type mockMetadataProvider struct {
metadata map[string]string
err error
}

func (m *mockMetadataProvider) GetMetadata(ctx context.Context) (map[string]string, error) {
return m.metadata, m.err
}

const (
installationIDKey = "installation_id"
installationIDValue = "test_installation_id"
testEntryKey = "test_key"
testEntryValue = "test_value"
)

func TestNewMetadata(t *testing.T) {
tests := []struct {
name string
blockstore config.Blockstore
expectedBlockstoreType string
expectedIDKeyType string
expectedIDKey string
}{
{
name: "s3",
blockstore: config.Blockstore{
Type: "s3",
S3: &config.BlockstoreS3{},
},
expectedBlockstoreType: "s3",
expectedIDKeyType: "err", // no s3 cloud provider in the test env, hence err
expectedIDKey: "err", // no s3 cloud provider in the test env, hence err
},
{
name: "gs",
blockstore: config.Blockstore{
Type: "gs",
GS: &config.BlockstoreGS{},
},
expectedBlockstoreType: "gs",
expectedIDKeyType: "err", // no gs cloud provider in the test env, hence err
expectedIDKey: "err", // no gs cloud provider in the test env, hence err
},
{
name: "local",
blockstore: config.Blockstore{
Type: "local",
},
expectedBlockstoreType: "local",
expectedIDKeyType: "nil",
expectedIDKey: "nil",
},
}

for _, tt := range tests {
ctx := context.Background()
logger := logging.FromContext(ctx)
provider := &mockMetadataProvider{
metadata: map[string]string{
installationIDKey: installationIDValue,
testEntryKey: testEntryValue,
},
}

t.Run(tt.name, func(t *testing.T) {
cfg := &config.BaseConfig{
Blockstore: tt.blockstore,
}

metadata := stats.NewMetadata(ctx, logger, provider, cfg.StorageConfig())

require.Equal(t, installationIDValue, metadata.InstallationID)
require.Len(t, metadata.Entries, 5)
require.Contains(t, metadata.Entries, stats.MetadataEntry{Name: installationIDKey, Value: installationIDValue})
require.Contains(t, metadata.Entries, stats.MetadataEntry{Name: testEntryKey, Value: testEntryValue})
require.Contains(t, metadata.Entries, stats.MetadataEntry{Name: stats.BlockstoreTypeKey, Value: tt.expectedBlockstoreType})
require.Contains(t, metadata.Entries, stats.MetadataEntry{Name: cloud.IDTypeKey, Value: tt.expectedIDKeyType})
require.Contains(t, metadata.Entries, stats.MetadataEntry{Name: cloud.IDKey, Value: tt.expectedIDKey})
})
}
}
Loading