Skip to content

Commit bab91cb

Browse files
maennchenc-rack
authored andcommitted
Solution: Storage Adapter (#313)
1 parent 5c2eb20 commit bab91cb

13 files changed

+516
-42
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](http://semver.org/).
44

55
## Unreleased
66

7+
### Added
8+
- Experimental Storage API
9+
710
Diff for [unreleased]
811

912
## 2.2.7 - 2018-03-22

lib/quantum.ex

+3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule Quantum do
88
alias Quantum.Normalizer
99
alias Quantum.Job
1010
alias Quantum.RunStrategy.Random
11+
alias Quantum.Storage.Noop
1112

1213
@defaults [
1314
global: false,
@@ -46,13 +47,15 @@ defmodule Quantum do
4647
task_supervisor = Module.concat(quantum, Task.Supervisor)
4748

4849
config
50+
|> Keyword.put_new(:quantum, quantum)
4951
|> update_in([:schedule], &Normalizer.normalize_schedule/1)
5052
|> Keyword.put_new(:task_stages_supervisor, task_stages_supervisor)
5153
|> Keyword.put_new(:job_broadcaster, job_broadcaster)
5254
|> Keyword.put_new(:execution_broadcaster, execution_broadcaster)
5355
|> Keyword.put_new(:executor_supervisor, executor_supervisor)
5456
|> Keyword.put_new(:task_registry, task_registry)
5557
|> Keyword.put_new(:task_supervisor, task_supervisor)
58+
|> Keyword.put_new(:storage, Noop)
5659
end
5760

5861
@doc """

lib/quantum/execution_broadcaster.ex

+58-11
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ defmodule Quantum.ExecutionBroadcaster do
88
require Logger
99

1010
alias Quantum.{Job, Util, DateLibrary}
11-
alias Crontab.{Scheduler, CronExpression}
1211
alias Quantum.DateLibrary.{InvalidDateTimeForTimezoneError, InvalidTimezoneError}
12+
alias Crontab.Scheduler, as: CrontabScheduler
13+
alias Crontab.CronExpression
14+
alias Quantum.Storage.Adapter
15+
alias Quantum.Scheduler
1316

1417
defmodule JobInPastError do
1518
defexception message:
@@ -25,22 +28,57 @@ defmodule Quantum.ExecutionBroadcaster do
2528
* `job_broadcaster` - The name of the stage to listen to
2629
2730
"""
28-
@spec start_link(GenServer.server(), GenServer.server(), boolean()) :: GenServer.on_start()
29-
def start_link(name, job_broadcaster, debug_logging) do
31+
@spec start_link(GenServer.server(), GenServer.server(), Adapter, Scheduler, boolean()) ::
32+
GenServer.on_start()
33+
def start_link(name, job_broadcaster, storage, scheduler, debug_logging) do
3034
__MODULE__
31-
|> GenStage.start_link({job_broadcaster, debug_logging}, name: name)
35+
|> GenStage.start_link({job_broadcaster, storage, scheduler, debug_logging}, name: name)
3236
|> Util.start_or_link()
3337
end
3438

3539
@doc false
36-
@spec child_spec({GenServer.server(), GenServer.server(), boolean()}) :: Supervisor.child_spec()
37-
def child_spec({name, job_broadcaster, debug_logging}) do
38-
%{super([]) | start: {__MODULE__, :start_link, [name, job_broadcaster, debug_logging]}}
40+
@spec child_spec({GenServer.server(), GenServer.server(), Adapter, Scheduler, boolean()}) ::
41+
Supervisor.child_spec()
42+
def child_spec({name, job_broadcaster, storage, scheduler, debug_logging}) do
43+
%{
44+
super([])
45+
| start:
46+
{__MODULE__, :start_link, [name, job_broadcaster, storage, scheduler, debug_logging]}
47+
}
3948
end
4049

4150
@doc false
42-
def init({job_broadcaster, debug_logging}) do
43-
state = %{jobs: [], time: NaiveDateTime.utc_now(), timer: nil, debug_logging: debug_logging}
51+
def init({job_broadcaster, storage, scheduler, debug_logging}) do
52+
last_execution_date =
53+
case storage.last_execution_date(scheduler) do
54+
%NaiveDateTime{} = date ->
55+
debug_logging &&
56+
Logger.debug(fn ->
57+
"[#{inspect(Node.self())}][#{__MODULE__}] Using last known execution time #{
58+
NaiveDateTime.to_iso8601(date)
59+
}"
60+
end)
61+
62+
date
63+
64+
:unknown ->
65+
debug_logging &&
66+
Logger.debug(fn ->
67+
"[#{inspect(Node.self())}][#{__MODULE__}] Unknown last execution time, using now"
68+
end)
69+
70+
NaiveDateTime.utc_now()
71+
end
72+
73+
state = %{
74+
jobs: [],
75+
time: last_execution_date,
76+
timer: nil,
77+
storage: storage,
78+
scheduler: scheduler,
79+
debug_logging: debug_logging
80+
}
81+
4482
{:producer_consumer, state, subscribe_to: [job_broadcaster]}
4583
end
4684

@@ -71,8 +109,15 @@ defmodule Quantum.ExecutionBroadcaster do
71109

72110
def handle_info(
73111
:execute,
74-
%{jobs: [{time_to_execute, jobs_to_execute} | tail], debug_logging: debug_logging} = state
112+
%{
113+
jobs: [{time_to_execute, jobs_to_execute} | tail],
114+
storage: storage,
115+
scheduler: scheduler,
116+
debug_logging: debug_logging
117+
} = state
75118
) do
119+
:ok = storage.update_last_execution_date(scheduler, time_to_execute)
120+
76121
state =
77122
state
78123
|> Map.put(:timer, nil)
@@ -126,6 +171,8 @@ defmodule Quantum.ExecutionBroadcaster do
126171
end)
127172

128173
%{state | jobs: jobs}
174+
|> sort_state
175+
|> reset_timer
129176
end
130177

131178
defp add_job_to_state(
@@ -162,7 +209,7 @@ defmodule Quantum.ExecutionBroadcaster do
162209
time
163210
) do
164211
schedule
165-
|> Scheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone))
212+
|> CrontabScheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone))
166213
|> case do
167214
{:ok, date} ->
168215
{:ok, DateLibrary.to_utc!(date, timezone)}

lib/quantum/job.ex

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ defmodule Quantum.Job do
2525
state: :active
2626
]
2727

28+
@type name :: atom | reference()
2829
@type state :: :active | :inactive
2930
@type task :: {atom, atom, [any]} | (() -> any)
3031
@type timezone :: :utc | :local | String.t()
3132
@type schedule :: Crontab.CronExpression.t()
3233

3334
@type t :: %__MODULE__{
34-
name: atom | Reference,
35+
name: name,
3536
schedule: schedule | nil,
3637
task: task | nil,
3738
state: state,

lib/quantum/job_broadcaster.ex

+67-15
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ defmodule Quantum.JobBroadcaster do
77

88
require Logger
99

10-
alias Quantum.{Job, Util}
10+
alias Quantum.{Job, Util, Scheduler}
11+
alias Quantum.Storage.Adapter
1112

1213
@doc """
1314
Start Job Broadcaster
@@ -18,24 +19,52 @@ defmodule Quantum.JobBroadcaster do
1819
* `jobs` - Array of `Quantum.Job`
1920
2021
"""
21-
@spec start_link(GenServer.server(), [Job.t()], boolean()) :: GenServer.on_start()
22-
def start_link(name, jobs, debug_logging) do
22+
@spec start_link(GenServer.server(), [Job.t()], Adapter, Scheduler, boolean()) ::
23+
GenServer.on_start()
24+
def start_link(name, jobs, storage, scheduler, debug_logging) do
2325
__MODULE__
24-
|> GenStage.start_link({jobs, debug_logging}, name: name)
26+
|> GenStage.start_link({jobs, storage, scheduler, debug_logging}, name: name)
2527
|> Util.start_or_link()
2628
end
2729

2830
@doc false
29-
@spec child_spec({GenServer.server(), [Job.t()], boolean()}) :: Supervisor.child_spec()
30-
def child_spec({name, jobs, debug_logging}) do
31-
%{super([]) | start: {__MODULE__, :start_link, [name, jobs, debug_logging]}}
31+
@spec child_spec({GenServer.server(), [Job.t()], Adapter, Scheduler, boolean()}) ::
32+
Supervisor.child_spec()
33+
def child_spec({name, jobs, storage, scheduler, debug_logging}) do
34+
%{
35+
super([])
36+
| start: {__MODULE__, :start_link, [name, jobs, storage, scheduler, debug_logging]}
37+
}
3238
end
3339

3440
@doc false
35-
def init({jobs, debug_logging}) do
41+
def init({jobs, storage, scheduler, debug_logging}) do
42+
effective_jobs =
43+
scheduler
44+
|> storage.jobs()
45+
|> case do
46+
:not_applicable ->
47+
debug_logging &&
48+
Logger.debug(fn ->
49+
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Config"
50+
end)
51+
52+
jobs
53+
54+
storage_jobs when is_list(storage_jobs) ->
55+
debug_logging &&
56+
Logger.debug(fn ->
57+
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config"
58+
end)
59+
60+
storage_jobs
61+
end
62+
3663
state = %{
37-
jobs: Enum.into(jobs, %{}, fn %{name: name} = job -> {name, job} end),
38-
buffer: for(%{state: :active} = job <- jobs, do: {:add, job}),
64+
jobs: Enum.into(effective_jobs, %{}, fn %{name: name} = job -> {name, job} end),
65+
buffer: for(%{state: :active} = job <- effective_jobs, do: {:add, job}),
66+
storage: storage,
67+
scheduler: scheduler,
3968
debug_logging: debug_logging
4069
}
4170

@@ -50,39 +79,53 @@ defmodule Quantum.JobBroadcaster do
5079

5180
def handle_cast(
5281
{:add, %Job{state: :active, name: job_name} = job},
53-
%{jobs: jobs, debug_logging: debug_logging} = state
82+
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
83+
state
5484
) do
5585
debug_logging &&
5686
Logger.debug(fn ->
5787
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
5888
end)
5989

90+
:ok = storage.add_job(scheduler, job)
91+
6092
{:noreply, [{:add, job}], %{state | jobs: Map.put(jobs, job_name, job)}}
6193
end
6294

6395
def handle_cast(
6496
{:add, %Job{state: :inactive, name: job_name} = job},
65-
%{jobs: jobs, debug_logging: debug_logging} = state
97+
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
98+
state
6699
) do
67100
debug_logging &&
68101
Logger.debug(fn ->
69102
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
70103
end)
71104

105+
:ok = storage.add_job(scheduler, job)
106+
72107
{:noreply, [], %{state | jobs: Map.put(jobs, job_name, job)}}
73108
end
74109

75-
def handle_cast({:delete, name}, %{jobs: jobs, debug_logging: debug_logging} = state) do
110+
def handle_cast(
111+
{:delete, name},
112+
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
113+
state
114+
) do
76115
debug_logging &&
77116
Logger.debug(fn ->
78117
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting job #{inspect(name)}"
79118
end)
80119

81120
case Map.fetch(jobs, name) do
82121
{:ok, %{state: :active}} ->
122+
:ok = storage.delete_job(scheduler, name)
123+
83124
{:noreply, [{:remove, name}], %{state | jobs: Map.delete(jobs, name)}}
84125

85126
{:ok, %{state: :inactive}} ->
127+
:ok = storage.delete_job(scheduler, name)
128+
86129
{:noreply, [], %{state | jobs: Map.delete(jobs, name)}}
87130

88131
:error ->
@@ -92,7 +135,8 @@ defmodule Quantum.JobBroadcaster do
92135

93136
def handle_cast(
94137
{:change_state, name, new_state},
95-
%{jobs: jobs, debug_logging: debug_logging} = state
138+
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
139+
state
96140
) do
97141
debug_logging &&
98142
Logger.debug(fn ->
@@ -109,6 +153,8 @@ defmodule Quantum.JobBroadcaster do
109153
{:ok, job} ->
110154
jobs = Map.update!(jobs, name, &Job.set_state(&1, new_state))
111155

156+
:ok = storage.update_job_state(scheduler, job.name, new_state)
157+
112158
case new_state do
113159
:active ->
114160
{:noreply, [{:add, %{job | state: new_state}}], %{state | jobs: jobs}}
@@ -119,14 +165,20 @@ defmodule Quantum.JobBroadcaster do
119165
end
120166
end
121167

122-
def handle_cast(:delete_all, %{jobs: jobs, debug_logging: debug_logging} = state) do
168+
def handle_cast(
169+
:delete_all,
170+
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
171+
state
172+
) do
123173
debug_logging &&
124174
Logger.debug(fn ->
125175
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs"
126176
end)
127177

128178
messages = for {name, %Job{state: :active}} <- jobs, do: {:remove, name}
129179

180+
:ok = storage.purge(scheduler)
181+
130182
{:noreply, messages, %{state | jobs: %{}}}
131183
end
132184

lib/quantum/storage/adapter.ex

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
defmodule Quantum.Storage.Adapter do
2+
@moduledoc """
3+
Bahaviour to be implemented by all Storage Adapters.
4+
5+
**WARNING: This Adapter is experimental and will therefore not adhere to semantic versioning.
6+
It could undergo massive changes even in patch releases.**
7+
"""
8+
9+
alias Quantum.Job
10+
11+
@typedoc """
12+
The calling scheduler Module
13+
"""
14+
@type scheduler_module :: atom
15+
16+
@typedoc """
17+
The expected return is `:ok`, every other result will terminate the scheduler.
18+
"""
19+
@type ok :: :ok
20+
21+
@doc """
22+
Load saved jobs from storage
23+
24+
Returns `:not_applicable` if the storage has never received an `add_job` call or after it has been purged.
25+
In this case the jobs from the configuration weill be loaded.
26+
"""
27+
@callback jobs(scheduler_module) :: :not_applicable | [Job.t()]
28+
29+
@doc """
30+
Save new job in storage.
31+
"""
32+
@callback add_job(scheduler_module, job :: Job.t()) :: ok
33+
34+
@doc """
35+
Delete new job in storage.
36+
"""
37+
@callback delete_job(scheduler_module, job :: Job.name()) :: ok
38+
39+
@doc """
40+
Change Job State from given job.
41+
"""
42+
@callback update_job_state(scheduler_module, job :: Job.name(), state :: Job.state()) :: ok
43+
44+
@doc """
45+
Load last execution time from storage
46+
47+
Returns `:unknown` if the storage does not know the last execution time.
48+
In this case all jobs will be run at the next applicable date.
49+
"""
50+
@callback last_execution_date(scheduler_module) :: :unknown | NaiveDateTime.t()
51+
52+
@doc """
53+
Update last execution time to given date.
54+
"""
55+
@callback update_last_execution_date(
56+
scheduler_module,
57+
last_execution_date :: NaiveDateTime.t()
58+
) :: ok
59+
60+
@doc """
61+
Purge all date from storage and go back to initial state.
62+
"""
63+
@callback purge(scheduler_module) :: ok
64+
end

0 commit comments

Comments
 (0)