Skip to content

Commit

Permalink
Creating worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
jessegeens committed Jan 9, 2025
1 parent 0eb56f4 commit 9a7baaf
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 50 deletions.
3 changes: 2 additions & 1 deletion cmd/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ func main() {
name := flag.String("name", "cernboxngcopy", "Database name")
gatewaysvc := flag.String("gatewaysvc", "localhost:9142", "Gateway service location")
token := flag.String("token", "", "JWT token for gateway svc")
dryRun := flag.Bool("dryrun", true, "Use dry run?")

flag.Parse()

fmt.Printf("Connecting to %s@%s:%d\n", *username, *host, *port)
sql.RunMigration(*username, *password, *host, *name, *gatewaysvc, *token, *port)
sql.RunMigration(*username, *password, *host, *name, *gatewaysvc, *token, *port, *dryRun)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/cernbox/reva-plugins

go 1.21.0
go 1.22.7

require (
github.com/Masterminds/sprig v2.22.0+incompatible
Expand Down
115 changes: 67 additions & 48 deletions share/sql/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,36 @@ type ShareOrLink struct {
Link *model.PublicLink
}

func RunMigration(username, password, host, name, gatewaysvc, token string, port int) {
type OldShareEntry struct {
ID int
UIDOwner string
UIDInitiator string
Prefix string
ItemSource string
ItemType string
ShareWith string
Token string
Expiration string
Permissions int
ShareType int
ShareName string
STime int
FileTarget string
State int
Quicklink bool
Description string
NotifyUploads bool
NotifyUploadsExtraRecipients sql.NullString
Orphan bool
}

const (
bufferSize = 10
numWorkers = 10
)

func RunMigration(username, password, host, name, gatewaysvc, token string, port int, dryRun bool) {
// Config
config := map[string]interface{}{
"engine": "mysql",
"db_username": username,
Expand All @@ -38,13 +67,15 @@ func RunMigration(username, password, host, name, gatewaysvc, token string, port
"db_port": port,
"db_name": name,
"gatewaysvc": gatewaysvc,
"dry_run": false,
"dry_run": dryRun,
}
// Authenticate to gateway service
tokenlessCtx, cancel := context.WithCancel(context.Background())
ctx := appctx.ContextSetToken(tokenlessCtx, token)
ctx = metadata.AppendToOutgoingContext(ctx, appctx.TokenHeader, token)
defer cancel()

// Set up migrator
shareManager, err := New(ctx, config)
if err != nil {
fmt.Println("Failed to create shareManager: " + err.Error())
Expand All @@ -62,80 +93,68 @@ func RunMigration(username, password, host, name, gatewaysvc, token string, port
ShareMgr: sharemgr,
}

ch := make(chan *ShareOrLink, 100)
go getAllShares(ctx, migrator, ch)
for share := range ch {
// TODO error handling
if share.IsShare {
fmt.Printf("Creating share %d\n", share.Share.ID)
migrator.NewDb.Create(&share.Share)
} else {
fmt.Printf("Creating share %d\n", share.Link.ID)
migrator.NewDb.Create(&share.Link)
}
}

}

func getAllShares(ctx context.Context, migrator Migrator, ch chan *ShareOrLink) {
// First we find out what the highest ID is
// Check how many shares are to be migrated
count, err := getCount(migrator)
if err != nil {
fmt.Println("Error getting highest id: " + err.Error())
close(ch)
return
}
fmt.Printf("Migrating %d shares\n", count)

// Get all old shares
query := "select id, coalesce(uid_owner, '') as uid_owner, coalesce(uid_initiator, '') as uid_initiator, lower(coalesce(share_with, '')) as share_with, coalesce(fileid_prefix, '') as fileid_prefix, coalesce(item_source, '') as item_source, coalesce(item_type, '') as item_type, stime, permissions, share_type, orphan FROM oc_share order by id desc" // AND id=?"
params := []interface{}{}

res, err := migrator.OldDb.Query(query, params...)

if err != nil {
fmt.Printf("Fatal error: %s", err.Error())
close(ch)
return
os.Exit(1)
}

// Create channel for workers
ch := make(chan *OldShareEntry, bufferSize)
defer close(ch)

// Start all workers
for range numWorkers {
go worker(ctx, migrator, ch)
}

for res.Next() {
var s OldShareEntry
res.Scan(&s.ID, &s.UIDOwner, &s.UIDInitiator, &s.ShareWith, &s.Prefix, &s.ItemSource, &s.ItemType, &s.STime, &s.Permissions, &s.ShareType, &s.Orphan)
newShare, err := oldShareToNewShare(ctx, migrator, s)
if err == nil {
ch <- newShare
ch <- &s
} else {
fmt.Printf("Error occured for share %s: %s\n", s.ID, err.Error())
fmt.Printf("Error occured for share %d: %s\n", s.ID, err.Error())
}
}

close(ch)
}

type OldShareEntry struct {
ID int
UIDOwner string
UIDInitiator string
Prefix string
ItemSource string
ItemType string
ShareWith string
Token string
Expiration string
Permissions int
ShareType int
ShareName string
STime int
FileTarget string
State int
Quicklink bool
Description string
NotifyUploads bool
NotifyUploadsExtraRecipients sql.NullString
Orphan bool
func worker(ctx context.Context, migrator Migrator, ch chan *OldShareEntry) {
for share := range ch {
handleSingleShare(ctx, migrator, share)
}
}

func handleSingleShare(ctx context.Context, migrator Migrator, s *OldShareEntry) {
share, err := oldShareToNewShare(ctx, migrator, s)
if err != nil {
return
}
// TODO error handling
if share.IsShare {
fmt.Printf("Creating share %d\n", share.Share.ID)
migrator.NewDb.Create(&share.Share)
} else {
fmt.Printf("Creating share %d\n", share.Link.ID)
migrator.NewDb.Create(&share.Link)
}
}

func oldShareToNewShare(ctx context.Context, migrator Migrator, s OldShareEntry) (*ShareOrLink, error) {
func oldShareToNewShare(ctx context.Context, migrator Migrator, s *OldShareEntry) (*ShareOrLink, error) {
expirationDate, expirationError := time.Parse("2006-01-02 15:04:05", s.Expiration)

protoShare := model.ProtoShare{
Expand Down

0 comments on commit 9a7baaf

Please sign in to comment.