From 9446f4ef9d702571ff66c7a9b68be58e23de8556 Mon Sep 17 00:00:00 2001 From: Mikhail Mazurskiy Date: Sat, 31 Mar 2018 19:41:43 +1100 Subject: [PATCH] Stop() for Ticker to enable leak-free code Kubernetes-commit: 1f393cdef96fe6e4ddcbf93825d65a9980463406 --- util/workqueue/delaying_queue.go | 10 ++++------ util/workqueue/rate_limitting_queue_test.go | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/util/workqueue/delaying_queue.go b/util/workqueue/delaying_queue.go index c62ed32efa..a37177425d 100644 --- a/util/workqueue/delaying_queue.go +++ b/util/workqueue/delaying_queue.go @@ -45,7 +45,7 @@ func newDelayingQueue(clock clock.Clock, name string) DelayingInterface { ret := &delayingType{ Interface: NewNamed(name), clock: clock, - heartbeat: clock.Tick(maxWait), + heartbeat: clock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), metrics: newRetryMetrics(name), @@ -67,10 +67,7 @@ type delayingType struct { stopCh chan struct{} // heartbeat ensures we wait no more than maxWait before firing - // - // TODO: replace with Ticker (and add to clock) so this can be cleaned up. - // clock.Tick will leak. - heartbeat <-chan time.Time + heartbeat clock.Ticker // waitingForAddCh is a buffered channel that feeds waitingForAdd waitingForAddCh chan *waitFor @@ -138,6 +135,7 @@ func (pq waitForPriorityQueue) Peek() interface{} { func (q *delayingType) ShutDown() { q.Interface.ShutDown() close(q.stopCh) + q.heartbeat.Stop() } // AddAfter adds the given item to the work queue after the given delay @@ -209,7 +207,7 @@ func (q *delayingType) waitingLoop() { case <-q.stopCh: return - case <-q.heartbeat: + case <-q.heartbeat.C(): // continue the loop, which will add ready items case <-nextReadyAt: diff --git a/util/workqueue/rate_limitting_queue_test.go b/util/workqueue/rate_limitting_queue_test.go index 32d7fc9068..3fbe07d0d8 100644 --- a/util/workqueue/rate_limitting_queue_test.go +++ b/util/workqueue/rate_limitting_queue_test.go @@ -30,7 +30,7 @@ func TestRateLimitingQueue(t *testing.T) { delayingQueue := &delayingType{ Interface: New(), clock: fakeClock, - heartbeat: fakeClock.Tick(maxWait), + heartbeat: fakeClock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), metrics: newRetryMetrics(""),