diff --git a/app/jobs/order_cycle_opened_job.rb b/app/jobs/order_cycle_opened_job.rb index 67904764f2d..e100d216444 100644 --- a/app/jobs/order_cycle_opened_job.rb +++ b/app/jobs/order_cycle_opened_job.rb @@ -3,9 +3,13 @@ # Trigger jobs for any order cycles that recently opened class OrderCycleOpenedJob < ApplicationJob def perform - recently_opened_order_cycles.each do |oc_id| - SubscriptionPlacementJob.perform_later(oc_id) - OrderCycleWebhookJob.perform_later(oc_id, 'order_cycle.opened') + ActiveRecord::Base.transaction do + recently_opened_order_cycles.find_each do |order_cycle| + SubscriptionPlacementJob.perform_later(order_cycle.id) + OrderCycleWebhookJob.perform_later(order_cycle.id, 'order_cycle.opened') + + mark_as_notified(order_cycle) + end end end @@ -13,7 +17,13 @@ def perform def recently_opened_order_cycles OrderCycle + .where(opened_notification_at: nil) .where(orders_open_at: 1.hour.ago..Time.zone.now) - .pluck(:id) + .lock.order(:id) + end + + def mark_as_notified(order_cycle) + now = Time.zone.now + order_cycle.update_columns(opened_notification_at: now, updated_at: now) end end diff --git a/app/models/order_cycle.rb b/app/models/order_cycle.rb index ce1846cf540..bdb1d9beb40 100644 --- a/app/models/order_cycle.rb +++ b/app/models/order_cycle.rb @@ -34,6 +34,7 @@ class OrderCycle < ApplicationRecord attr_accessor :incoming_exchanges, :outgoing_exchanges + before_update :reset_opened_notification_at, if: :will_save_change_to_orders_open_at? before_update :reset_processed_at, if: :will_save_change_to_orders_close_at? after_save :sync_subscriptions, if: :opening? @@ -333,6 +334,13 @@ def orders_close_at_after_orders_open_at? errors.add(:orders_close_at, :after_orders_open_at) end + def reset_opened_notification_at + return unless orders_open_at.present? && orders_open_at_was.present? + return unless orders_open_at > orders_open_at_was + + self.opened_notification_at = nil + end + def reset_processed_at return unless orders_close_at.present? && orders_close_at_was.present? return unless orders_close_at > orders_close_at_was diff --git a/db/migrate/20220919063831_add_opened_notification_at_to_order_cycle.rb b/db/migrate/20220919063831_add_opened_notification_at_to_order_cycle.rb new file mode 100644 index 00000000000..b56e0a87d02 --- /dev/null +++ b/db/migrate/20220919063831_add_opened_notification_at_to_order_cycle.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +class AddOpenedNotificationAtToOrderCycle < ActiveRecord::Migration[6.1] + def change + add_column :order_cycles, :opened_notification_at, :timestamp + end +end diff --git a/db/schema.rb b/db/schema.rb index ec58ba107fd..a689f5afced 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -310,6 +310,7 @@ t.datetime "processed_at" t.boolean "automatic_notifications", default: false t.boolean "mails_sent", default: false + t.datetime "opened_notification_at" end create_table "order_cycles_distributor_payment_methods", id: false, force: :cascade do |t| diff --git a/spec/jobs/order_cycle_opened_job_spec.rb b/spec/jobs/order_cycle_opened_job_spec.rb index bee76640419..fddb6e3167d 100644 --- a/spec/jobs/order_cycle_opened_job_spec.rb +++ b/spec/jobs/order_cycle_opened_job_spec.rb @@ -29,4 +29,38 @@ .to enqueue_job(SubscriptionPlacementJob).with(oc_opened_now.id) .and enqueue_job(OrderCycleWebhookJob).with(oc_opened_now.id, 'order_cycle.opened') end + + describe "concurrency", concurrency: true do + let(:breakpoint) { Mutex.new } + + it "doesn't place duplicate orders when run concurrently" do + oc_opened_now + + # Pause jobs when placing new job: + breakpoint.lock + allow(OrderCycleWebhookJob).to( + receive(:new).and_wrap_original do |method, *args| + breakpoint.synchronize {} + method.call(*args) + end + ) + + expect { + # Start two jobs in parallel: + threads = [ + Thread.new { OrderCycleOpenedJob.perform_now }, + Thread.new { OrderCycleOpenedJob.perform_now }, + ] + + # Wait for both to jobs to pause. + # This can reveal a race condition. + sleep 0.1 + + # Resume and complete both jobs: + breakpoint.unlock + threads.each(&:join) + }.to enqueue_job(OrderCycleWebhookJob) + .with(oc_opened_now.id, 'order_cycle.opened').once + end + end end diff --git a/spec/jobs/order_cycle_webhook_job_spec.rb b/spec/jobs/order_cycle_webhook_job_spec.rb index b3eb548d0a6..3ef3664e2bc 100644 --- a/spec/jobs/order_cycle_webhook_job_spec.rb +++ b/spec/jobs/order_cycle_webhook_job_spec.rb @@ -11,7 +11,6 @@ orders_close_at: "2022-09-19 17:00:00".to_time, ) } - let(:endpoint) { order_cycle.webhook_endpoints.create url: "http://url", event_name: "order_cycle.event" } context "enterprise owner has a webhook endpoint" do before do @@ -54,7 +53,6 @@ job = OrderCycleWebhookJob.new(order_cycle.id, 'order_cycle.event') order_cycle.destroy! - expect(WebhookDeliveryJob).to_not receive(:perform_later) expect(JobLogger.logger).to receive(:info).with(/Couldn't find OrderCycle/) expect{ job.perform_now }.not_to raise_error # so that job runner doesn't attempt to re-run @@ -67,6 +65,4 @@ .not_to enqueue_job(WebhookDeliveryJob) end end - - pending "doesn't create duplicate webhook jobs" end diff --git a/spec/models/order_cycle_spec.rb b/spec/models/order_cycle_spec.rb index 818b900e196..3abcff1d9c6 100644 --- a/spec/models/order_cycle_spec.rb +++ b/spec/models/order_cycle_spec.rb @@ -551,6 +551,30 @@ end end + describe "opened_notification_at " do + let!(:oc) { + create(:simple_order_cycle, orders_open_at: 2.days.ago, orders_close_at: 1.day.ago, opened_notification_at: 1.week.ago) + } + + it "reset opened_notification_at if open date change in future" do + expect(oc.opened_notification_at).to_not be_nil + oc.update!(orders_open_at: 1.week.from_now, orders_close_at: 2.weeks.from_now) + expect(oc.opened_notification_at).to be_nil + end + + it "it does not reset opened_notification_at if open date is changed to be earlier" do + expect(oc.opened_notification_at).to_not be_nil + oc.update!(orders_open_at: 3.days.ago) + expect(oc.opened_notification_at).to_not be_nil + end + + it "it does not reset opened_notification_at if open date does not change" do + expect(oc.opened_notification_at).to_not be_nil + oc.update!(orders_close_at: 1.day.from_now) + expect(oc.opened_notification_at).to_not be_nil + end + end + describe "processed_at " do let!(:oc) { create(:simple_order_cycle, orders_open_at: 1.week.ago, orders_close_at: 1.day.ago, processed_at: 1.hour.ago) @@ -562,13 +586,13 @@ expect(oc.processed_at).to be_nil end - it "it does not reset processed_at if close date change in the past" do + it "it does not reset processed_at if close date is changed to be earlier" do expect(oc.processed_at).to_not be_nil oc.update!(orders_close_at: 2.days.ago) expect(oc.processed_at).to_not be_nil end - it "it does not reset processed_at if close date do not change" do + it "it does not reset processed_at if close date does not change" do expect(oc.processed_at).to_not be_nil oc.update!(orders_open_at: 2.weeks.ago) expect(oc.processed_at).to_not be_nil