Skip to content

Commit 3b2dcd8

Browse files
authored
issue-654: allow setting a stopTime for job. (#760)
1 parent 256265f commit 3b2dcd8

File tree

5 files changed

+80
-0
lines changed

5 files changed

+80
-0
lines changed

errors.go

+3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ var (
4545
ErrWithMonitorNil = fmt.Errorf("gocron: WithMonitor: monitor must not be nil")
4646
ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty")
4747
ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past")
48+
ErrWithStopDateTimePast = fmt.Errorf("gocron: WithStopDateTime: end must not be in the past")
49+
ErrStartTimeLaterThanEndTime = fmt.Errorf("gocron: WithStartDateTime: start must not be later than end")
50+
ErrStopTimeEarlierThanStartTime = fmt.Errorf("gocron: WithStopDateTime: end must not be earlier than start")
4851
ErrWithStopTimeoutZeroOrNegative = fmt.Errorf("gocron: WithStopTimeout: timeout must be greater than 0")
4952
)
5053

executor.go

+4
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,10 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
358358
default:
359359
}
360360

361+
if j.stopTimeReached(e.clock.Now()) {
362+
return
363+
}
364+
361365
if e.elector != nil {
362366
if err := e.elector.IsLeader(j.ctx); err != nil {
363367
e.sendOutForRescheduling(&jIn)

job.go

+38
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type internalJob struct {
3838
limitRunsTo *limitRunsTo
3939
startTime time.Time
4040
startImmediately bool
41+
stopTime time.Time
4142
// event listeners
4243
afterJobRuns func(jobID uuid.UUID, jobName string)
4344
beforeJobRuns func(jobID uuid.UUID, jobName string)
@@ -60,6 +61,13 @@ func (j *internalJob) stop() {
6061
j.cancel()
6162
}
6263

64+
func (j *internalJob) stopTimeReached(now time.Time) bool {
65+
if j.stopTime.IsZero() {
66+
return false
67+
}
68+
return j.stopTime.Before(now)
69+
}
70+
6371
// task stores the function and parameters
6472
// that are actually run when the job is executed.
6573
type task struct {
@@ -594,11 +602,41 @@ func WithStartDateTime(start time.Time) StartAtOption {
594602
if start.IsZero() || start.Before(now) {
595603
return ErrWithStartDateTimePast
596604
}
605+
if !j.stopTime.IsZero() && j.stopTime.Before(start) {
606+
return ErrStartTimeLaterThanEndTime
607+
}
597608
j.startTime = start
598609
return nil
599610
}
600611
}
601612

613+
// WithStopAt sets the option for stopping the job from running
614+
// after the specified time.
615+
func WithStopAt(option StopAtOption) JobOption {
616+
return func(j *internalJob, now time.Time) error {
617+
return option(j, now)
618+
}
619+
}
620+
621+
// StopAtOption defines options for stopping the job
622+
type StopAtOption func(*internalJob, time.Time) error
623+
624+
// WithStopDateTime sets the final date & time after which the job should stop.
625+
// This must be in the future and should be after the startTime (if specified).
626+
// The job's final run may be at the stop time, but not after.
627+
func WithStopDateTime(end time.Time) StopAtOption {
628+
return func(j *internalJob, now time.Time) error {
629+
if end.IsZero() || end.Before(now) {
630+
return ErrWithStopDateTimePast
631+
}
632+
if end.Before(j.startTime) {
633+
return ErrStopTimeEarlierThanStartTime
634+
}
635+
j.stopTime = end
636+
return nil
637+
}
638+
}
639+
602640
// WithTags sets the tags for the job. Tags provide
603641
// a way to identify jobs by a set of tags and remove
604642
// multiple jobs by tag.

scheduler.go

+4
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,10 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
325325
return
326326
}
327327

328+
if j.stopTimeReached(s.now()) {
329+
return
330+
}
331+
328332
scheduleFrom := j.lastRun
329333
if len(j.nextScheduled) > 0 {
330334
// always grab the last element in the slice as that is the furthest

scheduler_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,21 @@ func TestScheduler_LongRunningJobs(t *testing.T) {
150150
options []SchedulerOption
151151
expectedRuns int
152152
}{
153+
{
154+
"duration with stop time between executions",
155+
durationCh,
156+
DurationJob(
157+
time.Millisecond * 500,
158+
),
159+
NewTask(
160+
func() {
161+
time.Sleep(1 * time.Second)
162+
durationCh <- struct{}{}
163+
}),
164+
[]JobOption{WithStopAt(WithStopDateTime(time.Now().Add(time.Millisecond * 1100)))},
165+
[]SchedulerOption{WithStopTimeout(time.Second * 2)},
166+
2,
167+
},
153168
{
154169
"duration",
155170
durationCh,
@@ -755,6 +770,22 @@ func TestScheduler_NewJobErrors(t *testing.T) {
755770
[]JobOption{WithStartAt(WithStartDateTime(time.Now().Add(-time.Second)))},
756771
ErrWithStartDateTimePast,
757772
},
773+
{
774+
"WithStartDateTime is later than the end",
775+
DurationJob(
776+
time.Second,
777+
),
778+
[]JobOption{WithStopAt(WithStopDateTime(time.Now().Add(time.Second))), WithStartAt(WithStartDateTime(time.Now().Add(time.Hour)))},
779+
ErrStartTimeLaterThanEndTime,
780+
},
781+
{
782+
"WithStopDateTime is earlier than the start",
783+
DurationJob(
784+
time.Second,
785+
),
786+
[]JobOption{WithStartAt(WithStartDateTime(time.Now().Add(time.Hour))), WithStopAt(WithStopDateTime(time.Now().Add(time.Second)))},
787+
ErrStopTimeEarlierThanStartTime,
788+
},
758789
{
759790
"oneTimeJob start at is zero",
760791
OneTimeJob(OneTimeJobStartDateTime(time.Time{})),

0 commit comments

Comments
 (0)