This repository has been archived by the owner on Feb 19, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 51
/
scheduler.go
214 lines (183 loc) · 6.12 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
// Package scheduler is a small library that you can use within your application that enables you to execute callbacks (goroutines) after a pre-defined amount of time.
// GTS also provides task storage which is used to invoke callbacks for tasks which couldn’t be executed during down-time as well
// as maintaining a history of the callbacks that got executed.
package scheduler
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/rakanalh/scheduler/storage"
"github.com/rakanalh/scheduler/task"
)
// Scheduler is used to schedule tasks. It holds information about those tasks
// including metadata such as argument types and schedule times
type Scheduler struct {
funcRegistry *task.FuncRegistry
stopChan chan bool
tasks map[task.ID]*task.Task
taskStore storeBridge
}
// New will return a new instance of the Scheduler struct.
func New(store storage.TaskStore) Scheduler {
funcRegistry := task.NewFuncRegistry()
return Scheduler{
funcRegistry: funcRegistry,
stopChan: make(chan bool),
tasks: make(map[task.ID]*task.Task),
taskStore: storeBridge{
store: store,
funcRegistry: funcRegistry,
},
}
}
// RunAt will schedule function to be executed once at the given time.
func (scheduler *Scheduler) RunAt(time time.Time, function task.Function, params ...task.Param) (task.ID, error) {
funcMeta, err := scheduler.funcRegistry.Add(function)
if err != nil {
return "", err
}
task := task.New(funcMeta, params)
task.NextRun = time
scheduler.registerTask(task)
return task.Hash(), nil
}
// RunAfter executes function once after a specific duration has elapsed.
func (scheduler *Scheduler) RunAfter(duration time.Duration, function task.Function, params ...task.Param) (task.ID, error) {
return scheduler.RunAt(time.Now().Add(duration), function, params...)
}
// RunEvery will schedule function to be executed every time the duration has elapsed.
func (scheduler *Scheduler) RunEvery(duration time.Duration, function task.Function, params ...task.Param) (task.ID, error) {
funcMeta, err := scheduler.funcRegistry.Add(function)
if err != nil {
return "", err
}
task := task.New(funcMeta, params)
task.IsRecurring = true
task.Duration = duration
task.NextRun = time.Now().Add(duration)
scheduler.registerTask(task)
return task.Hash(), nil
}
// Start will run the scheduler's timer and will trigger the execution
// of tasks depending on their schedule.
func (scheduler *Scheduler) Start() error {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Populate tasks from storage
if err := scheduler.populateTasks(); err != nil {
return err
}
if err := scheduler.persistRegisteredTasks(); err != nil {
return err
}
scheduler.runPending()
go func() {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
scheduler.runPending()
case <-sigChan:
scheduler.stopChan <- true
case <-scheduler.stopChan:
close(scheduler.stopChan)
}
}
}()
return nil
}
// Stop will put the scheduler to halt
func (scheduler *Scheduler) Stop() {
scheduler.taskStore.store.Close()
scheduler.stopChan <- true
}
// Wait is a convenience function for blocking until the scheduler is stopped.
func (scheduler *Scheduler) Wait() {
<-scheduler.stopChan
}
// Cancel is used to cancel the planned execution of a specific task using it's ID.
// The ID is returned when the task was scheduled using RunAt, RunAfter or RunEvery
func (scheduler *Scheduler) Cancel(taskID task.ID) error {
task, found := scheduler.tasks[taskID]
if !found {
return fmt.Errorf("Task not found")
}
_ = scheduler.taskStore.Remove(task)
delete(scheduler.tasks, taskID)
return nil
}
// Clear will cancel the execution and clear all registered tasks.
func (scheduler *Scheduler) Clear() {
for taskID, currentTask := range scheduler.tasks {
_ = scheduler.taskStore.Remove(currentTask)
delete(scheduler.tasks, taskID)
}
scheduler.funcRegistry = task.NewFuncRegistry()
}
func (scheduler *Scheduler) populateTasks() error {
tasks, err := scheduler.taskStore.Fetch()
if err != nil {
return err
}
for _, dbTask := range tasks {
// If we can't find the function, it's been changed/removed by user
exists := scheduler.funcRegistry.Exists(dbTask.Func.Name)
if !exists {
log.Printf("%s was not found, it will be removed\n", dbTask.Func.Name)
_ = scheduler.taskStore.Remove(dbTask)
continue
}
// If the task instance is still registered with the same computed hash then move on.
// Otherwise, one of the attributes changed and therefore, the task instance should
// be added to the list of tasks to be executed with the stored params
registeredTask, ok := scheduler.tasks[dbTask.Hash()]
if !ok {
log.Printf("Detected a change in attributes of one of the instances of task %s, \n",
dbTask.Func.Name)
dbTask.Func, _ = scheduler.funcRegistry.Get(dbTask.Func.Name)
registeredTask = dbTask
scheduler.tasks[dbTask.Hash()] = registeredTask
}
// Skip task which is not a recurring one and the NextRun has already passed
if !dbTask.IsRecurring && dbTask.NextRun.Before(time.Now()) {
// We might have a task instance which was executed already.
// In this case, delete it.
_ = scheduler.taskStore.Remove(dbTask)
delete(scheduler.tasks, dbTask.Hash())
continue
}
// Duration may have changed for recurring tasks
if dbTask.IsRecurring && registeredTask.Duration != dbTask.Duration {
// Reschedule NextRun based on dbTask.LastRun + registeredTask.Duration
registeredTask.NextRun = dbTask.LastRun.Add(registeredTask.Duration)
}
}
return nil
}
func (scheduler *Scheduler) persistRegisteredTasks() error {
for _, task := range scheduler.tasks {
err := scheduler.taskStore.Add(task)
if err != nil {
return err
}
}
return nil
}
func (scheduler *Scheduler) runPending() {
for _, task := range scheduler.tasks {
if task.IsDue() {
go task.Run()
if !task.IsRecurring {
_ = scheduler.taskStore.Remove(task)
delete(scheduler.tasks, task.Hash())
}
}
}
}
func (scheduler *Scheduler) registerTask(task *task.Task) {
_, _ = scheduler.funcRegistry.Add(task.Func)
scheduler.tasks[task.Hash()] = task
}