diff --git a/cloudtasker.gemspec b/cloudtasker.gemspec index da6544ea..907d6467 100644 --- a/cloudtasker.gemspec +++ b/cloudtasker.gemspec @@ -33,6 +33,7 @@ Gem::Specification.new do |spec| spec.add_dependency 'activesupport' spec.add_dependency 'connection_pool' spec.add_dependency 'fugit' + spec.add_dependency 'google-cloud-scheduler' spec.add_dependency 'google-cloud-tasks' spec.add_dependency 'jwt' spec.add_dependency 'redis' diff --git a/lib/cloudtasker.rb b/lib/cloudtasker.rb index 4a29ed6e..946bbc93 100644 --- a/lib/cloudtasker.rb +++ b/lib/cloudtasker.rb @@ -15,6 +15,7 @@ require 'cloudtasker/middleware/chain' require 'cloudtasker/authenticator' +require 'cloudtasker/cloud_scheduler' require 'cloudtasker/cloud_task' require 'cloudtasker/worker_logger' require 'cloudtasker/worker_handler' diff --git a/lib/cloudtasker/cloud_scheduler.rb b/lib/cloudtasker/cloud_scheduler.rb new file mode 100644 index 00000000..1310248f --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +require_relative 'cloud_scheduler/schedule' +require_relative 'cloud_scheduler/job' +require_relative 'cloud_scheduler/manager' + +module Cloudtasker + # Schedule jobs using GCP Cloud Scheduler + module CloudScheduler + end +end diff --git a/lib/cloudtasker/cloud_scheduler/job.rb b/lib/cloudtasker/cloud_scheduler/job.rb new file mode 100644 index 00000000..b4acaca4 --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler/job.rb @@ -0,0 +1,141 @@ +# frozen_string_literal: true + +require 'google/cloud/scheduler/v1' +require 'cloudtasker/worker_handler' + +module Cloudtasker + module CloudScheduler + # Manage cron jobs + class Job + # + # Return all jobs from a hash. + # + # @param [Hash] hash The hash to load jobs from. + # + # @return [Array] The list of jobs. + # + def self.load_from_hash!(hash) + Schedule.load_from_hash!(hash).map do |schedule| + new(schedule) + end + end + + attr_reader :schedule + + # + # Build a new instance of the class. + # + # @param [Cloudtasker::CloudScheduler::Schedule] schedule The schedule to run. + # + def initialize(schedule) + @schedule = schedule + end + + # + # Prefix for all jobs that includes the parent path and the queue prefix. + # + # @return [String] The job prefix. + # + def prefix + "#{parent}/jobs/#{config.gcp_queue_prefix}--" + end + + # + # Return name of the job in the remote scheduler. + # + # @return [String] The job name. + # + def remote_name + "#{prefix}#{schedule.id}" + end + + # + # Return the job name. + # + # @return [String] The job name. + # + def name + schedule.id + end + + # + # Create the job in the remote scheduler. + # + # @return [Google::Cloud::Scheduler::V1::Job] The job instance. + # + def create! + client.create_job(parent: parent, job: payload) + end + + # + # Update the job in the remote scheduler. + # + # @return [Google::Cloud::Scheduler::V1::Job] The job instance. + # + def update! + client.update_job(job: payload) + end + + # + # Delete the job from the remote scheduler. + # + # @return [Google::Protobuf::Empty] The job instance. + # + def delete! + client.delete_job(name: remote_name) + end + + # + # Return a hash that can be used to create/update a job in the remote scheduler. + # + # @return [Hash] The job hash. + # + def payload + { + name: remote_name, + schedule: schedule.cron, + time_zone: schedule.time_zone, + http_target: { + http_method: 'POST', + uri: config.processor_url, + oidc_token: config.oidc, + body: schedule.job_payload.to_json, + headers: { + Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json', + Cloudtasker::Config::CT_AUTHORIZATION_HEADER => Authenticator.bearer_token + }.compact + }.compact + } + end + + private + + # + # Return the parent path for all jobs. + # + # @return [String] The parent path. + # + def parent + @parent ||= client.location_path(project: config.gcp_project_id, location: config.gcp_location_id) + end + + # + # Return the Cloudtasker configuration. + # + # @return [Cloudtasker::Config] The configuration. + # + def config + @config ||= Cloudtasker.config + end + + # + # Return the Cloud Scheduler client. + # + # @return [Google::Cloud::Scheduler::V1::CloudSchedulerClient] The client. + # + def client + @client ||= Google::Cloud::Scheduler.cloud_scheduler + end + end + end +end diff --git a/lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb b/lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb new file mode 100644 index 00000000..561b7344 --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +module Cloudtasker + module CloudScheduler + class Job + # Payload used to schedule ActiveJob jobs on Cloud Scheduler + class ActiveJobPayload + attr_reader :worker + + # + # Build a new instance of the class. + # + # @param [ActiveJob::Base] worker The ActiveJob instance. + # + def initialize(worker) + @worker = worker + end + + # + # Return the Hash representation of the job payload. + # + # @return [Hash] The job payload. + # + def to_h + { + 'worker' => 'ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper', + 'job_queue' => worker.queue_name, + 'job_id' => worker.job_id, + 'job_meta' => {}, + 'job_args' => [worker.serialize] + } + end + end + end + end +end diff --git a/lib/cloudtasker/cloud_scheduler/job/worker_payload.rb b/lib/cloudtasker/cloud_scheduler/job/worker_payload.rb new file mode 100644 index 00000000..f7499d6f --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler/job/worker_payload.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require 'cloudtasker/worker_handler' + +module Cloudtasker + module CloudScheduler + class Job + # Payload used to schedule Cloudtasker Workers on Cloud Scheduler + class WorkerPayload + attr_reader :worker + + # + # Build a new instance of the class. + # + # @param [Cloudtasker::Worker] worker The Cloudtasker Worker instance. + # + def initialize(worker) + @worker = worker + end + + # + # Return the Hash representation of the job payload. + # + # @return [Hash] The job payload. + # + def to_h + JSON.parse(request_config[:body]) + end + + private + + # + # Return the HTTP request configuration for a Cloud Task. + # + # @return [Hash] The request configuration. + # + def request_config + worker_handler.task_payload[:http_request] + end + + # + # Return the worker handler. + # + # @return [Cloudtasker::WorkerHandler] The worker handler. + # + def worker_handler + @worker_handler ||= Cloudtasker::WorkerHandler.new(worker) + end + end + end + end +end diff --git a/lib/cloudtasker/cloud_scheduler/manager.rb b/lib/cloudtasker/cloud_scheduler/manager.rb new file mode 100644 index 00000000..d4a4da6b --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler/manager.rb @@ -0,0 +1,132 @@ +# frozen_string_literal: true + +require 'google/cloud/scheduler/v1' +require 'google/cloud/scheduler' + +module Cloudtasker + module CloudScheduler + # Manage the synchronization of jobs between the + # local configuration and the remote scheduler. + class Manager + # + # Synchronize the local configuration with the remote scheduler. + # + # @param [String] file The path to the schedule configuration file. + # + def self.synchronize!(file) + config = YAML.load_file(file) + jobs = Job.load_from_hash!(config) + + new(jobs).synchronize! + end + + attr_reader :jobs + + # + # Build a new instance of the class. + # + # @param [Array] jobs The list of jobs to synchronize. + # + def initialize(jobs) + @jobs = jobs + end + + # + # Synchronize the local configuration with the remote scheduler. + # + # @return [nil] + # + def synchronize! + new_jobs.map(&:create!) + stale_jobs.map(&:update!) + deleted_jobs.map { |job| client.delete_job(name: job) } + + nil + end + + private + + # + # Return all jobs from the remote scheduler. + # + # @return [Array] The list of job names. + # + def remote_jobs + @remote_jobs ||= client.list_jobs(parent: parent) + .response + .jobs + .map(&:name) + .select do |job| + job.start_with?(job_prefix) + end + end + + # + # Return all jobs that are not yet created in the remote scheduler. + # + # @return [Array] The list of jobs. + # + def new_jobs + jobs.reject do |job| + remote_jobs.include?(job.remote_name) + end + end + + # + # Return all jobs that are present in both local config and remote scheduler. + # + # @return [Array] The list of jobs. + # + def stale_jobs + jobs.select do |job| + remote_jobs.include?(job.remote_name) + end + end + + # + # Return all jobs that are present in the remote scheduler but not in the local config. + # + # @return [Array] The list of job names. + # + def deleted_jobs + remote_jobs - jobs.map(&:remote_name) + end + + # + # Prefix for all jobs that includes the parent path and the queue prefix. + # + # @return [String] The job prefix. + # + def job_prefix + "#{parent}/jobs/#{config.gcp_queue_prefix}--" + end + + # + # Return the parent path for all jobs. + # + # @return [String] The parent path. + # + def parent + @parent ||= client.location_path(project: config.gcp_project_id, location: config.gcp_location_id) + end + + # + # Return the Cloudtasker configuration. + # + # @return [Cloudtasker::Config] The configuration. + # + def config + @config ||= Cloudtasker.config + end + + # + # Return the Cloud Scheduler client. + # + # @return [Google::Cloud::Scheduler::V1::CloudSchedulerClient] The client. + # + def client + @client ||= Google::Cloud::Scheduler.cloud_scheduler + end + end + end +end diff --git a/lib/cloudtasker/cloud_scheduler/schedule.rb b/lib/cloudtasker/cloud_scheduler/schedule.rb new file mode 100644 index 00000000..d4b4ab85 --- /dev/null +++ b/lib/cloudtasker/cloud_scheduler/schedule.rb @@ -0,0 +1,151 @@ +# frozen_string_literal: true + +require 'fugit' +require 'cloudtasker/cloud_scheduler/job/active_job_payload' +require 'cloudtasker/cloud_scheduler/job/worker_payload' + +module Cloudtasker + module CloudScheduler + # Error raised when a schedule is invalid + class InvalidScheduleError < StandardError; end + + # Manage cron schedules + class Schedule + DEFAULT_TIME_ZONE = 'Etc/UTC' + + attr_accessor :id, :cron, :worker, :queue, :args, :time_zone + + # + # Return all valid schedules while raising an error if any schedule is invalid. + # + # @return [Array] The list of valid schedules. + # + # @raise [RuntimeError] If any schedule is invalid. + def self.load_from_hash!(hash) + return [] if hash.blank? + + hash.map do |id, config| + schedule = new( + id: id.to_s, + cron: config['cron'], + worker: config['worker'], + args: config['args'], + queue: config['queue'], + active_job: config['active_job'] || false, + time_zone: config['time_zone'] || DEFAULT_TIME_ZONE + ) + + raise InvalidScheduleError, "Invalid schedule: #{schedule.id}" unless schedule.valid? + + schedule + end + end + + # + # Build a new instance of the class. + # + # @param [String] id The schedule ID. + # @param [String] cron The cron expression. + # @param [Class] worker The worker class to run. + # @param [Array] args The worker arguments. + # @param [String] queue The queue to use for the cron job. + # @param [String] time_zone The time zone to use for the cron job. + # + def initialize(id:, cron:, worker:, **opts) + @id = id + @cron = cron + @worker = worker + @args = opts[:args] + @queue = opts[:queue] + @active_job = opts[:active_job] + @time_zone = opts[:time_zone] + end + + # + # Validate the schedule + # + # @return [Boolean] True if the schedule is valid, false otherwise. + # + def valid? + id.present? && cron_schedule.present? && worker.present? + end + + # + # Return the cron schedule to use for the job. + # + # @return [Fugit::Cron] The cron schedule. + # + def cron_schedule + @cron_schedule ||= Fugit::Cron.parse(cron) + end + + # + # Return if the specified worker is an ActiveJob. + # + # @return [Boolean] True if the worker is an ActiveJob, false otherwise. + # + def active_job? + @active_job + end + + # + # Return the job payload to make requests to the remote scheduler. + # + # @return [Hash] The job payload. + # + def job_payload + if active_job? + Job::ActiveJobPayload.new(active_job_instance).to_h + else + Job::WorkerPayload.new(worker_instance).to_h + end + end + + # + # Equality operator. + # + # @param [Any] other The object to compare. + # + # @return [Boolean] True if the object is equal. + # + def ==(other) + other.is_a?(self.class) && id == other.id + end + + private + + # + # Return an instance of the Cloudtasker Worker class. + # + # @return [Object] The worker instance. + # + def worker_instance + @worker_instance ||= worker_class.new(job_queue: queue, job_args: args) + end + + # + # Return an instance of the ActiveJob class. + # + # @return [Object] The ActiveJob instance. + # + def active_job_instance + @active_job_instance ||= begin + instance = worker_class.new(args) + instance.queue_name = queue if queue.present? + instance.timezone = time_zone if time_zone.present? + + instance + end + end + + # + # Return the worker class. + # + # @return [Class] The worker class. + # + def worker_class + worker.safe_constantize + end + end + end +end diff --git a/spec/cloudtasker/cloud_scheduler/job/active_job_payload_spec.rb b/spec/cloudtasker/cloud_scheduler/job/active_job_payload_spec.rb new file mode 100644 index 00000000..f53afa6d --- /dev/null +++ b/spec/cloudtasker/cloud_scheduler/job/active_job_payload_spec.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +RSpec.describe Cloudtasker::CloudScheduler::Job::ActiveJobPayload do + let(:worker) { TestActiveJob.new } + let(:payload) { described_class.new(worker) } + + before do + allow(worker).to receive(:job_id).and_return('123') + allow(worker).to receive(:queue_name).and_return('my_queue') + allow(worker).to receive(:serialize).and_return('serialized_job') + end + + describe '#to_h' do + subject(:hash) { payload.to_h } + + it { is_expected.to be_a(Hash) } + + it { + expect(hash).to include( + 'worker' => 'ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper', + 'job_id' => '123', 'job_args' => ['serialized_job'], + 'job_queue' => 'my_queue', 'job_meta' => {} + ) + } + end +end diff --git a/spec/cloudtasker/cloud_scheduler/job/worker_payload_spec.rb b/spec/cloudtasker/cloud_scheduler/job/worker_payload_spec.rb new file mode 100644 index 00000000..f1170b04 --- /dev/null +++ b/spec/cloudtasker/cloud_scheduler/job/worker_payload_spec.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +RSpec.describe Cloudtasker::CloudScheduler::Job::WorkerPayload do + let(:worker) { TestWorker.new(job_args: ['foo'], job_queue: 'my_queue') } + let(:payload) { described_class.new(worker) } + + describe '#to_h' do + subject(:hash) { payload.to_h } + + it { is_expected.to be_a(Hash) } + + it { + expect(hash).to include( + 'worker' => 'TestWorker', 'job_id' => worker.job_id, + 'job_args' => ['foo'], 'job_queue' => 'my_queue' + ) + } + end +end diff --git a/spec/cloudtasker/cloud_scheduler/job_spec.rb b/spec/cloudtasker/cloud_scheduler/job_spec.rb new file mode 100644 index 00000000..c6802493 --- /dev/null +++ b/spec/cloudtasker/cloud_scheduler/job_spec.rb @@ -0,0 +1,128 @@ +# frozen_string_literal: true + +require 'google/cloud/scheduler/v1' + +RSpec.describe Cloudtasker::CloudScheduler::Job do + let(:job) { described_class.new(scheduler) } + let(:parent_path) { '/path/to/parent' } + let(:queue_prefix) { Cloudtasker.config.gcp_queue_prefix } + let(:client) do + instance_double( + Google::Cloud::Scheduler::V1::CloudScheduler::Client, + location_path: parent_path, + create_job: Google::Cloud::Scheduler::V1::Job.new, + update_job: Google::Cloud::Scheduler::V1::Job.new, + delete_job: Google::Protobuf::Empty.new + ) + end + let(:scheduler) do + Cloudtasker::CloudScheduler::Schedule.new( + id: 'test', + cron: '* * * * *', + worker: 'TestWorker', + args: 'foo', + queue: 'default', + time_zone: 'America/New_York' + ) + end + + before do + allow(Google::Cloud::Scheduler).to receive(:cloud_scheduler).and_return(client) + end + + describe '.load_from_hash!' do + subject(:jobs) { described_class.load_from_hash!(hash) } + + let(:hash) do + { + 'test' => { + 'worker' => 'DummyWorker', + 'cron' => '* * * * *', + 'args' => { 'foo' => 'bar' }, + 'queue' => 'default', + 'time_zone' => 'America/New_York', + 'active_job' => true + } + } + end + + context 'with an empty hash' do + let(:hash) { {} } + + it { is_expected.to eq([]) } + end + + context 'with a valid schedule' do + it { is_expected.to be_a(Array) } + it { expect(jobs.size).to eq(1) } + it { expect(jobs.first).to be_a(described_class) } + end + end + + describe '.new' do + it { expect(job.schedule).to eq(scheduler) } + end + + describe '#prefix' do + subject(:parent) { job.prefix } + + it { is_expected.to eq("#{parent_path}/jobs/#{queue_prefix}--") } + end + + describe '#remote_name' do + subject(:name) { job.remote_name } + + it { is_expected.to eq("#{parent_path}/jobs/#{queue_prefix}--#{scheduler.id}") } + end + + describe '#name' do + subject(:name) { job.name } + + it { is_expected.to eq(scheduler.id) } + end + + describe '#create!' do + subject(:create!) { job.create! } + + after { expect(client).to have_received(:create_job) } + + it { is_expected.to be_a(Google::Cloud::Scheduler::V1::Job) } + end + + describe '#update!' do + subject(:update!) { job.update! } + + after { expect(client).to have_received(:update_job) } + + it { is_expected.to be_a(Google::Cloud::Scheduler::V1::Job) } + end + + describe '#delete!' do + subject(:delete!) { job.delete! } + + after { expect(client).to have_received(:delete_job) } + + it { is_expected.to be_a(Google::Protobuf::Empty) } + end + + describe '#payload' do + subject(:payload) { job.payload } + + before { allow(Cloudtasker::Authenticator).to receive(:bearer_token).and_return('token') } + + it { expect(payload[:name]).to eq(job.remote_name) } + it { expect(payload[:schedule]).to eq(scheduler.cron) } + it { expect(payload[:time_zone]).to eq(scheduler.time_zone) } + it { expect(payload[:http_target][:http_method]).to eq('POST') } + it { expect(payload[:http_target][:uri]).to eq(Cloudtasker.config.processor_url) } + it { expect(payload[:http_target][:oidc_token]).to eq(Cloudtasker.config.oidc) } + it { expect(payload[:http_target][:body]).to eq(scheduler.job_payload.to_json) } + + it { + expect(payload[:http_target][:headers]).to eq({ + Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json', + Cloudtasker::Config::CT_AUTHORIZATION_HEADER => 'token' + }) + } + end +end diff --git a/spec/cloudtasker/cloud_scheduler/manager_spec.rb b/spec/cloudtasker/cloud_scheduler/manager_spec.rb new file mode 100644 index 00000000..d6bf2b65 --- /dev/null +++ b/spec/cloudtasker/cloud_scheduler/manager_spec.rb @@ -0,0 +1,77 @@ +# frozen_string_literal: true + +require 'google/cloud/scheduler/v1' + +RSpec.describe Cloudtasker::CloudScheduler::Manager do + let(:manager) { described_class.new(jobs) } + let(:parent_path) { '/path/to/parent' } + let(:queue_prefix) { Cloudtasker.config.gcp_queue_prefix } + let(:client) do + instance_double( + Google::Cloud::Scheduler::V1::CloudScheduler::Client, + location_path: parent_path, + create_job: Google::Cloud::Scheduler::V1::Job.new, + update_job: Google::Cloud::Scheduler::V1::Job.new, + delete_job: Google::Protobuf::Empty.new + ) + end + let(:jobs) do + [ + Cloudtasker::CloudScheduler::Job.new( + Cloudtasker::CloudScheduler::Schedule.new( + id: 'test', + cron: '* * * * *', + worker: 'TestWorker', + args: 'foo', + queue: 'default', + time_zone: 'America/New_York' + ) + ) + ] + end + + before do + allow(Google::Cloud::Scheduler).to receive(:cloud_scheduler).and_return(client) + end + + describe '.synchronize!' do + subject(:synchronize!) { described_class.synchronize!(file) } + + let(:file) { 'path/to/file' } + let(:config) { { 'test' => { 'worker' => 'TestWorker' } } } + let(:jobs) { [instance_double(Cloudtasker::CloudScheduler::Job, create!: nil, update!: nil)] } + let(:manager) { instance_double(described_class, synchronize!: nil) } + + before do + allow(YAML).to receive(:load_file).with(file).and_return(config) + allow(Cloudtasker::CloudScheduler::Job).to receive(:load_from_hash!).with(config).and_return(jobs) + allow(described_class).to receive(:new).with(jobs).and_return(manager) + end + + after { expect(manager).to have_received(:synchronize!) } + + it { is_expected.to be_nil } + end + + describe '#synchronize!' do + subject(:synchronize!) { manager.synchronize! } + + let(:new_jobs) { [instance_double(Cloudtasker::CloudScheduler::Job, create!: nil)] } + let(:stale_jobs) { [instance_double(Cloudtasker::CloudScheduler::Job, update!: nil)] } + let(:deleted_jobs) { ['path/to/deleted/job'] } + + before do + allow(manager).to receive(:new_jobs).and_return(new_jobs) + allow(manager).to receive(:stale_jobs).and_return(stale_jobs) + allow(manager).to receive(:deleted_jobs).and_return(deleted_jobs) + end + + after do + expect(new_jobs.first).to have_received(:create!) + expect(stale_jobs.first).to have_received(:update!) + expect(client).to have_received(:delete_job).with(name: deleted_jobs.first) + end + + it { is_expected.to be_nil } + end +end diff --git a/spec/cloudtasker/cloud_scheduler/schedule_spec.rb b/spec/cloudtasker/cloud_scheduler/schedule_spec.rb new file mode 100644 index 00000000..1904fdd3 --- /dev/null +++ b/spec/cloudtasker/cloud_scheduler/schedule_spec.rb @@ -0,0 +1,172 @@ +# frozen_string_literal: true + +RSpec.describe Cloudtasker::CloudScheduler::Schedule do + describe '.load_from_hash!' do + subject(:schedules) { described_class.load_from_hash!(hash) } + + let(:hash) do + { + 'test' => { + 'worker' => 'TestWorker', + 'cron' => '* * * * *', + 'args' => { 'foo' => 'bar' }, + 'queue' => 'default', + 'time_zone' => 'America/New_York', + 'active_job' => true + } + } + end + + context 'with an empty hash' do + let(:hash) { {} } + + it { is_expected.to eq([]) } + end + + context 'with an invalid schedule' do + let(:hash) { { 'test' => { 'worker' => 'TestWorker' } } } + + it 'raises an error' do + expect { schedules }.to raise_error(Cloudtasker::CloudScheduler::InvalidScheduleError) + end + end + + context 'with a valid schedule' do + it { is_expected.to be_a(Array) } + it { expect(schedules.size).to eq(1) } + it { expect(schedules.first).to be_a(described_class) } + end + end + + describe '.new' do + subject(:schedule) do + described_class.new( + id: id, + cron: cron, + worker: worker, + args: args, + queue: queue, + time_zone: time_zone + ) + end + + let(:id) { 'test' } + let(:cron) { '* * * * *' } + let(:worker) { 'TestWorker' } + let(:args) { { foo: 'bar' } } + let(:queue) { 'default' } + let(:time_zone) { 'America/New_York' } + + it { expect(schedule.id).to eq(id) } + it { expect(schedule.cron).to eq(cron) } + it { expect(schedule.worker).to eq(worker) } + it { expect(schedule.args).to eq(args) } + it { expect(schedule.queue).to eq(queue) } + it { expect(schedule.time_zone).to eq(time_zone) } + end + + describe '#valid?' do + subject { described_class.new(id: id, cron: cron, worker: worker).valid? } + + let(:id) { 'test' } + let(:cron) { '* * * * *' } + let(:worker) { 'TestWorker' } + + context 'with blank id' do + let(:id) { '' } + + it { is_expected.not_to be_truthy } + end + + context 'with an invalid cron' do + let(:cron) { 'invalid' } + + it { is_expected.not_to be_truthy } + end + + context 'with a blank worker' do + let(:worker) { '' } + + it { is_expected.not_to be_truthy } + end + + context 'with a valid id, cron and worker' do + it { is_expected.to be_truthy } + end + end + + describe '#cron_schedule' do + subject { described_class.new(id: 'test', cron: cron, worker: 'TestWorker').cron_schedule } + + let(:cron) { '* * * * *' } + + it { is_expected.to be_a(Fugit::Cron) } + end + + describe '#active_job?' do + subject do + described_class.new(id: 'test', cron: '* * * * *', worker: 'TestWorker', active_job: active_job).active_job? + end + + let(:active_job) { true } + + context 'with active_job set to true' do + it { is_expected.to be_truthy } + end + + context 'with active_job set to false' do + let(:active_job) { false } + + it { is_expected.not_to be_truthy } + end + end + + describe '#job_payload' do + subject(:payload) do + described_class.new( + id: 'test', + cron: '* * * * *', + worker: worker, + args: args, + queue: queue, + active_job: active_job, + time_zone: time_zone + ).job_payload + end + + let(:worker) { 'TestWorker' } + let(:args) { { 'foo' => 'bar' } } + let(:queue) { 'default' } + let(:active_job) { false } + let(:time_zone) { 'America/New_York' } + + before do + allow(TestActiveJob).to receive(:new).and_return(TestActiveJob.new) + allow_any_instance_of(TestActiveJob).to receive(:queue_name=).and_return(queue) + allow_any_instance_of(TestActiveJob).to receive(:queue_name).and_return(queue) + allow_any_instance_of(TestActiveJob).to receive(:timezone=).and_return(time_zone) + allow_any_instance_of(TestActiveJob).to receive(:timezone).and_return(time_zone) + allow_any_instance_of(TestActiveJob).to receive(:job_id).and_return('1234') + allow_any_instance_of(TestActiveJob).to receive(:serialize).and_return('{}') + end + + context 'with a regular worker' do + it { is_expected.to be_a(Hash) } + it { expect(payload['worker']).to eq(worker) } + it { expect(payload['job_queue']).to eq(queue) } + it { expect(payload['job_args']).to eq(args) } + end + + context 'with an ActiveJob worker' do + let(:worker) { 'TestActiveJob' } + let(:active_job) { true } + + it { is_expected.to be_a(Hash) } + it { expect(payload['worker']).to eq('ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper') } + it { expect(payload['job_queue']).to eq(queue) } + it { expect(payload['job_args']).to eq(['{}']) } + it { expect(payload['job_id']).to eq('1234') } + it { expect(payload['job_meta']).to eq({}) } + end + end +end diff --git a/spec/support/test_active_job.rb b/spec/support/test_active_job.rb new file mode 100644 index 00000000..f7732278 --- /dev/null +++ b/spec/support/test_active_job.rb @@ -0,0 +1,5 @@ +# frozen_string_literal: true + +class TestActiveJob + def perform(args); end +end