diff --git a/internal/grpc/services/usershareprovider/usershareprovider.go b/internal/grpc/services/usershareprovider/usershareprovider.go index eaab359763e..1b801f58128 100644 --- a/internal/grpc/services/usershareprovider/usershareprovider.go +++ b/internal/grpc/services/usershareprovider/usershareprovider.go @@ -84,9 +84,9 @@ type service struct { allowedPathsForShares []*regexp.Regexp } -func getShareManager(c *config) (share.Manager, error) { +func getShareManager(c *config, logger *zerolog.Logger) (share.Manager, error) { if f, ok := registry.NewFuncs[c.Driver]; ok { - return f(c.Drivers[c.Driver]) + return f(c.Drivers[c.Driver], logger) } return nil, errtypes.NotFound("driver not found: " + c.Driver) } @@ -114,7 +114,7 @@ func parseConfig(m map[string]interface{}) (*config, error) { } // New creates a new user share provider svc initialized from defaults -func NewDefault(m map[string]interface{}, ss *grpc.Server, _ *zerolog.Logger) (rgrpc.Service, error) { +func NewDefault(m map[string]any, ss *grpc.Server, logger *zerolog.Logger) (rgrpc.Service, error) { c, err := parseConfig(m) if err != nil { @@ -123,7 +123,7 @@ func NewDefault(m map[string]interface{}, ss *grpc.Server, _ *zerolog.Logger) (r c.init() - sm, err := getShareManager(c) + sm, err := getShareManager(c, logger) if err != nil { return nil, err } diff --git a/internal/grpc/services/usershareprovider/usershareprovider_test.go b/internal/grpc/services/usershareprovider/usershareprovider_test.go index 0d46625388f..6dd5b75af75 100644 --- a/internal/grpc/services/usershareprovider/usershareprovider_test.go +++ b/internal/grpc/services/usershareprovider/usershareprovider_test.go @@ -32,6 +32,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/fieldmaskpb" @@ -91,7 +92,7 @@ var _ = Describe("user share provider service", func() { BeforeEach(func() { manager = &mocks.Manager{} - registry.Register("mockManager", func(m map[string]interface{}) (share.Manager, error) { + registry.Register("mockManager", func(m map[string]interface{}, l *zerolog.Logger) (share.Manager, error) { return manager, nil }) manager.On("UpdateShare", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collaborationpb.Share{}, nil) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index ba995639b8a..cf336ecb026 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -36,9 +36,9 @@ import ( "github.com/opencloud-eu/reva/v2/pkg/errtypes" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/events/stream" - "github.com/opencloud-eu/reva/v2/pkg/logger" "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" "github.com/opencloud-eu/reva/v2/pkg/share" + migration "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/migrations" "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/providercache" "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache" "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/sharecache" @@ -48,6 +48,7 @@ import ( "github.com/opencloud-eu/reva/v2/pkg/storagespace" "github.com/opencloud-eu/reva/v2/pkg/utils" "github.com/pkg/errors" + "github.com/rs/zerolog" "go.opentelemetry.io/otel/codes" "golang.org/x/sync/errgroup" "google.golang.org/genproto/protobuf/field_mask" @@ -122,14 +123,20 @@ var ( ) type config struct { - GatewayAddr string `mapstructure:"gateway_addr"` - MaxConcurrency int `mapstructure:"max_concurrency"` - ProviderAddr string `mapstructure:"provider_addr"` - ServiceUserID string `mapstructure:"service_user_id"` - ServiceUserIdp string `mapstructure:"service_user_idp"` - MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"` - CacheTTL int `mapstructure:"ttl"` - Events EventOptions `mapstructure:"events"` + GatewayAddr string `mapstructure:"gateway_addr"` + MaxConcurrency int `mapstructure:"max_concurrency"` + ProviderAddr string `mapstructure:"provider_addr"` + SystemUserID string `mapstructure:"system_user_id"` + SystemUserIdp string `mapstructure:"system_user_idp"` + MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"` + ServiceAccountID string `mapstructure:"service_account_id"` + ServiceAccountSecret string `mapstructure:"service_account_secret"` + // ProviderRegistryAddr is the address of the storage registry used during + // migrations. Defaults to GatewayAddr when empty, because in the default + // OpenCloud deployment the registry is co-located with the gateway. + ProviderRegistryAddr string `mapstructure:"provider_registry_addr"` + CacheTTL int `mapstructure:"ttl"` + Events EventOptions `mapstructure:"events"` } // EventOptions are the configurable options for events @@ -145,8 +152,6 @@ type EventOptions struct { // Manager implements a share manager using a cs3 storage backend with local caching type Manager struct { - sync.RWMutex - Cache providercache.Cache // holds all shares, sharded by provider id and space id CreatedCache sharecache.Cache // holds the list of shares a user has created, sharded by user id GroupReceivedCache sharecache.Cache // holds the list of shares a group has access to, sharded by group id @@ -155,23 +160,25 @@ type Manager struct { storage metadata.Storage SpaceRoot *provider.ResourceId - initialized bool + ready chan struct{} // closed once initialize() has completed successfully + migrationsDone chan struct{} // closed once doMigrations() has returned on this instance MaxConcurrency int gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient] eventStream events.Stream + logger *zerolog.Logger } // NewDefault returns a new manager instance with default dependencies -func NewDefault(m map[string]interface{}) (share.Manager, error) { +func NewDefault(m map[string]interface{}, logger *zerolog.Logger) (share.Manager, error) { c := &config{} if err := mapstructure.Decode(m, c); err != nil { err = errors.Wrap(err, "error creating a new manager") return nil, err } - s, err := metadata.NewCS3Storage(c.ProviderAddr, c.ProviderAddr, c.ServiceUserID, c.ServiceUserIdp, c.MachineAuthAPIKey) + s, err := metadata.NewCS3Storage(c.ProviderAddr, c.ProviderAddr, c.SystemUserID, c.SystemUserIdp, c.MachineAuthAPIKey) if err != nil { return nil, err } @@ -189,11 +196,34 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) { } } - return New(s, gatewaySelector, c.CacheTTL, es, c.MaxConcurrency) + mgr, err := New(s, logger, gatewaySelector, c.CacheTTL, es, c.MaxConcurrency) + if err != nil { + return nil, err + } + providerRegistryAddr := c.ProviderRegistryAddr + if providerRegistryAddr == "" { + providerRegistryAddr = c.GatewayAddr + } + mgr.RunMigrations(migration.MigrationConfig{ + ServiceAccountID: c.ServiceAccountID, + ServiceAccountSecret: c.ServiceAccountSecret, + ProviderRegistryAddr: providerRegistryAddr, + }) + return mgr, nil } // New returns a new manager instance. -func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) { +func New(s metadata.Storage, + logger *zerolog.Logger, + gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], + ttlSeconds int, + es events.Stream, + maxconcurrency int, +) (*Manager, error) { + if logger == nil { + nop := zerolog.Nop() + logger = &nop + } ttl := time.Duration(ttlSeconds) * time.Second m := &Manager{ @@ -205,13 +235,38 @@ func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.Gate gatewaySelector: gatewaySelector, eventStream: es, MaxConcurrency: maxconcurrency, + logger: logger, + ready: make(chan struct{}), + // migrationsDone is open (blocking) by default. It is closed by + // doMigrations when all migrations complete, or by SkipMigrations for + // callers (e.g. tests) that do not run migrations at all. + migrationsDone: make(chan struct{}), } + // Initialize the metadata storage connection in the background, retrying + // with exponential backoff if the backend is not yet available. + go func() { + backoff := time.Second + for { + if err := m.initialize(context.Background()); err != nil { + logger.Info().Err(err).Dur("backoff", backoff).Msg("share manager: metadata storage initialization failed, retrying") + time.Sleep(backoff) + if backoff < 30*time.Second { + backoff *= 2 + } + continue + } + logger.Debug().Msg("share manager: initialization succeeded") + close(m.ready) + return + } + }() + // listen for events if m.eventStream != nil { ch, err := events.Consume(m.eventStream, "jsoncs3sharemanager", _registeredEvents...) if err != nil { - appctx.GetLogger(context.Background()).Error().Err(err).Msg("error consuming events") + logger.Error().Err(err).Msg("error consuming events") } go m.ProcessEvents(ch) } @@ -219,23 +274,13 @@ func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.Gate return m, nil } +// initialize connects to the metadata storage backend and ensures the required +// directory structure exists. It is called once at startup from a background +// goroutine (see New) and must not be called concurrently. func (m *Manager) initialize(ctx context.Context) error { _, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "initialize") defer span.End() - if m.initialized { - span.SetStatus(codes.Ok, "already initialized") - return nil - } - - m.Lock() - defer m.Unlock() - - if m.initialized { // check if initialization happened while grabbing the lock - span.SetStatus(codes.Ok, "initialized while grabbing lock") - return nil - } - ctx = context.Background() err := m.storage.Init(ctx, "jsoncs3-share-manager-metadata") if err != nil { span.RecordError(err) @@ -261,21 +306,85 @@ func (m *Manager) initialize(ctx context.Context) error { span.SetStatus(codes.Error, err.Error()) return err } + err = m.storage.MakeDirIfNotExist(ctx, "migrations") + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } - m.initialized = true span.SetStatus(codes.Ok, "initialized") return nil } +// waitForInit blocks until the background initialization goroutine has +// successfully completed, or until ctx is cancelled. +func (m *Manager) waitForInit(ctx context.Context) error { + select { + case <-m.ready: + return nil + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "share manager not yet initialized") + } +} + +// waitForMigrations blocks until both storage initialization and all data +// migrations have completed on this instance, or until ctx is cancelled. +// It is a strict superset of waitForInit and should be used by write operations +// to ensure no writes race with an in-progress migration. +func (m *Manager) waitForMigrations(ctx context.Context) error { + select { + case <-m.ready: + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "share manager not yet initialized") + } + select { + case <-m.migrationsDone: + return nil + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "share manager migrations not yet complete") + } +} + +// RunMigrations starts data migrations in a background goroutine. It should be +// called once after New() in production server startup. Callers that do not +// need migrations should call SkipMigrations instead to unblock write operations. +func (m *Manager) RunMigrations(cfg migration.MigrationConfig) { + go m.doMigrations(cfg) +} + +// SkipMigrations unblocks write operations on this instance without running +// any migrations. It must be called when RunMigrations will not be called, +// for example in tests. +func (m *Manager) SkipMigrations() { + close(m.migrationsDone) +} + +func (m *Manager) doMigrations(cfg migration.MigrationConfig) { + // Always close migrationsDone when this goroutine exits, whether migrations + // ran, were skipped, or failed. This unblocks write operations on this + // instance. Non-winning instances are held here by acquireLock until the + // winning instance finishes, so the close happens only after the storage + // state is fully migrated. + defer close(m.migrationsDone) + if err := m.waitForInit(context.Background()); err != nil { + m.logger.Error().Err(err).Msg("share manager: aborting migrations, manager did not initialize") + return + } + m.logger.Debug().Msg("migrations start") + migrations := migration.New(*m.logger, m.gatewaySelector, m.storage, cfg, m, m) + migrations.RunMigrations() +} + func (m *Manager) ProcessEvents(ch <-chan events.Event) { - log := logger.New() + log := m.logger + ctx := context.Background() + if err := m.waitForInit(ctx); err != nil { + log.Error().Err(err).Msg("share manager: error waiting for initialization") + return + } for event := range ch { ctx := context.Background() - - if err := m.initialize(ctx); err != nil { - log.Error().Err(err).Msg("error initializing manager") - } - if ev, ok := event.Event.(events.SpaceDeleted); ok { log.Debug().Msgf("space deleted event: %v", ev) go func() { m.purgeSpace(ctx, ev.ID) }() @@ -287,7 +396,7 @@ func (m *Manager) ProcessEvents(ch <-chan events.Event) { func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share") defer span.End() - if err := m.initialize(ctx); err != nil { + if err := m.waitForMigrations(ctx); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return nil, err @@ -436,7 +545,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "GetShare") defer span.End() sublog := appctx.GetLogger(ctx).With().Str("id", ref.GetId().GetOpaqueId()).Str("key", ref.GetKey().String()).Str("driver", "jsoncs3").Str("handler", "GetShare").Logger() - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return nil, err } @@ -494,7 +603,7 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Unshare") defer span.End() - if err := m.initialize(ctx); err != nil { + if err := m.waitForMigrations(ctx); err != nil { return err } @@ -511,7 +620,7 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateShare") defer span.End() - if err := m.initialize(ctx); err != nil { + if err := m.waitForMigrations(ctx); err != nil { return nil, err } @@ -599,7 +708,7 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListShares") defer span.End() - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return nil, err } @@ -816,7 +925,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati defer span.End() sublog := appctx.GetLogger(ctx).With().Str("driver", "jsoncs3").Str("handler", "ListReceivedShares").Logger() - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return nil, err } @@ -1012,7 +1121,7 @@ func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.S // GetReceivedShare returns the information for a received share. func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) { - if err := m.initialize(ctx); err != nil { + if err := m.waitForInit(ctx); err != nil { return nil, err } @@ -1056,7 +1165,7 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateReceivedShare") defer span.End() - if err := m.initialize(ctx); err != nil { + if err := m.waitForMigrations(ctx); err != nil { return nil, err } @@ -1103,8 +1212,8 @@ func updateShareID(share *collaboration.Share) { // Load imports shares and received shares from channels (e.g. during migration) func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Share, receivedShareChan <-chan share.ReceivedShareWithUser) error { - log := appctx.GetLogger(ctx) - if err := m.initialize(ctx); err != nil { + l := m.logger + if err := m.waitForInit(ctx); err != nil { return err } @@ -1119,14 +1228,14 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar updateShareID(s) } if err := m.Cache.Add(context.Background(), s.GetResourceId().GetStorageId(), s.GetResourceId().GetSpaceId(), s.Id.OpaqueId, s); err != nil { - log.Error().Err(err).Interface("share", s).Msg("error persisting share") + l.Error().Err(err).Interface("share", s).Msg("error persisting share") } else { - log.Debug().Str("storageid", s.GetResourceId().GetStorageId()).Str("spaceid", s.GetResourceId().GetSpaceId()).Str("shareid", s.Id.OpaqueId).Msg("imported share") + l.Debug().Str("storageid", s.GetResourceId().GetStorageId()).Str("spaceid", s.GetResourceId().GetSpaceId()).Str("shareid", s.Id.OpaqueId).Msg("imported share") } if err := m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId); err != nil { - log.Error().Err(err).Interface("share", s).Msg("error persisting created cache") + l.Error().Err(err).Interface("share", s).Msg("error persisting created cache") } else { - log.Debug().Str("creatorid", s.GetCreator().GetOpaqueId()).Str("shareid", s.Id.OpaqueId).Msg("updated created cache") + l.Debug().Str("creatorid", s.GetCreator().GetOpaqueId()).Str("shareid", s.Id.OpaqueId).Msg("updated created cache") } } wg.Done() @@ -1137,18 +1246,19 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar if !shareIsRoutable(s.ReceivedShare.GetShare()) { updateShareID(s.ReceivedShare.GetShare()) } - switch s.ReceivedShare.Share.Grantee.Type { - case provider.GranteeType_GRANTEE_TYPE_USER: - if err := m.UserReceivedStates.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId(), s.ReceivedShare.GetShare().GetResourceId().GetSpaceId(), s.ReceivedShare); err != nil { - log.Error().Err(err).Interface("received share", s).Msg("error persisting received share for user") + if s.UserID != nil { + spaceid := s.ReceivedShare.GetShare().GetResourceId().GetStorageId() + shareid.IDDelimiter + s.ReceivedShare.GetShare().GetResourceId().GetSpaceId() + if err := m.UserReceivedStates.Add(context.Background(), s.UserID.GetOpaqueId(), spaceid, s.ReceivedShare); err != nil { + l.Error().Err(err).Interface("received share", s).Msg("error persisting received share for user") } else { - log.Debug().Str("userid", s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId()).Str("spaceid", s.ReceivedShare.GetShare().GetResourceId().GetSpaceId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share userdata") + l.Debug().Str("userid", s.UserID.GetOpaqueId()).Str("spaceid", spaceid).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share userdata") } - case provider.GranteeType_GRANTEE_TYPE_GROUP: + } + if s.ReceivedShare.Share.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP && s.UserID == nil { if err := m.GroupReceivedCache.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId(), s.ReceivedShare.GetShare().GetId().GetOpaqueId()); err != nil { - log.Error().Err(err).Interface("received share", s).Msg("error persisting received share to group cache") + l.Error().Err(err).Interface("received share", s).Msg("error persisting received share to group cache") } else { - log.Debug().Str("groupid", s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share group cache") + l.Debug().Str("groupid", s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share group cache") } } } @@ -1220,7 +1330,7 @@ func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share, skipS func (m *Manager) CleanupStaleShares(ctx context.Context) { log := appctx.GetLogger(ctx) - if err := m.initialize(ctx); err != nil { + if err := m.waitForMigrations(ctx); err != nil { return } diff --git a/pkg/share/manager/jsoncs3/jsoncs3_test.go b/pkg/share/manager/jsoncs3/jsoncs3_test.go index 011347de500..28118391a21 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3_test.go +++ b/pkg/share/manager/jsoncs3/jsoncs3_test.go @@ -166,8 +166,9 @@ var _ = Describe("Jsoncs3", func() { return client }, ) - m, err = jsoncs3.New(storage, gatewaySelector, 0, nil, 0) + m, err = jsoncs3.New(storage, nil, gatewaySelector, 0, nil, 0) Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() }) AfterEach(func() { @@ -261,8 +262,9 @@ var _ = Describe("Jsoncs3", func() { }) Expect(s).ToNot(BeNil()) - m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err = jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() s = shareBykey(&collaboration.ShareKey{ ResourceId: sharedResource.Id, @@ -451,8 +453,9 @@ var _ = Describe("Jsoncs3", func() { }) It("loads the cache when it doesn't have an entry", func() { - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() s, err := m.GetShare(ctx, shareRef) Expect(err).ToNot(HaveOccurred()) @@ -499,8 +502,9 @@ var _ = Describe("Jsoncs3", func() { }) Expect(err).ToNot(HaveOccurred()) - m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err = jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() s, err := m.GetShare(ctx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Key{ @@ -612,8 +616,9 @@ var _ = Describe("Jsoncs3", func() { Expect(us).ToNot(BeNil()) Expect(us.GetPermissions().GetPermissions().InitiateFileUpload).To(BeTrue()) - m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err = jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() s = shareBykey(&collaboration.ShareKey{ ResourceId: sharedResource.Id, @@ -751,8 +756,9 @@ var _ = Describe("Jsoncs3", func() { }) It("syncronizes the user received cache before listing", func() { - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}, nil) Expect(err).ToNot(HaveOccurred()) @@ -819,8 +825,9 @@ var _ = Describe("Jsoncs3", func() { }) It("syncronizes the group received cache before listing", func() { - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}, nil) Expect(err).ToNot(HaveOccurred()) @@ -863,8 +870,9 @@ var _ = Describe("Jsoncs3", func() { }) It("syncs the cache", func() { - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Id{ @@ -897,8 +905,9 @@ var _ = Describe("Jsoncs3", func() { }) It("syncs the cache", func() { - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Id{ @@ -1050,8 +1059,9 @@ var _ = Describe("Jsoncs3", func() { Expect(err).ToNot(HaveOccurred()) Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Id{ @@ -1078,8 +1088,9 @@ var _ = Describe("Jsoncs3", func() { Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) Expect(rs.Hidden).To(Equal(true)) - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Id{ @@ -1107,8 +1118,9 @@ var _ = Describe("Jsoncs3", func() { Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) Expect(rs.Hidden).To(Equal(true)) - m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) + m.SkipMigrations() rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Id{ diff --git a/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go new file mode 100644 index 00000000000..8f15a563eba --- /dev/null +++ b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers.go @@ -0,0 +1,435 @@ +// Copyright 2026 OpenCloud GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package migration + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/cenkalti/backoff" + grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1" + typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/google/uuid" + ctxpkg "github.com/opencloud-eu/reva/v2/pkg/ctx" + "github.com/opencloud-eu/reva/v2/pkg/errtypes" + "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" + "github.com/opencloud-eu/reva/v2/pkg/share" + "github.com/opencloud-eu/reva/v2/pkg/share/manager/jsoncs3/shareid" + "github.com/opencloud-eu/reva/v2/pkg/utils" + "github.com/rs/zerolog" + "google.golang.org/grpc" +) + +// storageProvider is the narrow subset of provider.ProviderAPIClient that the +// migration actually uses. Keeping it narrow makes test stubs trivial to write. +type storageProvider interface { + ListGrants(ctx context.Context, in *provider.ListGrantsRequest, opts ...grpc.CallOption) (*provider.ListGrantsResponse, error) +} + +type ImportSpaceMembersMigration struct { + cfg config + sharesChan chan *collaboration.Share + receivedChan chan share.ReceivedShareWithUser + userCache map[string]*userpb.UserId + groupCache map[string]*grouppb.GroupId + providerResolver func(context.Context, *provider.StorageSpace) (storageProvider, error) +} + +func init() { + registerMigration(&ImportSpaceMembersMigration{}) +} + +func (m *ImportSpaceMembersMigration) Initialize(cfg config) { + m.cfg = cfg + m.sharesChan = make(chan *collaboration.Share) + m.receivedChan = make(chan share.ReceivedShareWithUser) + m.userCache = make(map[string]*userpb.UserId) + m.groupCache = make(map[string]*grouppb.GroupId) + m.providerResolver = func(ctx context.Context, space *provider.StorageSpace) (storageProvider, error) { + return m.storageProviderForSpace(ctx, space) + } +} + +func (m *ImportSpaceMembersMigration) Name() string { + return "import_space_members" +} + +func (m *ImportSpaceMembersMigration) Version() int { + return 1 +} + +func (m *ImportSpaceMembersMigration) Migrate() error { + gwc, err := m.cfg.gatewaySelector.Next() + if err != nil { + return err + } + + svcCtx, err := utils.GetServiceUserContextWithContext(context.Background(), gwc, m.cfg.serviceAccountID, m.cfg.serviceAccountSecret) + if err != nil { + m.cfg.logger.Error().Err(err).Msg("failed to get service user context for migration") + return err + } + // List all project spaces. + listRes, err := gwc.ListStorageSpaces(svcCtx, &provider.ListStorageSpacesRequest{ + Opaque: utils.AppendPlainToOpaque(nil, "unrestricted", "true"), + Filters: []*provider.ListStorageSpacesRequest_Filter{ + { + Type: provider.ListStorageSpacesRequest_Filter_TYPE_SPACE_TYPE, + Term: &provider.ListStorageSpacesRequest_Filter_SpaceType{SpaceType: "project"}, + }, + }, + }) + if err != nil { + m.cfg.logger.Error().Err(err).Msg("space-membership migration: failed to list storage spaces") + return err + } + + if listRes.GetStatus().GetCode() != rpc.Code_CODE_OK { + m.cfg.logger.Error().Str("status", listRes.GetStatus().GetMessage()).Msg("space-membership migration: ListStorageSpaces returned non-OK status") + return errtypes.InternalError("ListStorageSpaces") + } + + spaces := listRes.GetStorageSpaces() + m.cfg.logger.Info().Int("spaces", len(spaces)).Msg("Starting migration") + + // loadCtx is cancelled when the producer finishes (or fails) so that the + // Load goroutine — which blocks reading from the channels — is not left + // waiting forever if we return early from an error. + loadCtx, cancelLoad := context.WithCancel(svcCtx) + defer cancelLoad() + + var wg sync.WaitGroup + var loaderError error + wg.Go(func() { + loaderError = m.cfg.loader.Load(loadCtx, m.sharesChan, m.receivedChan) + }) + + migrated := 0 + for _, space := range spaces { + sharesCreated, err := m.migrateSpace(loadCtx, space) + if err != nil { + m.cfg.logger.Error().Err(err).Str("space", space.GetId().GetOpaqueId()).Msg("failed to migrate space; continuing with remaining spaces") + continue + } + migrated++ + m.cfg.logger.Debug(). + Str("space", space.GetId().GetOpaqueId()). + Int("shares_created", sharesCreated). + Msg("space migrated") + if migrated%10 == 0 { + m.cfg.logger.Info(). + Int("migrated", migrated). + Int("total", len(spaces)). + Msg("migration progress") + } + } + close(m.receivedChan) + close(m.sharesChan) + + wg.Wait() + m.cfg.logger.Info().Err(loaderError).Int("migrated", migrated).Int("total", len(spaces)).Msg("Migration finished") + return loaderError +} + +func (m *ImportSpaceMembersMigration) migrateSpace(ctx context.Context, space *provider.StorageSpace) (int, error) { + spClient, err := m.providerResolver(ctx, space) + if err != nil { + return 0, err + } + + ref := &provider.Reference{ResourceId: space.GetRoot()} + grantsRes, err := spClient.ListGrants(ctx, &provider.ListGrantsRequest{Ref: ref}) + if err != nil { + return 0, err + } + if grantsRes.GetStatus().GetCode() != rpc.Code_CODE_OK { + return 0, errtypes.NewErrtypeFromStatus(grantsRes.GetStatus()) + } + + sharesCreated := 0 + for _, grant := range grantsRes.GetGrants() { + share, receivedShares, err := m.spaceGrantToShares(ctx, grant, space) + if err != nil { + m.cfg.logger.Error().Err(err). + Interface("grant", grant). + Msg("Failed to convert grant to shares") + continue + } + if share == nil { + // share already existed; nothing to import for this grant + continue + } + + select { + case m.sharesChan <- share: + case <-ctx.Done(): + return sharesCreated, ctx.Err() + } + for _, rs := range receivedShares { + select { + case m.receivedChan <- rs: + case <-ctx.Done(): + return sharesCreated, ctx.Err() + } + } + sharesCreated++ + } + return sharesCreated, nil +} + +// resolveRetries is the maximum number of times resolveUserID / resolveGroupID +// will retry after receiving an errtypes.Unavailable response (LDAP down). +const resolveRetries = 10 + +// retryOnUnavailable calls op, retrying with exponential backoff whenever op +// returns errtypes.Unavailable. Any other error (including context +// cancellation) stops the loop immediately and is returned as-is. +// Retries are capped at resolveRetries attempts and respect ctx cancellation. +func retryOnUnavailable(ctx context.Context, log zerolog.Logger, op func() error) error { + b := backoff.WithContext( + backoff.WithMaxRetries(backoff.NewExponentialBackOff(), resolveRetries), + ctx, + ) + notify := func(err error, d time.Duration) { + log.Warn().Err(err).Dur("retry_in", d).Msg("identity provider temporarily unavailable, retrying") + } + return backoff.RetryNotify(func() error { + err := op() + if err == nil { + return nil + } + if _, ok := err.(errtypes.Unavailable); ok { + return err // transient — keep retrying + } + return backoff.Permanent(err) // permanent — stop immediately + }, b, notify) +} + +func (m *ImportSpaceMembersMigration) resolveUserID(ctx context.Context, opaqueID string) (*userpb.UserId, error) { + if id, ok := m.userCache[opaqueID]; ok { + return id, nil + } + var id *userpb.UserId + err := retryOnUnavailable(ctx, m.cfg.logger, func() error { + gwc, err := m.cfg.gatewaySelector.Next() + if err != nil { + return err + } + res, err := gwc.GetUser(ctx, &userpb.GetUserRequest{ + UserId: &userpb.UserId{OpaqueId: opaqueID}, + SkipFetchingUserGroups: true, + }) + if err != nil { + return err + } + if res.GetStatus().GetCode() != rpc.Code_CODE_OK { + // errtypes.NewErrtypeFromStatus maps CODE_UNAVAILABLE → errtypes.Unavailable, + // which retryOnUnavailable will retry; all other codes are treated as permanent. + return errtypes.NewErrtypeFromStatus(res.GetStatus()) + } + id = res.GetUser().GetId() + return nil + }) + if err != nil { + return nil, err + } + m.userCache[opaqueID] = id + return id, nil +} + +func (m *ImportSpaceMembersMigration) resolveGroupID(ctx context.Context, opaqueID string) (*grouppb.GroupId, error) { + if id, ok := m.groupCache[opaqueID]; ok { + return id, nil + } + var id *grouppb.GroupId + err := retryOnUnavailable(ctx, m.cfg.logger, func() error { + gwc, err := m.cfg.gatewaySelector.Next() + if err != nil { + return err + } + res, err := gwc.GetGroup(ctx, &grouppb.GetGroupRequest{ + GroupId: &grouppb.GroupId{OpaqueId: opaqueID}, + SkipFetchingMembers: true, + }) + if err != nil { + return err + } + if res.GetStatus().GetCode() != rpc.Code_CODE_OK { + return errtypes.NewErrtypeFromStatus(res.GetStatus()) + } + id = res.GetGroup().GetId() + return nil + }) + if err != nil { + return nil, err + } + m.groupCache[opaqueID] = id + return id, nil +} + +func (m *ImportSpaceMembersMigration) spaceGrantToShares(ctx context.Context, grant *provider.Grant, space *provider.StorageSpace) (*collaboration.Share, []share.ReceivedShareWithUser, error) { + // The grantee ids as persisted on disk do not have an IDP or type stored as + // part of the userid/groupid. Resolve them via the gateway so we get the + // full userid + switch grant.GetGrantee().GetType() { + case provider.GranteeType_GRANTEE_TYPE_GROUP: + groupID, err := m.resolveGroupID(ctx, grant.GetGrantee().GetGroupId().GetOpaqueId()) + if err != nil { + return nil, nil, fmt.Errorf("resolve group %s: %w", grant.GetGrantee().GetGroupId().GetOpaqueId(), err) + } + grant.Grantee.Id = &provider.Grantee_GroupId{GroupId: groupID} + case provider.GranteeType_GRANTEE_TYPE_USER: + userID, err := m.resolveUserID(ctx, grant.GetGrantee().GetUserId().GetOpaqueId()) + if err != nil { + return nil, nil, fmt.Errorf("resolve user %s: %w", grant.GetGrantee().GetUserId().GetOpaqueId(), err) + } + grant.Grantee.Id = &provider.Grantee_UserId{UserId: userID} + } + + ref := &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Key{ + Key: &collaboration.ShareKey{ + ResourceId: space.GetRoot(), + Grantee: grant.GetGrantee(), + }, + }, + } + + ctx = ctxpkg.ContextSetUser(ctx, &userpb.User{Id: grant.Creator}) + if s, err := m.cfg.manager.GetShare(ctx, ref); err == nil { + // FIXME: Verify the actual grants? + m.cfg.logger.Debug().Interface("share", s).Msg("share already exists") + return nil, nil, nil + } + + ts := utils.TSNow() + shareID := shareid.Encode(space.GetRoot().GetStorageId(), space.GetRoot().GetSpaceId(), uuid.NewString()) + + creator := grant.GetCreator() + if creator.Type == userpb.UserType_USER_TYPE_INVALID { + creator = nil + } + newShare := &collaboration.Share{ + Id: &collaboration.ShareId{OpaqueId: shareID}, + ResourceId: space.GetRoot(), + Permissions: &collaboration.SharePermissions{Permissions: grant.GetPermissions()}, + Grantee: grant.GetGrantee(), + Expiration: grant.GetExpiration(), + Owner: creator, + Creator: creator, + Ctime: ts, + Mtime: ts, + } + + var newReceivedShares []share.ReceivedShareWithUser + switch grant.GetGrantee().GetType() { + case provider.GranteeType_GRANTEE_TYPE_GROUP: + gwc, err := m.cfg.gatewaySelector.Next() + if err != nil { + m.cfg.logger.Error().Err(err).Msg("Failed to get gateway client") + return nil, nil, err + } + + gr, err := gwc.GetMembers(ctx, &grouppb.GetMembersRequest{ + GroupId: grant.GetGrantee().GetGroupId(), + }) + if err != nil { + m.cfg.logger.Error().Err(err).Msg("Failed to expand group membership") + return nil, nil, err + } + if gr.GetStatus().GetCode() != rpc.Code_CODE_OK { + m.cfg.logger.Error().Str("Status", gr.GetStatus().GetMessage()).Msg("Failed to expand group membership") + return nil, nil, errtypes.NewErrtypeFromStatus(gr.GetStatus()) + } + for _, u := range gr.GetMembers() { + newReceivedShares = append(newReceivedShares, share.ReceivedShareWithUser{ + UserID: u, + ReceivedShare: &collaboration.ReceivedShare{ + Share: newShare, + State: collaboration.ShareState_SHARE_STATE_ACCEPTED, + }, + }) + } + // Also add a group-level entry (UserID == nil) so the group cache is populated. + newReceivedShares = append(newReceivedShares, share.ReceivedShareWithUser{ + UserID: nil, + ReceivedShare: &collaboration.ReceivedShare{ + Share: newShare, + State: collaboration.ShareState_SHARE_STATE_ACCEPTED, + }, + }) + case provider.GranteeType_GRANTEE_TYPE_USER: + newReceivedShares = append(newReceivedShares, share.ReceivedShareWithUser{ + UserID: grant.GetGrantee().GetUserId(), + ReceivedShare: &collaboration.ReceivedShare{ + Share: newShare, + State: collaboration.ShareState_SHARE_STATE_ACCEPTED, + }, + }) + } + return newShare, newReceivedShares, nil +} + +// storageProviderForSpace resolves the storageprovider responsible for the +// given storage space and returns a dialled client. In the default opencloud +// deployment the storage registry is co-located with the gateway, so +// the GatewayAddr is used as the registry address. +func (m *ImportSpaceMembersMigration) storageProviderForSpace(ctx context.Context, space *provider.StorageSpace) (provider.ProviderAPIClient, error) { + + srClient, err := pool.GetStorageRegistryClient(m.cfg.providerRegistryAddr) + if err != nil { + return nil, fmt.Errorf("get storage registry client: %w", err) + } + + spaceJSON, err := json.Marshal(space) + if err != nil { + return nil, fmt.Errorf("marshal space: %w", err) + } + + res, err := srClient.GetStorageProviders(ctx, ®istry.GetStorageProvidersRequest{ + Opaque: &typesv1beta1.Opaque{ + Map: map[string]*typesv1beta1.OpaqueEntry{ + "space": { + Decoder: "json", + Value: spaceJSON, + }, + }, + }, + }) + if err != nil { + return nil, fmt.Errorf("GetStorageProviders: %w", err) + } + if len(res.GetProviders()) == 0 { + return nil, fmt.Errorf("no storage provider found for space %s", space.GetId().GetOpaqueId()) + } + + c, err := pool.GetStorageProviderServiceClient(res.GetProviders()[0].GetAddress()) + if err != nil { + return nil, fmt.Errorf("dial storage provider: %w", err) + } + return c, nil +} diff --git a/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers_test.go b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers_test.go new file mode 100644 index 00000000000..40bdc45726d --- /dev/null +++ b/pkg/share/manager/jsoncs3/migrations/0001_import_spacemembers_test.go @@ -0,0 +1,580 @@ +// Copyright 2026 OpenCloud GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "context" + "errors" + "os" + "sync" + + gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc" + + csmocks "github.com/opencloud-eu/reva/v2/tests/cs3mocks/mocks" + + "github.com/opencloud-eu/reva/v2/pkg/errtypes" + "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" + "github.com/opencloud-eu/reva/v2/pkg/share" + shareMocks "github.com/opencloud-eu/reva/v2/pkg/share/mocks" + "github.com/opencloud-eu/reva/v2/pkg/storage/utils/metadata" +) + +// ── helpers ──────────────────────────────────────────────────────────────────── + +// mockStorageProvider is a hand-written stub that satisfies the narrow +// storageProvider interface (only ListGrants). This avoids generating a full +// ProviderAPIClient mock. +type mockStorageProvider struct { + grants []*provider.Grant + err error + status *rpc.Status +} + +func (s *mockStorageProvider) ListGrants(_ context.Context, _ *provider.ListGrantsRequest, _ ...grpc.CallOption) (*provider.ListGrantsResponse, error) { + if s.err != nil { + return nil, s.err + } + st := s.status + if st == nil { + st = &rpc.Status{Code: rpc.Code_CODE_OK} + } + return &provider.ListGrantsResponse{Status: st, Grants: s.grants}, nil +} + +// mockLoader records all shares delivered to it through the channels. +type mockLoader struct { + shares []*collaboration.Share + received []share.ReceivedShareWithUser + err error // returned from Load; set before calling Migrate() +} + +func (l *mockLoader) Load(_ context.Context, sc <-chan *collaboration.Share, rc <-chan share.ReceivedShareWithUser) error { + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for s := range sc { + l.shares = append(l.shares, s) + } + }() + go func() { + defer wg.Done() + for r := range rc { + l.received = append(l.received, r) + } + }() + wg.Wait() + return l.err +} + +// okAuthenticateResponse returns an AuthenticateResponse with CODE_OK and a +// placeholder token, sufficient to let GetServiceUserContextWithContext proceed. +func okAuthenticateResponse() *gatewayv1beta1.AuthenticateResponse { + return &gatewayv1beta1.AuthenticateResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + Token: "test-token", + } +} + +// buildMigration creates an ImportSpaceMembersMigration wired to the supplied +// gateway mock, share-manager mock, and loader, using a temporary DiskStorage. +func buildMigration(tmpdir string, gwMock *csmocks.GatewayAPIClient, mgr *shareMocks.Manager, ldr share.LoadableManager) *ImportSpaceMembersMigration { + stor, err := metadata.NewDiskStorage(tmpdir) + Expect(err).NotTo(HaveOccurred()) + + pool.RemoveSelector("GatewaySelector" + "eu.opencloud.api.gateway") + gsel := pool.GetSelector[gatewayv1beta1.GatewayAPIClient]( + "GatewaySelector", + "eu.opencloud.api.gateway", + func(_ grpc.ClientConnInterface) gatewayv1beta1.GatewayAPIClient { + return gwMock + }, + ) + + m := &ImportSpaceMembersMigration{} + m.Initialize(config{ + logger: zerolog.Nop(), + gatewaySelector: gsel, + storage: stor, + serviceAccountID: "svc", + serviceAccountSecret: "secret", + manager: mgr, + loader: ldr, + }) + return m +} + +// ── retryOnUnavailable ──────────────────────────────────────────────────────── + +var _ = Describe("retryOnUnavailable", func() { + var log zerolog.Logger + + BeforeEach(func() { + log = zerolog.Nop() + }) + + It("returns nil when op succeeds immediately", func() { + calls := 0 + err := retryOnUnavailable(context.Background(), log, func() error { + calls++ + return nil + }) + Expect(err).NotTo(HaveOccurred()) + Expect(calls).To(Equal(1)) + }) + + It("retries on errtypes.Unavailable and succeeds when op eventually succeeds", func() { + calls := 0 + err := retryOnUnavailable(context.Background(), log, func() error { + calls++ + if calls < 3 { + return errtypes.Unavailable("ldap down") + } + return nil + }) + Expect(err).NotTo(HaveOccurred()) + Expect(calls).To(Equal(3)) + }) + + It("stops immediately on a permanent (non-Unavailable) error", func() { + sentinel := errors.New("permanent failure") + calls := 0 + err := retryOnUnavailable(context.Background(), log, func() error { + calls++ + return sentinel + }) + Expect(err).To(MatchError(sentinel)) + Expect(calls).To(Equal(1)) + }) + + It("returns an error when context is cancelled before op succeeds", func() { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // already cancelled + err := retryOnUnavailable(ctx, log, func() error { + return errtypes.Unavailable("ldap down") + }) + Expect(err).To(HaveOccurred()) + }) + +}) + +// ── resolveUserID ───────────────────────────────────────────────────────────── + +var _ = Describe("resolveUserID", func() { + var ( + tmpdir string + gwMock *csmocks.GatewayAPIClient + m *ImportSpaceMembersMigration + ) + + BeforeEach(func() { + var err error + tmpdir, err = os.MkdirTemp("", "resolve-user-*") + Expect(err).NotTo(HaveOccurred()) + + gwMock = &csmocks.GatewayAPIClient{} + m = buildMigration(tmpdir, gwMock, &shareMocks.Manager{}, &mockLoader{}) + }) + + AfterEach(func() { + os.RemoveAll(tmpdir) + }) + + Context("successful lookup", func() { + BeforeEach(func() { + gwMock.On("GetUser", mock.Anything, mock.MatchedBy(func(r *userpb.GetUserRequest) bool { + return r.UserId.OpaqueId == "alice" + })).Return(&userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + User: &userpb.User{Id: &userpb.UserId{OpaqueId: "alice", Idp: "idp.example.com", Type: userpb.UserType_USER_TYPE_PRIMARY}}, + }, nil) + }) + + It("returns the full UserId", func() { + id, err := m.resolveUserID(context.Background(), "alice") + Expect(err).NotTo(HaveOccurred()) + Expect(id.Idp).To(Equal("idp.example.com")) + Expect(id.OpaqueId).To(Equal("alice")) + }) + + It("caches the result and only calls the gateway once", func() { + _, _ = m.resolveUserID(context.Background(), "alice") + _, _ = m.resolveUserID(context.Background(), "alice") + gwMock.AssertNumberOfCalls(GinkgoT(), "GetUser", 1) + }) + }) + + Context("gateway returns CODE_NOT_FOUND", func() { + BeforeEach(func() { + gwMock.On("GetUser", mock.Anything, mock.Anything).Return(&userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_NOT_FOUND, Message: "no such user"}, + }, nil) + }) + + It("returns an error without retrying", func() { + _, err := m.resolveUserID(context.Background(), "ghost") + Expect(err).To(HaveOccurred()) + gwMock.AssertNumberOfCalls(GinkgoT(), "GetUser", 1) + }) + }) + + Context("gateway returns CODE_UNAVAILABLE on first call then succeeds", func() { + BeforeEach(func() { + calls := 0 + gwMock.On("GetUser", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *userpb.GetUserRequest, _ ...grpc.CallOption) *userpb.GetUserResponse { + calls++ + if calls == 1 { + return &userpb.GetUserResponse{Status: &rpc.Status{Code: rpc.Code_CODE_UNAVAILABLE, Message: "ldap down"}} + } + return &userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + User: &userpb.User{Id: &userpb.UserId{OpaqueId: "bob", Idp: "idp.example.com"}}, + } + }, + func(_ context.Context, _ *userpb.GetUserRequest, _ ...grpc.CallOption) error { return nil }, + ) + }) + + It("retries and eventually returns the resolved ID", func() { + id, err := m.resolveUserID(context.Background(), "bob") + Expect(err).NotTo(HaveOccurred()) + Expect(id.OpaqueId).To(Equal("bob")) + gwMock.AssertNumberOfCalls(GinkgoT(), "GetUser", 2) + }) + }) +}) + +// ── resolveGroupID ──────────────────────────────────────────────────────────── + +var _ = Describe("resolveGroupID", func() { + var ( + tmpdir string + gwMock *csmocks.GatewayAPIClient + m *ImportSpaceMembersMigration + ) + + BeforeEach(func() { + var err error + tmpdir, err = os.MkdirTemp("", "resolve-group-*") + Expect(err).NotTo(HaveOccurred()) + + gwMock = &csmocks.GatewayAPIClient{} + m = buildMigration(tmpdir, gwMock, &shareMocks.Manager{}, &mockLoader{}) + }) + + AfterEach(func() { + os.RemoveAll(tmpdir) + }) + + Context("successful lookup", func() { + BeforeEach(func() { + gwMock.On("GetGroup", mock.Anything, mock.MatchedBy(func(r *grouppb.GetGroupRequest) bool { + return r.GroupId.OpaqueId == "admins" + })).Return(&grouppb.GetGroupResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + Group: &grouppb.Group{Id: &grouppb.GroupId{OpaqueId: "admins", Idp: "idp.example.com"}}, + }, nil) + }) + + It("returns the full GroupId", func() { + id, err := m.resolveGroupID(context.Background(), "admins") + Expect(err).NotTo(HaveOccurred()) + Expect(id.Idp).To(Equal("idp.example.com")) + Expect(id.OpaqueId).To(Equal("admins")) + }) + + It("caches the result and only calls the gateway once", func() { + _, _ = m.resolveGroupID(context.Background(), "admins") + _, _ = m.resolveGroupID(context.Background(), "admins") + gwMock.AssertNumberOfCalls(GinkgoT(), "GetGroup", 1) + }) + }) + + Context("gateway returns CODE_NOT_FOUND", func() { + BeforeEach(func() { + gwMock.On("GetGroup", mock.Anything, mock.Anything).Return(&grouppb.GetGroupResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_NOT_FOUND, Message: "no such group"}, + }, nil) + }) + + It("returns an error without retrying", func() { + _, err := m.resolveGroupID(context.Background(), "ghost-group") + Expect(err).To(HaveOccurred()) + gwMock.AssertNumberOfCalls(GinkgoT(), "GetGroup", 1) + }) + }) +}) + +// ── spaceGrantToShares ──────────────────────────────────────────────────────── + +var _ = Describe("spaceGrantToShares", func() { + var ( + tmpdir string + gwMock *csmocks.GatewayAPIClient + mgrMock *shareMocks.Manager + m *ImportSpaceMembersMigration + + space *provider.StorageSpace + ) + + BeforeEach(func() { + var err error + tmpdir, err = os.MkdirTemp("", "grant-to-shares-*") + Expect(err).NotTo(HaveOccurred()) + + gwMock = &csmocks.GatewayAPIClient{} + mgrMock = &shareMocks.Manager{} + m = buildMigration(tmpdir, gwMock, mgrMock, &mockLoader{}) + + space = &provider.StorageSpace{ + Id: &provider.StorageSpaceId{OpaqueId: "space1"}, + Root: &provider.ResourceId{StorageId: "stor1", SpaceId: "space1", OpaqueId: "space1"}, + } + }) + + AfterEach(func() { + os.RemoveAll(tmpdir) + }) + + Context("user grant — share does not yet exist", func() { + BeforeEach(func() { + // resolveUserID + gwMock.On("GetUser", mock.Anything, mock.Anything).Return(&userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + User: &userpb.User{Id: &userpb.UserId{OpaqueId: "alice", Idp: "idp.example.com"}}, + }, nil) + // GetShare returns not-found so the share needs to be created + mgrMock.On("GetShare", mock.Anything, mock.Anything).Return(nil, errtypes.NotFound("not found")) + }) + + It("returns a new share and one received-share entry for the user", func() { + grant := &provider.Grant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &provider.Grantee_UserId{UserId: &userpb.UserId{OpaqueId: "alice"}}, + }, + Permissions: &provider.ResourcePermissions{}, + Creator: &userpb.UserId{OpaqueId: "owner", Idp: "idp.example.com", Type: userpb.UserType_USER_TYPE_PRIMARY}, + } + + sh, rs, err := m.spaceGrantToShares(context.Background(), grant, space) + Expect(err).NotTo(HaveOccurred()) + Expect(sh).NotTo(BeNil()) + Expect(rs).To(HaveLen(1)) + Expect(rs[0].UserID.OpaqueId).To(Equal("alice")) + }) + }) + + Context("user grant — share already exists", func() { + BeforeEach(func() { + gwMock.On("GetUser", mock.Anything, mock.Anything).Return(&userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + User: &userpb.User{Id: &userpb.UserId{OpaqueId: "alice", Idp: "idp.example.com"}}, + }, nil) + // GetShare returns an existing share + mgrMock.On("GetShare", mock.Anything, mock.Anything).Return(&collaboration.Share{}, nil) + }) + + It("returns nil share (skip) without error", func() { + grant := &provider.Grant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &provider.Grantee_UserId{UserId: &userpb.UserId{OpaqueId: "alice"}}, + }, + Permissions: &provider.ResourcePermissions{}, + } + + sh, _, err := m.spaceGrantToShares(context.Background(), grant, space) + Expect(err).NotTo(HaveOccurred()) + Expect(sh).To(BeNil()) + }) + }) + + Context("group grant — share does not yet exist", func() { + BeforeEach(func() { + // resolveGroupID + gwMock.On("GetGroup", mock.Anything, mock.Anything).Return(&grouppb.GetGroupResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + Group: &grouppb.Group{Id: &grouppb.GroupId{OpaqueId: "admins", Idp: "idp.example.com"}}, + }, nil) + // GetShare → not found + mgrMock.On("GetShare", mock.Anything, mock.Anything).Return(nil, errtypes.NotFound("not found")) + // GetMembers → two members + gwMock.On("GetMembers", mock.Anything, mock.Anything).Return(&grouppb.GetMembersResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + Members: []*userpb.UserId{{OpaqueId: "u1", Idp: "idp"}, {OpaqueId: "u2", Idp: "idp"}}, + }, nil) + }) + + It("returns a share and received-share entries for each member plus a group-level nil entry", func() { + grant := &provider.Grant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_GROUP, + Id: &provider.Grantee_GroupId{GroupId: &grouppb.GroupId{OpaqueId: "admins"}}, + }, + Permissions: &provider.ResourcePermissions{}, + Creator: &userpb.UserId{OpaqueId: "owner", Idp: "idp.example.com", Type: userpb.UserType_USER_TYPE_PRIMARY}, + } + + sh, rs, err := m.spaceGrantToShares(context.Background(), grant, space) + Expect(err).NotTo(HaveOccurred()) + Expect(sh).NotTo(BeNil()) + // 2 member entries + 1 group-level nil entry + Expect(rs).To(HaveLen(3)) + Expect(rs[2].UserID).To(BeNil()) + }) + }) +}) + +// ── Migrate (integration-level) ─────────────────────────────────────────────── + +var _ = Describe("ImportSpaceMembersMigration.Migrate", func() { + var ( + tmpdir string + gwMock *csmocks.GatewayAPIClient + mgrMock *shareMocks.Manager + loader *mockLoader + m *ImportSpaceMembersMigration + ) + + BeforeEach(func() { + var err error + tmpdir, err = os.MkdirTemp("", "migrate-*") + Expect(err).NotTo(HaveOccurred()) + + gwMock = &csmocks.GatewayAPIClient{} + mgrMock = &shareMocks.Manager{} + loader = &mockLoader{} + m = buildMigration(tmpdir, gwMock, mgrMock, loader) + + // Authenticate — needed by GetServiceUserContextWithContext + gwMock.On("Authenticate", mock.Anything, mock.Anything).Return( + okAuthenticateResponse(), nil) + }) + + AfterEach(func() { + os.RemoveAll(tmpdir) + }) + + Context("no project spaces exist", func() { + BeforeEach(func() { + gwMock.On("ListStorageSpaces", mock.Anything, mock.Anything).Return( + &provider.ListStorageSpacesResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + StorageSpaces: []*provider.StorageSpace{}, + }, nil) + }) + + It("completes without error and the loader records nothing", func() { + Expect(m.Migrate()).To(Succeed()) + Expect(loader.shares).To(BeEmpty()) + Expect(loader.received).To(BeEmpty()) + }) + }) + + Context("one space with one user grant", func() { + BeforeEach(func() { + space := &provider.StorageSpace{ + Id: &provider.StorageSpaceId{OpaqueId: "s1"}, + Root: &provider.ResourceId{StorageId: "stor", SpaceId: "s1", OpaqueId: "s1"}, + } + gwMock.On("ListStorageSpaces", mock.Anything, mock.Anything).Return( + &provider.ListStorageSpacesResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + StorageSpaces: []*provider.StorageSpace{space}, + }, nil) + + // injected provider — returns one grant + m.providerResolver = func(_ context.Context, _ *provider.StorageSpace) (storageProvider, error) { + return &mockStorageProvider{ + grants: []*provider.Grant{ + { + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &provider.Grantee_UserId{UserId: &userpb.UserId{OpaqueId: "alice"}}, + }, + Permissions: &provider.ResourcePermissions{}, + Creator: &userpb.UserId{OpaqueId: "owner", Idp: "idp", Type: userpb.UserType_USER_TYPE_PRIMARY}, + }, + }, + }, nil + } + + gwMock.On("GetUser", mock.Anything, mock.Anything).Return(&userpb.GetUserResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + User: &userpb.User{Id: &userpb.UserId{OpaqueId: "alice", Idp: "idp"}}, + }, nil) + + mgrMock.On("GetShare", mock.Anything, mock.Anything).Return(nil, errtypes.NotFound("")) + }) + + It("sends one share and one received-share to the loader", func() { + Expect(m.Migrate()).To(Succeed()) + Expect(loader.shares).To(HaveLen(1)) + Expect(loader.received).To(HaveLen(1)) + }) + }) + + Context("ListStorageSpaces returns an error", func() { + BeforeEach(func() { + gwMock.On("ListStorageSpaces", mock.Anything, mock.Anything).Return( + nil, errors.New("network error")) + }) + + It("returns an error", func() { + Expect(m.Migrate()).To(HaveOccurred()) + }) + }) + + Context("ListStorageSpaces returns a non-OK status", func() { + BeforeEach(func() { + gwMock.On("ListStorageSpaces", mock.Anything, mock.Anything).Return( + &provider.ListStorageSpacesResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL, Message: "storage crashed"}, + }, nil) + }) + + It("returns an error", func() { + Expect(m.Migrate()).To(HaveOccurred()) + }) + }) + + Context("loader returns an error", func() { + BeforeEach(func() { + loader.err = errors.New("load failure") + + gwMock.On("ListStorageSpaces", mock.Anything, mock.Anything).Return( + &provider.ListStorageSpacesResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_OK}, + StorageSpaces: []*provider.StorageSpace{}, + }, nil) + }) + + It("propagates the loader error", func() { + Expect(m.Migrate()).To(MatchError("load failure")) + }) + }) +}) diff --git a/pkg/share/manager/jsoncs3/migrations/migration.go b/pkg/share/manager/jsoncs3/migrations/migration.go new file mode 100644 index 00000000000..e30b4c81801 --- /dev/null +++ b/pkg/share/manager/jsoncs3/migrations/migration.go @@ -0,0 +1,349 @@ +// Copyright 2026 OpenCloud GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package migration + +import ( + "cmp" + "context" + "crypto/rand" + "encoding/json" + "fmt" + "slices" + "time" + + gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + "github.com/opencloud-eu/reva/v2/pkg/errtypes" + "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" + "github.com/opencloud-eu/reva/v2/pkg/share" + "github.com/opencloud-eu/reva/v2/pkg/storage/utils/metadata" + "github.com/rs/zerolog" +) + +const stateFile = "migrations/state.json" + +const ( + lockFile = "migrations/lock.json" + lockTTL = time.Minute + lockHeartbeatInterval = 20 * time.Second +) + +// lockPollInterval is how long acquireLock sleeps between retries when the +// lock is held by another instance. Declared as a variable so tests can +// shorten it without rebuilding. +var lockPollInterval = 5 * time.Second + +// lockData is the content written to the lock file. +type lockData struct { + Timestamp time.Time `json:"timestamp"` + InstanceID string `json:"instance_id"` +} + +type migration interface { + Name() string + Version() int + Initialize(config) + Migrate() error +} + +// persistedState is the on-disk representation of the migration state. +type persistedState struct { + Version int `json:"version"` +} + +type state struct { + version int +} + +// MigrationConfig holds all caller-supplied options for a migration run. +// It is intentionally a plain struct so that new fields can be added without +// changing function signatures throughout the call chain. +type MigrationConfig struct { + ServiceAccountID string + ServiceAccountSecret string + ProviderRegistryAddr string +} + +type config struct { + logger zerolog.Logger + gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient] + storage metadata.Storage + serviceAccountID string + serviceAccountSecret string + providerRegistryAddr string + manager share.Manager + loader share.LoadableManager +} + +type Migrations struct { + config + state state + instanceID string +} + +var migrations []migration + +// registerMigration is only supposed to be call from init(), which runs sequentially +// so we don't need ot protect migrations with a lock +func registerMigration(m migration) { + migrations = append(migrations, m) +} + +func New(logger zerolog.Logger, + gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], + storage metadata.Storage, + cfg MigrationConfig, + manager share.Manager, + loader share.LoadableManager, +) Migrations { + + slices.SortFunc(migrations, func(a, b migration) int { + return cmp.Compare(a.Version(), b.Version()) + }) + + b := make([]byte, 8) + _, _ = rand.Read(b) + instanceID := fmt.Sprintf("%x", b) + + return Migrations{ + config{ + logger: logger.With().Str("jsoncs3", "migrations").Logger(), + gatewaySelector: gatewaySelector, + storage: storage, + serviceAccountID: cfg.ServiceAccountID, + serviceAccountSecret: cfg.ServiceAccountSecret, + providerRegistryAddr: cfg.ProviderRegistryAddr, + manager: manager, + loader: loader, + }, + state{}, + instanceID, + } +} + +// acquireLock tries to atomically create the lock file, blocking until the lock +// is obtained. It returns the etag of the lock file on success. It retries +// indefinitely until ctx is cancelled. A lock whose timestamp is older than +// lockTTL is considered stale and will be taken over. +func (m *Migrations) acquireLock(ctx context.Context) (string, error) { + m.logger.Debug().Str("instance", m.instanceID).Msg("acquiring migration lock") + for { + // Fast path: create the lock file only if it does not exist yet. + data, err := json.Marshal(lockData{Timestamp: time.Now(), InstanceID: m.instanceID}) + if err != nil { + return "", err + } + res, err := m.storage.Upload(ctx, metadata.UploadRequest{ + Path: lockFile, + Content: data, + IfNoneMatch: []string{"*"}, + }) + if err == nil { + m.logger.Debug().Str("instance", m.instanceID).Msg("migration lock acquired") + return res.Etag, nil + } + + // Propagate context cancellation immediately. + select { + case <-ctx.Done(): + return "", ctx.Err() + default: + } + + // Any error other than a conflict means something unexpected happened. + if !isConflict(err) { + return "", err + } + + // Lock file already exists — read it to decide whether it is stale. + dl, err := m.storage.Download(ctx, metadata.DownloadRequest{Path: lockFile}) + if err != nil { + if _, ok := err.(errtypes.IsNotFound); ok { + // Lock was released between our upload attempt and the download; + // retry acquiring it immediately. + m.logger.Debug().Str("instance", m.instanceID).Msg("migration lock vanished during read; retrying") + continue + } + return "", err + } + + var existing lockData + stale := true + if err := json.Unmarshal(dl.Content, &existing); err == nil { + stale = time.Since(existing.Timestamp) > lockTTL + } + + if stale { + m.logger.Debug(). + Str("instance", m.instanceID). + Str("held_by", existing.InstanceID). + Time("lock_timestamp", existing.Timestamp). + Msg("migration lock is stale; attempting takeover") + + // Atomically take over the stale lock using the etag we just read. + newData, err := json.Marshal(lockData{Timestamp: time.Now(), InstanceID: m.instanceID}) + if err != nil { + return "", err + } + res, err := m.storage.Upload(ctx, metadata.UploadRequest{ + Path: lockFile, + Content: newData, + IfMatchEtag: dl.Etag, + }) + if err == nil { + m.logger.Debug().Str("instance", m.instanceID).Msg("migration lock acquired via stale takeover") + return res.Etag, nil + } + // Another instance took the stale lock before us; loop and retry. + m.logger.Debug().Str("instance", m.instanceID).Err(err).Msg("stale lock takeover lost race; retrying") + continue + } + + m.logger.Debug(). + Str("instance", m.instanceID). + Str("held_by", existing.InstanceID). + Time("lock_timestamp", existing.Timestamp). + Dur("poll_interval", lockPollInterval). + Msg("migration lock held by another instance; waiting") + + // Lock is fresh and held by another instance; wait before retrying. + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(lockPollInterval): + } + } +} + +// startHeartbeat spawns a goroutine that periodically renews the lock file so +// that it is not considered stale while a long migration is running. Call the +// returned cancel function to stop the heartbeat. +func (m *Migrations) startHeartbeat(ctx context.Context, etag string) context.CancelFunc { + hbCtx, cancel := context.WithCancel(ctx) + go func() { + ticker := time.NewTicker(lockHeartbeatInterval) + defer ticker.Stop() + for { + select { + case <-hbCtx.Done(): + return + case <-ticker.C: + data, err := json.Marshal(lockData{Timestamp: time.Now(), InstanceID: m.instanceID}) + if err != nil { + m.logger.Warn().Err(err).Msg("failed to marshal heartbeat data for migration lock") + return + } + res, err := m.storage.Upload(hbCtx, metadata.UploadRequest{ + Path: lockFile, + Content: data, + IfMatchEtag: etag, + }) + if err != nil { + m.logger.Warn().Err(err).Msg("failed to renew migration lock; another instance may take over") + return + } + etag = res.Etag + } + } + }() + return cancel +} + +// releaseLock deletes the lock file unconditionally. +func (m *Migrations) releaseLock(ctx context.Context) { + if err := m.storage.Delete(ctx, lockFile); err != nil { + m.logger.Warn().Err(err).Msg("failed to release migration lock") + } +} + +// isConflict returns true for errors that signal a conditional-upload conflict, +// i.e. the lock file already exists or the etag did not match. +func isConflict(err error) bool { + switch err.(type) { + case errtypes.IsAlreadyExists, errtypes.IsAborted, errtypes.IsPreconditionFailed: + return true + } + return false +} + +// loadState reads the persisted migration version from storage. If no state +// file exists yet (fresh deployment) it returns version 0 without error. +func (m *Migrations) loadState(ctx context.Context) error { + data, err := m.storage.SimpleDownload(ctx, stateFile) + if err != nil { + if _, ok := err.(errtypes.IsNotFound); ok { + m.state = state{version: 0} + return nil + } + return err + } + var ps persistedState + if err := json.Unmarshal(data, &ps); err != nil { + return err + } + m.state = state{version: ps.Version} + return nil +} + +// saveState writes the current migration version to storage so that already- +// applied migrations are not re-run on the next server start. +func (m *Migrations) saveState(ctx context.Context) error { + data, err := json.Marshal(persistedState{Version: m.state.version}) + if err != nil { + return err + } + return m.storage.SimpleUpload(ctx, stateFile, data) +} + +func (m *Migrations) RunMigrations() { + ctx := context.Background() + + etag, err := m.acquireLock(ctx) + if err != nil { + m.logger.Error().Err(err).Msg("failed to acquire migration lock; skipping migrations") + return + } + cancelHB := m.startHeartbeat(ctx, etag) + defer cancelHB() + defer m.releaseLock(ctx) + + if err := m.loadState(ctx); err != nil { + m.logger.Error().Err(err).Msg("failed to load migration state; skipping migrations") + return + } + + m.logger.Info().Int("current state", m.state.version).Msg("checking migrations") + + for _, mig := range migrations { + if mig.Version() > m.state.version { + m.logger.Info().Str("migration", mig.Name()).Int("version", mig.Version()).Msg("running migration") + mig.Initialize(m.config) + if err := mig.Migrate(); err != nil { + m.logger.Error().Err(err).Str("migration", mig.Name()).Msg("migration failed; stopping") + return + } + m.state.version = mig.Version() + if err := m.saveState(ctx); err != nil { + m.logger.Error().Err(err).Msg("failed to save migration state; stopping") + return + } + } else { + m.logger.Info().Str("migration", mig.Name()).Int("version", mig.Version()).Msg("skipping migration") + } + } +} diff --git a/pkg/share/manager/jsoncs3/migrations/migration_state_test.go b/pkg/share/manager/jsoncs3/migrations/migration_state_test.go new file mode 100644 index 00000000000..0c624a3025d --- /dev/null +++ b/pkg/share/manager/jsoncs3/migrations/migration_state_test.go @@ -0,0 +1,358 @@ +// Copyright 2026 OpenCloud GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "context" + "encoding/json" + "errors" + "os" + "sync" + "sync/atomic" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/rs/zerolog" + + "github.com/opencloud-eu/reva/v2/pkg/storage/utils/metadata" +) + +// fakeMigration is a configurable stub that satisfies the migration interface. +type fakeMigration struct { + name string + version int + err error // non-nil → Migrate() returns this error + called bool +} + +func (f *fakeMigration) Name() string { return f.name } +func (f *fakeMigration) Version() int { return f.version } +func (f *fakeMigration) Initialize(_ config) {} +func (f *fakeMigration) Migrate() error { f.called = true; return f.err } + +// countingMigration is a migration stub that invokes a callback on every +// Migrate() call, allowing callers to observe how many times it ran. +type countingMigration struct { + name string + version int + onCall func() +} + +func (c *countingMigration) Name() string { return c.name } +func (c *countingMigration) Version() int { return c.version } +func (c *countingMigration) Initialize(_ config) {} +func (c *countingMigration) Migrate() error { c.onCall(); return nil } + +// marshalLockData is a test helper that JSON-encodes a lockData value. +func marshalLockData(d lockData) ([]byte, error) { + return json.Marshal(d) +} + +var _ = Describe("RunMigrations / loadState / saveState", func() { + var ( + tmpdir string + stor metadata.Storage + m Migrations + log zerolog.Logger + + // saved & restored around each test so our fakeMigrations don't leak + originalMigrations []migration + ) + + BeforeEach(func() { + var err error + tmpdir, err = os.MkdirTemp("", "migration-state-test-*") + Expect(err).NotTo(HaveOccurred()) + + stor, err = metadata.NewDiskStorage(tmpdir) + Expect(err).NotTo(HaveOccurred()) + + // Replicate the directory setup that jsoncs3 Manager.initialize() does + // so that tests driving storage directly behave like production. + Expect(stor.MakeDirIfNotExist(context.Background(), "migrations")).To(Succeed()) + + log = zerolog.Nop() + m = Migrations{ + config: config{ + logger: log, + storage: stor, + }, + } + + originalMigrations = migrations + migrations = nil + }) + + AfterEach(func() { + migrations = originalMigrations + os.RemoveAll(tmpdir) + }) + + // ── loadState ────────────────────────────────────────────────────────────── + + Describe("loadState", func() { + Context("when no state file exists yet", func() { + It("returns version 0 without error (fresh deployment)", func() { + Expect(m.loadState(context.Background())).To(Succeed()) + Expect(m.state.version).To(Equal(0)) + }) + }) + + Context("when the state file contains valid JSON", func() { + BeforeEach(func() { + ctx := context.Background() + Expect(stor.MakeDirIfNotExist(ctx, "migrations")).To(Succeed()) + Expect(stor.SimpleUpload(ctx, stateFile, []byte(`{"version":3}`))).To(Succeed()) + }) + + It("restores the persisted version", func() { + Expect(m.loadState(context.Background())).To(Succeed()) + Expect(m.state.version).To(Equal(3)) + }) + }) + + Context("when the state file contains invalid JSON", func() { + BeforeEach(func() { + ctx := context.Background() + Expect(stor.MakeDirIfNotExist(ctx, "migrations")).To(Succeed()) + Expect(stor.SimpleUpload(ctx, stateFile, []byte(`not-json`))).To(Succeed()) + }) + + It("returns an error", func() { + Expect(m.loadState(context.Background())).To(HaveOccurred()) + }) + }) + }) + + // ── saveState ────────────────────────────────────────────────────────────── + + Describe("saveState", func() { + It("persists the current version and can be round-tripped by loadState", func() { + m.state.version = 7 + Expect(m.saveState(context.Background())).To(Succeed()) + + m2 := Migrations{config: config{logger: log, storage: stor}} + Expect(m2.loadState(context.Background())).To(Succeed()) + Expect(m2.state.version).To(Equal(7)) + }) + }) + + // ── RunMigrations ────────────────────────────────────────────────────────── + + Describe("RunMigrations", func() { + Context("when there are no registered migrations", func() { + It("completes without error and leaves version at 0", func() { + m.RunMigrations() + Expect(m.state.version).To(Equal(0)) + }) + }) + + Context("when no migration has been applied yet", func() { + var ( + mig1 *fakeMigration + mig2 *fakeMigration + ) + + BeforeEach(func() { + mig1 = &fakeMigration{name: "first", version: 1} + mig2 = &fakeMigration{name: "second", version: 2} + migrations = []migration{mig1, mig2} + }) + + It("runs all migrations in order and saves the latest version", func() { + m.RunMigrations() + Expect(mig1.called).To(BeTrue()) + Expect(mig2.called).To(BeTrue()) + Expect(m.state.version).To(Equal(2)) + }) + }) + + Context("when the state is already at version 1", func() { + var ( + mig1 *fakeMigration + mig2 *fakeMigration + ) + + BeforeEach(func() { + // Persist version 1 so loadState inside RunMigrations picks it up. + m.state.version = 1 + Expect(m.saveState(context.Background())).To(Succeed()) + m.state.version = 0 // reset; RunMigrations will reload from storage + + mig1 = &fakeMigration{name: "first", version: 1} + mig2 = &fakeMigration{name: "second", version: 2} + migrations = []migration{mig1, mig2} + }) + + It("skips the already-applied migration and only runs the pending one", func() { + m.RunMigrations() + Expect(mig1.called).To(BeFalse()) + Expect(mig2.called).To(BeTrue()) + Expect(m.state.version).To(Equal(2)) + }) + }) + + Context("when a migration returns an error", func() { + var ( + mig1 *fakeMigration + mig2 *fakeMigration + ) + + BeforeEach(func() { + mig1 = &fakeMigration{name: "broken", version: 1, err: errors.New("boom")} + mig2 = &fakeMigration{name: "later", version: 2} + migrations = []migration{mig1, mig2} + }) + + It("stops after the failing migration and does not run subsequent ones", func() { + m.RunMigrations() + Expect(mig1.called).To(BeTrue()) + Expect(mig2.called).To(BeFalse()) + // version must not advance past the failed migration + Expect(m.state.version).To(Equal(0)) + }) + }) + + Context("when all migrations are already applied (state == highest version)", func() { + var mig *fakeMigration + + BeforeEach(func() { + // Persist version 5 so loadState inside RunMigrations picks it up. + m.state.version = 5 + Expect(m.saveState(context.Background())).To(Succeed()) + m.state.version = 0 + + mig = &fakeMigration{name: "done", version: 5} + migrations = []migration{mig} + }) + + It("skips every migration", func() { + m.RunMigrations() + Expect(mig.called).To(BeFalse()) + }) + }) + }) + + // ── Locking ──────────────────────────────────────────────────────────────── + + Describe("distributed lock", func() { + // Use a very short poll interval so the tests complete quickly. + var origPollInterval time.Duration + BeforeEach(func() { + origPollInterval = lockPollInterval + lockPollInterval = 10 * time.Millisecond + + m.instanceID = "instance-a" + }) + AfterEach(func() { + lockPollInterval = origPollInterval + }) + + Context("acquireLock", func() { + It("succeeds and returns a non-empty etag when no lock exists", func() { + etag, err := m.acquireLock(context.Background()) + Expect(err).NotTo(HaveOccurred()) + Expect(etag).NotTo(BeEmpty()) + }) + + It("blocks until the first holder releases the lock", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Instance A acquires the lock. + _, err := m.acquireLock(ctx) + Expect(err).NotTo(HaveOccurred()) + + // Instance B tries to acquire in the background. + m2 := Migrations{config: config{logger: log, storage: stor}, instanceID: "instance-b"} + acquired := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := m2.acquireLock(ctx) + Expect(err).NotTo(HaveOccurred()) + close(acquired) + }() + + // B must not acquire while A still holds the lock. + Consistently(acquired, 50*time.Millisecond, 10*time.Millisecond).ShouldNot(BeClosed()) + + // A releases; B should now acquire. + m.releaseLock(context.Background()) + Eventually(acquired, 500*time.Millisecond, 10*time.Millisecond).Should(BeClosed()) + }) + + It("immediately takes over a stale lock", func() { + // Write a lock file whose timestamp is well past lockTTL. + staleData, err := marshalLockData(lockData{ + Timestamp: time.Now().Add(-2 * lockTTL), + InstanceID: "crashed-instance", + }) + Expect(err).NotTo(HaveOccurred()) + Expect(stor.SimpleUpload(context.Background(), lockFile, staleData)).To(Succeed()) + + // acquireLock should detect the stale lock and take it over without waiting. + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + etag, err := m.acquireLock(context.Background()) + Expect(err).NotTo(HaveOccurred()) + Expect(etag).NotTo(BeEmpty()) + close(done) + }() + Eventually(done, 500*time.Millisecond, 10*time.Millisecond).Should(BeClosed()) + }) + }) + + Context("releaseLock", func() { + It("removes the lock file so a second caller can acquire immediately", func() { + ctx := context.Background() + _, err := m.acquireLock(ctx) + Expect(err).NotTo(HaveOccurred()) + + m.releaseLock(ctx) + + // A second instance should acquire without delay. + m2 := Migrations{config: config{logger: log, storage: stor}, instanceID: "instance-b"} + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := m2.acquireLock(ctx) + Expect(err).NotTo(HaveOccurred()) + close(done) + }() + Eventually(done, 500*time.Millisecond, 10*time.Millisecond).Should(BeClosed()) + }) + }) + + Context("RunMigrations with two concurrent instances", func() { + It("runs each pending migration exactly once", func() { + var callCount atomic.Int32 + mig := &countingMigration{name: "once", version: 1, onCall: func() { callCount.Add(1) }} + migrations = []migration{mig} + + m2 := Migrations{config: config{logger: log, storage: stor}, instanceID: "instance-b"} + + var wg sync.WaitGroup + wg.Add(2) + go func() { defer wg.Done(); m.RunMigrations() }() + go func() { defer wg.Done(); m2.RunMigrations() }() + wg.Wait() + + Expect(callCount.Load()).To(Equal(int32(1))) + }) + }) + }) +}) diff --git a/pkg/share/manager/jsoncs3/migrations/migrations_suite_test.go b/pkg/share/manager/jsoncs3/migrations/migrations_suite_test.go new file mode 100644 index 00000000000..eccce746f0c --- /dev/null +++ b/pkg/share/manager/jsoncs3/migrations/migrations_suite_test.go @@ -0,0 +1,27 @@ +// Copyright 2026 OpenCloud GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestMigrations(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Migrations Suite") +} diff --git a/pkg/share/manager/memory/memory.go b/pkg/share/manager/memory/memory.go index 8ade5926f10..0f34d3a68b7 100644 --- a/pkg/share/manager/memory/memory.go +++ b/pkg/share/manager/memory/memory.go @@ -28,6 +28,7 @@ import ( ctxpkg "github.com/opencloud-eu/reva/v2/pkg/ctx" "github.com/opencloud-eu/reva/v2/pkg/share" + "github.com/rs/zerolog" "google.golang.org/genproto/protobuf/field_mask" userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" @@ -46,7 +47,7 @@ func init() { } // New returns a new manager. -func New(c map[string]interface{}) (share.Manager, error) { +func New(c map[string]any, _ *zerolog.Logger) (share.Manager, error) { state := map[string]map[*collaboration.ShareId]collaboration.ShareState{} mp := map[string]map[*collaboration.ShareId]*provider.Reference{} return &manager{ diff --git a/pkg/share/manager/registry/registry.go b/pkg/share/manager/registry/registry.go index 16fc627ffff..6b22beb7799 100644 --- a/pkg/share/manager/registry/registry.go +++ b/pkg/share/manager/registry/registry.go @@ -18,11 +18,14 @@ package registry -import "github.com/opencloud-eu/reva/v2/pkg/share" +import ( + "github.com/opencloud-eu/reva/v2/pkg/share" + "github.com/rs/zerolog" +) // NewFunc is the function that share managers // should register at init time. -type NewFunc func(map[string]interface{}) (share.Manager, error) +type NewFunc func(map[string]any, *zerolog.Logger) (share.Manager, error) // NewFuncs is a map containing all the registered share managers. var NewFuncs = map[string]NewFunc{} diff --git a/pkg/storage/pkg/decomposedfs/node/permissions.go b/pkg/storage/pkg/decomposedfs/node/permissions.go index 6b1beb37d3a..9fe2a459fa9 100644 --- a/pkg/storage/pkg/decomposedfs/node/permissions.go +++ b/pkg/storage/pkg/decomposedfs/node/permissions.go @@ -101,6 +101,7 @@ func ServiceAccountPermissions() *provider.ResourcePermissions { Delete: true, // for cli restore command with replace option CreateContainer: true, // for space provisioning AddGrant: true, // for initial project space member assignment + ListGrants: true, // for initial project space member assignment } } diff --git a/pkg/storage/utils/metadata/disk.go b/pkg/storage/utils/metadata/disk.go index 6eae43e6381..5caef632d30 100644 --- a/pkg/storage/utils/metadata/disk.go +++ b/pkg/storage/utils/metadata/disk.go @@ -93,6 +93,40 @@ func (disk *Disk) SimpleUpload(ctx context.Context, uploadpath string, content [ // Upload stores a file on disk func (disk *Disk) Upload(_ context.Context, req UploadRequest) (*UploadResponse, error) { p := disk.targetPath(req.Path) + + // IfNoneMatch: ["*"] means create the file only if it does not already + // exist. Use O_EXCL so the check and the create are atomic on the local + // filesystem. + for _, tag := range req.IfNoneMatch { + if tag != "*" { + continue + } + f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0644) + if err != nil { + if errors.Is(err, os.ErrExist) { + return nil, errtypes.AlreadyExists(p) + } + return nil, err + } + if _, err := f.Write(req.Content); err != nil { + _ = f.Close() + return nil, err + } + if err := f.Close(); err != nil { + return nil, err + } + info, err := os.Stat(p) + if err != nil { + return nil, err + } + res := &UploadResponse{} + res.Etag, err = calcEtag(info.ModTime(), info.Size()) + if err != nil { + return nil, err + } + return res, nil + } + if req.IfMatchEtag != "" { info, err := os.Stat(p) if err != nil && !errors.Is(err, os.ErrNotExist) { @@ -170,7 +204,10 @@ func (disk *Disk) Download(_ context.Context, req DownloadRequest) (*DownloadRes // SimpleDownload reads a file from disk func (disk *Disk) SimpleDownload(ctx context.Context, downloadpath string) ([]byte, error) { res, err := disk.Download(ctx, DownloadRequest{Path: downloadpath}) - return res.Content, err + if err != nil { + return nil, err + } + return res.Content, nil } // Delete deletes a path