From df3b74ee80475439a91642bb671a599484997824 Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Wed, 29 Apr 2026 10:41:17 +0200 Subject: [PATCH 01/12] shares(jsoncs3): Initialize metadata connection at startup Move away from lazily initializing the connection to the metastorage and try initialize at service startup. This is ground work for the upcoming migration to import the space memberships into the jsoncs3 share manager. --- .../usershareprovider/usershareprovider.go | 8 +- pkg/share/manager/jsoncs3/jsoncs3.go | 91 +++++++++++-------- pkg/share/manager/memory/memory.go | 3 +- pkg/share/manager/registry/registry.go | 7 +- 4 files changed, 66 insertions(+), 43 deletions(-) diff --git a/internal/grpc/services/usershareprovider/usershareprovider.go b/internal/grpc/services/usershareprovider/usershareprovider.go index eaab359763e..1b801f58128 100644 --- a/internal/grpc/services/usershareprovider/usershareprovider.go +++ b/internal/grpc/services/usershareprovider/usershareprovider.go @@ -84,9 +84,9 @@ type service struct { allowedPathsForShares []*regexp.Regexp } -func getShareManager(c *config) (share.Manager, error) { +func getShareManager(c *config, logger *zerolog.Logger) (share.Manager, error) { if f, ok := registry.NewFuncs[c.Driver]; ok { - return f(c.Drivers[c.Driver]) + return f(c.Drivers[c.Driver], logger) } return nil, errtypes.NotFound("driver not found: " + c.Driver) } @@ -114,7 +114,7 @@ func parseConfig(m map[string]interface{}) (*config, error) { } // New creates a new user share provider svc initialized from defaults -func NewDefault(m map[string]interface{}, ss *grpc.Server, _ *zerolog.Logger) (rgrpc.Service, error) { +func NewDefault(m map[string]any, ss *grpc.Server, logger *zerolog.Logger) (rgrpc.Service, error) { c, err := parseConfig(m) if err != nil { @@ -123,7 +123,7 @@ func NewDefault(m map[string]interface{}, ss *grpc.Server, _ *zerolog.Logger) (r c.init() - sm, err := getShareManager(c) + sm, err := getShareManager(c, logger) if err != nil { return nil, err } diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index ba995639b8a..e097f9faa01 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -48,6 +48,7 @@ import ( "github.com/opencloud-eu/reva/v2/pkg/storagespace" "github.com/opencloud-eu/reva/v2/pkg/utils" "github.com/pkg/errors" + "github.com/rs/zerolog" "go.opentelemetry.io/otel/codes" "golang.org/x/sync/errgroup" "google.golang.org/genproto/protobuf/field_mask" @@ -145,8 +146,6 @@ type EventOptions struct { // Manager implements a share manager using a cs3 storage backend with local caching type Manager struct { - sync.RWMutex - Cache providercache.Cache // holds all shares, sharded by provider id and space id CreatedCache sharecache.Cache // holds the list of shares a user has created, sharded by user id GroupReceivedCache sharecache.Cache // holds the list of shares a group has access to, sharded by group id @@ -155,7 +154,7 @@ type Manager struct { storage metadata.Storage SpaceRoot *provider.ResourceId - initialized bool + ready chan struct{} // closed once initialize() has completed successfully MaxConcurrency int @@ -164,7 +163,7 @@ type Manager struct { } // NewDefault returns a new manager instance with default dependencies -func NewDefault(m map[string]interface{}) (share.Manager, error) { +func NewDefault(m map[string]interface{}, logger *zerolog.Logger) (share.Manager, error) { c := &config{} if err := mapstructure.Decode(m, c); err != nil { err = errors.Wrap(err, "error creating a new manager") @@ -189,11 +188,11 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) { } } - return New(s, gatewaySelector, c.CacheTTL, es, c.MaxConcurrency) + return New(s, logger, gatewaySelector, c.CacheTTL, es, c.MaxConcurrency) } // New returns a new manager instance. -func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) { +func New(s metadata.Storage, logger *zerolog.Logger, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) { ttl := time.Duration(ttlSeconds) * time.Second m := &Manager{ @@ -205,13 +204,33 @@ func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.Gate gatewaySelector: gatewaySelector, eventStream: es, MaxConcurrency: maxconcurrency, + ready: make(chan struct{}), } + // Initialize the metadata storage connection in the background, retrying + // with exponential backoff if the backend is not yet available. + go func() { + backoff := time.Second + for { + if err := m.initialize(context.Background()); err != nil { + logger.Info().Err(err).Dur("backoff", backoff).Msg("share manager: metadata storage initialization failed, retrying") + time.Sleep(backoff) + if backoff < 30*time.Second { + backoff *= 2 + } + continue + } + logger.Debug().Msg("share manager: initialization succeeded") + close(m.ready) + return + } + }() + // listen for events if m.eventStream != nil { ch, err := events.Consume(m.eventStream, "jsoncs3sharemanager", _registeredEvents...) if err != nil { - appctx.GetLogger(context.Background()).Error().Err(err).Msg("error consuming events") + logger.Error().Err(err).Msg("error consuming events") } go m.ProcessEvents(ch) } @@ -219,23 +238,13 @@ func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.Gate return m, nil } +// initialize connects to the metadata storage backend and ensures the required +// directory structure exists. It is called once at startup from a background +// goroutine (see New) and must not be called concurrently. func (m *Manager) initialize(ctx context.Context) error { _, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "initialize") defer span.End() - if m.initialized { - span.SetStatus(codes.Ok, "already initialized") - return nil - } - - m.Lock() - defer m.Unlock() - if m.initialized { // check if initialization happened while grabbing the lock - span.SetStatus(codes.Ok, "initialized while grabbing lock") - return nil - } - - ctx = context.Background() err := m.storage.Init(ctx, "jsoncs3-share-manager-metadata") if err != nil { span.RecordError(err) @@ -262,20 +271,30 @@ func (m *Manager) initialize(ctx context.Context) error { return err } - m.initialized = true span.SetStatus(codes.Ok, "initialized") return nil } +// waitForInit blocks until the background initialization goroutine has +// successfully completed, or until ctx is cancelled. +func (m *Manager) waitForInit(ctx context.Context) error { + select { + case <-m.ready: + return nil + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "share manager not yet initialized") + } +} + func (m *Manager) ProcessEvents(ch <-chan events.Event) { log := logger.New() + ctx := context.Background() + if err := m.waitForInit(ctx); err != nil { + log.Error().Err(err).Msg("share manager: error waiting for initialization") + return + } for event := range ch { ctx := context.Background() - - if err := m.initialize(ctx); err != nil { - log.Error().Err(err).Msg("error initializing manager") - } - if ev, ok := event.Event.(events.SpaceDeleted); ok { log.Debug().Msgf("space deleted event: %v", ev) go func() { m.purgeSpace(ctx, ev.ID) }() @@ -287,7 +306,7 @@ func (m *Manager) ProcessEvents(ch <-chan events.Event) { func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share") defer span.End() - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return nil, err @@ -436,7 +455,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "GetShare") defer span.End() sublog := appctx.GetLogger(ctx).With().Str("id", ref.GetId().GetOpaqueId()).Str("key", ref.GetKey().String()).Str("driver", "jsoncs3").Str("handler", "GetShare").Logger() - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return nil, err } @@ -494,7 +513,7 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Unshare") defer span.End() - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return err } @@ -511,7 +530,7 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateShare") defer span.End() - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return nil, err } @@ -599,7 +618,7 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListShares") defer span.End() - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return nil, err } @@ -816,7 +835,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati defer span.End() sublog := appctx.GetLogger(ctx).With().Str("driver", "jsoncs3").Str("handler", "ListReceivedShares").Logger() - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return nil, err } @@ -1012,7 +1031,7 @@ func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.S // GetReceivedShare returns the information for a received share. func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) { - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return nil, err } @@ -1056,7 +1075,7 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateReceivedShare") defer span.End() - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return nil, err } @@ -1104,7 +1123,7 @@ func updateShareID(share *collaboration.Share) { // Load imports shares and received shares from channels (e.g. during migration) func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Share, receivedShareChan <-chan share.ReceivedShareWithUser) error { log := appctx.GetLogger(ctx) - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return err } @@ -1220,7 +1239,7 @@ func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share, skipS func (m *Manager) CleanupStaleShares(ctx context.Context) { log := appctx.GetLogger(ctx) - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return } diff --git a/pkg/share/manager/memory/memory.go b/pkg/share/manager/memory/memory.go index 8ade5926f10..0f34d3a68b7 100644 --- a/pkg/share/manager/memory/memory.go +++ b/pkg/share/manager/memory/memory.go @@ -28,6 +28,7 @@ import ( ctxpkg "github.com/opencloud-eu/reva/v2/pkg/ctx" "github.com/opencloud-eu/reva/v2/pkg/share" + "github.com/rs/zerolog" "google.golang.org/genproto/protobuf/field_mask" userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" @@ -46,7 +47,7 @@ func init() { } // New returns a new manager. -func New(c map[string]interface{}) (share.Manager, error) { +func New(c map[string]any, _ *zerolog.Logger) (share.Manager, error) { state := map[string]map[*collaboration.ShareId]collaboration.ShareState{} mp := map[string]map[*collaboration.ShareId]*provider.Reference{} return &manager{ diff --git a/pkg/share/manager/registry/registry.go b/pkg/share/manager/registry/registry.go index 16fc627ffff..6b22beb7799 100644 --- a/pkg/share/manager/registry/registry.go +++ b/pkg/share/manager/registry/registry.go @@ -18,11 +18,14 @@ package registry -import "github.com/opencloud-eu/reva/v2/pkg/share" +import ( + "github.com/opencloud-eu/reva/v2/pkg/share" + "github.com/rs/zerolog" +) // NewFunc is the function that share managers // should register at init time. -type NewFunc func(map[string]interface{}) (share.Manager, error) +type NewFunc func(map[string]any, *zerolog.Logger) (share.Manager, error) // NewFuncs is a map containing all the registered share managers. var NewFuncs = map[string]NewFunc{} From bdc3cfa25466f7346a705ec4b88e6c2da97e0d57 Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Wed, 29 Apr 2026 16:03:18 +0200 Subject: [PATCH 02/12] shares(jsoncs3): fix naming of system user config This is not a service user (i.e. managed by the service-auth authentication), but a special just for the system metadata storage. --- pkg/share/manager/jsoncs3/jsoncs3.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index e097f9faa01..a699a69a084 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -126,8 +126,8 @@ type config struct { GatewayAddr string `mapstructure:"gateway_addr"` MaxConcurrency int `mapstructure:"max_concurrency"` ProviderAddr string `mapstructure:"provider_addr"` - ServiceUserID string `mapstructure:"service_user_id"` - ServiceUserIdp string `mapstructure:"service_user_idp"` + SystemUserID string `mapstructure:"system_user_id"` + SystemUserIdp string `mapstructure:"system_user_idp"` MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"` CacheTTL int `mapstructure:"ttl"` Events EventOptions `mapstructure:"events"` @@ -170,7 +170,7 @@ func NewDefault(m map[string]interface{}, logger *zerolog.Logger) (share.Manager return nil, err } - s, err := metadata.NewCS3Storage(c.ProviderAddr, c.ProviderAddr, c.ServiceUserID, c.ServiceUserIdp, c.MachineAuthAPIKey) + s, err := metadata.NewCS3Storage(c.ProviderAddr, c.ProviderAddr, c.SystemUserID, c.SystemUserIdp, c.MachineAuthAPIKey) if err != nil { return nil, err } From 44e4deea423842f8132adc1b552b6bbfa1e494e1 Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Wed, 6 May 2026 08:45:15 +0200 Subject: [PATCH 03/12] fix(jsoncs3 sharemanager): improve import of received shares When the received share to import has a userid set, we need to update the UserReceiveState even if the Grantee is a group (to correctly import the "accepted" state). --- pkg/share/manager/jsoncs3/jsoncs3.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index a699a69a084..f6c1fd2e130 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -1156,14 +1156,15 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar if !shareIsRoutable(s.ReceivedShare.GetShare()) { updateShareID(s.ReceivedShare.GetShare()) } - switch s.ReceivedShare.Share.Grantee.Type { - case provider.GranteeType_GRANTEE_TYPE_USER: - if err := m.UserReceivedStates.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId(), s.ReceivedShare.GetShare().GetResourceId().GetSpaceId(), s.ReceivedShare); err != nil { + if s.UserID != nil { + spaceid := s.ReceivedShare.GetShare().GetResourceId().GetStorageId() + shareid.IDDelimiter + s.ReceivedShare.GetShare().GetResourceId().GetSpaceId() + if err := m.UserReceivedStates.Add(context.Background(), s.UserID.GetOpaqueId(), spaceid, s.ReceivedShare); err != nil { log.Error().Err(err).Interface("received share", s).Msg("error persisting received share for user") } else { - log.Debug().Str("userid", s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId()).Str("spaceid", s.ReceivedShare.GetShare().GetResourceId().GetSpaceId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share userdata") + log.Debug().Str("userid", s.UserID.GetOpaqueId()).Str("spaceid", spaceid).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share userdata") } - case provider.GranteeType_GRANTEE_TYPE_GROUP: + } + if s.ReceivedShare.Share.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP && s.UserID == nil { if err := m.GroupReceivedCache.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId(), s.ReceivedShare.GetShare().GetId().GetOpaqueId()); err != nil { log.Error().Err(err).Interface("received share", s).Msg("error persisting received share to group cache") } else { From 8d72015a035280d76dfa9a5dadafcc5f8baf4e5a Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Wed, 6 May 2026 08:49:15 +0200 Subject: [PATCH 04/12] Give ListGrants permission to service users For the upcoming migration of space memberships to the shareprovider the service user needs to be able to read the Grants of all spaceroots. --- pkg/storage/pkg/decomposedfs/node/permissions.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/pkg/decomposedfs/node/permissions.go b/pkg/storage/pkg/decomposedfs/node/permissions.go index 6b1beb37d3a..9fe2a459fa9 100644 --- a/pkg/storage/pkg/decomposedfs/node/permissions.go +++ b/pkg/storage/pkg/decomposedfs/node/permissions.go @@ -101,6 +101,7 @@ func ServiceAccountPermissions() *provider.ResourcePermissions { Delete: true, // for cli restore command with replace option CreateContainer: true, // for space provisioning AddGrant: true, // for initial project space member assignment + ListGrants: true, // for initial project space member assignment } } From d781020c51454b236ad1faaca0d0c5a651bdbc7a Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Wed, 6 May 2026 09:59:18 +0200 Subject: [PATCH 05/12] feat(jsoncs3-sharemanager) introduce migration framework and import spacemember shares The migration state (currently only the version number) is persisted on the metadata storage (migratiion/state.json). Add a first migration "0001_import_spacemembers" to import the spaceroot Grants on the storageprovider as shares into the manager. Issue: https://github.com/opencloud-eu/opencloud/issues/2612 --- .../usershareprovider_test.go | 3 +- pkg/share/manager/jsoncs3/jsoncs3.go | 90 +++-- pkg/share/manager/jsoncs3/jsoncs3_test.go | 32 +- .../migrations/0001_import_spacemembers.go | 332 ++++++++++++++++++ .../manager/jsoncs3/migrations/migration.go | 172 +++++++++ 5 files changed, 591 insertions(+), 38 deletions(-) create mode 100644 pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go create mode 100644 pkg/share/manager/jsoncs3/migrations/migration.go diff --git a/internal/grpc/services/usershareprovider/usershareprovider_test.go b/internal/grpc/services/usershareprovider/usershareprovider_test.go index 0d46625388f..6dd5b75af75 100644 --- a/internal/grpc/services/usershareprovider/usershareprovider_test.go +++ b/internal/grpc/services/usershareprovider/usershareprovider_test.go @@ -32,6 +32,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/fieldmaskpb" @@ -91,7 +92,7 @@ var _ = Describe("user share provider service", func() { BeforeEach(func() { manager = &mocks.Manager{} - registry.Register("mockManager", func(m map[string]interface{}) (share.Manager, error) { + registry.Register("mockManager", func(m map[string]interface{}, l *zerolog.Logger) (share.Manager, error) { return manager, nil }) manager.On("UpdateShare", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collaborationpb.Share{}, nil) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index f6c1fd2e130..89c1b903e59 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -36,9 +36,9 @@ import ( "github.com/opencloud-eu/reva/v2/pkg/errtypes" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/events/stream" - "github.com/opencloud-eu/reva/v2/pkg/logger" "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" "github.com/opencloud-eu/reva/v2/pkg/share" + migration "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/migrations" "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/providercache" "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache" "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/sharecache" @@ -123,14 +123,20 @@ var ( ) type config struct { - GatewayAddr string `mapstructure:"gateway_addr"` - MaxConcurrency int `mapstructure:"max_concurrency"` - ProviderAddr string `mapstructure:"provider_addr"` - SystemUserID string `mapstructure:"system_user_id"` - SystemUserIdp string `mapstructure:"system_user_idp"` - MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"` - CacheTTL int `mapstructure:"ttl"` - Events EventOptions `mapstructure:"events"` + GatewayAddr string `mapstructure:"gateway_addr"` + MaxConcurrency int `mapstructure:"max_concurrency"` + ProviderAddr string `mapstructure:"provider_addr"` + SystemUserID string `mapstructure:"system_user_id"` + SystemUserIdp string `mapstructure:"system_user_idp"` + MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"` + ServiceAccountID string `mapstructure:"service_account_id"` + ServiceAccountSecret string `mapstructure:"service_account_secret"` + // ProviderRegistryAddr is the address of the storage registry used during + // migrations. Defaults to GatewayAddr when empty, because in the default + // OpenCloud deployment the registry is co-located with the gateway. + ProviderRegistryAddr string `mapstructure:"provider_registry_addr"` + CacheTTL int `mapstructure:"ttl"` + Events EventOptions `mapstructure:"events"` } // EventOptions are the configurable options for events @@ -160,6 +166,7 @@ type Manager struct { gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient] eventStream events.Stream + logger *zerolog.Logger } // NewDefault returns a new manager instance with default dependencies @@ -188,11 +195,34 @@ func NewDefault(m map[string]interface{}, logger *zerolog.Logger) (share.Manager } } - return New(s, logger, gatewaySelector, c.CacheTTL, es, c.MaxConcurrency) + mgr, err := New(s, logger, gatewaySelector, c.CacheTTL, es, c.MaxConcurrency) + if err != nil { + return nil, err + } + providerRegistryAddr := c.ProviderRegistryAddr + if providerRegistryAddr == "" { + providerRegistryAddr = c.GatewayAddr + } + mgr.RunMigrations(migration.MigrationConfig{ + ServiceAccountID: c.ServiceAccountID, + ServiceAccountSecret: c.ServiceAccountSecret, + ProviderRegistryAddr: providerRegistryAddr, + }) + return mgr, nil } // New returns a new manager instance. -func New(s metadata.Storage, logger *zerolog.Logger, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) { +func New(s metadata.Storage, + logger *zerolog.Logger, + gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], + ttlSeconds int, + es events.Stream, + maxconcurrency int, +) (*Manager, error) { + if logger == nil { + nop := zerolog.Nop() + logger = &nop + } ttl := time.Duration(ttlSeconds) * time.Second m := &Manager{ @@ -204,6 +234,7 @@ func New(s metadata.Storage, logger *zerolog.Logger, gatewaySelector pool.Select gatewaySelector: gatewaySelector, eventStream: es, MaxConcurrency: maxconcurrency, + logger: logger, ready: make(chan struct{}), } @@ -286,8 +317,25 @@ func (m *Manager) waitForInit(ctx context.Context) error { } } +// RunMigrations starts data migrations in a background goroutine. It should be +// called once after New() in production server startup. Tests that do not need +// migration behaviour can omit this call entirely. +func (m *Manager) RunMigrations(cfg migration.MigrationConfig) { + go m.doMigrations(cfg) +} + +func (m *Manager) doMigrations(cfg migration.MigrationConfig) { + if err := m.waitForInit(context.Background()); err != nil { + m.logger.Error().Err(err).Msg("share manager: aborting migrations, manager did not initialize") + return + } + m.logger.Debug().Msg("migrations start") + migrations := migration.New(*m.logger, m.gatewaySelector, m.storage, cfg, m, m) + migrations.RunMigrations() +} + func (m *Manager) ProcessEvents(ch <-chan events.Event) { - log := logger.New() + log := m.logger ctx := context.Background() if err := m.waitForInit(ctx); err != nil { log.Error().Err(err).Msg("share manager: error waiting for initialization") @@ -1122,7 +1170,7 @@ func updateShareID(share *collaboration.Share) { // Load imports shares and received shares from channels (e.g. during migration) func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Share, receivedShareChan <-chan share.ReceivedShareWithUser) error { - log := appctx.GetLogger(ctx) + l := m.logger if err := m.waitForInit(ctx); err != nil { return err } @@ -1138,14 +1186,14 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar updateShareID(s) } if err := m.Cache.Add(context.Background(), s.GetResourceId().GetStorageId(), s.GetResourceId().GetSpaceId(), s.Id.OpaqueId, s); err != nil { - log.Error().Err(err).Interface("share", s).Msg("error persisting share") + l.Error().Err(err).Interface("share", s).Msg("error persisting share") } else { - log.Debug().Str("storageid", s.GetResourceId().GetStorageId()).Str("spaceid", s.GetResourceId().GetSpaceId()).Str("shareid", s.Id.OpaqueId).Msg("imported share") + l.Debug().Str("storageid", s.GetResourceId().GetStorageId()).Str("spaceid", s.GetResourceId().GetSpaceId()).Str("shareid", s.Id.OpaqueId).Msg("imported share") } if err := m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId); err != nil { - log.Error().Err(err).Interface("share", s).Msg("error persisting created cache") + l.Error().Err(err).Interface("share", s).Msg("error persisting created cache") } else { - log.Debug().Str("creatorid", s.GetCreator().GetOpaqueId()).Str("shareid", s.Id.OpaqueId).Msg("updated created cache") + l.Debug().Str("creatorid", s.GetCreator().GetOpaqueId()).Str("shareid", s.Id.OpaqueId).Msg("updated created cache") } } wg.Done() @@ -1159,16 +1207,16 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar if s.UserID != nil { spaceid := s.ReceivedShare.GetShare().GetResourceId().GetStorageId() + shareid.IDDelimiter + s.ReceivedShare.GetShare().GetResourceId().GetSpaceId() if err := m.UserReceivedStates.Add(context.Background(), s.UserID.GetOpaqueId(), spaceid, s.ReceivedShare); err != nil { - log.Error().Err(err).Interface("received share", s).Msg("error persisting received share for user") + l.Error().Err(err).Interface("received share", s).Msg("error persisting received share for user") } else { - log.Debug().Str("userid", s.UserID.GetOpaqueId()).Str("spaceid", spaceid).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share userdata") + l.Debug().Str("userid", s.UserID.GetOpaqueId()).Str("spaceid", spaceid).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share userdata") } } if s.ReceivedShare.Share.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP && s.UserID == nil { if err := m.GroupReceivedCache.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId(), s.ReceivedShare.GetShare().GetId().GetOpaqueId()); err != nil { - log.Error().Err(err).Interface("received share", s).Msg("error persisting received share to group cache") + l.Error().Err(err).Interface("received share", s).Msg("error persisting received share to group cache") } else { - log.Debug().Str("groupid", s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share group cache") + l.Debug().Str("groupid", s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share group cache") } } } diff --git a/pkg/share/manager/jsoncs3/jsoncs3_test.go b/pkg/share/manager/jsoncs3/jsoncs3_test.go index 011347de500..9a64c8a3757 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3_test.go +++ b/pkg/share/manager/jsoncs3/jsoncs3_test.go @@ -102,10 +102,10 @@ var _ = Describe("Jsoncs3", func() { }, }, } - storage metadata.Storage - client *mocks.GatewayAPIClient - tmpdir string - m *jsoncs3.Manager + storage metadata.Storage + client *mocks.GatewayAPIClient + tmpdir string + m *jsoncs3.Manager ctx = ctxpkg.ContextSetUser(context.Background(), user1) granteeCtx = ctxpkg.ContextSetUser(context.Background(), user2) @@ -166,7 +166,7 @@ var _ = Describe("Jsoncs3", func() { return client }, ) - m, err = jsoncs3.New(storage, gatewaySelector, 0, nil, 0) + m, err = jsoncs3.New(storage, nil, gatewaySelector, 0, nil, 0) Expect(err).ToNot(HaveOccurred()) }) @@ -261,7 +261,7 @@ var _ = Describe("Jsoncs3", func() { }) Expect(s).ToNot(BeNil()) - m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err = jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) s = shareBykey(&collaboration.ShareKey{ @@ -451,7 +451,7 @@ var _ = Describe("Jsoncs3", func() { }) It("loads the cache when it doesn't have an entry", func() { - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) s, err := m.GetShare(ctx, shareRef) @@ -499,7 +499,7 @@ var _ = Describe("Jsoncs3", func() { }) Expect(err).ToNot(HaveOccurred()) - m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err = jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) s, err := m.GetShare(ctx, &collaboration.ShareReference{ @@ -612,7 +612,7 @@ var _ = Describe("Jsoncs3", func() { Expect(us).ToNot(BeNil()) Expect(us.GetPermissions().GetPermissions().InitiateFileUpload).To(BeTrue()) - m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err = jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) s = shareBykey(&collaboration.ShareKey{ @@ -751,7 +751,7 @@ var _ = Describe("Jsoncs3", func() { }) It("syncronizes the user received cache before listing", func() { - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}, nil) @@ -819,7 +819,7 @@ var _ = Describe("Jsoncs3", func() { }) It("syncronizes the group received cache before listing", func() { - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}, nil) @@ -863,7 +863,7 @@ var _ = Describe("Jsoncs3", func() { }) It("syncs the cache", func() { - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ @@ -897,7 +897,7 @@ var _ = Describe("Jsoncs3", func() { }) It("syncs the cache", func() { - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ @@ -1050,7 +1050,7 @@ var _ = Describe("Jsoncs3", func() { Expect(err).ToNot(HaveOccurred()) Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ @@ -1078,7 +1078,7 @@ var _ = Describe("Jsoncs3", func() { Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) Expect(rs.Hidden).To(Equal(true)) - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ @@ -1107,7 +1107,7 @@ var _ = Describe("Jsoncs3", func() { Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) Expect(rs.Hidden).To(Equal(true)) - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ diff --git a/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go new file mode 100644 index 00000000000..55560cbbab2 --- /dev/null +++ b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go @@ -0,0 +1,332 @@ +// Copyright 2026 OpenCloud GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package migration + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1" + typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/google/uuid" + ctxpkg "github.com/opencloud-eu/reva/v2/pkg/ctx" + "github.com/opencloud-eu/reva/v2/pkg/errtypes" + "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" + "github.com/opencloud-eu/reva/v2/pkg/share" + "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/shareid" + "github.com/opencloud-eu/reva/v2/pkg/utils" +) + +type ImportSpaceMembersMigration struct { + cfg config + sharesChan chan *collaboration.Share + receivedChan chan share.ReceivedShareWithUser +} + +func init() { + registerMigration(&ImportSpaceMembersMigration{}) +} + +func (m *ImportSpaceMembersMigration) Initialize(cfg config) { + m.cfg = cfg + m.sharesChan = make(chan *collaboration.Share) + m.receivedChan = make(chan share.ReceivedShareWithUser) +} + +func (m *ImportSpaceMembersMigration) Name() string { + return "import_space_members" +} + +func (m *ImportSpaceMembersMigration) Version() int { + return 1 +} + +func (m *ImportSpaceMembersMigration) Migrate() error { + gwc, err := m.cfg.gatewaySelector.Next() + if err != nil { + return err + } + + svcCtx, err := utils.GetServiceUserContextWithContext(context.Background(), gwc, m.cfg.serviceAccountID, m.cfg.serviceAccountSecret) + if err != nil { + m.cfg.logger.Error().Err(err).Msg("failed to get service user context for migration") + return err + } + // List all project spaces. + listRes, err := gwc.ListStorageSpaces(svcCtx, &provider.ListStorageSpacesRequest{ + Opaque: utils.AppendPlainToOpaque(nil, "unrestricted", "true"), + Filters: []*provider.ListStorageSpacesRequest_Filter{ + { + Type: provider.ListStorageSpacesRequest_Filter_TYPE_SPACE_TYPE, + Term: &provider.ListStorageSpacesRequest_Filter_SpaceType{SpaceType: "project"}, + }, + }, + }) + if err != nil { + m.cfg.logger.Error().Err(err).Msg("space-membership migration: failed to list storage spaces") + return err + } + + if listRes.GetStatus().GetCode() != rpc.Code_CODE_OK { + m.cfg.logger.Error().Str("status", listRes.GetStatus().GetMessage()).Msg("space-membership migration: ListStorageSpaces returned non-OK status") + return errtypes.InternalError("ListStorageSpaces") + } + + spaces := listRes.GetStorageSpaces() + m.cfg.logger.Info().Int("spaces", len(spaces)).Msg("Starting migration") + + // loadCtx is cancelled when the producer finishes (or fails) so that the + // Load goroutine — which blocks reading from the channels — is not left + // waiting forever if we return early from an error. + loadCtx, cancelLoad := context.WithCancel(svcCtx) + defer cancelLoad() + + var wg sync.WaitGroup + var loaderError error + wg.Go(func() { + loaderError = m.cfg.loader.Load(loadCtx, m.sharesChan, m.receivedChan) + }) + + migrated := 0 + for _, space := range spaces { + sharesCreated, err := m.migrateSpace(loadCtx, space) + if err != nil { + m.cfg.logger.Error().Err(err).Str("space", space.GetId().GetOpaqueId()).Msg("failed to migrate space; continuing with remaining spaces") + continue + } + migrated++ + m.cfg.logger.Debug(). + Str("space", space.GetId().GetOpaqueId()). + Int("shares_created", sharesCreated). + Msg("space migrated") + if migrated%10 == 0 { + m.cfg.logger.Info(). + Int("migrated", migrated). + Int("total", len(spaces)). + Msg("migration progress") + } + } + close(m.receivedChan) + close(m.sharesChan) + + wg.Wait() + m.cfg.logger.Info().Err(loaderError).Int("migrated", migrated).Int("total", len(spaces)).Msg("Migration finished") + return loaderError +} + +func (m *ImportSpaceMembersMigration) migrateSpace(ctx context.Context, space *provider.StorageSpace) (int, error) { + spClient, err := m.storageProviderForSpace(ctx, space) + if err != nil { + return 0, err + } + + ref := &provider.Reference{ResourceId: space.GetRoot()} + grantsRes, err := spClient.ListGrants(ctx, &provider.ListGrantsRequest{Ref: ref}) + if err != nil { + return 0, err + } + if grantsRes.GetStatus().GetCode() != rpc.Code_CODE_OK { + return 0, errtypes.NewErrtypeFromStatus(grantsRes.GetStatus()) + } + + sharesCreated := 0 + for _, grant := range grantsRes.GetGrants() { + share, receivedShares, err := m.spaceGrantToShares(ctx, grant, space) + if err != nil { + m.cfg.logger.Error().Err(err). + Interface("grant", grant). + Msg("Failed to convert grant to shares") + continue + } + if share == nil { + // share already existed; nothing to import for this grant + continue + } + + select { + case m.sharesChan <- share: + case <-ctx.Done(): + return sharesCreated, ctx.Err() + } + for _, rs := range receivedShares { + select { + case m.receivedChan <- rs: + case <-ctx.Done(): + return sharesCreated, ctx.Err() + } + } + sharesCreated++ + } + return sharesCreated, nil +} + +func (m *ImportSpaceMembersMigration) spaceGrantToShares(ctx context.Context, grant *provider.Grant, space *provider.StorageSpace) (*collaboration.Share, []share.ReceivedShareWithUser, error) { + // The grantee ids as peristed on disk do not have a IDP and type stored as part of the userid/groupid. + // We either need to do a lookup via the user/groupprovider or set a default IDP value via config + // FIXME get rid of hardcoded IDP value + idp := "https://localhost:9200" + switch grant.GetGrantee().GetType() { + case provider.GranteeType_GRANTEE_TYPE_GROUP: + grant.Grantee.Id = &provider.Grantee_GroupId{ + GroupId: &grouppb.GroupId{ + OpaqueId: grant.GetGrantee().GetGroupId().GetOpaqueId(), + Type: grouppb.GroupType_GROUP_TYPE_REGULAR, + Idp: idp, + }, + } + case provider.GranteeType_GRANTEE_TYPE_USER: + grant.Grantee.Id = &provider.Grantee_UserId{ + UserId: &userpb.UserId{ + OpaqueId: grant.GetGrantee().GetUserId().GetOpaqueId(), + Type: userpb.UserType_USER_TYPE_PRIMARY, + Idp: idp, + }, + } + } + + ref := &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Key{ + Key: &collaboration.ShareKey{ + ResourceId: space.GetRoot(), + Grantee: grant.GetGrantee(), + }, + }, + } + + ctx = ctxpkg.ContextSetUser(ctx, &userpb.User{Id: grant.Creator}) + if s, err := m.cfg.manager.GetShare(ctx, ref); err == nil { + // FIXME: Verify the actual grants? + m.cfg.logger.Debug().Interface("share", s).Msg("share already exists") + return nil, nil, nil + } + + ts := utils.TSNow() + shareID := shareid.Encode(space.GetRoot().GetStorageId(), space.GetRoot().GetSpaceId(), uuid.NewString()) + + creator := grant.GetCreator() + if creator.Type == userpb.UserType_USER_TYPE_INVALID { + creator = nil + } + newShare := &collaboration.Share{ + Id: &collaboration.ShareId{OpaqueId: shareID}, + ResourceId: space.GetRoot(), + Permissions: &collaboration.SharePermissions{Permissions: grant.GetPermissions()}, + Grantee: grant.GetGrantee(), + Expiration: grant.GetExpiration(), + Owner: creator, + Creator: creator, + Ctime: ts, + Mtime: ts, + } + + var newReceivedShares []share.ReceivedShareWithUser + switch grant.GetGrantee().GetType() { + case provider.GranteeType_GRANTEE_TYPE_GROUP: + gwc, err := m.cfg.gatewaySelector.Next() + if err != nil { + m.cfg.logger.Error().Err(err).Msg("Failed to get gateway client") + return nil, nil, err + } + + gr, err := gwc.GetMembers(ctx, &grouppb.GetMembersRequest{ + GroupId: grant.GetGrantee().GetGroupId(), + }) + if err != nil { + m.cfg.logger.Error().Err(err).Msg("Failed to expand group membership") + return nil, nil, err + } + if gr.GetStatus().GetCode() != rpc.Code_CODE_OK { + m.cfg.logger.Error().Str("Status", gr.GetStatus().GetMessage()).Msg("Failed to expand group membership") + return nil, nil, errtypes.NewErrtypeFromStatus(gr.GetStatus()) + } + for _, u := range gr.GetMembers() { + newReceivedShares = append(newReceivedShares, share.ReceivedShareWithUser{ + UserID: u, + ReceivedShare: &collaboration.ReceivedShare{ + Share: newShare, + State: collaboration.ShareState_SHARE_STATE_ACCEPTED, + }, + }) + } + // Also add a group-level entry (UserID == nil) so the group cache is populated. + newReceivedShares = append(newReceivedShares, share.ReceivedShareWithUser{ + UserID: nil, + ReceivedShare: &collaboration.ReceivedShare{ + Share: newShare, + State: collaboration.ShareState_SHARE_STATE_ACCEPTED, + }, + }) + case provider.GranteeType_GRANTEE_TYPE_USER: + newReceivedShares = append(newReceivedShares, share.ReceivedShareWithUser{ + UserID: grant.GetGrantee().GetUserId(), + ReceivedShare: &collaboration.ReceivedShare{ + Share: newShare, + State: collaboration.ShareState_SHARE_STATE_ACCEPTED, + }, + }) + } + return newShare, newReceivedShares, nil +} + +// storageProviderForSpace resolves the storageprovider responsible for the +// given storage space and returns a dialled client. In the default opencloud +// deployment the storage registry is co-located with the gateway, so +// the GatewayAddr is used as the registry address. +func (m *ImportSpaceMembersMigration) storageProviderForSpace(ctx context.Context, space *provider.StorageSpace) (provider.ProviderAPIClient, error) { + + srClient, err := pool.GetStorageRegistryClient(m.cfg.providerRegistryAddr) + if err != nil { + return nil, fmt.Errorf("get storage registry client: %w", err) + } + + spaceJSON, err := json.Marshal(space) + if err != nil { + return nil, fmt.Errorf("marshal space: %w", err) + } + + res, err := srClient.GetStorageProviders(ctx, ®istry.GetStorageProvidersRequest{ + Opaque: &typesv1beta1.Opaque{ + Map: map[string]*typesv1beta1.OpaqueEntry{ + "space": { + Decoder: "json", + Value: spaceJSON, + }, + }, + }, + }) + if err != nil { + return nil, fmt.Errorf("GetStorageProviders: %w", err) + } + if len(res.GetProviders()) == 0 { + return nil, fmt.Errorf("no storage provider found for space %s", space.GetId().GetOpaqueId()) + } + + c, err := pool.GetStorageProviderServiceClient(res.GetProviders()[0].GetAddress()) + if err != nil { + return nil, fmt.Errorf("dial storage provider: %w", err) + } + return c, nil +} diff --git a/pkg/share/manager/jsoncs3/migrations/migration.go b/pkg/share/manager/jsoncs3/migrations/migration.go new file mode 100644 index 00000000000..2382391ee4c --- /dev/null +++ b/pkg/share/manager/jsoncs3/migrations/migration.go @@ -0,0 +1,172 @@ +// Copyright 2026 OpenCloud GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package migration + +import ( + "cmp" + "context" + "encoding/json" + "slices" + + gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + "github.com/opencloud-eu/reva/v2/pkg/errtypes" + "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" + "github.com/opencloud-eu/reva/v2/pkg/share" + "github.com/opencloud-eu/reva/v2/pkg/storage/utils/metadata" + "github.com/rs/zerolog" +) + +const stateFile = "migrations/state.json" + +type migration interface { + Name() string + Version() int + Initialize(config) + Migrate() error +} + +// persistedState is the on-disk representation of the migration state. +type persistedState struct { + Version int `json:"version"` +} + +type state struct { + version int +} + +// MigrationConfig holds all caller-supplied options for a migration run. +// It is intentionally a plain struct so that new fields can be added without +// changing function signatures throughout the call chain. +type MigrationConfig struct { + ServiceAccountID string + ServiceAccountSecret string + ProviderRegistryAddr string +} + +type config struct { + logger zerolog.Logger + gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient] + storage metadata.Storage + serviceAccountID string + serviceAccountSecret string + providerRegistryAddr string + manager share.Manager + loader share.LoadableManager +} + +type Migrations struct { + config + state state +} + +var migrations []migration + +// registerMigration is only supposed to be call from init(), which runs sequentially +// so we don't need ot protect migrations with a lock +func registerMigration(m migration) { + migrations = append(migrations, m) +} + +func New(logger zerolog.Logger, + gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], + storage metadata.Storage, + cfg MigrationConfig, + manager share.Manager, + loader share.LoadableManager, +) Migrations { + + slices.SortFunc(migrations, func(a, b migration) int { + return cmp.Compare(a.Version(), b.Version()) + }) + + return Migrations{ + config{ + logger: logger.With().Str("jsoncs3", "migrations").Logger(), + gatewaySelector: gatewaySelector, + storage: storage, + serviceAccountID: cfg.ServiceAccountID, + serviceAccountSecret: cfg.ServiceAccountSecret, + providerRegistryAddr: cfg.ProviderRegistryAddr, + manager: manager, + loader: loader, + }, + state{}, + } +} + +// loadState reads the persisted migration version from storage. If no state +// file exists yet (fresh deployment) it returns version 0 without error. +func (m *Migrations) loadState(ctx context.Context) error { + data, err := m.storage.SimpleDownload(ctx, stateFile) + if err != nil { + if _, ok := err.(errtypes.IsNotFound); ok { + m.state = state{version: 0} + return nil + } + return err + } + var ps persistedState + if err := json.Unmarshal(data, &ps); err != nil { + return err + } + m.state = state{version: ps.Version} + return nil +} + +// saveState writes the current migration version to storage so that already- +// applied migrations are not re-run on the next server start. +func (m *Migrations) saveState(ctx context.Context) error { + if err := m.storage.MakeDirIfNotExist(ctx, "migrations"); err != nil { + return err + } + data, err := json.Marshal(persistedState{Version: m.state.version}) + if err != nil { + return err + } + return m.storage.SimpleUpload(ctx, stateFile, data) +} + +func (m *Migrations) RunMigrations() { + ctx := context.Background() + + if err := m.loadState(ctx); err != nil { + m.logger.Error().Err(err).Msg("failed to load migration state; skipping migrations") + return + } + + m.logger.Info().Int("current state", m.state.version).Msg("checking migrations") + + for _, mig := range migrations { + if mig.Version() > m.state.version { + m.logger.Info().Str("migration", mig.Name()).Int("version", mig.Version()).Msg("running migration") + mig.Initialize(m.config) + if err := mig.Migrate(); err != nil { + m.logger.Error().Err(err).Str("migration", mig.Name()).Msg("migration failed; stopping") + return + } + m.state.version = mig.Version() + if err := m.saveState(ctx); err != nil { + m.logger.Error().Err(err).Msg("failed to save migration state; stopping") + return + } + } else { + m.logger.Info().Str("migration", mig.Name()).Int("version", mig.Version()).Msg("skipping migration") + } + } +} From add692dc94baced114f471bd8e04989db0349ff9 Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Wed, 6 May 2026 14:36:08 +0200 Subject: [PATCH 06/12] fix(migration): resolve user's idp via lookup When importing space membership, the userid's from the grants do not have an IDP value set. Do a user/group lookup to get the correct IDP value. Cache results to avoid repeated lookups. --- pkg/share/manager/jsoncs3/jsoncs3_test.go | 8 +- .../migrations/0001_import_spacemembers.go | 124 +++++++++++++++--- 2 files changed, 112 insertions(+), 20 deletions(-) diff --git a/pkg/share/manager/jsoncs3/jsoncs3_test.go b/pkg/share/manager/jsoncs3/jsoncs3_test.go index 9a64c8a3757..9910a518f18 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3_test.go +++ b/pkg/share/manager/jsoncs3/jsoncs3_test.go @@ -102,10 +102,10 @@ var _ = Describe("Jsoncs3", func() { }, }, } - storage metadata.Storage - client *mocks.GatewayAPIClient - tmpdir string - m *jsoncs3.Manager + storage metadata.Storage + client *mocks.GatewayAPIClient + tmpdir string + m *jsoncs3.Manager ctx = ctxpkg.ContextSetUser(context.Background(), user1) granteeCtx = ctxpkg.ContextSetUser(context.Background(), user2) diff --git a/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go index 55560cbbab2..1ed2680950c 100644 --- a/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go +++ b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go @@ -23,7 +23,9 @@ import ( "encoding/json" "fmt" "sync" + "time" + "github.com/cenkalti/backoff" grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" @@ -38,12 +40,15 @@ import ( "github.com/opencloud-eu/reva/v2/pkg/share" "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/shareid" "github.com/opencloud-eu/reva/v2/pkg/utils" + "github.com/rs/zerolog" ) type ImportSpaceMembersMigration struct { cfg config sharesChan chan *collaboration.Share receivedChan chan share.ReceivedShareWithUser + userCache map[string]*userpb.UserId + groupCache map[string]*grouppb.GroupId } func init() { @@ -54,6 +59,8 @@ func (m *ImportSpaceMembersMigration) Initialize(cfg config) { m.cfg = cfg m.sharesChan = make(chan *collaboration.Share) m.receivedChan = make(chan share.ReceivedShareWithUser) + m.userCache = make(map[string]*userpb.UserId) + m.groupCache = make(map[string]*grouppb.GroupId) } func (m *ImportSpaceMembersMigration) Name() string { @@ -183,28 +190,113 @@ func (m *ImportSpaceMembersMigration) migrateSpace(ctx context.Context, space *p return sharesCreated, nil } +// resolveRetries is the maximum number of times resolveUserID / resolveGroupID +// will retry after receiving an errtypes.Unavailable response (LDAP down). +const resolveRetries = 10 + +// retryOnUnavailable calls op, retrying with exponential backoff whenever op +// returns errtypes.Unavailable. Any other error (including context +// cancellation) stops the loop immediately and is returned as-is. +// Retries are capped at resolveRetries attempts and respect ctx cancellation. +func retryOnUnavailable(ctx context.Context, log zerolog.Logger, op func() error) error { + b := backoff.WithContext( + backoff.WithMaxRetries(backoff.NewExponentialBackOff(), resolveRetries), + ctx, + ) + notify := func(err error, d time.Duration) { + log.Warn().Err(err).Dur("retry_in", d).Msg("identity provider temporarily unavailable, retrying") + } + return backoff.RetryNotify(func() error { + err := op() + if err == nil { + return nil + } + if _, ok := err.(errtypes.Unavailable); ok { + return err // transient — keep retrying + } + return backoff.Permanent(err) // permanent — stop immediately + }, b, notify) +} + +func (m *ImportSpaceMembersMigration) resolveUserID(ctx context.Context, opaqueID string) (*userpb.UserId, error) { + if id, ok := m.userCache[opaqueID]; ok { + return id, nil + } + var id *userpb.UserId + err := retryOnUnavailable(ctx, m.cfg.logger, func() error { + gwc, err := m.cfg.gatewaySelector.Next() + if err != nil { + return err + } + res, err := gwc.GetUser(ctx, &userpb.GetUserRequest{ + UserId: &userpb.UserId{OpaqueId: opaqueID}, + SkipFetchingUserGroups: true, + }) + if err != nil { + return err + } + if res.GetStatus().GetCode() != rpc.Code_CODE_OK { + // errtypes.NewErrtypeFromStatus maps CODE_UNAVAILABLE → errtypes.Unavailable, + // which retryOnUnavailable will retry; all other codes are treated as permanent. + return errtypes.NewErrtypeFromStatus(res.GetStatus()) + } + id = res.GetUser().GetId() + return nil + }) + if err != nil { + return nil, err + } + m.userCache[opaqueID] = id + return id, nil +} + +func (m *ImportSpaceMembersMigration) resolveGroupID(ctx context.Context, opaqueID string) (*grouppb.GroupId, error) { + if id, ok := m.groupCache[opaqueID]; ok { + return id, nil + } + var id *grouppb.GroupId + err := retryOnUnavailable(ctx, m.cfg.logger, func() error { + gwc, err := m.cfg.gatewaySelector.Next() + if err != nil { + return err + } + res, err := gwc.GetGroup(ctx, &grouppb.GetGroupRequest{ + GroupId: &grouppb.GroupId{OpaqueId: opaqueID}, + SkipFetchingMembers: true, + }) + if err != nil { + return err + } + if res.GetStatus().GetCode() != rpc.Code_CODE_OK { + return errtypes.NewErrtypeFromStatus(res.GetStatus()) + } + id = res.GetGroup().GetId() + return nil + }) + if err != nil { + return nil, err + } + m.groupCache[opaqueID] = id + return id, nil +} + func (m *ImportSpaceMembersMigration) spaceGrantToShares(ctx context.Context, grant *provider.Grant, space *provider.StorageSpace) (*collaboration.Share, []share.ReceivedShareWithUser, error) { - // The grantee ids as peristed on disk do not have a IDP and type stored as part of the userid/groupid. - // We either need to do a lookup via the user/groupprovider or set a default IDP value via config - // FIXME get rid of hardcoded IDP value - idp := "https://localhost:9200" + // The grantee ids as persisted on disk do not have an IDP or type stored as + // part of the userid/groupid. Resolve them via the gateway so we get the + // full userid switch grant.GetGrantee().GetType() { case provider.GranteeType_GRANTEE_TYPE_GROUP: - grant.Grantee.Id = &provider.Grantee_GroupId{ - GroupId: &grouppb.GroupId{ - OpaqueId: grant.GetGrantee().GetGroupId().GetOpaqueId(), - Type: grouppb.GroupType_GROUP_TYPE_REGULAR, - Idp: idp, - }, + groupID, err := m.resolveGroupID(ctx, grant.GetGrantee().GetGroupId().GetOpaqueId()) + if err != nil { + return nil, nil, fmt.Errorf("resolve group %s: %w", grant.GetGrantee().GetGroupId().GetOpaqueId(), err) } + grant.Grantee.Id = &provider.Grantee_GroupId{GroupId: groupID} case provider.GranteeType_GRANTEE_TYPE_USER: - grant.Grantee.Id = &provider.Grantee_UserId{ - UserId: &userpb.UserId{ - OpaqueId: grant.GetGrantee().GetUserId().GetOpaqueId(), - Type: userpb.UserType_USER_TYPE_PRIMARY, - Idp: idp, - }, + userID, err := m.resolveUserID(ctx, grant.GetGrantee().GetUserId().GetOpaqueId()) + if err != nil { + return nil, nil, fmt.Errorf("resolve user %s: %w", grant.GetGrantee().GetUserId().GetOpaqueId(), err) } + grant.Grantee.Id = &provider.Grantee_UserId{UserId: userID} } ref := &collaboration.ShareReference{ From 24c48bd77b7c907e9cf7c84ba40f3532c7d89371 Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Wed, 6 May 2026 17:04:07 +0200 Subject: [PATCH 07/12] fix(metadata/disk): avoid nil pointer panic res might be nil when disk.Download returned an error --- pkg/storage/utils/metadata/disk.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/storage/utils/metadata/disk.go b/pkg/storage/utils/metadata/disk.go index 6eae43e6381..f07642b953f 100644 --- a/pkg/storage/utils/metadata/disk.go +++ b/pkg/storage/utils/metadata/disk.go @@ -170,7 +170,10 @@ func (disk *Disk) Download(_ context.Context, req DownloadRequest) (*DownloadRes // SimpleDownload reads a file from disk func (disk *Disk) SimpleDownload(ctx context.Context, downloadpath string) ([]byte, error) { res, err := disk.Download(ctx, DownloadRequest{Path: downloadpath}) - return res.Content, err + if err != nil { + return nil, err + } + return res.Content, nil } // Delete deletes a path From 34335f590f6ab594042f4e51ffbacee66a815bd0 Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Wed, 6 May 2026 17:06:38 +0200 Subject: [PATCH 08/12] tests(jsoncs3/migrations): Add unit test for migrations --- .../migrations/0001_import_spacemembers.go | 23 +- .../0001_import_spacemembers_test.go | 580 ++++++++++++++++++ .../migrations/migration_state_test.go | 222 +++++++ .../migrations/migrations_suite_test.go | 27 + 4 files changed, 846 insertions(+), 6 deletions(-) create mode 100644 pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers_test.go create mode 100644 pkg/share/manager/jsoncs3/migrations/migration_state_test.go create mode 100644 pkg/share/manager/jsoncs3/migrations/migrations_suite_test.go diff --git a/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go index 1ed2680950c..8f15a563eba 100644 --- a/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go +++ b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go @@ -41,14 +41,22 @@ import ( "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/shareid" "github.com/opencloud-eu/reva/v2/pkg/utils" "github.com/rs/zerolog" + "google.golang.org/grpc" ) +// storageProvider is the narrow subset of provider.ProviderAPIClient that the +// migration actually uses. Keeping it narrow makes test stubs trivial to write. +type storageProvider interface { + ListGrants(ctx context.Context, in *provider.ListGrantsRequest, opts ...grpc.CallOption) (*provider.ListGrantsResponse, error) +} + type ImportSpaceMembersMigration struct { - cfg config - sharesChan chan *collaboration.Share - receivedChan chan share.ReceivedShareWithUser - userCache map[string]*userpb.UserId - groupCache map[string]*grouppb.GroupId + cfg config + sharesChan chan *collaboration.Share + receivedChan chan share.ReceivedShareWithUser + userCache map[string]*userpb.UserId + groupCache map[string]*grouppb.GroupId + providerResolver func(context.Context, *provider.StorageSpace) (storageProvider, error) } func init() { @@ -61,6 +69,9 @@ func (m *ImportSpaceMembersMigration) Initialize(cfg config) { m.receivedChan = make(chan share.ReceivedShareWithUser) m.userCache = make(map[string]*userpb.UserId) m.groupCache = make(map[string]*grouppb.GroupId) + m.providerResolver = func(ctx context.Context, space *provider.StorageSpace) (storageProvider, error) { + return m.storageProviderForSpace(ctx, space) + } } func (m *ImportSpaceMembersMigration) Name() string { @@ -145,7 +156,7 @@ func (m *ImportSpaceMembersMigration) Migrate() error { } func (m *ImportSpaceMembersMigration) migrateSpace(ctx context.Context, space *provider.StorageSpace) (int, error) { - spClient, err := m.storageProviderForSpace(ctx, space) + spClient, err := m.providerResolver(ctx, space) if err != nil { return 0, err } diff --git a/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers_test.go b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers_test.go new file mode 100644 index 00000000000..40bdc45726d --- /dev/null +++ b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers_test.go @@ -0,0 +1,580 @@ +// Copyright 2026 OpenCloud GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "context" + "errors" + "os" + "sync" + + gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc" + + csmocks "github.com/opencloud-eu/reva/v2/tests/cs3mocks/mocks" + + "github.com/opencloud-eu/reva/v2/pkg/errtypes" + "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" + "github.com/opencloud-eu/reva/v2/pkg/share" + shareMocks "github.com/opencloud-eu/reva/v2/pkg/share/mocks" + "github.com/opencloud-eu/reva/v2/pkg/storage/utils/metadata" +) + +// ── helpers ──────────────────────────────────────────────────────────────────── + +// mockStorageProvider is a hand-written stub that satisfies the narrow +// storageProvider interface (only ListGrants). This avoids generating a full +// ProviderAPIClient mock. +type mockStorageProvider struct { + grants []*provider.Grant + err error + status *rpc.Status +} + +func (s *mockStorageProvider) ListGrants(_ context.Context, _ *provider.ListGrantsRequest, _ ...grpc.CallOption) (*provider.ListGrantsResponse, error) { + if s.err != nil { + return nil, s.err + } + st := s.status + if st == nil { + st = &rpc.Status{Code: rpc.Code_CODE_OK} + } + return &provider.ListGrantsResponse{Status: st, Grants: s.grants}, nil +} + +// mockLoader records all shares delivered to it through the channels. +type mockLoader struct { + shares []*collaboration.Share + received []share.ReceivedShareWithUser + err error // returned from Load; set before calling Migrate() +} + +func (l *mockLoader) Load(_ context.Context, sc <-chan *collaboration.Share, rc <-chan share.ReceivedShareWithUser) error { + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for s := range sc { + l.shares = append(l.shares, s) + } + }() + go func() { + defer wg.Done() + for r := range rc { + l.received = append(l.received, r) + } + }() + wg.Wait() + return l.err +} + +// okAuthenticateResponse returns an AuthenticateResponse with CODE_OK and a +// placeholder token, sufficient to let GetServiceUserContextWithContext proceed. +func okAuthenticateResponse() *gatewayv1beta1.AuthenticateResponse { + return &gatewayv1beta1.AuthenticateResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + Token: "test-token", + } +} + +// buildMigration creates an ImportSpaceMembersMigration wired to the supplied +// gateway mock, share-manager mock, and loader, using a temporary DiskStorage. +func buildMigration(tmpdir string, gwMock *csmocks.GatewayAPIClient, mgr *shareMocks.Manager, ldr share.LoadableManager) *ImportSpaceMembersMigration { + stor, err := metadata.NewDiskStorage(tmpdir) + Expect(err).NotTo(HaveOccurred()) + + pool.RemoveSelector("GatewaySelector" + "eu.opencloud.api.gateway") + gsel := pool.GetSelector[gatewayv1beta1.GatewayAPIClient]( + "GatewaySelector", + "eu.opencloud.api.gateway", + func(_ grpc.ClientConnInterface) gatewayv1beta1.GatewayAPIClient { + return gwMock + }, + ) + + m := &ImportSpaceMembersMigration{} + m.Initialize(config{ + logger: zerolog.Nop(), + gatewaySelector: gsel, + storage: stor, + serviceAccountID: "svc", + serviceAccountSecret: "secret", + manager: mgr, + loader: ldr, + }) + return m +} + +// ── retryOnUnavailable ──────────────────────────────────────────────────────── + +var _ = Describe("retryOnUnavailable", func() { + var log zerolog.Logger + + BeforeEach(func() { + log = zerolog.Nop() + }) + + It("returns nil when op succeeds immediately", func() { + calls := 0 + err := retryOnUnavailable(context.Background(), log, func() error { + calls++ + return nil + }) + Expect(err).NotTo(HaveOccurred()) + Expect(calls).To(Equal(1)) + }) + + It("retries on errtypes.Unavailable and succeeds when op eventually succeeds", func() { + calls := 0 + err := retryOnUnavailable(context.Background(), log, func() error { + calls++ + if calls < 3 { + return errtypes.Unavailable("ldap down") + } + return nil + }) + Expect(err).NotTo(HaveOccurred()) + Expect(calls).To(Equal(3)) + }) + + It("stops immediately on a permanent (non-Unavailable) error", func() { + sentinel := errors.New("permanent failure") + calls := 0 + err := retryOnUnavailable(context.Background(), log, func() error { + calls++ + return sentinel + }) + Expect(err).To(MatchError(sentinel)) + Expect(calls).To(Equal(1)) + }) + + It("returns an error when context is cancelled before op succeeds", func() { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // already cancelled + err := retryOnUnavailable(ctx, log, func() error { + return errtypes.Unavailable("ldap down") + }) + Expect(err).To(HaveOccurred()) + }) + +}) + +// ── resolveUserID ───────────────────────────────────────────────────────────── + +var _ = Describe("resolveUserID", func() { + var ( + tmpdir string + gwMock *csmocks.GatewayAPIClient + m *ImportSpaceMembersMigration + ) + + BeforeEach(func() { + var err error + tmpdir, err = os.MkdirTemp("", "resolve-user-*") + Expect(err).NotTo(HaveOccurred()) + + gwMock = &csmocks.GatewayAPIClient{} + m = buildMigration(tmpdir, gwMock, &shareMocks.Manager{}, &mockLoader{}) + }) + + AfterEach(func() { + os.RemoveAll(tmpdir) + }) + + Context("successful lookup", func() { + BeforeEach(func() { + gwMock.On("GetUser", mock.Anything, mock.MatchedBy(func(r *userpb.GetUserRequest) bool { + return r.UserId.OpaqueId == "alice" + })).Return(&userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + User: &userpb.User{Id: &userpb.UserId{OpaqueId: "alice", Idp: "idp.example.com", Type: userpb.UserType_USER_TYPE_PRIMARY}}, + }, nil) + }) + + It("returns the full UserId", func() { + id, err := m.resolveUserID(context.Background(), "alice") + Expect(err).NotTo(HaveOccurred()) + Expect(id.Idp).To(Equal("idp.example.com")) + Expect(id.OpaqueId).To(Equal("alice")) + }) + + It("caches the result and only calls the gateway once", func() { + _, _ = m.resolveUserID(context.Background(), "alice") + _, _ = m.resolveUserID(context.Background(), "alice") + gwMock.AssertNumberOfCalls(GinkgoT(), "GetUser", 1) + }) + }) + + Context("gateway returns CODE_NOT_FOUND", func() { + BeforeEach(func() { + gwMock.On("GetUser", mock.Anything, mock.Anything).Return(&userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_NOT_FOUND, Message: "no such user"}, + }, nil) + }) + + It("returns an error without retrying", func() { + _, err := m.resolveUserID(context.Background(), "ghost") + Expect(err).To(HaveOccurred()) + gwMock.AssertNumberOfCalls(GinkgoT(), "GetUser", 1) + }) + }) + + Context("gateway returns CODE_UNAVAILABLE on first call then succeeds", func() { + BeforeEach(func() { + calls := 0 + gwMock.On("GetUser", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *userpb.GetUserRequest, _ ...grpc.CallOption) *userpb.GetUserResponse { + calls++ + if calls == 1 { + return &userpb.GetUserResponse{Status: &rpc.Status{Code: rpc.Code_CODE_UNAVAILABLE, Message: "ldap down"}} + } + return &userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + User: &userpb.User{Id: &userpb.UserId{OpaqueId: "bob", Idp: "idp.example.com"}}, + } + }, + func(_ context.Context, _ *userpb.GetUserRequest, _ ...grpc.CallOption) error { return nil }, + ) + }) + + It("retries and eventually returns the resolved ID", func() { + id, err := m.resolveUserID(context.Background(), "bob") + Expect(err).NotTo(HaveOccurred()) + Expect(id.OpaqueId).To(Equal("bob")) + gwMock.AssertNumberOfCalls(GinkgoT(), "GetUser", 2) + }) + }) +}) + +// ── resolveGroupID ──────────────────────────────────────────────────────────── + +var _ = Describe("resolveGroupID", func() { + var ( + tmpdir string + gwMock *csmocks.GatewayAPIClient + m *ImportSpaceMembersMigration + ) + + BeforeEach(func() { + var err error + tmpdir, err = os.MkdirTemp("", "resolve-group-*") + Expect(err).NotTo(HaveOccurred()) + + gwMock = &csmocks.GatewayAPIClient{} + m = buildMigration(tmpdir, gwMock, &shareMocks.Manager{}, &mockLoader{}) + }) + + AfterEach(func() { + os.RemoveAll(tmpdir) + }) + + Context("successful lookup", func() { + BeforeEach(func() { + gwMock.On("GetGroup", mock.Anything, mock.MatchedBy(func(r *grouppb.GetGroupRequest) bool { + return r.GroupId.OpaqueId == "admins" + })).Return(&grouppb.GetGroupResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + Group: &grouppb.Group{Id: &grouppb.GroupId{OpaqueId: "admins", Idp: "idp.example.com"}}, + }, nil) + }) + + It("returns the full GroupId", func() { + id, err := m.resolveGroupID(context.Background(), "admins") + Expect(err).NotTo(HaveOccurred()) + Expect(id.Idp).To(Equal("idp.example.com")) + Expect(id.OpaqueId).To(Equal("admins")) + }) + + It("caches the result and only calls the gateway once", func() { + _, _ = m.resolveGroupID(context.Background(), "admins") + _, _ = m.resolveGroupID(context.Background(), "admins") + gwMock.AssertNumberOfCalls(GinkgoT(), "GetGroup", 1) + }) + }) + + Context("gateway returns CODE_NOT_FOUND", func() { + BeforeEach(func() { + gwMock.On("GetGroup", mock.Anything, mock.Anything).Return(&grouppb.GetGroupResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_NOT_FOUND, Message: "no such group"}, + }, nil) + }) + + It("returns an error without retrying", func() { + _, err := m.resolveGroupID(context.Background(), "ghost-group") + Expect(err).To(HaveOccurred()) + gwMock.AssertNumberOfCalls(GinkgoT(), "GetGroup", 1) + }) + }) +}) + +// ── spaceGrantToShares ──────────────────────────────────────────────────────── + +var _ = Describe("spaceGrantToShares", func() { + var ( + tmpdir string + gwMock *csmocks.GatewayAPIClient + mgrMock *shareMocks.Manager + m *ImportSpaceMembersMigration + + space *provider.StorageSpace + ) + + BeforeEach(func() { + var err error + tmpdir, err = os.MkdirTemp("", "grant-to-shares-*") + Expect(err).NotTo(HaveOccurred()) + + gwMock = &csmocks.GatewayAPIClient{} + mgrMock = &shareMocks.Manager{} + m = buildMigration(tmpdir, gwMock, mgrMock, &mockLoader{}) + + space = &provider.StorageSpace{ + Id: &provider.StorageSpaceId{OpaqueId: "space1"}, + Root: &provider.ResourceId{StorageId: "stor1", SpaceId: "space1", OpaqueId: "space1"}, + } + }) + + AfterEach(func() { + os.RemoveAll(tmpdir) + }) + + Context("user grant — share does not yet exist", func() { + BeforeEach(func() { + // resolveUserID + gwMock.On("GetUser", mock.Anything, mock.Anything).Return(&userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + User: &userpb.User{Id: &userpb.UserId{OpaqueId: "alice", Idp: "idp.example.com"}}, + }, nil) + // GetShare returns not-found so the share needs to be created + mgrMock.On("GetShare", mock.Anything, mock.Anything).Return(nil, errtypes.NotFound("not found")) + }) + + It("returns a new share and one received-share entry for the user", func() { + grant := &provider.Grant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &provider.Grantee_UserId{UserId: &userpb.UserId{OpaqueId: "alice"}}, + }, + Permissions: &provider.ResourcePermissions{}, + Creator: &userpb.UserId{OpaqueId: "owner", Idp: "idp.example.com", Type: userpb.UserType_USER_TYPE_PRIMARY}, + } + + sh, rs, err := m.spaceGrantToShares(context.Background(), grant, space) + Expect(err).NotTo(HaveOccurred()) + Expect(sh).NotTo(BeNil()) + Expect(rs).To(HaveLen(1)) + Expect(rs[0].UserID.OpaqueId).To(Equal("alice")) + }) + }) + + Context("user grant — share already exists", func() { + BeforeEach(func() { + gwMock.On("GetUser", mock.Anything, mock.Anything).Return(&userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + User: &userpb.User{Id: &userpb.UserId{OpaqueId: "alice", Idp: "idp.example.com"}}, + }, nil) + // GetShare returns an existing share + mgrMock.On("GetShare", mock.Anything, mock.Anything).Return(&collaboration.Share{}, nil) + }) + + It("returns nil share (skip) without error", func() { + grant := &provider.Grant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &provider.Grantee_UserId{UserId: &userpb.UserId{OpaqueId: "alice"}}, + }, + Permissions: &provider.ResourcePermissions{}, + } + + sh, _, err := m.spaceGrantToShares(context.Background(), grant, space) + Expect(err).NotTo(HaveOccurred()) + Expect(sh).To(BeNil()) + }) + }) + + Context("group grant — share does not yet exist", func() { + BeforeEach(func() { + // resolveGroupID + gwMock.On("GetGroup", mock.Anything, mock.Anything).Return(&grouppb.GetGroupResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + Group: &grouppb.Group{Id: &grouppb.GroupId{OpaqueId: "admins", Idp: "idp.example.com"}}, + }, nil) + // GetShare → not found + mgrMock.On("GetShare", mock.Anything, mock.Anything).Return(nil, errtypes.NotFound("not found")) + // GetMembers → two members + gwMock.On("GetMembers", mock.Anything, mock.Anything).Return(&grouppb.GetMembersResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + Members: []*userpb.UserId{{OpaqueId: "u1", Idp: "idp"}, {OpaqueId: "u2", Idp: "idp"}}, + }, nil) + }) + + It("returns a share and received-share entries for each member plus a group-level nil entry", func() { + grant := &provider.Grant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_GROUP, + Id: &provider.Grantee_GroupId{GroupId: &grouppb.GroupId{OpaqueId: "admins"}}, + }, + Permissions: &provider.ResourcePermissions{}, + Creator: &userpb.UserId{OpaqueId: "owner", Idp: "idp.example.com", Type: userpb.UserType_USER_TYPE_PRIMARY}, + } + + sh, rs, err := m.spaceGrantToShares(context.Background(), grant, space) + Expect(err).NotTo(HaveOccurred()) + Expect(sh).NotTo(BeNil()) + // 2 member entries + 1 group-level nil entry + Expect(rs).To(HaveLen(3)) + Expect(rs[2].UserID).To(BeNil()) + }) + }) +}) + +// ── Migrate (integration-level) ─────────────────────────────────────────────── + +var _ = Describe("ImportSpaceMembersMigration.Migrate", func() { + var ( + tmpdir string + gwMock *csmocks.GatewayAPIClient + mgrMock *shareMocks.Manager + loader *mockLoader + m *ImportSpaceMembersMigration + ) + + BeforeEach(func() { + var err error + tmpdir, err = os.MkdirTemp("", "migrate-*") + Expect(err).NotTo(HaveOccurred()) + + gwMock = &csmocks.GatewayAPIClient{} + mgrMock = &shareMocks.Manager{} + loader = &mockLoader{} + m = buildMigration(tmpdir, gwMock, mgrMock, loader) + + // Authenticate — needed by GetServiceUserContextWithContext + gwMock.On("Authenticate", mock.Anything, mock.Anything).Return( + okAuthenticateResponse(), nil) + }) + + AfterEach(func() { + os.RemoveAll(tmpdir) + }) + + Context("no project spaces exist", func() { + BeforeEach(func() { + gwMock.On("ListStorageSpaces", mock.Anything, mock.Anything).Return( + &provider.ListStorageSpacesResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + StorageSpaces: []*provider.StorageSpace{}, + }, nil) + }) + + It("completes without error and the loader records nothing", func() { + Expect(m.Migrate()).To(Succeed()) + Expect(loader.shares).To(BeEmpty()) + Expect(loader.received).To(BeEmpty()) + }) + }) + + Context("one space with one user grant", func() { + BeforeEach(func() { + space := &provider.StorageSpace{ + Id: &provider.StorageSpaceId{OpaqueId: "s1"}, + Root: &provider.ResourceId{StorageId: "stor", SpaceId: "s1", OpaqueId: "s1"}, + } + gwMock.On("ListStorageSpaces", mock.Anything, mock.Anything).Return( + &provider.ListStorageSpacesResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + StorageSpaces: []*provider.StorageSpace{space}, + }, nil) + + // injected provider — returns one grant + m.providerResolver = func(_ context.Context, _ *provider.StorageSpace) (storageProvider, error) { + return &mockStorageProvider{ + grants: []*provider.Grant{ + { + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &provider.Grantee_UserId{UserId: &userpb.UserId{OpaqueId: "alice"}}, + }, + Permissions: &provider.ResourcePermissions{}, + Creator: &userpb.UserId{OpaqueId: "owner", Idp: "idp", Type: userpb.UserType_USER_TYPE_PRIMARY}, + }, + }, + }, nil + } + + gwMock.On("GetUser", mock.Anything, mock.Anything).Return(&userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + User: &userpb.User{Id: &userpb.UserId{OpaqueId: "alice", Idp: "idp"}}, + }, nil) + + mgrMock.On("GetShare", mock.Anything, mock.Anything).Return(nil, errtypes.NotFound("")) + }) + + It("sends one share and one received-share to the loader", func() { + Expect(m.Migrate()).To(Succeed()) + Expect(loader.shares).To(HaveLen(1)) + Expect(loader.received).To(HaveLen(1)) + }) + }) + + Context("ListStorageSpaces returns an error", func() { + BeforeEach(func() { + gwMock.On("ListStorageSpaces", mock.Anything, mock.Anything).Return( + nil, errors.New("network error")) + }) + + It("returns an error", func() { + Expect(m.Migrate()).To(HaveOccurred()) + }) + }) + + Context("ListStorageSpaces returns a non-OK status", func() { + BeforeEach(func() { + gwMock.On("ListStorageSpaces", mock.Anything, mock.Anything).Return( + &provider.ListStorageSpacesResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL, Message: "storage crashed"}, + }, nil) + }) + + It("returns an error", func() { + Expect(m.Migrate()).To(HaveOccurred()) + }) + }) + + Context("loader returns an error", func() { + BeforeEach(func() { + loader.err = errors.New("load failure") + + gwMock.On("ListStorageSpaces", mock.Anything, mock.Anything).Return( + &provider.ListStorageSpacesResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + StorageSpaces: []*provider.StorageSpace{}, + }, nil) + }) + + It("propagates the loader error", func() { + Expect(m.Migrate()).To(MatchError("load failure")) + }) + }) +}) diff --git a/pkg/share/manager/jsoncs3/migrations/migration_state_test.go b/pkg/share/manager/jsoncs3/migrations/migration_state_test.go new file mode 100644 index 00000000000..8784a7a6b72 --- /dev/null +++ b/pkg/share/manager/jsoncs3/migrations/migration_state_test.go @@ -0,0 +1,222 @@ +// Copyright 2026 OpenCloud GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "context" + "errors" + "os" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/rs/zerolog" + + "github.com/opencloud-eu/reva/v2/pkg/storage/utils/metadata" +) + +// fakeMigration is a configurable stub that satisfies the migration interface. +type fakeMigration struct { + name string + version int + err error // non-nil → Migrate() returns this error + called bool +} + +func (f *fakeMigration) Name() string { return f.name } +func (f *fakeMigration) Version() int { return f.version } +func (f *fakeMigration) Initialize(_ config) {} +func (f *fakeMigration) Migrate() error { f.called = true; return f.err } + +var _ = Describe("RunMigrations / loadState / saveState", func() { + var ( + tmpdir string + stor metadata.Storage + m Migrations + log zerolog.Logger + + // saved & restored around each test so our fakeMigrations don't leak + originalMigrations []migration + ) + + BeforeEach(func() { + var err error + tmpdir, err = os.MkdirTemp("", "migration-state-test-*") + Expect(err).NotTo(HaveOccurred()) + + stor, err = metadata.NewDiskStorage(tmpdir) + Expect(err).NotTo(HaveOccurred()) + + log = zerolog.Nop() + m = Migrations{ + config: config{ + logger: log, + storage: stor, + }, + } + + originalMigrations = migrations + migrations = nil + }) + + AfterEach(func() { + migrations = originalMigrations + os.RemoveAll(tmpdir) + }) + + // ── loadState ────────────────────────────────────────────────────────────── + + Describe("loadState", func() { + Context("when no state file exists yet", func() { + It("returns version 0 without error (fresh deployment)", func() { + Expect(m.loadState(context.Background())).To(Succeed()) + Expect(m.state.version).To(Equal(0)) + }) + }) + + Context("when the state file contains valid JSON", func() { + BeforeEach(func() { + ctx := context.Background() + Expect(stor.MakeDirIfNotExist(ctx, "migrations")).To(Succeed()) + Expect(stor.SimpleUpload(ctx, stateFile, []byte(`{"version":3}`))).To(Succeed()) + }) + + It("restores the persisted version", func() { + Expect(m.loadState(context.Background())).To(Succeed()) + Expect(m.state.version).To(Equal(3)) + }) + }) + + Context("when the state file contains invalid JSON", func() { + BeforeEach(func() { + ctx := context.Background() + Expect(stor.MakeDirIfNotExist(ctx, "migrations")).To(Succeed()) + Expect(stor.SimpleUpload(ctx, stateFile, []byte(`not-json`))).To(Succeed()) + }) + + It("returns an error", func() { + Expect(m.loadState(context.Background())).To(HaveOccurred()) + }) + }) + }) + + // ── saveState ────────────────────────────────────────────────────────────── + + Describe("saveState", func() { + It("persists the current version and can be round-tripped by loadState", func() { + m.state.version = 7 + Expect(m.saveState(context.Background())).To(Succeed()) + + m2 := Migrations{config: config{logger: log, storage: stor}} + Expect(m2.loadState(context.Background())).To(Succeed()) + Expect(m2.state.version).To(Equal(7)) + }) + }) + + // ── RunMigrations ────────────────────────────────────────────────────────── + + Describe("RunMigrations", func() { + Context("when there are no registered migrations", func() { + It("completes without error and leaves version at 0", func() { + m.RunMigrations() + Expect(m.state.version).To(Equal(0)) + }) + }) + + Context("when no migration has been applied yet", func() { + var ( + mig1 *fakeMigration + mig2 *fakeMigration + ) + + BeforeEach(func() { + mig1 = &fakeMigration{name: "first", version: 1} + mig2 = &fakeMigration{name: "second", version: 2} + migrations = []migration{mig1, mig2} + }) + + It("runs all migrations in order and saves the latest version", func() { + m.RunMigrations() + Expect(mig1.called).To(BeTrue()) + Expect(mig2.called).To(BeTrue()) + Expect(m.state.version).To(Equal(2)) + }) + }) + + Context("when the state is already at version 1", func() { + var ( + mig1 *fakeMigration + mig2 *fakeMigration + ) + + BeforeEach(func() { + // Persist version 1 so loadState inside RunMigrations picks it up. + m.state.version = 1 + Expect(m.saveState(context.Background())).To(Succeed()) + m.state.version = 0 // reset; RunMigrations will reload from storage + + mig1 = &fakeMigration{name: "first", version: 1} + mig2 = &fakeMigration{name: "second", version: 2} + migrations = []migration{mig1, mig2} + }) + + It("skips the already-applied migration and only runs the pending one", func() { + m.RunMigrations() + Expect(mig1.called).To(BeFalse()) + Expect(mig2.called).To(BeTrue()) + Expect(m.state.version).To(Equal(2)) + }) + }) + + Context("when a migration returns an error", func() { + var ( + mig1 *fakeMigration + mig2 *fakeMigration + ) + + BeforeEach(func() { + mig1 = &fakeMigration{name: "broken", version: 1, err: errors.New("boom")} + mig2 = &fakeMigration{name: "later", version: 2} + migrations = []migration{mig1, mig2} + }) + + It("stops after the failing migration and does not run subsequent ones", func() { + m.RunMigrations() + Expect(mig1.called).To(BeTrue()) + Expect(mig2.called).To(BeFalse()) + // version must not advance past the failed migration + Expect(m.state.version).To(Equal(0)) + }) + }) + + Context("when all migrations are already applied (state == highest version)", func() { + var mig *fakeMigration + + BeforeEach(func() { + // Persist version 5 so loadState inside RunMigrations picks it up. + m.state.version = 5 + Expect(m.saveState(context.Background())).To(Succeed()) + m.state.version = 0 + + mig = &fakeMigration{name: "done", version: 5} + migrations = []migration{mig} + }) + + It("skips every migration", func() { + m.RunMigrations() + Expect(mig.called).To(BeFalse()) + }) + }) + }) +}) diff --git a/pkg/share/manager/jsoncs3/migrations/migrations_suite_test.go b/pkg/share/manager/jsoncs3/migrations/migrations_suite_test.go new file mode 100644 index 00000000000..eccce746f0c --- /dev/null +++ b/pkg/share/manager/jsoncs3/migrations/migrations_suite_test.go @@ -0,0 +1,27 @@ +// Copyright 2026 OpenCloud GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestMigrations(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Migrations Suite") +} From f301894296b55bce83a7b3cc26a1e79fbaf4d0e7 Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Thu, 7 May 2026 11:27:49 +0200 Subject: [PATCH 09/12] jsoncs3/migraton: Create "migrations" directory at startup --- pkg/share/manager/jsoncs3/jsoncs3.go | 6 ++++++ pkg/share/manager/jsoncs3/migrations/migration.go | 3 --- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 89c1b903e59..5548d81cf16 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -301,6 +301,12 @@ func (m *Manager) initialize(ctx context.Context) error { span.SetStatus(codes.Error, err.Error()) return err } + err = m.storage.MakeDirIfNotExist(ctx, "migrations") + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } span.SetStatus(codes.Ok, "initialized") return nil diff --git a/pkg/share/manager/jsoncs3/migrations/migration.go b/pkg/share/manager/jsoncs3/migrations/migration.go index 2382391ee4c..e8c444e8a18 100644 --- a/pkg/share/manager/jsoncs3/migrations/migration.go +++ b/pkg/share/manager/jsoncs3/migrations/migration.go @@ -132,9 +132,6 @@ func (m *Migrations) loadState(ctx context.Context) error { // saveState writes the current migration version to storage so that already- // applied migrations are not re-run on the next server start. func (m *Migrations) saveState(ctx context.Context) error { - if err := m.storage.MakeDirIfNotExist(ctx, "migrations"); err != nil { - return err - } data, err := json.Marshal(persistedState{Version: m.state.version}) if err != nil { return err From 87aaef847ba808b81f9e9bc1d38005337b62652a Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Thu, 7 May 2026 11:28:49 +0200 Subject: [PATCH 10/12] jsoncs3/migration: Use locking to run migrations on a single instance This add locking around the migration runner to prevent the migrations being executed in parallel when multiple instances of the sharing service are running. The lock has a ttl of 1 minute and is refreshed every 20 seconds. Other instances are trying to acquire the look evert 5 seconds until it was either released or became stale. --- .../manager/jsoncs3/migrations/migration.go | 182 +++++++++++++++++- .../migrations/migration_state_test.go | 136 +++++++++++++ 2 files changed, 317 insertions(+), 1 deletion(-) diff --git a/pkg/share/manager/jsoncs3/migrations/migration.go b/pkg/share/manager/jsoncs3/migrations/migration.go index e8c444e8a18..e30b4c81801 100644 --- a/pkg/share/manager/jsoncs3/migrations/migration.go +++ b/pkg/share/manager/jsoncs3/migrations/migration.go @@ -21,8 +21,11 @@ package migration import ( "cmp" "context" + "crypto/rand" "encoding/json" + "fmt" "slices" + "time" gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" "github.com/opencloud-eu/reva/v2/pkg/errtypes" @@ -34,6 +37,23 @@ import ( const stateFile = "migrations/state.json" +const ( + lockFile = "migrations/lock.json" + lockTTL = time.Minute + lockHeartbeatInterval = 20 * time.Second +) + +// lockPollInterval is how long acquireLock sleeps between retries when the +// lock is held by another instance. Declared as a variable so tests can +// shorten it without rebuilding. +var lockPollInterval = 5 * time.Second + +// lockData is the content written to the lock file. +type lockData struct { + Timestamp time.Time `json:"timestamp"` + InstanceID string `json:"instance_id"` +} + type migration interface { Name() string Version() int @@ -72,7 +92,8 @@ type config struct { type Migrations struct { config - state state + state state + instanceID string } var migrations []migration @@ -95,6 +116,10 @@ func New(logger zerolog.Logger, return cmp.Compare(a.Version(), b.Version()) }) + b := make([]byte, 8) + _, _ = rand.Read(b) + instanceID := fmt.Sprintf("%x", b) + return Migrations{ config{ logger: logger.With().Str("jsoncs3", "migrations").Logger(), @@ -107,9 +132,155 @@ func New(logger zerolog.Logger, loader: loader, }, state{}, + instanceID, } } +// acquireLock tries to atomically create the lock file, blocking until the lock +// is obtained. It returns the etag of the lock file on success. It retries +// indefinitely until ctx is cancelled. A lock whose timestamp is older than +// lockTTL is considered stale and will be taken over. +func (m *Migrations) acquireLock(ctx context.Context) (string, error) { + m.logger.Debug().Str("instance", m.instanceID).Msg("acquiring migration lock") + for { + // Fast path: create the lock file only if it does not exist yet. + data, err := json.Marshal(lockData{Timestamp: time.Now(), InstanceID: m.instanceID}) + if err != nil { + return "", err + } + res, err := m.storage.Upload(ctx, metadata.UploadRequest{ + Path: lockFile, + Content: data, + IfNoneMatch: []string{"*"}, + }) + if err == nil { + m.logger.Debug().Str("instance", m.instanceID).Msg("migration lock acquired") + return res.Etag, nil + } + + // Propagate context cancellation immediately. + select { + case <-ctx.Done(): + return "", ctx.Err() + default: + } + + // Any error other than a conflict means something unexpected happened. + if !isConflict(err) { + return "", err + } + + // Lock file already exists — read it to decide whether it is stale. + dl, err := m.storage.Download(ctx, metadata.DownloadRequest{Path: lockFile}) + if err != nil { + if _, ok := err.(errtypes.IsNotFound); ok { + // Lock was released between our upload attempt and the download; + // retry acquiring it immediately. + m.logger.Debug().Str("instance", m.instanceID).Msg("migration lock vanished during read; retrying") + continue + } + return "", err + } + + var existing lockData + stale := true + if err := json.Unmarshal(dl.Content, &existing); err == nil { + stale = time.Since(existing.Timestamp) > lockTTL + } + + if stale { + m.logger.Debug(). + Str("instance", m.instanceID). + Str("held_by", existing.InstanceID). + Time("lock_timestamp", existing.Timestamp). + Msg("migration lock is stale; attempting takeover") + + // Atomically take over the stale lock using the etag we just read. + newData, err := json.Marshal(lockData{Timestamp: time.Now(), InstanceID: m.instanceID}) + if err != nil { + return "", err + } + res, err := m.storage.Upload(ctx, metadata.UploadRequest{ + Path: lockFile, + Content: newData, + IfMatchEtag: dl.Etag, + }) + if err == nil { + m.logger.Debug().Str("instance", m.instanceID).Msg("migration lock acquired via stale takeover") + return res.Etag, nil + } + // Another instance took the stale lock before us; loop and retry. + m.logger.Debug().Str("instance", m.instanceID).Err(err).Msg("stale lock takeover lost race; retrying") + continue + } + + m.logger.Debug(). + Str("instance", m.instanceID). + Str("held_by", existing.InstanceID). + Time("lock_timestamp", existing.Timestamp). + Dur("poll_interval", lockPollInterval). + Msg("migration lock held by another instance; waiting") + + // Lock is fresh and held by another instance; wait before retrying. + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(lockPollInterval): + } + } +} + +// startHeartbeat spawns a goroutine that periodically renews the lock file so +// that it is not considered stale while a long migration is running. Call the +// returned cancel function to stop the heartbeat. +func (m *Migrations) startHeartbeat(ctx context.Context, etag string) context.CancelFunc { + hbCtx, cancel := context.WithCancel(ctx) + go func() { + ticker := time.NewTicker(lockHeartbeatInterval) + defer ticker.Stop() + for { + select { + case <-hbCtx.Done(): + return + case <-ticker.C: + data, err := json.Marshal(lockData{Timestamp: time.Now(), InstanceID: m.instanceID}) + if err != nil { + m.logger.Warn().Err(err).Msg("failed to marshal heartbeat data for migration lock") + return + } + res, err := m.storage.Upload(hbCtx, metadata.UploadRequest{ + Path: lockFile, + Content: data, + IfMatchEtag: etag, + }) + if err != nil { + m.logger.Warn().Err(err).Msg("failed to renew migration lock; another instance may take over") + return + } + etag = res.Etag + } + } + }() + return cancel +} + +// releaseLock deletes the lock file unconditionally. +func (m *Migrations) releaseLock(ctx context.Context) { + if err := m.storage.Delete(ctx, lockFile); err != nil { + m.logger.Warn().Err(err).Msg("failed to release migration lock") + } +} + +// isConflict returns true for errors that signal a conditional-upload conflict, +// i.e. the lock file already exists or the etag did not match. +func isConflict(err error) bool { + switch err.(type) { + case errtypes.IsAlreadyExists, errtypes.IsAborted, errtypes.IsPreconditionFailed: + return true + } + return false +} + // loadState reads the persisted migration version from storage. If no state // file exists yet (fresh deployment) it returns version 0 without error. func (m *Migrations) loadState(ctx context.Context) error { @@ -142,6 +313,15 @@ func (m *Migrations) saveState(ctx context.Context) error { func (m *Migrations) RunMigrations() { ctx := context.Background() + etag, err := m.acquireLock(ctx) + if err != nil { + m.logger.Error().Err(err).Msg("failed to acquire migration lock; skipping migrations") + return + } + cancelHB := m.startHeartbeat(ctx, etag) + defer cancelHB() + defer m.releaseLock(ctx) + if err := m.loadState(ctx); err != nil { m.logger.Error().Err(err).Msg("failed to load migration state; skipping migrations") return diff --git a/pkg/share/manager/jsoncs3/migrations/migration_state_test.go b/pkg/share/manager/jsoncs3/migrations/migration_state_test.go index 8784a7a6b72..0c624a3025d 100644 --- a/pkg/share/manager/jsoncs3/migrations/migration_state_test.go +++ b/pkg/share/manager/jsoncs3/migrations/migration_state_test.go @@ -16,8 +16,12 @@ package migration import ( "context" + "encoding/json" "errors" "os" + "sync" + "sync/atomic" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -39,6 +43,24 @@ func (f *fakeMigration) Version() int { return f.version } func (f *fakeMigration) Initialize(_ config) {} func (f *fakeMigration) Migrate() error { f.called = true; return f.err } +// countingMigration is a migration stub that invokes a callback on every +// Migrate() call, allowing callers to observe how many times it ran. +type countingMigration struct { + name string + version int + onCall func() +} + +func (c *countingMigration) Name() string { return c.name } +func (c *countingMigration) Version() int { return c.version } +func (c *countingMigration) Initialize(_ config) {} +func (c *countingMigration) Migrate() error { c.onCall(); return nil } + +// marshalLockData is a test helper that JSON-encodes a lockData value. +func marshalLockData(d lockData) ([]byte, error) { + return json.Marshal(d) +} + var _ = Describe("RunMigrations / loadState / saveState", func() { var ( tmpdir string @@ -58,6 +80,10 @@ var _ = Describe("RunMigrations / loadState / saveState", func() { stor, err = metadata.NewDiskStorage(tmpdir) Expect(err).NotTo(HaveOccurred()) + // Replicate the directory setup that jsoncs3 Manager.initialize() does + // so that tests driving storage directly behave like production. + Expect(stor.MakeDirIfNotExist(context.Background(), "migrations")).To(Succeed()) + log = zerolog.Nop() m = Migrations{ config: config{ @@ -219,4 +245,114 @@ var _ = Describe("RunMigrations / loadState / saveState", func() { }) }) }) + + // ── Locking ──────────────────────────────────────────────────────────────── + + Describe("distributed lock", func() { + // Use a very short poll interval so the tests complete quickly. + var origPollInterval time.Duration + BeforeEach(func() { + origPollInterval = lockPollInterval + lockPollInterval = 10 * time.Millisecond + + m.instanceID = "instance-a" + }) + AfterEach(func() { + lockPollInterval = origPollInterval + }) + + Context("acquireLock", func() { + It("succeeds and returns a non-empty etag when no lock exists", func() { + etag, err := m.acquireLock(context.Background()) + Expect(err).NotTo(HaveOccurred()) + Expect(etag).NotTo(BeEmpty()) + }) + + It("blocks until the first holder releases the lock", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Instance A acquires the lock. + _, err := m.acquireLock(ctx) + Expect(err).NotTo(HaveOccurred()) + + // Instance B tries to acquire in the background. + m2 := Migrations{config: config{logger: log, storage: stor}, instanceID: "instance-b"} + acquired := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := m2.acquireLock(ctx) + Expect(err).NotTo(HaveOccurred()) + close(acquired) + }() + + // B must not acquire while A still holds the lock. + Consistently(acquired, 50*time.Millisecond, 10*time.Millisecond).ShouldNot(BeClosed()) + + // A releases; B should now acquire. + m.releaseLock(context.Background()) + Eventually(acquired, 500*time.Millisecond, 10*time.Millisecond).Should(BeClosed()) + }) + + It("immediately takes over a stale lock", func() { + // Write a lock file whose timestamp is well past lockTTL. + staleData, err := marshalLockData(lockData{ + Timestamp: time.Now().Add(-2 * lockTTL), + InstanceID: "crashed-instance", + }) + Expect(err).NotTo(HaveOccurred()) + Expect(stor.SimpleUpload(context.Background(), lockFile, staleData)).To(Succeed()) + + // acquireLock should detect the stale lock and take it over without waiting. + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + etag, err := m.acquireLock(context.Background()) + Expect(err).NotTo(HaveOccurred()) + Expect(etag).NotTo(BeEmpty()) + close(done) + }() + Eventually(done, 500*time.Millisecond, 10*time.Millisecond).Should(BeClosed()) + }) + }) + + Context("releaseLock", func() { + It("removes the lock file so a second caller can acquire immediately", func() { + ctx := context.Background() + _, err := m.acquireLock(ctx) + Expect(err).NotTo(HaveOccurred()) + + m.releaseLock(ctx) + + // A second instance should acquire without delay. + m2 := Migrations{config: config{logger: log, storage: stor}, instanceID: "instance-b"} + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := m2.acquireLock(ctx) + Expect(err).NotTo(HaveOccurred()) + close(done) + }() + Eventually(done, 500*time.Millisecond, 10*time.Millisecond).Should(BeClosed()) + }) + }) + + Context("RunMigrations with two concurrent instances", func() { + It("runs each pending migration exactly once", func() { + var callCount atomic.Int32 + mig := &countingMigration{name: "once", version: 1, onCall: func() { callCount.Add(1) }} + migrations = []migration{mig} + + m2 := Migrations{config: config{logger: log, storage: stor}, instanceID: "instance-b"} + + var wg sync.WaitGroup + wg.Add(2) + go func() { defer wg.Done(); m.RunMigrations() }() + go func() { defer wg.Done(); m2.RunMigrations() }() + wg.Wait() + + Expect(callCount.Load()).To(Equal(int32(1))) + }) + }) + }) }) From c2f519fe91da76a3da249346991ce540f3b783fd Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Thu, 7 May 2026 11:34:05 +0200 Subject: [PATCH 11/12] metadata/disk: Add support for IfNoneMatch on Upload This is needed for being able to run the unit tests of the jsoncs3 sharemanager. --- pkg/storage/utils/metadata/disk.go | 34 ++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/pkg/storage/utils/metadata/disk.go b/pkg/storage/utils/metadata/disk.go index f07642b953f..5caef632d30 100644 --- a/pkg/storage/utils/metadata/disk.go +++ b/pkg/storage/utils/metadata/disk.go @@ -93,6 +93,40 @@ func (disk *Disk) SimpleUpload(ctx context.Context, uploadpath string, content [ // Upload stores a file on disk func (disk *Disk) Upload(_ context.Context, req UploadRequest) (*UploadResponse, error) { p := disk.targetPath(req.Path) + + // IfNoneMatch: ["*"] means create the file only if it does not already + // exist. Use O_EXCL so the check and the create are atomic on the local + // filesystem. + for _, tag := range req.IfNoneMatch { + if tag != "*" { + continue + } + f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0644) + if err != nil { + if errors.Is(err, os.ErrExist) { + return nil, errtypes.AlreadyExists(p) + } + return nil, err + } + if _, err := f.Write(req.Content); err != nil { + _ = f.Close() + return nil, err + } + if err := f.Close(); err != nil { + return nil, err + } + info, err := os.Stat(p) + if err != nil { + return nil, err + } + res := &UploadResponse{} + res.Etag, err = calcEtag(info.ModTime(), info.Size()) + if err != nil { + return nil, err + } + return res, nil + } + if req.IfMatchEtag != "" { info, err := os.Stat(p) if err != nil && !errors.Is(err, os.ErrNotExist) { From f863cda2ac9c014a729272e608f0ff4b98268d14 Mon Sep 17 00:00:00 2001 From: Ralf Haferkamp Date: Thu, 7 May 2026 13:24:55 +0200 Subject: [PATCH 12/12] jsoncs3/migrations: Block Writes during migration While migration are running block any Write call on the share manager. --- pkg/share/manager/jsoncs3/jsoncs3.go | 52 +++++++++++++++++++---- pkg/share/manager/jsoncs3/jsoncs3_test.go | 12 ++++++ 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 5548d81cf16..cf336ecb026 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -160,7 +160,8 @@ type Manager struct { storage metadata.Storage SpaceRoot *provider.ResourceId - ready chan struct{} // closed once initialize() has completed successfully + ready chan struct{} // closed once initialize() has completed successfully + migrationsDone chan struct{} // closed once doMigrations() has returned on this instance MaxConcurrency int @@ -236,6 +237,10 @@ func New(s metadata.Storage, MaxConcurrency: maxconcurrency, logger: logger, ready: make(chan struct{}), + // migrationsDone is open (blocking) by default. It is closed by + // doMigrations when all migrations complete, or by SkipMigrations for + // callers (e.g. tests) that do not run migrations at all. + migrationsDone: make(chan struct{}), } // Initialize the metadata storage connection in the background, retrying @@ -323,14 +328,45 @@ func (m *Manager) waitForInit(ctx context.Context) error { } } +// waitForMigrations blocks until both storage initialization and all data +// migrations have completed on this instance, or until ctx is cancelled. +// It is a strict superset of waitForInit and should be used by write operations +// to ensure no writes race with an in-progress migration. +func (m *Manager) waitForMigrations(ctx context.Context) error { + select { + case <-m.ready: + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "share manager not yet initialized") + } + select { + case <-m.migrationsDone: + return nil + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "share manager migrations not yet complete") + } +} + // RunMigrations starts data migrations in a background goroutine. It should be -// called once after New() in production server startup. Tests that do not need -// migration behaviour can omit this call entirely. +// called once after New() in production server startup. Callers that do not +// need migrations should call SkipMigrations instead to unblock write operations. func (m *Manager) RunMigrations(cfg migration.MigrationConfig) { go m.doMigrations(cfg) } +// SkipMigrations unblocks write operations on this instance without running +// any migrations. It must be called when RunMigrations will not be called, +// for example in tests. +func (m *Manager) SkipMigrations() { + close(m.migrationsDone) +} + func (m *Manager) doMigrations(cfg migration.MigrationConfig) { + // Always close migrationsDone when this goroutine exits, whether migrations + // ran, were skipped, or failed. This unblocks write operations on this + // instance. Non-winning instances are held here by acquireLock until the + // winning instance finishes, so the close happens only after the storage + // state is fully migrated. + defer close(m.migrationsDone) if err := m.waitForInit(context.Background()); err != nil { m.logger.Error().Err(err).Msg("share manager: aborting migrations, manager did not initialize") return @@ -360,7 +396,7 @@ func (m *Manager) ProcessEvents(ch <-chan events.Event) { func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share") defer span.End() - if err := m.waitForInit(ctx); err != nil { + if err := m.waitForMigrations(ctx); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return nil, err @@ -567,7 +603,7 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Unshare") defer span.End() - if err := m.waitForInit(ctx); err != nil { + if err := m.waitForMigrations(ctx); err != nil { return err } @@ -584,7 +620,7 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateShare") defer span.End() - if err := m.waitForInit(ctx); err != nil { + if err := m.waitForMigrations(ctx); err != nil { return nil, err } @@ -1129,7 +1165,7 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateReceivedShare") defer span.End() - if err := m.waitForInit(ctx); err != nil { + if err := m.waitForMigrations(ctx); err != nil { return nil, err } @@ -1294,7 +1330,7 @@ func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share, skipS func (m *Manager) CleanupStaleShares(ctx context.Context) { log := appctx.GetLogger(ctx) - if err := m.waitForInit(ctx); err != nil { + if err := m.waitForMigrations(ctx); err != nil { return } diff --git a/pkg/share/manager/jsoncs3/jsoncs3_test.go b/pkg/share/manager/jsoncs3/jsoncs3_test.go index 9910a518f18..28118391a21 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3_test.go +++ b/pkg/share/manager/jsoncs3/jsoncs3_test.go @@ -168,6 +168,7 @@ var _ = Describe("Jsoncs3", func() { ) m, err = jsoncs3.New(storage, nil, gatewaySelector, 0, nil, 0) Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() }) AfterEach(func() { @@ -263,6 +264,7 @@ var _ = Describe("Jsoncs3", func() { m, err = jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() s = shareBykey(&collaboration.ShareKey{ ResourceId: sharedResource.Id, @@ -453,6 +455,7 @@ var _ = Describe("Jsoncs3", func() { It("loads the cache when it doesn't have an entry", func() { m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() s, err := m.GetShare(ctx, shareRef) Expect(err).ToNot(HaveOccurred()) @@ -501,6 +504,7 @@ var _ = Describe("Jsoncs3", func() { m, err = jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() s, err := m.GetShare(ctx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Key{ @@ -614,6 +618,7 @@ var _ = Describe("Jsoncs3", func() { m, err = jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() s = shareBykey(&collaboration.ShareKey{ ResourceId: sharedResource.Id, @@ -753,6 +758,7 @@ var _ = Describe("Jsoncs3", func() { It("syncronizes the user received cache before listing", func() { m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}, nil) Expect(err).ToNot(HaveOccurred()) @@ -821,6 +827,7 @@ var _ = Describe("Jsoncs3", func() { It("syncronizes the group received cache before listing", func() { m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}, nil) Expect(err).ToNot(HaveOccurred()) @@ -865,6 +872,7 @@ var _ = Describe("Jsoncs3", func() { It("syncs the cache", func() { m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Id{ @@ -899,6 +907,7 @@ var _ = Describe("Jsoncs3", func() { It("syncs the cache", func() { m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Id{ @@ -1052,6 +1061,7 @@ var _ = Describe("Jsoncs3", func() { m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Id{ @@ -1080,6 +1090,7 @@ var _ = Describe("Jsoncs3", func() { m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Id{ @@ -1109,6 +1120,7 @@ var _ = Describe("Jsoncs3", func() { m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Id{