Skip to content

Commit

Permalink
Use sync.WaitGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
jessegeens committed Feb 3, 2025
1 parent 89d7075 commit 8abdb05
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
1 change: 0 additions & 1 deletion share/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type ProtoShare struct {
Permissions uint8
Orphan bool
Expiration datatypes.NullTime
PreviousID uint `gorm:"uniqueIndex"`
}

type Share struct {
Expand Down
38 changes: 25 additions & 13 deletions share/sql/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"os"
"sync"
"time"

model "github.com/cernbox/reva-plugins/share"
Expand Down Expand Up @@ -60,7 +61,7 @@ type OldShareState struct {

const (
bufferSize = 10
numWorkers = 10
numWorkers = 5
)

func RunMigration(username, password, host, name, gatewaysvc, token string, port int, dryRun bool) {
Expand All @@ -75,6 +76,7 @@ func RunMigration(username, password, host, name, gatewaysvc, token string, port
"gatewaysvc": gatewaysvc,
"dry_run": dryRun,
}

// Authenticate to gateway service
tokenlessCtx, cancel := context.WithCancel(context.Background())
ctx := appctx.ContextSetToken(tokenlessCtx, token)
Expand Down Expand Up @@ -103,6 +105,8 @@ func RunMigration(username, password, host, name, gatewaysvc, token string, port
migrator.NewDb = migrator.NewDb.Debug()
}

migrator.NewDb.AutoMigrate(&model.Share{}, &model.PublicLink{}, &model.ShareState{})

migrateShares(ctx, migrator)
fmt.Println("---------------------------------")
migrateShareStatuses(ctx, migrator)
Expand Down Expand Up @@ -131,11 +135,11 @@ func migrateShares(ctx context.Context, migrator Migrator) {

// Create channel for workers
ch := make(chan *OldShareEntry, bufferSize)
defer close(ch)
var wg sync.WaitGroup

// Start all workers
for range numWorkers {
go workerShare(ctx, migrator, ch)
go workerShare(ctx, migrator, ch, &wg)
}

for res.Next() {
Expand All @@ -147,6 +151,9 @@ func migrateShares(ctx context.Context, migrator Migrator) {
fmt.Printf("Error occured for share %d: %s\n", s.ID, err.Error())
}
}

close(ch)
wg.Wait()
}

func migrateShareStatuses(ctx context.Context, migrator Migrator) {
Expand All @@ -171,11 +178,12 @@ func migrateShareStatuses(ctx context.Context, migrator Migrator) {

// Create channel for workers
ch := make(chan *OldShareState, bufferSize)
defer close(ch)

var wg sync.WaitGroup

// Start all workers
for range numWorkers {
go workerState(ctx, migrator, ch)
go workerState(ctx, migrator, ch, &wg)
}

for res.Next() {
Expand All @@ -187,18 +195,24 @@ func migrateShareStatuses(ctx context.Context, migrator Migrator) {
fmt.Printf("Error occured for share status%d: %s\n", s.id, err.Error())
}
}
close(ch)
wg.Wait()
}

func workerShare(ctx context.Context, migrator Migrator, ch chan *OldShareEntry) {
func workerShare(ctx context.Context, migrator Migrator, ch chan *OldShareEntry, wg *sync.WaitGroup) {
wg.Add(1)
for share := range ch {
handleSingleShare(ctx, migrator, share)
}
wg.Done()
}

func workerState(ctx context.Context, migrator Migrator, ch chan *OldShareState) {
func workerState(ctx context.Context, migrator Migrator, ch chan *OldShareState, wg *sync.WaitGroup) {
wg.Add(1)
for state := range ch {
handleSingleState(ctx, migrator, state)
}
wg.Done()
}

func handleSingleShare(ctx context.Context, migrator Migrator, s *OldShareEntry) {
Expand All @@ -222,12 +236,9 @@ func handleSingleShare(ctx context.Context, migrator Migrator, s *OldShareEntry)
func handleSingleState(ctx context.Context, migrator Migrator, s *OldShareState) {
newShareState := &model.ShareState{
ShareID: uint(s.id),
Model: gorm.Model{
ID: uint(s.id),
},
User: s.recipient,
Hidden: s.state == -1, // Hidden if REJECTED
Synced: false,
User: s.recipient,
Hidden: s.state == -1, // Hidden if REJECTED
Synced: false,
}
res := migrator.NewDb.Create(&newShareState)
if res.Error != nil {
Expand Down Expand Up @@ -267,6 +278,7 @@ func oldShareToNewShare(ctx context.Context, migrator Migrator, s *OldShareEntry
if err == nil {
protoShare.InitialPath = path
} else if errors.Is(err, errtypes.NotFound(protoShare.Inode)) {
fmt.Printf("Marked share %d as an orphan (%s, %s)\n", s.ID, protoShare.Instance, protoShare.Inode)
protoShare.Orphan = true
} else {
// We do not set, because of a general error
Expand Down

0 comments on commit 8abdb05

Please sign in to comment.