diff --git a/cmd/migrator.go b/cmd/migrator.go index 14d312f..f7c1973 100644 --- a/cmd/migrator.go +++ b/cmd/migrator.go @@ -15,9 +15,10 @@ func main() { name := flag.String("name", "cernboxngcopy", "Database name") gatewaysvc := flag.String("gatewaysvc", "localhost:9142", "Gateway service location") token := flag.String("token", "", "JWT token for gateway svc") + dryRun := flag.Bool("dryrun", true, "Use dry run?") flag.Parse() fmt.Printf("Connecting to %s@%s:%d\n", *username, *host, *port) - sql.RunMigration(*username, *password, *host, *name, *gatewaysvc, *token, *port) + sql.RunMigration(*username, *password, *host, *name, *gatewaysvc, *token, *port, *dryRun) } diff --git a/go.mod b/go.mod index d25d1f3..e691c45 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/cernbox/reva-plugins -go 1.21.0 +go 1.22.7 require ( github.com/Masterminds/sprig v2.22.0+incompatible diff --git a/share/sql/migrate.go b/share/sql/migrate.go index 04f60ca..bb180a3 100644 --- a/share/sql/migrate.go +++ b/share/sql/migrate.go @@ -29,7 +29,36 @@ type ShareOrLink struct { Link *model.PublicLink } -func RunMigration(username, password, host, name, gatewaysvc, token string, port int) { +type OldShareEntry struct { + ID int + UIDOwner string + UIDInitiator string + Prefix string + ItemSource string + ItemType string + ShareWith string + Token string + Expiration string + Permissions int + ShareType int + ShareName string + STime int + FileTarget string + State int + Quicklink bool + Description string + NotifyUploads bool + NotifyUploadsExtraRecipients sql.NullString + Orphan bool +} + +const ( + bufferSize = 10 + numWorkers = 10 +) + +func RunMigration(username, password, host, name, gatewaysvc, token string, port int, dryRun bool) { + // Config config := map[string]interface{}{ "engine": "mysql", "db_username": username, @@ -38,13 +67,15 @@ func RunMigration(username, password, host, name, gatewaysvc, token string, port "db_port": port, "db_name": name, "gatewaysvc": gatewaysvc, - "dry_run": false, + "dry_run": dryRun, } + // Authenticate to gateway service tokenlessCtx, cancel := context.WithCancel(context.Background()) ctx := appctx.ContextSetToken(tokenlessCtx, token) ctx = metadata.AppendToOutgoingContext(ctx, appctx.TokenHeader, token) defer cancel() + // Set up migrator shareManager, err := New(ctx, config) if err != nil { fmt.Println("Failed to create shareManager: " + err.Error()) @@ -62,31 +93,15 @@ func RunMigration(username, password, host, name, gatewaysvc, token string, port ShareMgr: sharemgr, } - ch := make(chan *ShareOrLink, 100) - go getAllShares(ctx, migrator, ch) - for share := range ch { - // TODO error handling - if share.IsShare { - fmt.Printf("Creating share %d\n", share.Share.ID) - migrator.NewDb.Create(&share.Share) - } else { - fmt.Printf("Creating share %d\n", share.Link.ID) - migrator.NewDb.Create(&share.Link) - } - } - -} - -func getAllShares(ctx context.Context, migrator Migrator, ch chan *ShareOrLink) { - // First we find out what the highest ID is + // Check how many shares are to be migrated count, err := getCount(migrator) if err != nil { fmt.Println("Error getting highest id: " + err.Error()) - close(ch) return } fmt.Printf("Migrating %d shares\n", count) + // Get all old shares query := "select id, coalesce(uid_owner, '') as uid_owner, coalesce(uid_initiator, '') as uid_initiator, lower(coalesce(share_with, '')) as share_with, coalesce(fileid_prefix, '') as fileid_prefix, coalesce(item_source, '') as item_source, coalesce(item_type, '') as item_type, stime, permissions, share_type, orphan FROM oc_share order by id desc" // AND id=?" params := []interface{}{} @@ -94,48 +109,52 @@ func getAllShares(ctx context.Context, migrator Migrator, ch chan *ShareOrLink) if err != nil { fmt.Printf("Fatal error: %s", err.Error()) - close(ch) - return + os.Exit(1) + } + + // Create channel for workers + ch := make(chan *OldShareEntry, bufferSize) + defer close(ch) + + // Start all workers + for range numWorkers { + go worker(ctx, migrator, ch) } for res.Next() { var s OldShareEntry res.Scan(&s.ID, &s.UIDOwner, &s.UIDInitiator, &s.ShareWith, &s.Prefix, &s.ItemSource, &s.ItemType, &s.STime, &s.Permissions, &s.ShareType, &s.Orphan) - newShare, err := oldShareToNewShare(ctx, migrator, s) if err == nil { - ch <- newShare + ch <- &s } else { - fmt.Printf("Error occured for share %s: %s\n", s.ID, err.Error()) + fmt.Printf("Error occured for share %d: %s\n", s.ID, err.Error()) } } - close(ch) } -type OldShareEntry struct { - ID int - UIDOwner string - UIDInitiator string - Prefix string - ItemSource string - ItemType string - ShareWith string - Token string - Expiration string - Permissions int - ShareType int - ShareName string - STime int - FileTarget string - State int - Quicklink bool - Description string - NotifyUploads bool - NotifyUploadsExtraRecipients sql.NullString - Orphan bool +func worker(ctx context.Context, migrator Migrator, ch chan *OldShareEntry) { + for share := range ch { + handleSingleShare(ctx, migrator, share) + } +} + +func handleSingleShare(ctx context.Context, migrator Migrator, s *OldShareEntry) { + share, err := oldShareToNewShare(ctx, migrator, s) + if err != nil { + return + } + // TODO error handling + if share.IsShare { + fmt.Printf("Creating share %d\n", share.Share.ID) + migrator.NewDb.Create(&share.Share) + } else { + fmt.Printf("Creating share %d\n", share.Link.ID) + migrator.NewDb.Create(&share.Link) + } } -func oldShareToNewShare(ctx context.Context, migrator Migrator, s OldShareEntry) (*ShareOrLink, error) { +func oldShareToNewShare(ctx context.Context, migrator Migrator, s *OldShareEntry) (*ShareOrLink, error) { expirationDate, expirationError := time.Parse("2006-01-02 15:04:05", s.Expiration) protoShare := model.ProtoShare{