Skip to content

Commit 24528fc

Browse files
author
willzhen
committed
Fixed cycle queue
1 parent 36b0ad2 commit 24528fc

File tree

1 file changed

+15
-12
lines changed

1 file changed

+15
-12
lines changed

task_scheduler.go

+15-12
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,17 @@ func (s *TaskScheduler) checkProcessed(taskId string) bool {
170170
}
171171
s.lock.Lock()
172172
defer s.lock.Unlock()
173+
173174
if _, ok := s.processedTask[taskId]; !ok {
175+
if s.count >= s.bufflen {
176+
// 满了, 清除一个头部数据
177+
delete(s.processedTask, s.taskProcessedTime[s.head].taskId)
178+
s.head++
179+
s.count--
180+
if s.head >= s.bufflen {
181+
s.head = 0
182+
}
183+
}
174184
s.processedTask[taskId] = true
175185
s.taskProcessedTime[s.tail] = processTime{
176186
t: time.Now(),
@@ -180,24 +190,17 @@ func (s *TaskScheduler) checkProcessed(taskId string) bool {
180190
if s.tail >= s.bufflen {
181191
s.tail = 0
182192
}
183-
if s.count >= s.bufflen {
184-
// 满了, head 被覆盖
185-
s.head++
186-
if s.head >= s.bufflen {
187-
s.head = 0
188-
}
189-
} else {
190-
s.count++
191-
}
193+
s.count++
192194
return true
193195
} else {
194196
return false
195197
}
196198
}
197199

198200
func (s *TaskScheduler) cleanProcessTask() {
199-
ticker := time.NewTicker(3 * time.Second)
201+
ticker := time.NewTicker(time.Second)
200202
for {
203+
201204
select {
202205
case <-s.ctx.Done():
203206
return
@@ -212,12 +215,12 @@ func (s *TaskScheduler) cleanProcessTask() {
212215
}()
213216

214217
for s.count > 0 {
215-
if s.taskProcessedTime[s.head].t.After(time.Now().Add(-5 * time.Second)) {
218+
if s.taskProcessedTime[s.head].t.Before(time.Now().Add(-time.Second)) {
216219
delete(s.processedTask, s.taskProcessedTime[s.head].taskId)
217220
s.head++
218221
s.count--
219222
if s.head >= s.bufflen {
220-
s.bufflen = 0
223+
s.head = 0
221224
}
222225
} else {
223226
break

0 commit comments

Comments
 (0)