Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solution: Storage Adapter #313

Merged
merged 1 commit into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

### Added
- Experimental Storage API

Diff for [unreleased]

## 2.2.7 - 2018-03-22
Expand Down
3 changes: 3 additions & 0 deletions lib/quantum.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Quantum do
alias Quantum.Normalizer
alias Quantum.Job
alias Quantum.RunStrategy.Random
alias Quantum.Storage.Noop

@defaults [
global: false,
Expand Down Expand Up @@ -46,13 +47,15 @@ defmodule Quantum do
task_supervisor = Module.concat(quantum, Task.Supervisor)

config
|> Keyword.put_new(:quantum, quantum)
|> update_in([:schedule], &Normalizer.normalize_schedule/1)
|> Keyword.put_new(:task_stages_supervisor, task_stages_supervisor)
|> Keyword.put_new(:job_broadcaster, job_broadcaster)
|> Keyword.put_new(:execution_broadcaster, execution_broadcaster)
|> Keyword.put_new(:executor_supervisor, executor_supervisor)
|> Keyword.put_new(:task_registry, task_registry)
|> Keyword.put_new(:task_supervisor, task_supervisor)
|> Keyword.put_new(:storage, Noop)
end

@doc """
Expand Down
69 changes: 58 additions & 11 deletions lib/quantum/execution_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ defmodule Quantum.ExecutionBroadcaster do
require Logger

alias Quantum.{Job, Util, DateLibrary}
alias Crontab.{Scheduler, CronExpression}
alias Quantum.DateLibrary.{InvalidDateTimeForTimezoneError, InvalidTimezoneError}
alias Crontab.Scheduler, as: CrontabScheduler
alias Crontab.CronExpression
alias Quantum.Storage.Adapter
alias Quantum.Scheduler

defmodule JobInPastError do
defexception message:
Expand All @@ -25,22 +28,57 @@ defmodule Quantum.ExecutionBroadcaster do
* `job_broadcaster` - The name of the stage to listen to

"""
@spec start_link(GenServer.server(), GenServer.server(), boolean()) :: GenServer.on_start()
def start_link(name, job_broadcaster, debug_logging) do
@spec start_link(GenServer.server(), GenServer.server(), Adapter, Scheduler, boolean()) ::
GenServer.on_start()
def start_link(name, job_broadcaster, storage, scheduler, debug_logging) do
__MODULE__
|> GenStage.start_link({job_broadcaster, debug_logging}, name: name)
|> GenStage.start_link({job_broadcaster, storage, scheduler, debug_logging}, name: name)
|> Util.start_or_link()
end

@doc false
@spec child_spec({GenServer.server(), GenServer.server(), boolean()}) :: Supervisor.child_spec()
def child_spec({name, job_broadcaster, debug_logging}) do
%{super([]) | start: {__MODULE__, :start_link, [name, job_broadcaster, debug_logging]}}
@spec child_spec({GenServer.server(), GenServer.server(), Adapter, Scheduler, boolean()}) ::
Supervisor.child_spec()
def child_spec({name, job_broadcaster, storage, scheduler, debug_logging}) do
%{
super([])
| start:
{__MODULE__, :start_link, [name, job_broadcaster, storage, scheduler, debug_logging]}
}
end

@doc false
def init({job_broadcaster, debug_logging}) do
state = %{jobs: [], time: NaiveDateTime.utc_now(), timer: nil, debug_logging: debug_logging}
def init({job_broadcaster, storage, scheduler, debug_logging}) do
last_execution_date =
case storage.last_execution_date(scheduler) do
%NaiveDateTime{} = date ->
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Using last known execution time #{
NaiveDateTime.to_iso8601(date)
}"
end)

date

:unknown ->
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Unknown last execution time, using now"
end)

NaiveDateTime.utc_now()
end

state = %{
jobs: [],
time: last_execution_date,
timer: nil,
storage: storage,
scheduler: scheduler,
debug_logging: debug_logging
}

{:producer_consumer, state, subscribe_to: [job_broadcaster]}
end

Expand Down Expand Up @@ -71,8 +109,15 @@ defmodule Quantum.ExecutionBroadcaster do

def handle_info(
:execute,
%{jobs: [{time_to_execute, jobs_to_execute} | tail], debug_logging: debug_logging} = state
%{
jobs: [{time_to_execute, jobs_to_execute} | tail],
storage: storage,
scheduler: scheduler,
debug_logging: debug_logging
} = state
) do
:ok = storage.update_last_execution_date(scheduler, time_to_execute)

state =
state
|> Map.put(:timer, nil)
Expand Down Expand Up @@ -126,6 +171,8 @@ defmodule Quantum.ExecutionBroadcaster do
end)

%{state | jobs: jobs}
|> sort_state
|> reset_timer
end

defp add_job_to_state(
Expand Down Expand Up @@ -162,7 +209,7 @@ defmodule Quantum.ExecutionBroadcaster do
time
) do
schedule
|> Scheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone))
|> CrontabScheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone))
|> case do
{:ok, date} ->
{:ok, DateLibrary.to_utc!(date, timezone)}
Expand Down
3 changes: 2 additions & 1 deletion lib/quantum/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ defmodule Quantum.Job do
state: :active
]

@type name :: atom | reference()
@type state :: :active | :inactive
@type task :: {atom, atom, [any]} | (() -> any)
@type timezone :: :utc | :local | String.t()
@type schedule :: Crontab.CronExpression.t()

@type t :: %__MODULE__{
name: atom | Reference,
name: name,
schedule: schedule | nil,
task: task | nil,
state: state,
Expand Down
82 changes: 67 additions & 15 deletions lib/quantum/job_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ defmodule Quantum.JobBroadcaster do

require Logger

alias Quantum.{Job, Util}
alias Quantum.{Job, Util, Scheduler}
alias Quantum.Storage.Adapter

@doc """
Start Job Broadcaster
Expand All @@ -18,24 +19,52 @@ defmodule Quantum.JobBroadcaster do
* `jobs` - Array of `Quantum.Job`

"""
@spec start_link(GenServer.server(), [Job.t()], boolean()) :: GenServer.on_start()
def start_link(name, jobs, debug_logging) do
@spec start_link(GenServer.server(), [Job.t()], Adapter, Scheduler, boolean()) ::
GenServer.on_start()
def start_link(name, jobs, storage, scheduler, debug_logging) do
__MODULE__
|> GenStage.start_link({jobs, debug_logging}, name: name)
|> GenStage.start_link({jobs, storage, scheduler, debug_logging}, name: name)
|> Util.start_or_link()
end

@doc false
@spec child_spec({GenServer.server(), [Job.t()], boolean()}) :: Supervisor.child_spec()
def child_spec({name, jobs, debug_logging}) do
%{super([]) | start: {__MODULE__, :start_link, [name, jobs, debug_logging]}}
@spec child_spec({GenServer.server(), [Job.t()], Adapter, Scheduler, boolean()}) ::
Supervisor.child_spec()
def child_spec({name, jobs, storage, scheduler, debug_logging}) do
%{
super([])
| start: {__MODULE__, :start_link, [name, jobs, storage, scheduler, debug_logging]}
}
end

@doc false
def init({jobs, debug_logging}) do
def init({jobs, storage, scheduler, debug_logging}) do
effective_jobs =
scheduler
|> storage.jobs()
|> case do
:not_applicable ->
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Config"
end)

jobs

storage_jobs when is_list(storage_jobs) ->
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config"
end)

storage_jobs
end

state = %{
jobs: Enum.into(jobs, %{}, fn %{name: name} = job -> {name, job} end),
buffer: for(%{state: :active} = job <- jobs, do: {:add, job}),
jobs: Enum.into(effective_jobs, %{}, fn %{name: name} = job -> {name, job} end),
buffer: for(%{state: :active} = job <- effective_jobs, do: {:add, job}),
storage: storage,
scheduler: scheduler,
debug_logging: debug_logging
}

Expand All @@ -50,39 +79,53 @@ defmodule Quantum.JobBroadcaster do

def handle_cast(
{:add, %Job{state: :active, name: job_name} = job},
%{jobs: jobs, debug_logging: debug_logging} = state
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
state
) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
end)

:ok = storage.add_job(scheduler, job)

{:noreply, [{:add, job}], %{state | jobs: Map.put(jobs, job_name, job)}}
end

def handle_cast(
{:add, %Job{state: :inactive, name: job_name} = job},
%{jobs: jobs, debug_logging: debug_logging} = state
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
state
) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
end)

:ok = storage.add_job(scheduler, job)

{:noreply, [], %{state | jobs: Map.put(jobs, job_name, job)}}
end

def handle_cast({:delete, name}, %{jobs: jobs, debug_logging: debug_logging} = state) do
def handle_cast(
{:delete, name},
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
state
) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting job #{inspect(name)}"
end)

case Map.fetch(jobs, name) do
{:ok, %{state: :active}} ->
:ok = storage.delete_job(scheduler, name)

{:noreply, [{:remove, name}], %{state | jobs: Map.delete(jobs, name)}}

{:ok, %{state: :inactive}} ->
:ok = storage.delete_job(scheduler, name)

{:noreply, [], %{state | jobs: Map.delete(jobs, name)}}

:error ->
Expand All @@ -92,7 +135,8 @@ defmodule Quantum.JobBroadcaster do

def handle_cast(
{:change_state, name, new_state},
%{jobs: jobs, debug_logging: debug_logging} = state
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
state
) do
debug_logging &&
Logger.debug(fn ->
Expand All @@ -109,6 +153,8 @@ defmodule Quantum.JobBroadcaster do
{:ok, job} ->
jobs = Map.update!(jobs, name, &Job.set_state(&1, new_state))

:ok = storage.update_job_state(scheduler, job.name, new_state)

case new_state do
:active ->
{:noreply, [{:add, %{job | state: new_state}}], %{state | jobs: jobs}}
Expand All @@ -119,14 +165,20 @@ defmodule Quantum.JobBroadcaster do
end
end

def handle_cast(:delete_all, %{jobs: jobs, debug_logging: debug_logging} = state) do
def handle_cast(
:delete_all,
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
state
) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs"
end)

messages = for {name, %Job{state: :active}} <- jobs, do: {:remove, name}

:ok = storage.purge(scheduler)

{:noreply, messages, %{state | jobs: %{}}}
end

Expand Down
64 changes: 64 additions & 0 deletions lib/quantum/storage/adapter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule Quantum.Storage.Adapter do
@moduledoc """
Bahaviour to be implemented by all Storage Adapters.

**WARNING: This Adapter is experimental and will therefore not adhere to semantic versioning.
It could undergo massive changes even in patch releases.**
"""

alias Quantum.Job

@typedoc """
The calling scheduler Module
"""
@type scheduler_module :: atom

@typedoc """
The expected return is `:ok`, every other result will terminate the scheduler.
"""
@type ok :: :ok

@doc """
Load saved jobs from storage

Returns `:not_applicable` if the storage has never received an `add_job` call or after it has been purged.
In this case the jobs from the configuration weill be loaded.
"""
@callback jobs(scheduler_module) :: :not_applicable | [Job.t()]

@doc """
Save new job in storage.
"""
@callback add_job(scheduler_module, job :: Job.t()) :: ok

@doc """
Delete new job in storage.
"""
@callback delete_job(scheduler_module, job :: Job.name()) :: ok

@doc """
Change Job State from given job.
"""
@callback update_job_state(scheduler_module, job :: Job.name(), state :: Job.state()) :: ok

@doc """
Load last execution time from storage

Returns `:unknown` if the storage does not know the last execution time.
In this case all jobs will be run at the next applicable date.
"""
@callback last_execution_date(scheduler_module) :: :unknown | NaiveDateTime.t()

@doc """
Update last execution time to given date.
"""
@callback update_last_execution_date(
scheduler_module,
last_execution_date :: NaiveDateTime.t()
) :: ok

@doc """
Purge all date from storage and go back to initial state.
"""
@callback purge(scheduler_module) :: ok
end
Loading