@@ -114,116 +114,153 @@ func (s *V1Instance) Close() error {
114
114
// rate limit `Name` and `UniqueKey` is not owned by this instance then we forward the request to the
115
115
// peer that does.
116
116
func (s * V1Instance ) GetRateLimits (ctx context.Context , r * GetRateLimitsReq ) (* GetRateLimitsResp , error ) {
117
- var resp GetRateLimitsResp
118
117
if len (r .Requests ) > maxBatchSize {
119
118
return nil , status .Errorf (codes .OutOfRange ,
120
119
"Requests.RateLimits list too large; max size is '%d'" , maxBatchSize )
121
120
}
122
121
123
- type InOut struct {
124
- In * RateLimitReq
125
- Idx int
126
- Out * RateLimitResp
122
+ resp := GetRateLimitsResp {
123
+ Responses : make ([]* RateLimitResp , len (r .Requests )),
127
124
}
128
125
129
- // Asynchronously fetch rate limits
130
- out := make (chan InOut )
131
- go func () {
132
- fan := syncutil .NewFanOut (1000 )
133
- // For each item in the request body
134
- for i , item := range r .Requests {
135
- fan .Run (func (data interface {}) error {
136
- inOut := data .(InOut )
137
-
138
- globalKey := inOut .In .Name + "_" + inOut .In .UniqueKey
139
- var peer * PeerClient
140
- var err error
141
-
142
- if len (inOut .In .UniqueKey ) == 0 {
143
- inOut .Out = & RateLimitResp {Error : "field 'unique_key' cannot be empty" }
144
- out <- inOut
145
- return nil
146
- }
126
+ var wg sync.WaitGroup
127
+ asyncCh := make (chan AsyncResp , len (r .Requests ))
147
128
148
- if len ( inOut . In . Name ) == 0 {
149
- inOut . Out = & RateLimitResp { Error : "field 'namespace' cannot be empty" }
150
- out <- inOut
151
- return nil
152
- }
129
+ // For each item in the request body
130
+ for i , req := range r . Requests {
131
+ key := req . Name + "_" + req . UniqueKey
132
+ var peer * PeerClient
133
+ var err error
153
134
154
- var attempts int
155
- getPeer:
156
- if attempts > 5 {
157
- inOut .Out = & RateLimitResp {
158
- Error : fmt .Sprintf ("GetPeer() keeps returning peers that are not connected for '%s' - '%s'" , globalKey , err ),
159
- }
160
- out <- inOut
161
- return nil
162
- }
135
+ if len (req .UniqueKey ) == 0 {
136
+ resp .Responses [i ] = & RateLimitResp {Error : "field 'unique_key' cannot be empty" }
137
+ continue
138
+ }
139
+
140
+ if len (req .Name ) == 0 {
141
+ resp .Responses [i ] = & RateLimitResp {Error : "field 'namespace' cannot be empty" }
142
+ continue
143
+ }
144
+
145
+ peer , err = s .GetPeer (key )
146
+ if err != nil {
147
+ resp .Responses [i ] = & RateLimitResp {
148
+ Error : fmt .Sprintf ("while finding peer that owns rate limit '%s' - '%s'" , key , err ),
149
+ }
150
+ continue
151
+ }
163
152
164
- peer , err = s .GetPeer (globalKey )
153
+ // If our server instance is the owner of this rate limit
154
+ if peer .Info ().IsOwner {
155
+ // Apply our rate limit algorithm to the request
156
+ resp .Responses [i ], err = s .getRateLimit (req )
157
+ if err != nil {
158
+ resp .Responses [i ] = & RateLimitResp {
159
+ Error : fmt .Sprintf ("while applying rate limit for '%s' - '%s'" , key , err ),
160
+ }
161
+ }
162
+ } else {
163
+ if HasBehavior (req .Behavior , Behavior_GLOBAL ) {
164
+ resp .Responses [i ], err = s .getGlobalRateLimit (req )
165
165
if err != nil {
166
- inOut .Out = & RateLimitResp {
167
- Error : fmt .Sprintf ("while finding peer that owns rate limit '%s' - '%s'" , globalKey , err ),
168
- }
169
- out <- inOut
170
- return nil
166
+ resp .Responses [i ] = & RateLimitResp {Error : err .Error ()}
171
167
}
172
168
173
- // If our server instance is the owner of this rate limit
174
- if peer .Info ().IsOwner {
175
- // Apply our rate limit algorithm to the request
176
- inOut .Out , err = s .getRateLimit (inOut .In )
177
- if err != nil {
178
- inOut .Out = & RateLimitResp {
179
- Error : fmt .Sprintf ("while applying rate limit for '%s' - '%s'" , globalKey , err ),
180
- }
181
- }
182
- } else {
183
- if HasBehavior (inOut .In .Behavior , Behavior_GLOBAL ) {
184
- inOut .Out , err = s .getGlobalRateLimit (inOut .In )
185
- if err != nil {
186
- inOut .Out = & RateLimitResp {Error : err .Error ()}
187
- }
188
-
189
- // Inform the client of the owner key of the key
190
- inOut .Out .Metadata = map [string ]string {"owner" : peer .Info ().GRPCAddress }
191
-
192
- out <- inOut
193
- return nil
194
- }
169
+ // Inform the client of the owner key of the key
170
+ resp .Responses [i ].Metadata = map [string ]string {"owner" : peer .Info ().GRPCAddress }
171
+ continue
172
+ }
195
173
196
- // Make an RPC call to the peer that owns this rate limit
197
- inOut .Out , err = peer .GetPeerRateLimit (ctx , inOut .In )
198
- if err != nil {
199
- if IsNotReady (err ) {
200
- attempts ++
201
- goto getPeer
202
- }
203
- inOut .Out = & RateLimitResp {
204
- Error : fmt .Sprintf ("while fetching rate limit '%s' from peer - '%s'" , globalKey , err ),
205
- }
206
- }
174
+ wg .Add (1 )
175
+ go s .asyncRequests (ctx , & AsyncReq {
176
+ AsyncCh : asyncCh ,
177
+ Peer : peer ,
178
+ Req : req ,
179
+ WG : & wg ,
180
+ Key : key ,
181
+ })
182
+ }
183
+ }
207
184
208
- // Inform the client of the owner key of the key
209
- inOut .Out .Metadata = map [string ]string {"owner" : peer .Info ().GRPCAddress }
210
- }
185
+ // Wait for any async responses if any
186
+ wg .Wait ()
187
+ close (asyncCh )
188
+ for a := range asyncCh {
189
+ resp .Responses [a .Idx ] = a .Resp
190
+ }
191
+
192
+ return & resp , nil
193
+ }
194
+
195
+ type AsyncResp struct {
196
+ Idx int
197
+ Resp * RateLimitResp
198
+ }
199
+
200
+ type AsyncReq struct {
201
+ WG * sync.WaitGroup
202
+ AsyncCh chan AsyncResp
203
+ Req * RateLimitReq
204
+ Peer * PeerClient
205
+ Key string
206
+ Idx int
207
+ }
208
+
209
+ func (s * V1Instance ) asyncRequests (ctx context.Context , req * AsyncReq ) {
210
+ var attempts int
211
+ var err error
212
+
213
+ resp := AsyncResp {
214
+ Idx : req .Idx ,
215
+ }
211
216
212
- out <- inOut
213
- return nil
214
- }, InOut {In : item , Idx : i })
217
+ for {
218
+ if attempts > 5 {
219
+ resp .Resp = & RateLimitResp {
220
+ Error : fmt .Sprintf ("GetPeer() keeps returning peers that are not connected for '%s' - '%s'" , req .Key , err ),
221
+ }
222
+ break
215
223
}
216
- fan .Wait ()
217
- close (out )
218
- }()
219
224
220
- resp .Responses = make ([]* RateLimitResp , len (r .Requests ))
221
- // Collect the async responses as they return
222
- for i := range out {
223
- resp .Responses [i .Idx ] = i .Out
225
+ // If we are attempting again, the owner of the this rate limit might have changed to us!
226
+ if attempts != 0 {
227
+ if req .Peer .Info ().IsOwner {
228
+ resp .Resp , err = s .getRateLimit (req .Req )
229
+ if err != nil {
230
+ resp .Resp = & RateLimitResp {
231
+ Error : fmt .Sprintf ("while applying rate limit for '%s' - '%s'" , req .Key , err ),
232
+ }
233
+ }
234
+ break
235
+ }
236
+ }
237
+
238
+ // Make an RPC call to the peer that owns this rate limit
239
+ r , err := req .Peer .GetPeerRateLimit (ctx , req .Req )
240
+ if err != nil {
241
+ if IsNotReady (err ) {
242
+ attempts ++
243
+ req .Peer , err = s .GetPeer (req .Key )
244
+ if err != nil {
245
+ resp .Resp = & RateLimitResp {
246
+ Error : fmt .Sprintf ("while finding peer that owns rate limit '%s' - '%s'" , req .Key , err ),
247
+ }
248
+ break
249
+ }
250
+ continue
251
+ }
252
+ resp .Resp = & RateLimitResp {
253
+ Error : fmt .Sprintf ("while fetching rate limit '%s' from peer - '%s'" , req .Key , err ),
254
+ }
255
+ }
256
+ // Inform the client of the owner key of the key
257
+ resp .Resp = r
258
+ resp .Resp .Metadata = map [string ]string {"owner" : req .Peer .Info ().GRPCAddress }
259
+ break
224
260
}
225
261
226
- return & resp , nil
262
+ req .AsyncCh <- resp
263
+ req .WG .Done ()
227
264
}
228
265
229
266
// getGlobalRateLimit handles rate limits that are marked as `Behavior = GLOBAL`. Rate limit responses
0 commit comments