Skip to content

Commit dc212d7

Browse files
FogDongleejanee
andauthored
Feat: add workqueue for list watcher (#30)
* workqueue * Feat: add workqueue for list watcher Signed-off-by: FogDong <[email protected]> * fix the header Signed-off-by: FogDong <[email protected]> * fix the lint Signed-off-by: FogDong <[email protected]> --------- Signed-off-by: FogDong <[email protected]> Co-authored-by: Jian.Li <[email protected]>
1 parent 3125442 commit dc212d7

11 files changed

+1942
-2
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
k8s.io/api v0.25.3
2121
k8s.io/apimachinery v0.25.3
2222
k8s.io/client-go v0.25.3
23+
k8s.io/klog/v2 v2.70.1
2324
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed
2425
sigs.k8s.io/controller-runtime v0.12.3
2526
)
@@ -117,7 +118,6 @@ require (
117118
k8s.io/apiserver v0.25.3 // indirect
118119
k8s.io/component-base v0.25.3 // indirect
119120
k8s.io/klog v1.0.0 // indirect
120-
k8s.io/klog/v2 v2.70.1 // indirect
121121
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect
122122
open-cluster-management.io/api v0.7.0 // indirect
123123
sigs.k8s.io/apiserver-network-proxy v0.0.30 // indirect

pkg/source/builtin/k8sresourcewatcher/controller/controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ import (
3535
"k8s.io/client-go/dynamic"
3636
"k8s.io/client-go/kubernetes/scheme"
3737
"k8s.io/client-go/tools/cache"
38-
"k8s.io/client-go/util/workqueue"
3938

4039
"github.com/kubevela/kube-trigger/api/v1alpha1"
4140
"github.com/kubevela/kube-trigger/pkg/eventhandler"
4241
"github.com/kubevela/kube-trigger/pkg/source/builtin/k8sresourcewatcher/types"
4342
"github.com/kubevela/kube-trigger/pkg/source/builtin/k8sresourcewatcher/utils"
43+
"github.com/kubevela/kube-trigger/pkg/workqueue"
4444
)
4545

4646
const maxRetries = 5
+261
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
/*
2+
Copyright 2023 The KubeVela Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package workqueue
18+
19+
import (
20+
"math"
21+
"sync"
22+
"time"
23+
24+
"golang.org/x/time/rate"
25+
)
26+
27+
// RateLimiter .
28+
type RateLimiter interface {
29+
// When gets an item and gets to decide how long that item should wait
30+
When(item interface{}) time.Duration
31+
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
32+
// or for success, we'll stop tracking it
33+
Forget(item interface{})
34+
// NumRequeues returns back how many failures the item has had
35+
NumRequeues(item interface{}) int
36+
}
37+
38+
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
39+
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
40+
func DefaultControllerRateLimiter() RateLimiter {
41+
return NewMaxOfRateLimiter(
42+
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
43+
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
44+
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
45+
)
46+
}
47+
48+
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
49+
type BucketRateLimiter struct {
50+
*rate.Limiter
51+
}
52+
53+
// RateLimiter .
54+
var _ RateLimiter = &BucketRateLimiter{}
55+
56+
// When .
57+
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
58+
return r.Limiter.Reserve().Delay()
59+
}
60+
61+
// NumRequeues .
62+
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
63+
return 0
64+
}
65+
66+
// Forget .
67+
func (r *BucketRateLimiter) Forget(item interface{}) {
68+
}
69+
70+
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
71+
// dealing with max failures and expiration are up to the caller
72+
type ItemExponentialFailureRateLimiter struct {
73+
failuresLock sync.Mutex
74+
failures map[interface{}]int
75+
76+
baseDelay time.Duration
77+
maxDelay time.Duration
78+
}
79+
80+
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
81+
82+
// NewItemExponentialFailureRateLimiter creates a new ItemExponentialFailureRateLimiter
83+
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
84+
return &ItemExponentialFailureRateLimiter{
85+
failures: map[interface{}]int{},
86+
baseDelay: baseDelay,
87+
maxDelay: maxDelay,
88+
}
89+
}
90+
91+
// DefaultItemBasedRateLimiter is a no-arg constructor for a default rate limiter for a workqueue
92+
func DefaultItemBasedRateLimiter() RateLimiter {
93+
return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
94+
}
95+
96+
// When .
97+
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
98+
r.failuresLock.Lock()
99+
defer r.failuresLock.Unlock()
100+
101+
exp := r.failures[item]
102+
r.failures[item]++
103+
104+
// The backoff is capped such that 'calculated' value never overflows.
105+
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
106+
if backoff > math.MaxInt64 {
107+
return r.maxDelay
108+
}
109+
110+
calculated := time.Duration(backoff)
111+
if calculated > r.maxDelay {
112+
return r.maxDelay
113+
}
114+
115+
return calculated
116+
}
117+
118+
// NumRequeues .
119+
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
120+
r.failuresLock.Lock()
121+
defer r.failuresLock.Unlock()
122+
123+
return r.failures[item]
124+
}
125+
126+
// Forget .
127+
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
128+
r.failuresLock.Lock()
129+
defer r.failuresLock.Unlock()
130+
131+
delete(r.failures, item)
132+
}
133+
134+
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
135+
type ItemFastSlowRateLimiter struct {
136+
failuresLock sync.Mutex
137+
failures map[interface{}]int
138+
139+
maxFastAttempts int
140+
fastDelay time.Duration
141+
slowDelay time.Duration
142+
}
143+
144+
// RateLimiter .
145+
var _ RateLimiter = &ItemFastSlowRateLimiter{}
146+
147+
// NewItemFastSlowRateLimiter creates a new ItemFastSlowRateLimiter
148+
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
149+
return &ItemFastSlowRateLimiter{
150+
failures: map[interface{}]int{},
151+
fastDelay: fastDelay,
152+
slowDelay: slowDelay,
153+
maxFastAttempts: maxFastAttempts,
154+
}
155+
}
156+
157+
// When .
158+
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
159+
r.failuresLock.Lock()
160+
defer r.failuresLock.Unlock()
161+
162+
r.failures[item]++
163+
164+
if r.failures[item] <= r.maxFastAttempts {
165+
return r.fastDelay
166+
}
167+
168+
return r.slowDelay
169+
}
170+
171+
// NumRequeues .
172+
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
173+
r.failuresLock.Lock()
174+
defer r.failuresLock.Unlock()
175+
176+
return r.failures[item]
177+
}
178+
179+
// Forget .
180+
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
181+
r.failuresLock.Lock()
182+
defer r.failuresLock.Unlock()
183+
184+
delete(r.failures, item)
185+
}
186+
187+
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
188+
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
189+
// were separately delayed a longer time.
190+
type MaxOfRateLimiter struct {
191+
limiters []RateLimiter
192+
}
193+
194+
// When .
195+
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
196+
ret := time.Duration(0)
197+
for _, limiter := range r.limiters {
198+
curr := limiter.When(item)
199+
if curr > ret {
200+
ret = curr
201+
}
202+
}
203+
204+
return ret
205+
}
206+
207+
// NewMaxOfRateLimiter creates a new MaxOfRateLimiter
208+
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
209+
return &MaxOfRateLimiter{limiters: limiters}
210+
}
211+
212+
// NumRequeues .
213+
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
214+
ret := 0
215+
for _, limiter := range r.limiters {
216+
curr := limiter.NumRequeues(item)
217+
if curr > ret {
218+
ret = curr
219+
}
220+
}
221+
222+
return ret
223+
}
224+
225+
// Forget .
226+
func (r *MaxOfRateLimiter) Forget(item interface{}) {
227+
for _, limiter := range r.limiters {
228+
limiter.Forget(item)
229+
}
230+
}
231+
232+
// WithMaxWaitRateLimiter have maxDelay which avoids waiting too long
233+
type WithMaxWaitRateLimiter struct {
234+
limiter RateLimiter
235+
maxDelay time.Duration
236+
}
237+
238+
// NewWithMaxWaitRateLimiter creates a new WithMaxWaitRateLimiter
239+
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
240+
return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
241+
}
242+
243+
// When .
244+
func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
245+
delay := w.limiter.When(item)
246+
if delay > w.maxDelay {
247+
return w.maxDelay
248+
}
249+
250+
return delay
251+
}
252+
253+
// Forget .
254+
func (w WithMaxWaitRateLimiter) Forget(item interface{}) {
255+
w.limiter.Forget(item)
256+
}
257+
258+
// NumRequeues .
259+
func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int {
260+
return w.limiter.NumRequeues(item)
261+
}

0 commit comments

Comments
 (0)