diff --git a/share/model.go b/share/model.go index 8521c7f..772ea54 100644 --- a/share/model.go +++ b/share/model.go @@ -42,7 +42,6 @@ type ProtoShare struct { Permissions uint8 Orphan bool Expiration datatypes.NullTime - PreviousID uint `gorm:"uniqueIndex"` } type Share struct { diff --git a/share/sql/migrate.go b/share/sql/migrate.go index 018b8db..82e97ac 100644 --- a/share/sql/migrate.go +++ b/share/sql/migrate.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "os" + "sync" "time" model "github.com/cernbox/reva-plugins/share" @@ -60,7 +61,7 @@ type OldShareState struct { const ( bufferSize = 10 - numWorkers = 10 + numWorkers = 5 ) func RunMigration(username, password, host, name, gatewaysvc, token string, port int, dryRun bool) { @@ -75,6 +76,7 @@ func RunMigration(username, password, host, name, gatewaysvc, token string, port "gatewaysvc": gatewaysvc, "dry_run": dryRun, } + // Authenticate to gateway service tokenlessCtx, cancel := context.WithCancel(context.Background()) ctx := appctx.ContextSetToken(tokenlessCtx, token) @@ -103,6 +105,8 @@ func RunMigration(username, password, host, name, gatewaysvc, token string, port migrator.NewDb = migrator.NewDb.Debug() } + migrator.NewDb.AutoMigrate(&model.Share{}, &model.PublicLink{}, &model.ShareState{}) + migrateShares(ctx, migrator) fmt.Println("---------------------------------") migrateShareStatuses(ctx, migrator) @@ -131,11 +135,11 @@ func migrateShares(ctx context.Context, migrator Migrator) { // Create channel for workers ch := make(chan *OldShareEntry, bufferSize) - defer close(ch) + var wg sync.WaitGroup // Start all workers for range numWorkers { - go workerShare(ctx, migrator, ch) + go workerShare(ctx, migrator, ch, &wg) } for res.Next() { @@ -147,6 +151,9 @@ func migrateShares(ctx context.Context, migrator Migrator) { fmt.Printf("Error occured for share %d: %s\n", s.ID, err.Error()) } } + + close(ch) + wg.Wait() } func migrateShareStatuses(ctx context.Context, migrator Migrator) { @@ -171,11 +178,12 @@ func migrateShareStatuses(ctx context.Context, migrator Migrator) { // Create channel for workers ch := make(chan *OldShareState, bufferSize) - defer close(ch) + + var wg sync.WaitGroup // Start all workers for range numWorkers { - go workerState(ctx, migrator, ch) + go workerState(ctx, migrator, ch, &wg) } for res.Next() { @@ -187,18 +195,24 @@ func migrateShareStatuses(ctx context.Context, migrator Migrator) { fmt.Printf("Error occured for share status%d: %s\n", s.id, err.Error()) } } + close(ch) + wg.Wait() } -func workerShare(ctx context.Context, migrator Migrator, ch chan *OldShareEntry) { +func workerShare(ctx context.Context, migrator Migrator, ch chan *OldShareEntry, wg *sync.WaitGroup) { + wg.Add(1) for share := range ch { handleSingleShare(ctx, migrator, share) } + wg.Done() } -func workerState(ctx context.Context, migrator Migrator, ch chan *OldShareState) { +func workerState(ctx context.Context, migrator Migrator, ch chan *OldShareState, wg *sync.WaitGroup) { + wg.Add(1) for state := range ch { handleSingleState(ctx, migrator, state) } + wg.Done() } func handleSingleShare(ctx context.Context, migrator Migrator, s *OldShareEntry) { @@ -222,12 +236,9 @@ func handleSingleShare(ctx context.Context, migrator Migrator, s *OldShareEntry) func handleSingleState(ctx context.Context, migrator Migrator, s *OldShareState) { newShareState := &model.ShareState{ ShareID: uint(s.id), - Model: gorm.Model{ - ID: uint(s.id), - }, - User: s.recipient, - Hidden: s.state == -1, // Hidden if REJECTED - Synced: false, + User: s.recipient, + Hidden: s.state == -1, // Hidden if REJECTED + Synced: false, } res := migrator.NewDb.Create(&newShareState) if res.Error != nil { @@ -267,6 +278,7 @@ func oldShareToNewShare(ctx context.Context, migrator Migrator, s *OldShareEntry if err == nil { protoShare.InitialPath = path } else if errors.Is(err, errtypes.NotFound(protoShare.Inode)) { + fmt.Printf("Marked share %d as an orphan (%s, %s)\n", s.ID, protoShare.Instance, protoShare.Inode) protoShare.Orphan = true } else { // We do not set, because of a general error