Skip to content

Commit 64d6e48

Browse files
authored
issue-740: expand oneTimeJob to support multiple times (#741)
1 parent 7c391d4 commit 64d6e48

File tree

3 files changed

+256
-15
lines changed

3 files changed

+256
-15
lines changed

example_test.go

+11
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,17 @@ func ExampleOneTimeJob() {
354354
func() {},
355355
),
356356
)
357+
// run job twice - once in 10 seconds and once in 55 minutes
358+
n := time.Now()
359+
_, _ = s.NewJob(
360+
OneTimeJob(
361+
OneTimeJobStartDateTimes(
362+
n.Add(10*time.Second),
363+
n.Add(55*time.Minute),
364+
),
365+
),
366+
NewTask(func() {}),
367+
)
357368

358369
s.Start()
359370
}

job.go

+63-15
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"math/rand"
9+
"sort"
910
"strings"
1011
"time"
1112

@@ -446,35 +447,47 @@ type oneTimeJobDefinition struct {
446447
}
447448

448449
func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error {
449-
j.jobSchedule = oneTimeJob{}
450-
if err := o.startAt(j); err != nil {
451-
return err
450+
sortedTimes := o.startAt(j)
451+
sort.Slice(sortedTimes, func(i, j int) bool {
452+
return sortedTimes[i].Before(sortedTimes[j])
453+
})
454+
// keep only schedules that are in the future
455+
idx, found := slices.BinarySearchFunc(sortedTimes, now, timeCmp())
456+
if found {
457+
idx++
452458
}
453-
// in case we are not in the `startImmediately` case, our start-date must be in
454-
// the future according to the scheduler clock
455-
if !j.startImmediately && (j.startTime.IsZero() || j.startTime.Before(now)) {
459+
sortedTimes = sortedTimes[idx:]
460+
if !j.startImmediately && len(sortedTimes) == 0 {
456461
return ErrOneTimeJobStartDateTimePast
457462
}
463+
j.jobSchedule = oneTimeJob{sortedTimes: sortedTimes}
458464
return nil
459465
}
460466

461467
// OneTimeJobStartAtOption defines when the one time job is run
462-
type OneTimeJobStartAtOption func(*internalJob) error
468+
type OneTimeJobStartAtOption func(*internalJob) []time.Time
463469

464470
// OneTimeJobStartImmediately tells the scheduler to run the one time job immediately.
465471
func OneTimeJobStartImmediately() OneTimeJobStartAtOption {
466-
return func(j *internalJob) error {
472+
return func(j *internalJob) []time.Time {
467473
j.startImmediately = true
468-
return nil
474+
return []time.Time{}
469475
}
470476
}
471477

472478
// OneTimeJobStartDateTime sets the date & time at which the job should run.
473479
// This datetime must be in the future (according to the scheduler clock).
474480
func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption {
475-
return func(j *internalJob) error {
476-
j.startTime = start
477-
return nil
481+
return func(j *internalJob) []time.Time {
482+
return []time.Time{start}
483+
}
484+
}
485+
486+
// OneTimeJobStartDateTimes sets the date & times at which the job should run.
487+
// At least one of the date/times must be in the future (according to the scheduler clock).
488+
func OneTimeJobStartDateTimes(times ...time.Time) OneTimeJobStartAtOption {
489+
return func(j *internalJob) []time.Time {
490+
return times
478491
}
479492
}
480493

@@ -486,6 +499,18 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition {
486499
}
487500
}
488501

502+
func timeCmp() func(element time.Time, target time.Time) int {
503+
return func(element time.Time, target time.Time) int {
504+
if element.Equal(target) {
505+
return 0
506+
}
507+
if element.Before(target) {
508+
return -1
509+
}
510+
return 1
511+
}
512+
}
513+
489514
// -----------------------------------------------
490515
// -----------------------------------------------
491516
// ----------------- Job Options -----------------
@@ -876,10 +901,33 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass
876901

877902
var _ jobSchedule = (*oneTimeJob)(nil)
878903

879-
type oneTimeJob struct{}
904+
type oneTimeJob struct {
905+
sortedTimes []time.Time
906+
}
880907

881-
func (o oneTimeJob) next(_ time.Time) time.Time {
882-
return time.Time{}
908+
// next finds the next item in a sorted list of times using binary-search.
909+
//
910+
// example: sortedTimes: [2, 4, 6, 8]
911+
//
912+
// lastRun: 1 => [idx=0,found=false] => next is 2 - sorted[idx] idx=0
913+
// lastRun: 2 => [idx=0,found=true] => next is 4 - sorted[idx+1] idx=1
914+
// lastRun: 3 => [idx=1,found=false] => next is 4 - sorted[idx] idx=1
915+
// lastRun: 4 => [idx=1,found=true] => next is 6 - sorted[idx+1] idx=2
916+
// lastRun: 7 => [idx=3,found=false] => next is 8 - sorted[idx] idx=3
917+
// lastRun: 8 => [idx=3,found=found] => next is none
918+
// lastRun: 9 => [idx=3,found=found] => next is none
919+
func (o oneTimeJob) next(lastRun time.Time) time.Time {
920+
idx, found := slices.BinarySearchFunc(o.sortedTimes, lastRun, timeCmp())
921+
// if found, the next run is the following index
922+
if found {
923+
idx++
924+
}
925+
// exhausted runs
926+
if idx >= len(o.sortedTimes) {
927+
return time.Time{}
928+
}
929+
930+
return o.sortedTimes[idx]
883931
}
884932

885933
// -----------------------------------------------

scheduler_test.go

+182
Original file line numberDiff line numberDiff line change
@@ -2089,6 +2089,188 @@ func TestScheduler_OneTimeJob(t *testing.T) {
20892089
}
20902090
}
20912091

2092+
func TestScheduler_AtTimesJob(t *testing.T) {
2093+
defer verifyNoGoroutineLeaks(t)
2094+
2095+
n := time.Now().UTC()
2096+
2097+
tests := []struct {
2098+
name string
2099+
atTimes []time.Time
2100+
fakeClock clockwork.FakeClock
2101+
assertErr require.ErrorAssertionFunc
2102+
// asserts things about schedules, advance time and perform new assertions
2103+
advanceAndAsserts []func(
2104+
t *testing.T,
2105+
j Job,
2106+
clock clockwork.FakeClock,
2107+
runs *atomic.Uint32,
2108+
)
2109+
}{
2110+
{
2111+
name: "no at times",
2112+
atTimes: []time.Time{},
2113+
fakeClock: clockwork.NewFakeClock(),
2114+
assertErr: func(t require.TestingT, err error, i ...interface{}) {
2115+
require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast)
2116+
},
2117+
},
2118+
{
2119+
name: "all in the past",
2120+
atTimes: []time.Time{n.Add(-1 * time.Second)},
2121+
fakeClock: clockwork.NewFakeClockAt(n),
2122+
assertErr: func(t require.TestingT, err error, i ...interface{}) {
2123+
require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast)
2124+
},
2125+
},
2126+
{
2127+
name: "one run 1 millisecond in the future",
2128+
atTimes: []time.Time{n.Add(1 * time.Millisecond)},
2129+
fakeClock: clockwork.NewFakeClockAt(n),
2130+
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
2131+
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
2132+
require.Equal(t, uint32(0), runs.Load())
2133+
2134+
// last not initialized
2135+
lastRunAt, err := j.LastRun()
2136+
require.NoError(t, err)
2137+
require.Equal(t, time.Time{}, lastRunAt)
2138+
2139+
// next is now
2140+
nextRunAt, err := j.NextRun()
2141+
require.NoError(t, err)
2142+
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)
2143+
2144+
// advance and eventually run
2145+
clock.Advance(2 * time.Millisecond)
2146+
require.Eventually(t, func() bool {
2147+
return assert.Equal(t, uint32(1), runs.Load())
2148+
}, 3*time.Second, 100*time.Millisecond)
2149+
2150+
// last was run
2151+
lastRunAt, err = j.LastRun()
2152+
require.NoError(t, err)
2153+
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)
2154+
2155+
nextRunAt, err = j.NextRun()
2156+
require.NoError(t, err)
2157+
require.Equal(t, time.Time{}, nextRunAt)
2158+
},
2159+
},
2160+
},
2161+
{
2162+
name: "one run in the past and one in the future",
2163+
atTimes: []time.Time{n.Add(-1 * time.Millisecond), n.Add(1 * time.Millisecond)},
2164+
fakeClock: clockwork.NewFakeClockAt(n),
2165+
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
2166+
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
2167+
require.Equal(t, uint32(0), runs.Load())
2168+
2169+
// last not initialized
2170+
lastRunAt, err := j.LastRun()
2171+
require.NoError(t, err)
2172+
require.Equal(t, time.Time{}, lastRunAt)
2173+
2174+
// next is now
2175+
nextRunAt, err := j.NextRun()
2176+
require.NoError(t, err)
2177+
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)
2178+
2179+
// advance and eventually run
2180+
clock.Advance(2 * time.Millisecond)
2181+
require.Eventually(t, func() bool {
2182+
return assert.Equal(t, uint32(1), runs.Load())
2183+
}, 3*time.Second, 100*time.Millisecond)
2184+
2185+
// last was run
2186+
lastRunAt, err = j.LastRun()
2187+
require.NoError(t, err)
2188+
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)
2189+
},
2190+
},
2191+
},
2192+
{
2193+
name: "two runs in the future",
2194+
atTimes: []time.Time{n.Add(1 * time.Millisecond), n.Add(3 * time.Millisecond)},
2195+
fakeClock: clockwork.NewFakeClockAt(n),
2196+
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
2197+
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
2198+
require.Equal(t, uint32(0), runs.Load())
2199+
2200+
// last not initialized
2201+
lastRunAt, err := j.LastRun()
2202+
require.NoError(t, err)
2203+
require.Equal(t, time.Time{}, lastRunAt)
2204+
2205+
// next is now
2206+
nextRunAt, err := j.NextRun()
2207+
require.NoError(t, err)
2208+
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)
2209+
2210+
// advance and eventually run
2211+
clock.Advance(2 * time.Millisecond)
2212+
require.Eventually(t, func() bool {
2213+
return assert.Equal(t, uint32(1), runs.Load())
2214+
}, 3*time.Second, 100*time.Millisecond)
2215+
2216+
// last was run
2217+
lastRunAt, err = j.LastRun()
2218+
require.NoError(t, err)
2219+
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)
2220+
2221+
nextRunAt, err = j.NextRun()
2222+
require.NoError(t, err)
2223+
require.Equal(t, n.Add(3*time.Millisecond), nextRunAt)
2224+
},
2225+
2226+
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
2227+
// advance and eventually run
2228+
clock.Advance(2 * time.Millisecond)
2229+
require.Eventually(t, func() bool {
2230+
return assert.Equal(t, uint32(2), runs.Load())
2231+
}, 3*time.Second, 100*time.Millisecond)
2232+
2233+
// last was run
2234+
lastRunAt, err := j.LastRun()
2235+
require.NoError(t, err)
2236+
require.WithinDuration(t, n.Add(3*time.Millisecond), lastRunAt, 1*time.Millisecond)
2237+
2238+
nextRunAt, err := j.NextRun()
2239+
require.NoError(t, err)
2240+
require.Equal(t, time.Time{}, nextRunAt)
2241+
},
2242+
},
2243+
},
2244+
}
2245+
2246+
for _, tt := range tests {
2247+
t.Run(tt.name, func(t *testing.T) {
2248+
s := newTestScheduler(t, WithClock(tt.fakeClock))
2249+
t.Cleanup(func() {
2250+
require.NoError(t, s.Shutdown())
2251+
})
2252+
2253+
runs := atomic.Uint32{}
2254+
j, err := s.NewJob(
2255+
OneTimeJob(OneTimeJobStartDateTimes(tt.atTimes...)),
2256+
NewTask(func() {
2257+
runs.Add(1)
2258+
}),
2259+
)
2260+
if tt.assertErr != nil {
2261+
tt.assertErr(t, err)
2262+
} else {
2263+
require.NoError(t, err)
2264+
s.Start()
2265+
2266+
for _, advanceAndAssert := range tt.advanceAndAsserts {
2267+
advanceAndAssert(t, j, tt.fakeClock, &runs)
2268+
}
2269+
}
2270+
})
2271+
}
2272+
}
2273+
20922274
func TestScheduler_WithLimitedRuns(t *testing.T) {
20932275
defer verifyNoGoroutineLeaks(t)
20942276

0 commit comments

Comments
 (0)