Skip to content

Commit

Permalink
Resolving more PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse Geens committed Jan 27, 2025
1 parent 9cc87e9 commit e3129e7
Show file tree
Hide file tree
Showing 3 changed files with 362 additions and 51 deletions.
21 changes: 12 additions & 9 deletions share/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ type ProtoShare struct {
ItemType ItemType // file | folder | reference | symlink
InitialPath string
Inode string
Permissions uint8
Instance string
Orphan bool
Description string
Expiration datatypes.Null[datatypes.Date]
Permissions uint8
Orphan bool
Expiration datatypes.NullTime
}

type Share struct {
Expand All @@ -60,7 +60,7 @@ type PublicLink struct {
NotifyUploadsExtraRecipients string
Password string
// Users can give a name to a share
ShareName string
LinkName string
}

type ShareState struct {
Expand All @@ -75,9 +75,12 @@ type ShareState struct {
}

func (s *Share) AsCS3Share(granteeType userpb.UserType) *collaboration.Share {
ts := &typespb.Timestamp{
creationTs := &typespb.Timestamp{
Seconds: uint64(s.CreatedAt.Unix()),
}
updateTs := &typespb.Timestamp{
Seconds: uint64(s.UpdatedAt.Unix()),
}
return &collaboration.Share{
Id: &collaboration.ShareId{
OpaqueId: strconv.FormatUint(uint64(s.ID), 10),
Expand All @@ -91,8 +94,8 @@ func (s *Share) AsCS3Share(granteeType userpb.UserType) *collaboration.Share {
Grantee: extractGrantee(s.SharedWithIsGroup, s.ShareWith, granteeType),
Owner: conversions.MakeUserID(s.UIDOwner),
Creator: conversions.MakeUserID(s.UIDInitiator),
Ctime: ts,
Mtime: ts,
Ctime: creationTs,
Mtime: updateTs,
}
}

Expand Down Expand Up @@ -123,7 +126,7 @@ func (p *PublicLink) AsCS3PublicShare() *link.PublicShare {
}
var expires *typespb.Timestamp
if p.Expiration.Valid {
exp, err := p.Expiration.V.Value()
exp, err := p.Expiration.Value()
if err == nil {
expiration := exp.(time.Time)
expires = &typespb.Timestamp{
Expand All @@ -144,7 +147,7 @@ func (p *PublicLink) AsCS3PublicShare() *link.PublicShare {
Owner: conversions.MakeUserID(p.UIDOwner),
Creator: conversions.MakeUserID(p.UIDInitiator),
Token: p.Token,
DisplayName: p.ShareName,
DisplayName: p.LinkName,
PasswordProtected: pwd,
Expiration: expires,
Ctime: ts,
Expand Down
320 changes: 320 additions & 0 deletions share/sql/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
package sql

import (
"context"
"database/sql"
"fmt"
"os"
"time"

model "github.com/cernbox/reva-plugins/share"
providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/pkg/errors"
"google.golang.org/grpc/metadata"
"gorm.io/datatypes"
"gorm.io/gorm"
)

type Migrator struct {
NewDb *gorm.DB
OldDb *sql.DB
ShareMgr *mgr
}

type ShareOrLink struct {
IsShare bool
Share *model.Share
Link *model.PublicLink
}

type OldShareEntry struct {
ID int
UIDOwner string
UIDInitiator string
Prefix string
ItemSource string
ItemType string
ShareWith string
Token string
Expiration string
Permissions int
ShareType int
ShareName string
STime int
FileTarget string
State int
Quicklink bool
Description string
NotifyUploads bool
NotifyUploadsExtraRecipients sql.NullString
Orphan bool
}

type OldShareState struct {
id int
recipient string
state int
}

const (
bufferSize = 10
numWorkers = 10
)

func RunMigration(username, password, host, name, gatewaysvc, token string, port int, dryRun bool) {
// Config
config := map[string]interface{}{
"engine": "mysql",
"db_username": username,
"db_password": password,
"db_host": host,
"db_port": port,
"db_name": name,
"gatewaysvc": gatewaysvc,
"dry_run": dryRun,
}
// Authenticate to gateway service
tokenlessCtx, cancel := context.WithCancel(context.Background())
ctx := appctx.ContextSetToken(tokenlessCtx, token)
ctx = metadata.AppendToOutgoingContext(ctx, appctx.TokenHeader, token)
defer cancel()

// Set up migrator
shareManager, err := New(ctx, config)
if err != nil {
fmt.Println("Failed to create shareManager: " + err.Error())
os.Exit(1)
}
sharemgr := shareManager.(*mgr)
oldDb, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username, password, host, port, name))
if err != nil {
fmt.Println("Failed to create db: " + err.Error())
os.Exit(1)
}
migrator := Migrator{
OldDb: oldDb,
NewDb: sharemgr.db,
ShareMgr: sharemgr,
}

if dryRun {
migrator.NewDb = migrator.NewDb.Debug()
}

migrateShares(ctx, migrator)
fmt.Println("---------------------------------")
migrateShareStatuses(ctx, migrator)

}

func migrateShares(ctx context.Context, migrator Migrator) {
// Check how many shares are to be migrated
count, err := getCount(migrator, "oc_share")
if err != nil {
fmt.Println("Error getting count: " + err.Error())
return
}
fmt.Printf("Migrating %d shares\n", count)

// Get all old shares
query := "select id, coalesce(uid_owner, '') as uid_owner, coalesce(uid_initiator, '') as uid_initiator, lower(coalesce(share_with, '')) as share_with, coalesce(fileid_prefix, '') as fileid_prefix, coalesce(item_source, '') as item_source, coalesce(item_type, '') as item_type, stime, permissions, share_type, orphan FROM oc_share order by id desc" // AND id=?"
params := []interface{}{}

res, err := migrator.OldDb.Query(query, params...)

if err != nil {
fmt.Printf("Fatal error: %s", err.Error())
os.Exit(1)
}

// Create channel for workers
ch := make(chan *OldShareEntry, bufferSize)
defer close(ch)

// Start all workers
for range numWorkers {
go workerShare(ctx, migrator, ch)
}

for res.Next() {
var s OldShareEntry
res.Scan(&s.ID, &s.UIDOwner, &s.UIDInitiator, &s.ShareWith, &s.Prefix, &s.ItemSource, &s.ItemType, &s.STime, &s.Permissions, &s.ShareType, &s.Orphan)
if err == nil {
ch <- &s
} else {
fmt.Printf("Error occured for share %d: %s\n", s.ID, err.Error())
}
}
}

func migrateShareStatuses(ctx context.Context, migrator Migrator) {
// Check how many shares are to be migrated
count, err := getCount(migrator, "oc_share_status")
if err != nil {
fmt.Println("Error getting count: " + err.Error())
return
}
fmt.Printf("Migrating %d share statuses\n", count)

// Get all old shares
query := "select id, coalesce(recipient, '') as recipient, state FROM oc_share_status order by id desc"
params := []interface{}{}

res, err := migrator.OldDb.Query(query, params...)

if err != nil {
fmt.Printf("Fatal error: %s", err.Error())
os.Exit(1)
}

// Create channel for workers
ch := make(chan *OldShareState, bufferSize)
defer close(ch)

// Start all workers
for range numWorkers {
go workerState(ctx, migrator, ch)
}

for res.Next() {
var s OldShareState
res.Scan(&s.id, &s.recipient, &s.state)
if err == nil {
ch <- &s
} else {
fmt.Printf("Error occured for share status%d: %s\n", s.id, err.Error())
}
}
}

func workerShare(ctx context.Context, migrator Migrator, ch chan *OldShareEntry) {
for share := range ch {
handleSingleShare(ctx, migrator, share)
}
}

func workerState(ctx context.Context, migrator Migrator, ch chan *OldShareState) {
for state := range ch {
handleSingleState(ctx, migrator, state)
}
}

func handleSingleShare(ctx context.Context, migrator Migrator, s *OldShareEntry) {
share, err := oldShareToNewShare(ctx, migrator, s)
if err != nil {
return
}
// TODO error handling
if share.IsShare {
migrator.NewDb.Create(&share.Share)
} else {
migrator.NewDb.Create(&share.Link)
}
}

func handleSingleState(ctx context.Context, migrator Migrator, s *OldShareState) {
// case collaboration.ShareState_SHARE_STATE_REJECTED:
// state = -1
// case collaboration.ShareState_SHARE_STATE_ACCEPTED:
// state = 1

newShareState := &model.ShareState{
ShareID: uint(s.id),
Model: gorm.Model{
ID: uint(s.id),
},
User: s.recipient,
Hidden: s.state == -1, // Hidden if REJECTED
Synced: false,
}
migrator.NewDb.Create(&newShareState)
}

func oldShareToNewShare(ctx context.Context, migrator Migrator, s *OldShareEntry) (*ShareOrLink, error) {
expirationDate, expirationError := time.Parse("2006-01-02 15:04:05", s.Expiration)

protoShare := model.ProtoShare{
Model: gorm.Model{
ID: uint(s.ID),
CreatedAt: time.Unix(int64(s.STime), 0),
UpdatedAt: time.Unix(int64(s.STime), 0),
},
UIDOwner: s.UIDOwner,
UIDInitiator: s.UIDInitiator,
Description: s.Description,
Permissions: uint8(s.Permissions),
Orphan: s.Orphan, // will be re-checked later
Expiration: datatypes.Null[time.Time]{
V: expirationDate,
Valid: expirationError == nil,
},
ItemType: model.ItemType(s.ItemType),
InitialPath: "", // set later
Inode: s.ItemSource,
Instance: s.Prefix,
}

// Getting InitialPath
if !protoShare.Orphan {
path, err := migrator.ShareMgr.getPath(ctx, &providerv1beta1.ResourceId{
StorageId: protoShare.Instance,
OpaqueId: protoShare.Inode,
})
if err == nil {
protoShare.InitialPath = path
} else if errors.Is(err, errtypes.NotFound(protoShare.Inode)) {
protoShare.Orphan = true
} else {
// We do not set, because of a general error
fmt.Printf("An error occured for share %d while statting (%s, %s): %s\n", s.ID, protoShare.Instance, protoShare.Inode, err.Error())
}
}

// ShareTypeUser = 0
// ShareTypeGroup = 1
// ShareTypePublicLink = 3
// ShareTypeFederatedCloudShare = 6
// ShareTypeSpaceMembership = 7
if s.ShareType == 0 || s.ShareType == 1 {
return &ShareOrLink{
IsShare: true,
Share: &model.Share{
ProtoShare: protoShare,
ShareWith: s.ShareWith,
SharedWithIsGroup: s.ShareType == 1,
},
}, nil
} else if s.ShareType == 3 {
notifyUploadsExtraRecipients := ""
if s.NotifyUploadsExtraRecipients.Valid {
notifyUploadsExtraRecipients = s.NotifyUploadsExtraRecipients.String
}
return &ShareOrLink{
IsShare: false,
Link: &model.PublicLink{
ProtoShare: protoShare,
Token: s.Token,
Quicklink: s.Quicklink,
NotifyUploads: s.NotifyUploads,
NotifyUploadsExtraRecipients: notifyUploadsExtraRecipients,
Password: s.ShareWith,
LinkName: s.ShareName,
},
}, nil
} else {
return nil, errors.New("Invalid share type")
}
}

func getCount(migrator Migrator, table string) (int, error) {
res := 0
query := "select count(*) from " + table
params := []interface{}{}

if err := migrator.OldDb.QueryRow(query, params...).Scan(&res); err != nil {
return 0, err
}
return res, nil
}
Loading

0 comments on commit e3129e7

Please sign in to comment.