Skip to content

Commit 63f3701

Browse files
authored
Fix #835 and #837 (#836)
1 parent e1b7d52 commit 63f3701

File tree

2 files changed

+80
-76
lines changed

2 files changed

+80
-76
lines changed

executor.go

+79-75
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type executor struct {
3636

3737
// used by the executor to receive a stop signal from the scheduler
3838
stopCh chan struct{}
39+
// ensure that stop runs before the next call to start and only runs once
40+
stopOnce *sync.Once
3941
// the timeout value when stopping
4042
stopTimeout time.Duration
4143
// used to signal that the executor has completed shutdown
@@ -88,6 +90,7 @@ func (e *executor) start() {
8890
// any other uses within the executor should create a context
8991
// using the executor context as parent.
9092
e.ctx, e.cancel = context.WithCancel(context.Background())
93+
e.stopOnce = &sync.Once{}
9194

9295
// the standardJobsWg tracks
9396
standardJobsWg := &waitGroupWithMutex{}
@@ -131,7 +134,7 @@ func (e *executor) start() {
131134

132135
// spin off into a goroutine to unblock the executor and
133136
// allow for processing for more work
134-
go func() {
137+
go func(executorCtx context.Context) {
135138
// make sure to cancel the above context per the docs
136139
// // Canceling this context releases resources associated with it, so code should
137140
// // call cancel as soon as the operations running in this Context complete.
@@ -211,8 +214,7 @@ func (e *executor) start() {
211214
}
212215
} else {
213216
select {
214-
case <-e.stopCh:
215-
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
217+
case <-executorCtx.Done():
216218
return
217219
default:
218220
}
@@ -228,7 +230,7 @@ func (e *executor) start() {
228230
}(*j)
229231
}
230232
}
231-
}()
233+
}(e.ctx)
232234
case <-e.stopCh:
233235
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
234236
return
@@ -473,86 +475,88 @@ func (e *executor) incrementJobCounter(j internalJob, status JobStatus) {
473475
}
474476

475477
func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) {
476-
e.logger.Debug("gocron: stopping executor")
477-
// we've been asked to stop. This is either because the scheduler has been told
478-
// to stop all jobs or the scheduler has been asked to completely shutdown.
479-
//
480-
// cancel tells all the functions to stop their work and send in a done response
481-
e.cancel()
482-
483-
// the wait for job channels are used to report back whether we successfully waited
484-
// for all jobs to complete or if we hit the configured timeout.
485-
waitForJobs := make(chan struct{}, 1)
486-
waitForSingletons := make(chan struct{}, 1)
487-
waitForLimitMode := make(chan struct{}, 1)
488-
489-
// the waiter context is used to cancel the functions waiting on jobs.
490-
// this is done to avoid goroutine leaks.
491-
waiterCtx, waiterCancel := context.WithCancel(context.Background())
492-
493-
// wait for standard jobs to complete
494-
go func() {
495-
e.logger.Debug("gocron: waiting for standard jobs to complete")
478+
e.stopOnce.Do(func() {
479+
e.logger.Debug("gocron: stopping executor")
480+
// we've been asked to stop. This is either because the scheduler has been told
481+
// to stop all jobs or the scheduler has been asked to completely shutdown.
482+
//
483+
// cancel tells all the functions to stop their work and send in a done response
484+
e.cancel()
485+
486+
// the wait for job channels are used to report back whether we successfully waited
487+
// for all jobs to complete or if we hit the configured timeout.
488+
waitForJobs := make(chan struct{}, 1)
489+
waitForSingletons := make(chan struct{}, 1)
490+
waitForLimitMode := make(chan struct{}, 1)
491+
492+
// the waiter context is used to cancel the functions waiting on jobs.
493+
// this is done to avoid goroutine leaks.
494+
waiterCtx, waiterCancel := context.WithCancel(context.Background())
495+
496+
// wait for standard jobs to complete
496497
go func() {
497-
// this is done in a separate goroutine, so we aren't
498-
// blocked by the WaitGroup's Wait call in the event
499-
// that the waiter context is cancelled.
500-
// This particular goroutine could leak in the event that
501-
// some long-running standard job doesn't complete.
502-
standardJobsWg.Wait()
503-
e.logger.Debug("gocron: standard jobs completed")
504-
waitForJobs <- struct{}{}
498+
e.logger.Debug("gocron: waiting for standard jobs to complete")
499+
go func() {
500+
// this is done in a separate goroutine, so we aren't
501+
// blocked by the WaitGroup's Wait call in the event
502+
// that the waiter context is cancelled.
503+
// This particular goroutine could leak in the event that
504+
// some long-running standard job doesn't complete.
505+
standardJobsWg.Wait()
506+
e.logger.Debug("gocron: standard jobs completed")
507+
waitForJobs <- struct{}{}
508+
}()
509+
<-waiterCtx.Done()
505510
}()
506-
<-waiterCtx.Done()
507-
}()
508511

509-
// wait for per job singleton limit mode runner jobs to complete
510-
go func() {
511-
e.logger.Debug("gocron: waiting for singleton jobs to complete")
512+
// wait for per job singleton limit mode runner jobs to complete
512513
go func() {
513-
singletonJobsWg.Wait()
514-
e.logger.Debug("gocron: singleton jobs completed")
515-
waitForSingletons <- struct{}{}
514+
e.logger.Debug("gocron: waiting for singleton jobs to complete")
515+
go func() {
516+
singletonJobsWg.Wait()
517+
e.logger.Debug("gocron: singleton jobs completed")
518+
waitForSingletons <- struct{}{}
519+
}()
520+
<-waiterCtx.Done()
516521
}()
517-
<-waiterCtx.Done()
518-
}()
519522

520-
// wait for limit mode runners to complete
521-
go func() {
522-
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
523+
// wait for limit mode runners to complete
523524
go func() {
524-
limitModeJobsWg.Wait()
525-
e.logger.Debug("gocron: limitMode jobs completed")
526-
waitForLimitMode <- struct{}{}
525+
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
526+
go func() {
527+
limitModeJobsWg.Wait()
528+
e.logger.Debug("gocron: limitMode jobs completed")
529+
waitForLimitMode <- struct{}{}
530+
}()
531+
<-waiterCtx.Done()
527532
}()
528-
<-waiterCtx.Done()
529-
}()
530533

531-
// now either wait for all the jobs to complete,
532-
// or hit the timeout.
533-
var count int
534-
timeout := time.Now().Add(e.stopTimeout)
535-
for time.Now().Before(timeout) && count < 3 {
536-
select {
537-
case <-waitForJobs:
538-
count++
539-
case <-waitForSingletons:
540-
count++
541-
case <-waitForLimitMode:
542-
count++
543-
default:
534+
// now either wait for all the jobs to complete,
535+
// or hit the timeout.
536+
var count int
537+
timeout := time.Now().Add(e.stopTimeout)
538+
for time.Now().Before(timeout) && count < 3 {
539+
select {
540+
case <-waitForJobs:
541+
count++
542+
case <-waitForSingletons:
543+
count++
544+
case <-waitForLimitMode:
545+
count++
546+
default:
547+
}
544548
}
545-
}
546-
if count < 3 {
547-
e.done <- ErrStopJobsTimedOut
548-
e.logger.Debug("gocron: executor stopped - timed out")
549-
} else {
550-
e.done <- nil
551-
e.logger.Debug("gocron: executor stopped")
552-
}
553-
waiterCancel()
549+
if count < 3 {
550+
e.done <- ErrStopJobsTimedOut
551+
e.logger.Debug("gocron: executor stopped - timed out")
552+
} else {
553+
e.done <- nil
554+
e.logger.Debug("gocron: executor stopped")
555+
}
556+
waiterCancel()
554557

555-
if e.limitMode != nil {
556-
e.limitMode.started = false
557-
}
558+
if e.limitMode != nil {
559+
e.limitMode.started = false
560+
}
561+
})
558562
}

scheduler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
141141
jobUpdateNextRuns: make(chan uuid.UUID),
142142
jobsOutCompleted: make(chan uuid.UUID),
143143
jobOutRequest: make(chan jobOutRequest, 1000),
144-
done: make(chan error),
144+
done: make(chan error, 1),
145145
}
146146

147147
s := &scheduler{

0 commit comments

Comments
 (0)