diff --git a/CHANGELOG.md b/CHANGELOG.md index feed9d9..eb01589 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased +### Added +- Experimental Storage API + Diff for [unreleased] ## 2.2.7 - 2018-03-22 diff --git a/lib/quantum.ex b/lib/quantum.ex index a1dfd81..7e2a667 100644 --- a/lib/quantum.ex +++ b/lib/quantum.ex @@ -8,6 +8,7 @@ defmodule Quantum do alias Quantum.Normalizer alias Quantum.Job alias Quantum.RunStrategy.Random + alias Quantum.Storage.Noop @defaults [ global: false, @@ -46,6 +47,7 @@ defmodule Quantum do task_supervisor = Module.concat(quantum, Task.Supervisor) config + |> Keyword.put_new(:quantum, quantum) |> update_in([:schedule], &Normalizer.normalize_schedule/1) |> Keyword.put_new(:task_stages_supervisor, task_stages_supervisor) |> Keyword.put_new(:job_broadcaster, job_broadcaster) @@ -53,6 +55,7 @@ defmodule Quantum do |> Keyword.put_new(:executor_supervisor, executor_supervisor) |> Keyword.put_new(:task_registry, task_registry) |> Keyword.put_new(:task_supervisor, task_supervisor) + |> Keyword.put_new(:storage, Noop) end @doc """ diff --git a/lib/quantum/execution_broadcaster.ex b/lib/quantum/execution_broadcaster.ex index 85cfec8..442d4a0 100644 --- a/lib/quantum/execution_broadcaster.ex +++ b/lib/quantum/execution_broadcaster.ex @@ -8,8 +8,11 @@ defmodule Quantum.ExecutionBroadcaster do require Logger alias Quantum.{Job, Util, DateLibrary} - alias Crontab.{Scheduler, CronExpression} alias Quantum.DateLibrary.{InvalidDateTimeForTimezoneError, InvalidTimezoneError} + alias Crontab.Scheduler, as: CrontabScheduler + alias Crontab.CronExpression + alias Quantum.Storage.Adapter + alias Quantum.Scheduler defmodule JobInPastError do defexception message: @@ -25,22 +28,57 @@ defmodule Quantum.ExecutionBroadcaster do * `job_broadcaster` - The name of the stage to listen to """ - @spec start_link(GenServer.server(), GenServer.server(), boolean()) :: GenServer.on_start() - def start_link(name, job_broadcaster, debug_logging) do + @spec start_link(GenServer.server(), GenServer.server(), Adapter, Scheduler, boolean()) :: + GenServer.on_start() + def start_link(name, job_broadcaster, storage, scheduler, debug_logging) do __MODULE__ - |> GenStage.start_link({job_broadcaster, debug_logging}, name: name) + |> GenStage.start_link({job_broadcaster, storage, scheduler, debug_logging}, name: name) |> Util.start_or_link() end @doc false - @spec child_spec({GenServer.server(), GenServer.server(), boolean()}) :: Supervisor.child_spec() - def child_spec({name, job_broadcaster, debug_logging}) do - %{super([]) | start: {__MODULE__, :start_link, [name, job_broadcaster, debug_logging]}} + @spec child_spec({GenServer.server(), GenServer.server(), Adapter, Scheduler, boolean()}) :: + Supervisor.child_spec() + def child_spec({name, job_broadcaster, storage, scheduler, debug_logging}) do + %{ + super([]) + | start: + {__MODULE__, :start_link, [name, job_broadcaster, storage, scheduler, debug_logging]} + } end @doc false - def init({job_broadcaster, debug_logging}) do - state = %{jobs: [], time: NaiveDateTime.utc_now(), timer: nil, debug_logging: debug_logging} + def init({job_broadcaster, storage, scheduler, debug_logging}) do + last_execution_date = + case storage.last_execution_date(scheduler) do + %NaiveDateTime{} = date -> + debug_logging && + Logger.debug(fn -> + "[#{inspect(Node.self())}][#{__MODULE__}] Using last known execution time #{ + NaiveDateTime.to_iso8601(date) + }" + end) + + date + + :unknown -> + debug_logging && + Logger.debug(fn -> + "[#{inspect(Node.self())}][#{__MODULE__}] Unknown last execution time, using now" + end) + + NaiveDateTime.utc_now() + end + + state = %{ + jobs: [], + time: last_execution_date, + timer: nil, + storage: storage, + scheduler: scheduler, + debug_logging: debug_logging + } + {:producer_consumer, state, subscribe_to: [job_broadcaster]} end @@ -71,8 +109,15 @@ defmodule Quantum.ExecutionBroadcaster do def handle_info( :execute, - %{jobs: [{time_to_execute, jobs_to_execute} | tail], debug_logging: debug_logging} = state + %{ + jobs: [{time_to_execute, jobs_to_execute} | tail], + storage: storage, + scheduler: scheduler, + debug_logging: debug_logging + } = state ) do + :ok = storage.update_last_execution_date(scheduler, time_to_execute) + state = state |> Map.put(:timer, nil) @@ -126,6 +171,8 @@ defmodule Quantum.ExecutionBroadcaster do end) %{state | jobs: jobs} + |> sort_state + |> reset_timer end defp add_job_to_state( @@ -162,7 +209,7 @@ defmodule Quantum.ExecutionBroadcaster do time ) do schedule - |> Scheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone)) + |> CrontabScheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone)) |> case do {:ok, date} -> {:ok, DateLibrary.to_utc!(date, timezone)} diff --git a/lib/quantum/job.ex b/lib/quantum/job.ex index 4c26a22..628d395 100644 --- a/lib/quantum/job.ex +++ b/lib/quantum/job.ex @@ -25,13 +25,14 @@ defmodule Quantum.Job do state: :active ] + @type name :: atom | reference() @type state :: :active | :inactive @type task :: {atom, atom, [any]} | (() -> any) @type timezone :: :utc | :local | String.t() @type schedule :: Crontab.CronExpression.t() @type t :: %__MODULE__{ - name: atom | Reference, + name: name, schedule: schedule | nil, task: task | nil, state: state, diff --git a/lib/quantum/job_broadcaster.ex b/lib/quantum/job_broadcaster.ex index c8c716b..3528ac0 100644 --- a/lib/quantum/job_broadcaster.ex +++ b/lib/quantum/job_broadcaster.ex @@ -7,7 +7,8 @@ defmodule Quantum.JobBroadcaster do require Logger - alias Quantum.{Job, Util} + alias Quantum.{Job, Util, Scheduler} + alias Quantum.Storage.Adapter @doc """ Start Job Broadcaster @@ -18,24 +19,52 @@ defmodule Quantum.JobBroadcaster do * `jobs` - Array of `Quantum.Job` """ - @spec start_link(GenServer.server(), [Job.t()], boolean()) :: GenServer.on_start() - def start_link(name, jobs, debug_logging) do + @spec start_link(GenServer.server(), [Job.t()], Adapter, Scheduler, boolean()) :: + GenServer.on_start() + def start_link(name, jobs, storage, scheduler, debug_logging) do __MODULE__ - |> GenStage.start_link({jobs, debug_logging}, name: name) + |> GenStage.start_link({jobs, storage, scheduler, debug_logging}, name: name) |> Util.start_or_link() end @doc false - @spec child_spec({GenServer.server(), [Job.t()], boolean()}) :: Supervisor.child_spec() - def child_spec({name, jobs, debug_logging}) do - %{super([]) | start: {__MODULE__, :start_link, [name, jobs, debug_logging]}} + @spec child_spec({GenServer.server(), [Job.t()], Adapter, Scheduler, boolean()}) :: + Supervisor.child_spec() + def child_spec({name, jobs, storage, scheduler, debug_logging}) do + %{ + super([]) + | start: {__MODULE__, :start_link, [name, jobs, storage, scheduler, debug_logging]} + } end @doc false - def init({jobs, debug_logging}) do + def init({jobs, storage, scheduler, debug_logging}) do + effective_jobs = + scheduler + |> storage.jobs() + |> case do + :not_applicable -> + debug_logging && + Logger.debug(fn -> + "[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Config" + end) + + jobs + + storage_jobs when is_list(storage_jobs) -> + debug_logging && + Logger.debug(fn -> + "[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config" + end) + + storage_jobs + end + state = %{ - jobs: Enum.into(jobs, %{}, fn %{name: name} = job -> {name, job} end), - buffer: for(%{state: :active} = job <- jobs, do: {:add, job}), + jobs: Enum.into(effective_jobs, %{}, fn %{name: name} = job -> {name, job} end), + buffer: for(%{state: :active} = job <- effective_jobs, do: {:add, job}), + storage: storage, + scheduler: scheduler, debug_logging: debug_logging } @@ -50,29 +79,39 @@ defmodule Quantum.JobBroadcaster do def handle_cast( {:add, %Job{state: :active, name: job_name} = job}, - %{jobs: jobs, debug_logging: debug_logging} = state + %{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} = + state ) do debug_logging && Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}" end) + :ok = storage.add_job(scheduler, job) + {:noreply, [{:add, job}], %{state | jobs: Map.put(jobs, job_name, job)}} end def handle_cast( {:add, %Job{state: :inactive, name: job_name} = job}, - %{jobs: jobs, debug_logging: debug_logging} = state + %{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} = + state ) do debug_logging && Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}" end) + :ok = storage.add_job(scheduler, job) + {:noreply, [], %{state | jobs: Map.put(jobs, job_name, job)}} end - def handle_cast({:delete, name}, %{jobs: jobs, debug_logging: debug_logging} = state) do + def handle_cast( + {:delete, name}, + %{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} = + state + ) do debug_logging && Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Deleting job #{inspect(name)}" @@ -80,9 +119,13 @@ defmodule Quantum.JobBroadcaster do case Map.fetch(jobs, name) do {:ok, %{state: :active}} -> + :ok = storage.delete_job(scheduler, name) + {:noreply, [{:remove, name}], %{state | jobs: Map.delete(jobs, name)}} {:ok, %{state: :inactive}} -> + :ok = storage.delete_job(scheduler, name) + {:noreply, [], %{state | jobs: Map.delete(jobs, name)}} :error -> @@ -92,7 +135,8 @@ defmodule Quantum.JobBroadcaster do def handle_cast( {:change_state, name, new_state}, - %{jobs: jobs, debug_logging: debug_logging} = state + %{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} = + state ) do debug_logging && Logger.debug(fn -> @@ -109,6 +153,8 @@ defmodule Quantum.JobBroadcaster do {:ok, job} -> jobs = Map.update!(jobs, name, &Job.set_state(&1, new_state)) + :ok = storage.update_job_state(scheduler, job.name, new_state) + case new_state do :active -> {:noreply, [{:add, %{job | state: new_state}}], %{state | jobs: jobs}} @@ -119,7 +165,11 @@ defmodule Quantum.JobBroadcaster do end end - def handle_cast(:delete_all, %{jobs: jobs, debug_logging: debug_logging} = state) do + def handle_cast( + :delete_all, + %{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} = + state + ) do debug_logging && Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs" @@ -127,6 +177,8 @@ defmodule Quantum.JobBroadcaster do messages = for {name, %Job{state: :active}} <- jobs, do: {:remove, name} + :ok = storage.purge(scheduler) + {:noreply, messages, %{state | jobs: %{}}} end diff --git a/lib/quantum/storage/adapter.ex b/lib/quantum/storage/adapter.ex new file mode 100644 index 0000000..e538f8b --- /dev/null +++ b/lib/quantum/storage/adapter.ex @@ -0,0 +1,64 @@ +defmodule Quantum.Storage.Adapter do + @moduledoc """ + Bahaviour to be implemented by all Storage Adapters. + + **WARNING: This Adapter is experimental and will therefore not adhere to semantic versioning. + It could undergo massive changes even in patch releases.** + """ + + alias Quantum.Job + + @typedoc """ + The calling scheduler Module + """ + @type scheduler_module :: atom + + @typedoc """ + The expected return is `:ok`, every other result will terminate the scheduler. + """ + @type ok :: :ok + + @doc """ + Load saved jobs from storage + + Returns `:not_applicable` if the storage has never received an `add_job` call or after it has been purged. + In this case the jobs from the configuration weill be loaded. + """ + @callback jobs(scheduler_module) :: :not_applicable | [Job.t()] + + @doc """ + Save new job in storage. + """ + @callback add_job(scheduler_module, job :: Job.t()) :: ok + + @doc """ + Delete new job in storage. + """ + @callback delete_job(scheduler_module, job :: Job.name()) :: ok + + @doc """ + Change Job State from given job. + """ + @callback update_job_state(scheduler_module, job :: Job.name(), state :: Job.state()) :: ok + + @doc """ + Load last execution time from storage + + Returns `:unknown` if the storage does not know the last execution time. + In this case all jobs will be run at the next applicable date. + """ + @callback last_execution_date(scheduler_module) :: :unknown | NaiveDateTime.t() + + @doc """ + Update last execution time to given date. + """ + @callback update_last_execution_date( + scheduler_module, + last_execution_date :: NaiveDateTime.t() + ) :: ok + + @doc """ + Purge all date from storage and go back to initial state. + """ + @callback purge(scheduler_module) :: ok +end diff --git a/lib/quantum/storage/noop.ex b/lib/quantum/storage/noop.ex new file mode 100644 index 0000000..a87df3b --- /dev/null +++ b/lib/quantum/storage/noop.ex @@ -0,0 +1,15 @@ +defmodule Quantum.Storage.Noop do + @moduledoc """ + Empty implementation of a `Quantum.Storage.Adapter`. + """ + + @behaviour Quantum.Storage.Adapter + + def jobs(_scheduler_module), do: :not_applicable + def add_job(_scheduler_module, _job), do: :ok + def delete_job(_scheduler_module, _job_name), do: :ok + def update_job_state(_scheduler_module, _job_name, _state), do: :ok + def last_execution_date(_scheduler_module), do: :unknown + def update_last_execution_date(_scheduler_module, _last_execution_date), do: :ok + def purge(_scheduler_module), do: :ok +end diff --git a/lib/quantum/task_stages_supervisor.ex b/lib/quantum/task_stages_supervisor.ex index bedd144..80d519a 100644 --- a/lib/quantum/task_stages_supervisor.ex +++ b/lib/quantum/task_stages_supervisor.ex @@ -33,6 +33,8 @@ defmodule Quantum.TaskStagesSupervisor do { Keyword.fetch!(opts, :job_broadcaster), Keyword.fetch!(opts, :jobs), + Keyword.fetch!(opts, :storage), + Keyword.fetch!(opts, :quantum), Keyword.fetch!(opts, :debug_logging) } }, @@ -41,6 +43,8 @@ defmodule Quantum.TaskStagesSupervisor do { Keyword.fetch!(opts, :execution_broadcaster), Keyword.fetch!(opts, :job_broadcaster), + Keyword.fetch!(opts, :storage), + Keyword.fetch!(opts, :quantum), Keyword.fetch!(opts, :debug_logging) } }, diff --git a/pages/configuration.md b/pages/configuration.md index 075016e..269efb6 100644 --- a/pages/configuration.md +++ b/pages/configuration.md @@ -19,6 +19,25 @@ config :your_app, YourApp.Scheduler, ] ``` +## Persistent Storage + +Persistent storage can be used to track jobs and last execution times over restarts. + +**Note: If a storage is present, the jobs from the configuration will not be loaded to prevent conflicts.** + +```elixir +config :your_app, YourApp.Scheduler, + storage: Quantum.Storage.Adapter.Implementation +``` + +### Storage Adapters + +Storage implementations must implement the `Quantum.Storage.Adapter` behaviour. + +The following adapters are supported: + +* TODO: Add Adapters + ## Release managers ( [conform](https://github.com/bitwalker/conform) / diff --git a/test/quantum/execution_broadcaster_test.exs b/test/quantum/execution_broadcaster_test.exs index dcd6f37..d118492 100644 --- a/test/quantum/execution_broadcaster_test.exs +++ b/test/quantum/execution_broadcaster_test.exs @@ -5,10 +5,12 @@ defmodule Quantum.ExecutionBroadcasterTest do import Crontab.CronExpression import ExUnit.CaptureLog + import Quantum.CaptureLogExtend alias Quantum.ExecutionBroadcaster alias Quantum.{TestConsumer, TestProducer} alias Quantum.Job + alias Quantum.Storage.Test, as: TestStorage # Allow max 10% Latency @max_timeout 1_100 @@ -21,12 +23,27 @@ defmodule Quantum.ExecutionBroadcasterTest do use Quantum.Scheduler, otp_app: :execution_broadcaster_test end - setup do - {:ok, producer} = start_supervised({TestProducer, []}) - {:ok, broadcaster} = start_supervised({ExecutionBroadcaster, {__MODULE__, producer, true}}) - {:ok, _consumer} = start_supervised({TestConsumer, [broadcaster, self()]}) + setup tags do + if tags[:listen_storage] do + Process.put(:test_pid, self()) + end + + if tags[:manual_dispatch] do + :ok + else + {:ok, producer} = start_supervised({TestProducer, []}) + + {{:ok, broadcaster}, _} = + capture_log_with_return(fn -> + start_supervised( + {ExecutionBroadcaster, {__MODULE__, producer, TestStorage, TestScheduler, true}} + ) + end) - {:ok, %{producer: producer, broadcaster: broadcaster, debug_logging: true}} + {:ok, _consumer} = start_supervised({TestConsumer, [broadcaster, self()]}) + + {:ok, %{producer: producer, broadcaster: broadcaster, debug_logging: true}} + end end describe "add" do @@ -62,6 +79,71 @@ defmodule Quantum.ExecutionBroadcasterTest do end) end + @tag manual_dispatch: true, listen_storage: true + test "loads last execution time from storage" do + defmodule TestStorageWithLastExecutionTime do + @moduledoc false + use Quantum.Storage.Test + + def last_execution_date(_), + do: NaiveDateTime.add(NaiveDateTime.utc_now(), -3_600, :second) + end + + capture_log(fn -> + {:ok, producer} = start_supervised({TestProducer, []}) + + {:ok, broadcaster} = + start_supervised( + {ExecutionBroadcaster, + {__MODULE__, producer, TestStorageWithLastExecutionTime, TestScheduler, true}} + ) + + {:ok, _consumer} = start_supervised({TestConsumer, [broadcaster, self()]}) + + job = + TestScheduler.new_job() + |> Job.set_schedule(~e[*]e) + + TestProducer.send(producer, {:add, job}) + + assert_receive {:update_last_execution_date, {TestScheduler, date}, _}, @max_timeout + + diff_seconds = NaiveDateTime.diff(NaiveDateTime.utc_now(), date, :second) + + assert diff_seconds >= 3_600 - 1 + + assert_receive {:received, {:execute, ^job}}, @max_timeout + # Quickly executes until reached current time + for _ <- 0..diff_seconds do + assert_receive {:received, {:execute, ^job}}, 100 + end + + # Maybe a little time elapsed in the test? + for _ <- 0..2 do + assert_receive {:received, {:execute, ^job}}, 1010 + end + + # Goes back to normal pace + refute_receive {:received, {:execute, ^job}}, 100 + end) + end + + @tag listen_storage: true + test "saves new last execution time in storage", %{producer: producer} do + job = + TestScheduler.new_job() + |> Job.set_schedule(~e[*]e) + + capture_log(fn -> + TestProducer.send(producer, {:add, job}) + + assert_receive {:update_last_execution_date, {TestScheduler, %NaiveDateTime{}}, _}, + @max_timeout + + assert_receive {:received, {:execute, ^job}}, @max_timeout + end) + end + test "normal schedule in other timezone triggers once per second", %{producer: producer} do job = TestScheduler.new_job() diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index c1c0d55..18caa1a 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -6,8 +6,10 @@ defmodule Quantum.JobBroadcasterTest do alias Quantum.JobBroadcaster alias Quantum.TestConsumer alias Quantum.Job + alias Quantum.Storage.Test, as: TestStorage import ExUnit.CaptureLog + import Quantum.CaptureLogExtend doctest JobBroadcaster @@ -18,6 +20,10 @@ defmodule Quantum.JobBroadcasterTest do end setup tags do + if tags[:listen_storage] do + Process.put(:test_pid, self()) + end + active_job = TestScheduler.new_job() inactive_job = Job.set_state(TestScheduler.new_job(), :inactive) @@ -36,8 +42,21 @@ defmodule Quantum.JobBroadcasterTest do [] end - {:ok, broadcaster} = start_supervised({JobBroadcaster, {__MODULE__, init_jobs, true}}) - {:ok, _consumer} = start_supervised({TestConsumer, [broadcaster, self()]}) + broadcaster = + if tags[:manual_dispatch] do + nil + else + {{:ok, broadcaster}, _} = + capture_log_with_return(fn -> + start_supervised( + {JobBroadcaster, {__MODULE__, init_jobs, TestStorage, TestScheduler, true}} + ) + end) + + {:ok, _consumer} = start_supervised({TestConsumer, [broadcaster, self()]}) + + broadcaster + end { :ok, @@ -56,14 +75,42 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:received, {:add, ^inactive_job}} assert_receive {:received, {:add, ^active_job}} end + + @tag manual_dispatch: true + test "storage jobs", %{active_job: active_job, inactive_job: inactive_job} do + capture_log(fn -> + defmodule FullStorage do + @moduledoc false + + use Quantum.Storage.Test + + def jobs(_), + do: [ + TestScheduler.new_job(), + Job.set_state(TestScheduler.new_job(), :inactive) + ] + end + + {:ok, broadcaster} = + start_supervised({JobBroadcaster, {__MODULE__, [], FullStorage, TestScheduler, true}}) + + {:ok, _consumer} = start_supervised({TestConsumer, [broadcaster, self()]}) + + assert_receive {:received, {:add, _}} + refute_receive {:received, {:add, _}} + end) + end end describe "add" do + @tag listen_storage: true test "active", %{broadcaster: broadcaster, active_job: active_job} do assert capture_log(fn -> TestScheduler.add_job(broadcaster, active_job) assert_receive {:received, {:add, ^active_job}} + + assert_receive {:add_job, {TestScheduler, ^active_job}, _} end) =~ "Adding job #Reference" end @@ -74,7 +121,9 @@ defmodule Quantum.JobBroadcasterTest do :ok = stop_supervised(TestConsumer) {:ok, broadcaster} = - start_supervised({JobBroadcaster, {__MODULE__, init_jobs, false}}) + start_supervised( + {JobBroadcaster, {__MODULE__, init_jobs, TestStorage, TestScheduler, false}} + ) {:ok, _consumer} = start_supervised({TestConsumer, [broadcaster, self()]}) @@ -84,17 +133,20 @@ defmodule Quantum.JobBroadcasterTest do end) =~ "Adding job #Reference" end + @tag listen_storage: true test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do capture_log(fn -> TestScheduler.add_job(broadcaster, inactive_job) refute_receive {:received, {:add, _}} + + assert_receive {:add_job, {TestScheduler, ^inactive_job}, _} end) end end describe "delete" do - @tag jobs: :active + @tag jobs: :active, listen_storage: true test "active", %{broadcaster: broadcaster, active_job: active_job} do active_job_name = active_job.name @@ -103,27 +155,36 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:received, {:remove, ^active_job_name}} + assert_receive {:delete_job, {TestScheduler, ^active_job_name}, _} + refute Enum.any?(TestScheduler.jobs(broadcaster), fn {key, _} -> key == active_job_name end) end) end + @tag listen_storage: true test "missing", %{broadcaster: broadcaster} do capture_log(fn -> TestScheduler.delete_job(broadcaster, make_ref()) refute_receive {:received, {:remove, _}} + + refute_receive {:delete_job, {TestScheduler, _}, _} end) end - @tag jobs: :inactive + @tag jobs: :inactive, listen_storage: true test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do capture_log(fn -> + inactive_job_name = inactive_job.name + TestScheduler.delete_job(broadcaster, inactive_job.name) refute_receive {:received, {:remove, _}} + assert_receive {:delete_job, {TestScheduler, ^inactive_job_name}, _} + refute Enum.any?(TestScheduler.jobs(broadcaster), fn {key, _} -> key == inactive_job.name end) @@ -132,7 +193,7 @@ defmodule Quantum.JobBroadcasterTest do end describe "change_state" do - @tag jobs: :active + @tag jobs: :active, listen_storage: true test "active => inactive", %{broadcaster: broadcaster, active_job: active_job} do active_job_name = active_job.name @@ -140,10 +201,12 @@ defmodule Quantum.JobBroadcasterTest do TestScheduler.deactivate_job(broadcaster, active_job.name) assert_receive {:received, {:remove, ^active_job_name}} + + assert_receive {:update_job_state, {TestScheduler, _, _}, _} end) end - @tag jobs: :inactive + @tag jobs: :inactive, listen_storage: true test "inactive => active", %{broadcaster: broadcaster, inactive_job: inactive_job} do capture_log(fn -> TestScheduler.activate_job(broadcaster, inactive_job.name) @@ -151,10 +214,12 @@ defmodule Quantum.JobBroadcasterTest do active_job = Job.set_state(inactive_job, :active) assert_receive {:received, {:add, ^active_job}} + + assert_receive {:update_job_state, {TestScheduler, _, _}, _} end) end - @tag jobs: :active + @tag jobs: :active, listen_storage: true test "active => active", %{broadcaster: broadcaster, active_job: active_job} do # Initial assert_receive {:received, {:add, ^active_job}} @@ -163,10 +228,12 @@ defmodule Quantum.JobBroadcasterTest do TestScheduler.activate_job(broadcaster, active_job.name) refute_receive {:received, {:add, ^active_job}} + + refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) end - @tag jobs: :inactive + @tag jobs: :inactive, listen_storage: true test "inactive => inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do inactive_job_name = inactive_job.name @@ -174,9 +241,12 @@ defmodule Quantum.JobBroadcasterTest do TestScheduler.deactivate_job(broadcaster, inactive_job.name) refute_receive {:received, {:remove, ^inactive_job_name}} + + refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) end + @tag listen_storage: true test "missing", %{broadcaster: broadcaster} do capture_log(fn -> TestScheduler.deactivate_job(broadcaster, make_ref()) @@ -184,12 +254,13 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:received, {:remove, _}} refute_receive {:received, {:add, _}} + refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) end end describe "delete_all" do - @tag jobs: :both + @tag jobs: :both, listen_storage: true test "only active jobs", %{ broadcaster: broadcaster, active_job: active_job, @@ -203,6 +274,8 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:received, {:remove, ^inactive_job_name}} assert_receive {:received, {:remove, ^active_job_name}} + + assert_receive {:purge, TestScheduler, _} end) end end diff --git a/test/support/capture_log_extend.ex b/test/support/capture_log_extend.ex new file mode 100644 index 0000000..b0becef --- /dev/null +++ b/test/support/capture_log_extend.ex @@ -0,0 +1,20 @@ +defmodule Quantum.CaptureLogExtend do + @moduledoc false + + import ExUnit.CaptureLog + + def capture_log_with_return(fun) do + ref = make_ref() + + logs = + capture_log(fn -> + return = fun.() + send(self(), {:return, ref, return}) + end) + + receive do + {:return, ^ref, return} -> + {return, logs} + end + end +end diff --git a/test/support/test_storage.ex b/test/support/test_storage.ex new file mode 100644 index 0000000..a1a1a25 --- /dev/null +++ b/test/support/test_storage.ex @@ -0,0 +1,91 @@ +defmodule Quantum.Storage.Test do + @moduledoc """ + Test implementation of a `Quantum.Storage.Adapter`. + """ + + @behaviour Quantum.Storage.Adapter + + def jobs(scheduler_module), do: send_and_wait(:jobs, scheduler_module, :not_applicable) + def add_job(scheduler_module, job), do: send_and_wait(:add_job, {scheduler_module, job}) + + def delete_job(scheduler_module, job_name), + do: send_and_wait(:delete_job, {scheduler_module, job_name}) + + def update_job_state(scheduler_module, job_name, state), + do: send_and_wait(:update_job_state, {scheduler_module, job_name, state}) + + def last_execution_date(scheduler_module), + do: send_and_wait(:last_execution_date, scheduler_module, :unknown) + + def update_last_execution_date(scheduler_module, last_execution_date), + do: send_and_wait(:update_last_execution_date, {scheduler_module, last_execution_date}) + + def purge(scheduler_module), do: send_and_wait(:purge, scheduler_module) + + @doc false + # Used for Small Test Storages + defmacro __using__(_) do + quote do + @behaviour Quantum.Storage.Adapter + + alias Quantum.Storage.Test + + def jobs(scheduler_module), do: Test.send_and_wait(:jobs, scheduler_module, :not_applicable) + + def add_job(scheduler_module, job), + do: Test.send_and_wait(:add_job, {scheduler_module, job}) + + def delete_job(scheduler_module, job_name), + do: Test.send_and_wait(:delete_job, {scheduler_module, job_name}) + + def update_job_state(scheduler_module, job_name, state), + do: Test.send_and_wait(:update_job_state, {scheduler_module, job_name, state}) + + def last_execution_date(scheduler_module), + do: Test.send_and_wait(:last_execution_date, scheduler_module, :unknown) + + def update_last_execution_date(scheduler_module, last_execution_date), + do: + Test.send_and_wait(:update_last_execution_date, {scheduler_module, last_execution_date}) + + def purge(scheduler_module), do: Test.send_and_wait(:purge, scheduler_module) + + defoverridable Quantum.Storage.Adapter + end + end + + def send_and_wait(fun, args, default \\ :ok) do + test_pid = find_test_pid(self()) + + if !is_nil(test_pid) do + ref = make_ref() + + send(test_pid, {fun, args, {self(), ref}}) + end + + default + end + + defp find_test_pid(pid) do + pid + |> Process.info() + |> case do + nil -> [] + other -> other + end + |> Keyword.get(:dictionary, []) + |> Enum.into(%{}) + |> case do + %{test_pid: pid} -> + pid + + %{"$ancestors": ancestors} -> + Enum.find_value(ancestors, fn ancestor_pid -> + find_test_pid(ancestor_pid) + end) + + _ -> + nil + end + end +end