@@ -25,6 +25,7 @@ import (
25
25
"strings"
26
26
27
27
"github.com/cs3org/reva/v2/pkg/storagespace"
28
+ "golang.org/x/sync/errgroup"
28
29
"google.golang.org/genproto/protobuf/field_mask"
29
30
"google.golang.org/grpc"
30
31
codes "google.golang.org/grpc/codes"
@@ -60,11 +61,13 @@ func init() {
60
61
type config struct {
61
62
GatewayAddr string `mapstructure:"gateway_addr"`
62
63
UserShareProviderEndpoint string `mapstructure:"usershareprovidersvc"`
64
+ MaxConcurrency int `mapstructure:"max_concurrency"`
63
65
}
64
66
65
67
type service struct {
66
68
gatewaySelector pool.Selectable [gateway.GatewayAPIClient ]
67
69
sharingCollaborationSelector pool.Selectable [collaboration.CollaborationAPIClient ]
70
+ maxConcurrency int
68
71
}
69
72
70
73
func (s * service ) Close () error {
@@ -98,14 +101,19 @@ func NewDefault(m map[string]interface{}, _ *grpc.Server) (rgrpc.Service, error)
98
101
return nil , errors .Wrap (err , "sharesstorageprovider: error getting UserShareProvider client" )
99
102
}
100
103
101
- return New (gatewaySelector , sharingCollaborationSelector )
104
+ if c .MaxConcurrency <= 0 {
105
+ c .MaxConcurrency = 5
106
+ }
107
+
108
+ return New (gatewaySelector , sharingCollaborationSelector , c .MaxConcurrency )
102
109
}
103
110
104
111
// New returns a new instance of the SharesStorageProvider service
105
- func New (gatewaySelector pool.Selectable [gateway.GatewayAPIClient ], sharingCollaborationSelector pool.Selectable [collaboration.CollaborationAPIClient ]) (rgrpc.Service , error ) {
112
+ func New (gatewaySelector pool.Selectable [gateway.GatewayAPIClient ], sharingCollaborationSelector pool.Selectable [collaboration.CollaborationAPIClient ], maxConcurrency int ) (rgrpc.Service , error ) {
106
113
s := & service {
107
114
gatewaySelector : gatewaySelector ,
108
115
sharingCollaborationSelector : sharingCollaborationSelector ,
116
+ maxConcurrency : maxConcurrency ,
109
117
}
110
118
return s , nil
111
119
}
@@ -399,7 +407,7 @@ func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStora
399
407
var shareInfo map [string ]* provider.ResourceInfo
400
408
var err error
401
409
if fetchShares {
402
- receivedShares , shareInfo , err = s .fetchShares (ctx , req .Opaque , []string {}, & fieldmaskpb.FieldMask { /*TODO mtime and etag only?*/ })
410
+ receivedShares , shareInfo , err = s .fetchAcceptedShares (ctx , req .Opaque , []string {}, & fieldmaskpb.FieldMask { /*TODO mtime and etag only?*/ })
403
411
if err != nil {
404
412
return nil , errors .Wrap (err , "sharesstorageprovider: error calling ListReceivedSharesRequest" )
405
413
}
@@ -710,7 +718,7 @@ func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provide
710
718
if ! ok {
711
719
return nil , fmt .Errorf ("missing user in context" )
712
720
}
713
- receivedShares , shareMd , err := s .fetchShares (ctx , req .Opaque , req .ArbitraryMetadataKeys , req .FieldMask )
721
+ receivedShares , shareMd , err := s .fetchAcceptedShares (ctx , req .Opaque , req .ArbitraryMetadataKeys , req .FieldMask )
714
722
if err != nil {
715
723
return nil , err
716
724
}
@@ -806,7 +814,7 @@ func (s *service) ListContainer(ctx context.Context, req *provider.ListContainer
806
814
// The root is empty, it is filled by mountpoints
807
815
// so, when accessing the root via /dav/spaces, we need to list the accepted shares with their mountpoint
808
816
809
- receivedShares , shareMd , err := s .fetchShares (ctx , req .Opaque , req .ArbitraryMetadataKeys , req .FieldMask )
817
+ receivedShares , shareMd , err := s .fetchAcceptedShares (ctx , req .Opaque , req .ArbitraryMetadataKeys , req .FieldMask )
810
818
if err != nil {
811
819
return nil , errors .Wrap (err , "sharesstorageprovider: error calling ListReceivedSharesRequest" )
812
820
}
@@ -1143,14 +1151,21 @@ func (s *service) rejectReceivedShare(ctx context.Context, receivedShare *collab
1143
1151
return errtypes .NewErrtypeFromStatus (res .Status )
1144
1152
}
1145
1153
1146
- func (s * service ) fetchShares (ctx context.Context , opaque * typesv1beta1.Opaque , arbitraryMetadataKeys []string , fieldMask * field_mask.FieldMask ) ([]* collaboration.ReceivedShare , map [string ]* provider.ResourceInfo , error ) {
1154
+ func (s * service ) fetchAcceptedShares (ctx context.Context , opaque * typesv1beta1.Opaque , arbitraryMetadataKeys []string , fieldMask * field_mask.FieldMask ) ([]* collaboration.ReceivedShare , map [string ]* provider.ResourceInfo , error ) {
1147
1155
sharingCollaborationClient , err := s .sharingCollaborationSelector .Next ()
1148
1156
if err != nil {
1149
1157
return nil , nil , err
1150
1158
}
1151
1159
1152
1160
lsRes , err := sharingCollaborationClient .ListReceivedShares (ctx , & collaboration.ListReceivedSharesRequest {
1153
- // FIXME filter by received shares for resource id - listing all shares is tooo expensive!
1161
+ Filters : []* collaboration.Filter {
1162
+ {
1163
+ Type : collaboration .Filter_TYPE_STATE ,
1164
+ Term : & collaboration.Filter_State {
1165
+ State : collaboration .ShareState_SHARE_STATE_ACCEPTED ,
1166
+ },
1167
+ },
1168
+ },
1154
1169
})
1155
1170
if err != nil {
1156
1171
return nil , nil , errors .Wrap (err , "sharesstorageprovider: error calling ListReceivedSharesRequest" )
@@ -1159,42 +1174,98 @@ func (s *service) fetchShares(ctx context.Context, opaque *typesv1beta1.Opaque,
1159
1174
return nil , nil , fmt .Errorf ("sharesstorageprovider: error calling ListReceivedSharesRequest" )
1160
1175
}
1161
1176
1162
- gatewayClient , err := s .gatewaySelector .Next ()
1163
- if err != nil {
1164
- return nil , nil , err
1177
+ numWorkers := s .maxConcurrency
1178
+ if len (lsRes .Shares ) < numWorkers {
1179
+ numWorkers = len (lsRes .Shares )
1180
+ }
1181
+ type res struct {
1182
+ shareid string
1183
+ info * provider.ResourceInfo
1165
1184
}
1185
+ work := make (chan * collaboration.ReceivedShare , len (lsRes .Shares ))
1186
+ results := make (chan res , len (lsRes .Shares ))
1166
1187
1167
- shareMetaData := make (map [string ]* provider.ResourceInfo , len (lsRes .Shares ))
1168
- for _ , rs := range lsRes .Shares {
1169
- // only stat accepted shares
1170
- if rs .State != collaboration .ShareState_SHARE_STATE_ACCEPTED {
1171
- continue
1172
- }
1173
- if rs .Share .ResourceId .SpaceId == "" {
1174
- // convert backwards compatible share id
1175
- rs .Share .ResourceId .StorageId , rs .Share .ResourceId .SpaceId = storagespace .SplitStorageID (rs .Share .ResourceId .StorageId )
1188
+ g , ctx := errgroup .WithContext (ctx )
1189
+
1190
+ // Distribute work
1191
+ g .Go (func () error {
1192
+ defer close (work )
1193
+ for _ , share := range lsRes .Shares {
1194
+ select {
1195
+ case work <- share :
1196
+ case <- ctx .Done ():
1197
+ return ctx .Err ()
1198
+ }
1176
1199
}
1177
- sRes , err := gatewayClient .Stat (ctx , & provider.StatRequest {
1178
- Opaque : opaque ,
1179
- Ref : & provider.Reference {ResourceId : rs .Share .ResourceId },
1180
- ArbitraryMetadataKeys : arbitraryMetadataKeys ,
1181
- FieldMask : fieldMask ,
1200
+ return nil
1201
+ })
1202
+
1203
+ // Spawn workers that'll concurrently work the queue
1204
+ for i := 0 ; i < numWorkers ; i ++ {
1205
+ g .Go (func () error {
1206
+ for rs := range work {
1207
+
1208
+ // only stat accepted shares
1209
+ if rs .State != collaboration .ShareState_SHARE_STATE_ACCEPTED {
1210
+ continue
1211
+ }
1212
+ if rs .Share .ResourceId .SpaceId == "" {
1213
+ // convert backwards compatible share id
1214
+ rs .Share .ResourceId .StorageId , rs .Share .ResourceId .SpaceId = storagespace .SplitStorageID (rs .Share .ResourceId .StorageId )
1215
+ }
1216
+
1217
+ gatewayClient , err := s .gatewaySelector .Next ()
1218
+ if err != nil {
1219
+ appctx .GetLogger (ctx ).Error ().
1220
+ Err (err ).
1221
+ Interface ("resourceID" , rs .Share .ResourceId ).
1222
+ Msg ("ListRecievedShares: failed to select next gateway client" )
1223
+ return err
1224
+ }
1225
+ sRes , err := gatewayClient .Stat (ctx , & provider.StatRequest {
1226
+ Opaque : opaque ,
1227
+ Ref : & provider.Reference {ResourceId : rs .Share .ResourceId },
1228
+ ArbitraryMetadataKeys : arbitraryMetadataKeys ,
1229
+ FieldMask : fieldMask ,
1230
+ })
1231
+ if err != nil {
1232
+ appctx .GetLogger (ctx ).Error ().
1233
+ Err (err ).
1234
+ Interface ("resourceID" , rs .Share .ResourceId ).
1235
+ Msg ("ListRecievedShares: failed to make stat call" )
1236
+ return err
1237
+ }
1238
+ if sRes .Status .Code != rpc .Code_CODE_OK {
1239
+ appctx .GetLogger (ctx ).Debug ().
1240
+ Interface ("resourceID" , rs .Share .ResourceId ).
1241
+ Interface ("status" , sRes .Status ).
1242
+ Msg ("ListRecievedShares: failed to stat the resource" )
1243
+ continue
1244
+ }
1245
+ select {
1246
+ case results <- res {shareid : rs .Share .Id .OpaqueId , info : sRes .Info }:
1247
+ case <- ctx .Done ():
1248
+ return ctx .Err ()
1249
+ }
1250
+ }
1251
+ return nil
1182
1252
})
1183
- if err != nil {
1184
- appctx .GetLogger (ctx ).Error ().
1185
- Err (err ).
1186
- Interface ("resourceID" , rs .Share .ResourceId ).
1187
- Msg ("ListRecievedShares: failed to make stat call" )
1188
- continue
1189
- }
1190
- if sRes .Status .Code != rpc .Code_CODE_OK {
1191
- appctx .GetLogger (ctx ).Debug ().
1192
- Interface ("resourceID" , rs .Share .ResourceId ).
1193
- Interface ("status" , sRes .Status ).
1194
- Msg ("ListRecievedShares: failed to stat the resource" )
1195
- continue
1196
- }
1197
- shareMetaData [rs .Share .Id .OpaqueId ] = sRes .Info
1253
+ }
1254
+
1255
+ // Wait for things to settle down, then close results chan
1256
+ go func () {
1257
+ _ = g .Wait () // error is checked later
1258
+ close (results )
1259
+ }()
1260
+
1261
+ // some results might have been skipped, so we cannot preallocate the map
1262
+ shareMetaData := make (map [string ]* provider.ResourceInfo )
1263
+ for r := range results {
1264
+ shareMetaData [r .shareid ] = r .info
1265
+ }
1266
+
1267
+ if err := g .Wait (); err != nil {
1268
+ return nil , nil , err
1198
1269
}
1199
1270
1200
1271
return lsRes .Shares , shareMetaData , nil
0 commit comments