From e3129e778ccab6d987cdac8d67c625bc318cf0ef Mon Sep 17 00:00:00 2001 From: Jesse Geens Date: Fri, 24 Jan 2025 13:38:08 +0100 Subject: [PATCH] Resolving more PR comments --- share/model.go | 21 +-- share/sql/migrate.go | 320 +++++++++++++++++++++++++++++++++++++++++++ share/sql/sql.go | 72 ++++------ 3 files changed, 362 insertions(+), 51 deletions(-) create mode 100644 share/sql/migrate.go diff --git a/share/model.go b/share/model.go index 2cc5933..ed81b83 100644 --- a/share/model.go +++ b/share/model.go @@ -38,11 +38,11 @@ type ProtoShare struct { ItemType ItemType // file | folder | reference | symlink InitialPath string Inode string - Permissions uint8 Instance string - Orphan bool Description string - Expiration datatypes.Null[datatypes.Date] + Permissions uint8 + Orphan bool + Expiration datatypes.NullTime } type Share struct { @@ -60,7 +60,7 @@ type PublicLink struct { NotifyUploadsExtraRecipients string Password string // Users can give a name to a share - ShareName string + LinkName string } type ShareState struct { @@ -75,9 +75,12 @@ type ShareState struct { } func (s *Share) AsCS3Share(granteeType userpb.UserType) *collaboration.Share { - ts := &typespb.Timestamp{ + creationTs := &typespb.Timestamp{ Seconds: uint64(s.CreatedAt.Unix()), } + updateTs := &typespb.Timestamp{ + Seconds: uint64(s.UpdatedAt.Unix()), + } return &collaboration.Share{ Id: &collaboration.ShareId{ OpaqueId: strconv.FormatUint(uint64(s.ID), 10), @@ -91,8 +94,8 @@ func (s *Share) AsCS3Share(granteeType userpb.UserType) *collaboration.Share { Grantee: extractGrantee(s.SharedWithIsGroup, s.ShareWith, granteeType), Owner: conversions.MakeUserID(s.UIDOwner), Creator: conversions.MakeUserID(s.UIDInitiator), - Ctime: ts, - Mtime: ts, + Ctime: creationTs, + Mtime: updateTs, } } @@ -123,7 +126,7 @@ func (p *PublicLink) AsCS3PublicShare() *link.PublicShare { } var expires *typespb.Timestamp if p.Expiration.Valid { - exp, err := p.Expiration.V.Value() + exp, err := p.Expiration.Value() if err == nil { expiration := exp.(time.Time) expires = &typespb.Timestamp{ @@ -144,7 +147,7 @@ func (p *PublicLink) AsCS3PublicShare() *link.PublicShare { Owner: conversions.MakeUserID(p.UIDOwner), Creator: conversions.MakeUserID(p.UIDInitiator), Token: p.Token, - DisplayName: p.ShareName, + DisplayName: p.LinkName, PasswordProtected: pwd, Expiration: expires, Ctime: ts, diff --git a/share/sql/migrate.go b/share/sql/migrate.go new file mode 100644 index 0000000..dde7bc3 --- /dev/null +++ b/share/sql/migrate.go @@ -0,0 +1,320 @@ +package sql + +import ( + "context" + "database/sql" + "fmt" + "os" + "time" + + model "github.com/cernbox/reva-plugins/share" + providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/pkg/errors" + "google.golang.org/grpc/metadata" + "gorm.io/datatypes" + "gorm.io/gorm" +) + +type Migrator struct { + NewDb *gorm.DB + OldDb *sql.DB + ShareMgr *mgr +} + +type ShareOrLink struct { + IsShare bool + Share *model.Share + Link *model.PublicLink +} + +type OldShareEntry struct { + ID int + UIDOwner string + UIDInitiator string + Prefix string + ItemSource string + ItemType string + ShareWith string + Token string + Expiration string + Permissions int + ShareType int + ShareName string + STime int + FileTarget string + State int + Quicklink bool + Description string + NotifyUploads bool + NotifyUploadsExtraRecipients sql.NullString + Orphan bool +} + +type OldShareState struct { + id int + recipient string + state int +} + +const ( + bufferSize = 10 + numWorkers = 10 +) + +func RunMigration(username, password, host, name, gatewaysvc, token string, port int, dryRun bool) { + // Config + config := map[string]interface{}{ + "engine": "mysql", + "db_username": username, + "db_password": password, + "db_host": host, + "db_port": port, + "db_name": name, + "gatewaysvc": gatewaysvc, + "dry_run": dryRun, + } + // Authenticate to gateway service + tokenlessCtx, cancel := context.WithCancel(context.Background()) + ctx := appctx.ContextSetToken(tokenlessCtx, token) + ctx = metadata.AppendToOutgoingContext(ctx, appctx.TokenHeader, token) + defer cancel() + + // Set up migrator + shareManager, err := New(ctx, config) + if err != nil { + fmt.Println("Failed to create shareManager: " + err.Error()) + os.Exit(1) + } + sharemgr := shareManager.(*mgr) + oldDb, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username, password, host, port, name)) + if err != nil { + fmt.Println("Failed to create db: " + err.Error()) + os.Exit(1) + } + migrator := Migrator{ + OldDb: oldDb, + NewDb: sharemgr.db, + ShareMgr: sharemgr, + } + + if dryRun { + migrator.NewDb = migrator.NewDb.Debug() + } + + migrateShares(ctx, migrator) + fmt.Println("---------------------------------") + migrateShareStatuses(ctx, migrator) + +} + +func migrateShares(ctx context.Context, migrator Migrator) { + // Check how many shares are to be migrated + count, err := getCount(migrator, "oc_share") + if err != nil { + fmt.Println("Error getting count: " + err.Error()) + return + } + fmt.Printf("Migrating %d shares\n", count) + + // Get all old shares + query := "select id, coalesce(uid_owner, '') as uid_owner, coalesce(uid_initiator, '') as uid_initiator, lower(coalesce(share_with, '')) as share_with, coalesce(fileid_prefix, '') as fileid_prefix, coalesce(item_source, '') as item_source, coalesce(item_type, '') as item_type, stime, permissions, share_type, orphan FROM oc_share order by id desc" // AND id=?" + params := []interface{}{} + + res, err := migrator.OldDb.Query(query, params...) + + if err != nil { + fmt.Printf("Fatal error: %s", err.Error()) + os.Exit(1) + } + + // Create channel for workers + ch := make(chan *OldShareEntry, bufferSize) + defer close(ch) + + // Start all workers + for range numWorkers { + go workerShare(ctx, migrator, ch) + } + + for res.Next() { + var s OldShareEntry + res.Scan(&s.ID, &s.UIDOwner, &s.UIDInitiator, &s.ShareWith, &s.Prefix, &s.ItemSource, &s.ItemType, &s.STime, &s.Permissions, &s.ShareType, &s.Orphan) + if err == nil { + ch <- &s + } else { + fmt.Printf("Error occured for share %d: %s\n", s.ID, err.Error()) + } + } +} + +func migrateShareStatuses(ctx context.Context, migrator Migrator) { + // Check how many shares are to be migrated + count, err := getCount(migrator, "oc_share_status") + if err != nil { + fmt.Println("Error getting count: " + err.Error()) + return + } + fmt.Printf("Migrating %d share statuses\n", count) + + // Get all old shares + query := "select id, coalesce(recipient, '') as recipient, state FROM oc_share_status order by id desc" + params := []interface{}{} + + res, err := migrator.OldDb.Query(query, params...) + + if err != nil { + fmt.Printf("Fatal error: %s", err.Error()) + os.Exit(1) + } + + // Create channel for workers + ch := make(chan *OldShareState, bufferSize) + defer close(ch) + + // Start all workers + for range numWorkers { + go workerState(ctx, migrator, ch) + } + + for res.Next() { + var s OldShareState + res.Scan(&s.id, &s.recipient, &s.state) + if err == nil { + ch <- &s + } else { + fmt.Printf("Error occured for share status%d: %s\n", s.id, err.Error()) + } + } +} + +func workerShare(ctx context.Context, migrator Migrator, ch chan *OldShareEntry) { + for share := range ch { + handleSingleShare(ctx, migrator, share) + } +} + +func workerState(ctx context.Context, migrator Migrator, ch chan *OldShareState) { + for state := range ch { + handleSingleState(ctx, migrator, state) + } +} + +func handleSingleShare(ctx context.Context, migrator Migrator, s *OldShareEntry) { + share, err := oldShareToNewShare(ctx, migrator, s) + if err != nil { + return + } + // TODO error handling + if share.IsShare { + migrator.NewDb.Create(&share.Share) + } else { + migrator.NewDb.Create(&share.Link) + } +} + +func handleSingleState(ctx context.Context, migrator Migrator, s *OldShareState) { + // case collaboration.ShareState_SHARE_STATE_REJECTED: + // state = -1 + // case collaboration.ShareState_SHARE_STATE_ACCEPTED: + // state = 1 + + newShareState := &model.ShareState{ + ShareID: uint(s.id), + Model: gorm.Model{ + ID: uint(s.id), + }, + User: s.recipient, + Hidden: s.state == -1, // Hidden if REJECTED + Synced: false, + } + migrator.NewDb.Create(&newShareState) +} + +func oldShareToNewShare(ctx context.Context, migrator Migrator, s *OldShareEntry) (*ShareOrLink, error) { + expirationDate, expirationError := time.Parse("2006-01-02 15:04:05", s.Expiration) + + protoShare := model.ProtoShare{ + Model: gorm.Model{ + ID: uint(s.ID), + CreatedAt: time.Unix(int64(s.STime), 0), + UpdatedAt: time.Unix(int64(s.STime), 0), + }, + UIDOwner: s.UIDOwner, + UIDInitiator: s.UIDInitiator, + Description: s.Description, + Permissions: uint8(s.Permissions), + Orphan: s.Orphan, // will be re-checked later + Expiration: datatypes.Null[time.Time]{ + V: expirationDate, + Valid: expirationError == nil, + }, + ItemType: model.ItemType(s.ItemType), + InitialPath: "", // set later + Inode: s.ItemSource, + Instance: s.Prefix, + } + + // Getting InitialPath + if !protoShare.Orphan { + path, err := migrator.ShareMgr.getPath(ctx, &providerv1beta1.ResourceId{ + StorageId: protoShare.Instance, + OpaqueId: protoShare.Inode, + }) + if err == nil { + protoShare.InitialPath = path + } else if errors.Is(err, errtypes.NotFound(protoShare.Inode)) { + protoShare.Orphan = true + } else { + // We do not set, because of a general error + fmt.Printf("An error occured for share %d while statting (%s, %s): %s\n", s.ID, protoShare.Instance, protoShare.Inode, err.Error()) + } + } + + // ShareTypeUser = 0 + // ShareTypeGroup = 1 + // ShareTypePublicLink = 3 + // ShareTypeFederatedCloudShare = 6 + // ShareTypeSpaceMembership = 7 + if s.ShareType == 0 || s.ShareType == 1 { + return &ShareOrLink{ + IsShare: true, + Share: &model.Share{ + ProtoShare: protoShare, + ShareWith: s.ShareWith, + SharedWithIsGroup: s.ShareType == 1, + }, + }, nil + } else if s.ShareType == 3 { + notifyUploadsExtraRecipients := "" + if s.NotifyUploadsExtraRecipients.Valid { + notifyUploadsExtraRecipients = s.NotifyUploadsExtraRecipients.String + } + return &ShareOrLink{ + IsShare: false, + Link: &model.PublicLink{ + ProtoShare: protoShare, + Token: s.Token, + Quicklink: s.Quicklink, + NotifyUploads: s.NotifyUploads, + NotifyUploadsExtraRecipients: notifyUploadsExtraRecipients, + Password: s.ShareWith, + LinkName: s.ShareName, + }, + }, nil + } else { + return nil, errors.New("Invalid share type") + } +} + +func getCount(migrator Migrator, table string) (int, error) { + res := 0 + query := "select count(*) from " + table + params := []interface{}{} + + if err := migrator.OldDb.QueryRow(query, params...).Scan(&res); err != nil { + return 0, err + } + return res, nil +} diff --git a/share/sql/sql.go b/share/sql/sql.go index 40f686d..57c302f 100644 --- a/share/sql/sql.go +++ b/share/sql/sql.go @@ -79,8 +79,7 @@ type config struct { } type mgr struct { - c *config - //db *sql.DB + c *config db *gorm.DB } @@ -267,15 +266,7 @@ func (m *mgr) Unshare(ctx context.Context, ref *collaboration.ShareReference) er var share *model.Share var err error if id := ref.GetId(); id != nil { - var intId int - intId, err = strconv.Atoi(id.OpaqueId) - share = &model.Share{ - ProtoShare: model.ProtoShare{ - Model: gorm.Model{ - ID: uint(intId), - }, - }, - } + share, err = emptyShareWithId(id.OpaqueId) } else { share, err = m.getShare(ctx, ref) } @@ -290,15 +281,7 @@ func (m *mgr) UpdateShare(ctx context.Context, ref *collaboration.ShareReference var share *model.Share var err error if id := ref.GetId(); id != nil { - var intId int - intId, err = strconv.Atoi(id.OpaqueId) - share = &model.Share{ - ProtoShare: model.ProtoShare{ - Model: gorm.Model{ - ID: uint(intId), - }, - }, - } + share, err = emptyShareWithId(id.OpaqueId) } else { share, err = m.getShare(ctx, ref) } @@ -432,7 +415,6 @@ func (m *mgr) ListReceivedShares(ctx context.Context, filters []*collaboration.F return receivedShares, nil } -// shareId *collaboration.ShareId func (m *mgr) getShareState(ctx context.Context, share *model.Share, user *userpb.User) (*model.ShareState, error) { var shareState model.ShareState query := m.db.Model(&shareState). @@ -449,14 +431,26 @@ func (m *mgr) getShareState(ctx context.Context, share *model.Share, user *userp Synced: false, User: user.Username, } - // Does not really matter if it fails, next time the user - // lists his shares this will just be called again - m.db.Save(&shareState) } return &shareState, nil } +func emptyShareWithId(id string) (*model.Share, error) { + intId, err := strconv.Atoi(id) + if err != nil { + return nil, err + } + share := &model.Share{ + ProtoShare: model.ProtoShare{ + Model: gorm.Model{ + ID: uint(intId), + }, + }, + } + return share, nil +} + func (m *mgr) getReceivedByID(ctx context.Context, id *collaboration.ShareId, gtype userpb.UserType) (*collaboration.ReceivedShare, error) { user := appctx.ContextMustGetUser(ctx) share, err := m.getByID(ctx, id) @@ -513,27 +507,21 @@ func (m *mgr) GetReceivedShare(ctx context.Context, ref *collaboration.ShareRefe return s, nil } -func (m *mgr) UpdateReceivedShare(ctx context.Context, share *collaboration.ReceivedShare, fieldMask *field_mask.FieldMask) (*collaboration.ReceivedShare, error) { +func (m *mgr) UpdateReceivedShare(ctx context.Context, recvShare *collaboration.ReceivedShare, fieldMask *field_mask.FieldMask) (*collaboration.ReceivedShare, error) { user := appctx.ContextMustGetUser(ctx) - rs, err := m.getReceivedByID(ctx, share.Share.Id, user.Id.Type) + rs, err := m.getReceivedByID(ctx, recvShare.Share.Id, user.Id.Type) if err != nil { return nil, err } - shareId, err := strconv.Atoi(share.Share.Id.OpaqueId) + share, err := emptyShareWithId(recvShare.Share.Id.OpaqueId) if err != nil { return nil, err } - shareState, err := m.getShareState(ctx, &model.Share{ - ProtoShare: model.ProtoShare{ - Model: gorm.Model{ - ID: uint(shareId), - }, - }, - }, user) + shareState, err := m.getShareState(ctx, share, user) if err != nil { return nil, err } @@ -542,21 +530,21 @@ func (m *mgr) UpdateReceivedShare(ctx context.Context, share *collaboration.Rece for _, path := range fieldMask.Paths { switch path { case "state": - rs.State = share.State + rs.State = recvShare.State + switch rs.State { + case collaboration.ShareState_SHARE_STATE_ACCEPTED: + shareState.Hidden = false + case collaboration.ShareState_SHARE_STATE_REJECTED: + shareState.Hidden = true + } case "hidden": - rs.Hidden = share.Hidden + rs.Hidden = recvShare.Hidden default: return nil, errtypes.NotSupported("updating " + path + " is not supported") } } // Now we do the actual update to the db model - switch rs.State { - case collaboration.ShareState_SHARE_STATE_ACCEPTED: - shareState.Hidden = false - case collaboration.ShareState_SHARE_STATE_REJECTED: - shareState.Hidden = true - } res := m.db.Save(&shareState) if res.Error != nil {