diff --git a/node/cron/cron.go b/node/cron/cron.go index 8ba6cb9..c7fae00 100644 --- a/node/cron/cron.go +++ b/node/cron/cron.go @@ -209,69 +209,57 @@ func (c *Cron) reIndex() { // access to the 'running' state variable. func (c *Cron) run() { // Figure out the next activation times for each entry. - now := time.Now().In(c.location) + now := c.now() for _, entry := range c.entries { entry.Next = entry.Schedule.Next(now) } - timer := time.NewTimer(time.Minute) for { // Determine the next entry to run. sort.Sort(byTime(c.entries)) - c.reIndex() - var effective time.Time + var timer *time.Timer if len(c.entries) == 0 || c.entries[0].Next.IsZero() { // If there are no entries yet, just sleep - it still handles new entries // and stop requests. - effective = now.AddDate(10, 0, 0) + timer = time.NewTimer(100000 * time.Hour) } else { - effective = c.entries[0].Next + timer = time.NewTimer(c.entries[0].Next.Sub(now)) } - timer.Reset(effective.Sub(now)) - select { - case now = <-timer.C: - now = now.In(c.location) - // Run every entry whose next time was this effective time. - for _, e := range c.entries { - if e.Next != effective { - break + for { + select { + case now = <-timer.C: + now = now.In(c.location) + // Run every entry whose next time was less than now + for _, e := range c.entries { + if e.Next.After(now) || e.Next.IsZero() { + break + } + go c.runWithRecovery(e.Job) + e.Prev = e.Next + e.Next = e.Schedule.Next(now) } - go c.runWithRecovery(e.Job) - e.Prev = e.Next - e.Next = e.Schedule.Next(now) - } - continue - case newEntry := <-c.add: - if index, ok := c.indexes[newEntry.ID]; ok { - c.entries[index] = newEntry - } else { - c.entries, c.indexes[newEntry.ID] = append(c.entries, newEntry), len(c.entries) - } - newEntry.Next = newEntry.Schedule.Next(time.Now().In(c.location)) + case newEntry := <-c.add: + timer.Stop() + now = c.now() + newEntry.Next = newEntry.Schedule.Next(now) + c.entries = append(c.entries, newEntry) - case id := <-c.del: - index, ok := c.indexes[id] - if !ok { + case <-c.snapshot: + c.snapshot <- c.entrySnapshot() continue - } - c.entries = append(c.entries[:index], c.entries[index+1:]...) - delete(c.indexes, id) - - case <-c.snapshot: - c.snapshot <- c.entrySnapshot() + case <-c.stop: + timer.Stop() + return + } - case <-c.stop: - timer.Stop() - return + break } - - // 'now' should be updated after newEntry and snapshot cases. - now = time.Now().In(c.location) } + } // Logs an error to stderr or to the configured error log @@ -305,3 +293,8 @@ func (c *Cron) entrySnapshot() []*Entry { } return entries } + +// now returns current time in c location +func (c *Cron) now() time.Time { + return time.Now().In(c.location) +}