Skip to content

Commit b1202e0

Browse files
committed
Solution: Storage Adapter
1 parent adf3b66 commit b1202e0

12 files changed

+495
-43
lines changed

Diff for: lib/quantum.ex

+3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule Quantum do
88
alias Quantum.Normalizer
99
alias Quantum.Job
1010
alias Quantum.RunStrategy.Random
11+
alias Quantum.Storage.Noop
1112

1213
@defaults [
1314
global: false,
@@ -45,13 +46,15 @@ defmodule Quantum do
4546
task_supervisor = Module.concat(quantum, Task.Supervisor)
4647

4748
config
49+
|> Keyword.put_new(:quantum, quantum)
4850
|> update_in([:schedule], &Normalizer.normalize_schedule/1)
4951
|> Keyword.put_new(:task_stages_supervisor, task_stages_supervisor)
5052
|> Keyword.put_new(:job_broadcaster, job_broadcaster)
5153
|> Keyword.put_new(:execution_broadcaster, execution_broadcaster)
5254
|> Keyword.put_new(:executor_supervisor, executor_supervisor)
5355
|> Keyword.put_new(:task_registry, task_registry)
5456
|> Keyword.put_new(:task_supervisor, task_supervisor)
57+
|> Keyword.put_new(:storage, Noop)
5558
end
5659

5760
@doc """

Diff for: lib/quantum/execution_broadcaster.ex

+53-11
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ defmodule Quantum.ExecutionBroadcaster do
88
require Logger
99

1010
alias Quantum.{Job, Util, DateLibrary}
11-
alias Crontab.{Scheduler, CronExpression}
1211
alias Quantum.DateLibrary.{InvalidDateTimeForTimezoneError, InvalidTimezoneError}
12+
alias Crontab.Scheduler, as: CrontabScheduler
13+
alias Crontab.CronExpression
14+
alias Quantum.Storage.Adapter
15+
alias Quantum.Scheduler
1316

1417
defmodule JobInPastError do
1518
defexception message:
@@ -25,22 +28,50 @@ defmodule Quantum.ExecutionBroadcaster do
2528
* `job_broadcaster` - The name of the stage to listen to
2629
2730
"""
28-
@spec start_link(GenServer.server(), GenServer.server()) :: GenServer.on_start()
29-
def start_link(name, job_broadcaster) do
31+
@spec start_link(GenServer.server(), GenServer.server(), Adapter, Scheduler) ::
32+
GenServer.on_start()
33+
def start_link(name, job_broadcaster, storage, scheduler) do
3034
__MODULE__
31-
|> GenStage.start_link(job_broadcaster, name: name)
35+
|> GenStage.start_link({job_broadcaster, storage, scheduler}, name: name)
3236
|> Util.start_or_link()
3337
end
3438

3539
@doc false
36-
@spec child_spec({GenServer.server(), GenServer.server()}) :: Supervisor.child_spec()
37-
def child_spec({name, job_broadcaster}) do
38-
%{super([]) | start: {__MODULE__, :start_link, [name, job_broadcaster]}}
40+
@spec child_spec({GenServer.server(), GenServer.server(), Adapter, Scheduler}) ::
41+
Supervisor.child_spec()
42+
def child_spec({name, job_broadcaster, storage, scheduler}) do
43+
%{super([]) | start: {__MODULE__, :start_link, [name, job_broadcaster, storage, scheduler]}}
3944
end
4045

4146
@doc false
42-
def init(job_broadcaster) do
43-
state = %{jobs: [], time: NaiveDateTime.utc_now(), timer: nil}
47+
def init({job_broadcaster, storage, scheduler}) do
48+
last_execution_date =
49+
case storage.last_execution_date(scheduler) do
50+
%NaiveDateTime{} = date ->
51+
Logger.debug(fn ->
52+
"[#{inspect(Node.self())}][#{__MODULE__}] Using last known execution time #{
53+
NaiveDateTime.to_iso8601(date)
54+
}"
55+
end)
56+
57+
date
58+
59+
:unknown ->
60+
Logger.debug(fn ->
61+
"[#{inspect(Node.self())}][#{__MODULE__}] Unknown last execution time, using now"
62+
end)
63+
64+
NaiveDateTime.utc_now()
65+
end
66+
67+
state = %{
68+
jobs: [],
69+
time: last_execution_date,
70+
timer: nil,
71+
storage: storage,
72+
scheduler: scheduler
73+
}
74+
4475
{:producer_consumer, state, subscribe_to: [job_broadcaster]}
4576
end
4677

@@ -68,7 +99,16 @@ defmodule Quantum.ExecutionBroadcaster do
6899
{:noreply, reboot_add_events, state}
69100
end
70101

71-
def handle_info(:execute, %{jobs: [{time_to_execute, jobs_to_execute} | tail]} = state) do
102+
def handle_info(
103+
:execute,
104+
%{
105+
jobs: [{time_to_execute, jobs_to_execute} | tail],
106+
storage: storage,
107+
scheduler: scheduler
108+
} = state
109+
) do
110+
:ok = storage.update_last_execution_date(scheduler, time_to_execute)
111+
72112
state =
73113
state
74114
|> Map.put(:timer, nil)
@@ -119,6 +159,8 @@ defmodule Quantum.ExecutionBroadcaster do
119159
end)
120160

121161
%{state | jobs: jobs}
162+
|> sort_state
163+
|> reset_timer
122164
end
123165

124166
defp add_job_to_state(
@@ -155,7 +197,7 @@ defmodule Quantum.ExecutionBroadcaster do
155197
time
156198
) do
157199
schedule
158-
|> Scheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone))
200+
|> CrontabScheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone))
159201
|> case do
160202
{:ok, date} ->
161203
{:ok, DateLibrary.to_utc!(date, timezone)}

Diff for: lib/quantum/job.ex

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ defmodule Quantum.Job do
2525
state: :active
2626
]
2727

28+
@type name :: atom | reference()
2829
@type state :: :active | :inactive
2930
@type task :: {atom, atom, [any]} | (() -> any)
3031
@type timezone :: :utc | :local | String.t()
3132
@type schedule :: Crontab.CronExpression.t()
3233

3334
@type t :: %__MODULE__{
34-
name: atom | Reference,
35+
name: name,
3536
schedule: schedule | nil,
3637
task: task | nil,
3738
state: state,

Diff for: lib/quantum/job_broadcaster.ex

+58-15
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ defmodule Quantum.JobBroadcaster do
77

88
require Logger
99

10-
alias Quantum.{Job, Util}
10+
alias Quantum.{Job, Util, Scheduler}
11+
alias Quantum.Storage.Adapter
1112

1213
@doc """
1314
Start Job Broadcaster
@@ -18,24 +19,45 @@ defmodule Quantum.JobBroadcaster do
1819
* `jobs` - Array of `Quantum.Job`
1920
2021
"""
21-
@spec start_link(GenServer.server(), [Job.t()]) :: GenServer.on_start()
22-
def start_link(name, jobs) do
22+
@spec start_link(GenServer.server(), [Job.t()], Adapter, Scheduler) :: GenServer.on_start()
23+
def start_link(name, jobs, storage, scheduler) do
2324
__MODULE__
24-
|> GenStage.start_link(jobs, name: name)
25+
|> GenStage.start_link({jobs, storage, scheduler}, name: name)
2526
|> Util.start_or_link()
2627
end
2728

2829
@doc false
29-
@spec child_spec({GenServer.server(), [Job.t()]}) :: Supervisor.child_spec()
30-
def child_spec({name, jobs}) do
31-
%{super([]) | start: {__MODULE__, :start_link, [name, jobs]}}
30+
@spec child_spec({GenServer.server(), [Job.t()], Adapter, Scheduler}) :: Supervisor.child_spec()
31+
def child_spec({name, jobs, storage, scheduler}) do
32+
%{super([]) | start: {__MODULE__, :start_link, [name, jobs, storage, scheduler]}}
3233
end
3334

3435
@doc false
35-
def init(jobs) do
36+
def init({jobs, storage, scheduler}) do
37+
effective_jobs =
38+
scheduler
39+
|> storage.jobs()
40+
|> case do
41+
:not_applicable ->
42+
Logger.debug(fn ->
43+
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Config"
44+
end)
45+
46+
jobs
47+
48+
storage_jobs when is_list(storage_jobs) ->
49+
Logger.debug(fn ->
50+
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config"
51+
end)
52+
53+
storage_jobs
54+
end
55+
3656
state = %{
37-
jobs: Enum.into(jobs, %{}, fn %{name: name} = job -> {name, job} end),
38-
buffer: for(%{state: :active} = job <- jobs, do: {:add, job})
57+
jobs: Enum.into(effective_jobs, %{}, fn %{name: name} = job -> {name, job} end),
58+
buffer: for(%{state: :active} = job <- effective_jobs, do: {:add, job}),
59+
storage: storage,
60+
scheduler: scheduler
3961
}
4062

4163
{:producer, state}
@@ -47,40 +69,57 @@ defmodule Quantum.JobBroadcaster do
4769
{:noreply, to_send, %{state | buffer: remaining}}
4870
end
4971

50-
def handle_cast({:add, %Job{state: :active, name: job_name} = job}, %{jobs: jobs} = state) do
72+
def handle_cast(
73+
{:add, %Job{state: :active, name: job_name} = job},
74+
%{jobs: jobs, storage: storage, scheduler: scheduler} = state
75+
) do
5176
Logger.debug(fn ->
5277
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
5378
end)
5479

80+
:ok = storage.add_job(scheduler, job)
81+
5582
{:noreply, [{:add, job}], %{state | jobs: Map.put(jobs, job_name, job)}}
5683
end
5784

58-
def handle_cast({:add, %Job{state: :inactive, name: job_name} = job}, %{jobs: jobs} = state) do
85+
def handle_cast(
86+
{:add, %Job{state: :inactive, name: job_name} = job},
87+
%{jobs: jobs, storage: storage, scheduler: scheduler} = state
88+
) do
5989
Logger.debug(fn ->
6090
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
6191
end)
6292

93+
:ok = storage.add_job(scheduler, job)
94+
6395
{:noreply, [], %{state | jobs: Map.put(jobs, job_name, job)}}
6496
end
6597

66-
def handle_cast({:delete, name}, %{jobs: jobs} = state) do
98+
def handle_cast({:delete, name}, %{jobs: jobs, storage: storage, scheduler: scheduler} = state) do
6799
Logger.debug(fn ->
68100
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting job #{inspect(name)}"
69101
end)
70102

71103
case Map.fetch(jobs, name) do
72104
{:ok, %{state: :active}} ->
105+
:ok = storage.delete_job(scheduler, name)
106+
73107
{:noreply, [{:remove, name}], %{state | jobs: Map.delete(jobs, name)}}
74108

75109
{:ok, %{state: :inactive}} ->
110+
:ok = storage.delete_job(scheduler, name)
111+
76112
{:noreply, [], %{state | jobs: Map.delete(jobs, name)}}
77113

78114
:error ->
79115
{:noreply, [], state}
80116
end
81117
end
82118

83-
def handle_cast({:change_state, name, new_state}, %{jobs: jobs} = state) do
119+
def handle_cast(
120+
{:change_state, name, new_state},
121+
%{jobs: jobs, storage: storage, scheduler: scheduler} = state
122+
) do
84123
Logger.debug(fn ->
85124
"[#{inspect(Node.self())}][#{__MODULE__}] Change job state #{inspect(name)}"
86125
end)
@@ -95,6 +134,8 @@ defmodule Quantum.JobBroadcaster do
95134
{:ok, job} ->
96135
jobs = Map.update!(jobs, name, &Job.set_state(&1, new_state))
97136

137+
:ok = storage.update_job_state(scheduler, job.name, new_state)
138+
98139
case new_state do
99140
:active ->
100141
{:noreply, [{:add, %{job | state: new_state}}], %{state | jobs: jobs}}
@@ -105,13 +146,15 @@ defmodule Quantum.JobBroadcaster do
105146
end
106147
end
107148

108-
def handle_cast(:delete_all, %{jobs: jobs} = state) do
149+
def handle_cast(:delete_all, %{jobs: jobs, storage: storage, scheduler: scheduler} = state) do
109150
Logger.debug(fn ->
110151
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs"
111152
end)
112153

113154
messages = for {name, %Job{state: :active}} <- jobs, do: {:remove, name}
114155

156+
:ok = storage.purge(scheduler)
157+
115158
{:noreply, messages, %{state | jobs: %{}}}
116159
end
117160

Diff for: lib/quantum/storage/adapter.ex

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
defmodule Quantum.Storage.Adapter do
2+
@moduledoc """
3+
Bahaviour to be implemented by all Storage Adapters.
4+
"""
5+
6+
alias Quantum.Job
7+
8+
@typedoc """
9+
The calling scheduler Module
10+
"""
11+
@type scheduler_module :: atom
12+
13+
@typedoc """
14+
The expected return is `:ok`, every other result will terminate the scheduler.
15+
"""
16+
@type ok :: :ok
17+
18+
@doc """
19+
Load saved jobs from storage
20+
21+
Returns `:not_applicable` if the storage has never received an `add_job` call or after it has been purged.
22+
In this case the jobs from the configuration weill be loaded.
23+
"""
24+
@callback jobs(scheduler_module) :: :not_applicable | [Job.t()]
25+
26+
@doc """
27+
Save new job in storage.
28+
"""
29+
@callback add_job(scheduler_module, job :: Job.t()) :: ok
30+
31+
@doc """
32+
Delete new job in storage.
33+
"""
34+
@callback delete_job(scheduler_module, job :: Job.name()) :: ok
35+
36+
@doc """
37+
Change Job State from given job.
38+
"""
39+
@callback update_job_state(scheduler_module, job :: Job.name(), state :: Job.state()) :: ok
40+
41+
@doc """
42+
Load last execution time from storage
43+
44+
Returns `:unknown` if the storage does not know the last execution time.
45+
In this case all jobs will be run at the next applicable date.
46+
"""
47+
@callback last_execution_date(scheduler_module) :: :unknown | NaiveDateTime.t()
48+
49+
@doc """
50+
Update last execution time to given date.
51+
"""
52+
@callback update_last_execution_date(
53+
scheduler_module,
54+
last_execution_date :: NaiveDateTime.t()
55+
) :: ok
56+
57+
@doc """
58+
Purge all date from storage and go back to initial state.
59+
"""
60+
@callback purge(scheduler_module) :: ok
61+
end

Diff for: lib/quantum/storage/noop.ex

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
defmodule Quantum.Storage.Noop do
2+
@moduledoc """
3+
Empty implementation of a `Quantum.Storage.Adapter`.
4+
"""
5+
6+
@behaviour Quantum.Storage.Adapter
7+
8+
def jobs(_scheduler_module), do: :not_applicable
9+
def add_job(_scheduler_module, _job), do: :ok
10+
def delete_job(_scheduler_module, _job_name), do: :ok
11+
def update_job_state(_scheduler_module, _job_name, _state), do: :ok
12+
def last_execution_date(_scheduler_module), do: :unknown
13+
def update_last_execution_date(_scheduler_module, _last_execution_date), do: :ok
14+
def purge(_scheduler_module), do: :ok
15+
end

Diff for: lib/quantum/task_stages_supervisor.ex

+6-2
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,18 @@ defmodule Quantum.TaskStagesSupervisor do
3232
Quantum.JobBroadcaster,
3333
{
3434
Keyword.fetch!(opts, :job_broadcaster),
35-
Keyword.fetch!(opts, :jobs)
35+
Keyword.fetch!(opts, :jobs),
36+
Keyword.fetch!(opts, :storage),
37+
Keyword.fetch!(opts, :quantum)
3638
}
3739
},
3840
{
3941
Quantum.ExecutionBroadcaster,
4042
{
4143
Keyword.fetch!(opts, :execution_broadcaster),
42-
Keyword.fetch!(opts, :job_broadcaster)
44+
Keyword.fetch!(opts, :job_broadcaster),
45+
Keyword.fetch!(opts, :storage),
46+
Keyword.fetch!(opts, :quantum)
4347
}
4448
},
4549
{

0 commit comments

Comments
 (0)