Skip to content

Commit

Permalink
Prevent creating duplicate webhook notifications [migration]
Browse files Browse the repository at this point in the history
Using the clever concurrency testing borrowed from SubscriptionPlacementJob, but I thought a shorter pause time (just 100ms) would be sufficient.
  • Loading branch information
dacook committed Feb 7, 2023
1 parent 9cc0b19 commit c56b078
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 10 deletions.
18 changes: 14 additions & 4 deletions app/jobs/order_cycle_opened_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,27 @@
# Trigger jobs for any order cycles that recently opened
class OrderCycleOpenedJob < ApplicationJob
def perform
recently_opened_order_cycles.each do |oc_id|
SubscriptionPlacementJob.perform_later(oc_id)
OrderCycleWebhookJob.perform_later(oc_id, 'order_cycle.opened')
ActiveRecord::Base.transaction do
recently_opened_order_cycles.find_each do |order_cycle|
SubscriptionPlacementJob.perform_later(order_cycle.id)
OrderCycleWebhookJob.perform_later(order_cycle.id, 'order_cycle.opened')

mark_as_notified(order_cycle)
end
end
end

private

def recently_opened_order_cycles
OrderCycle
.where(opened_notification_at: nil)
.where(orders_open_at: 1.hour.ago..Time.zone.now)
.pluck(:id)
.lock.order(:id)
end

def mark_as_notified(order_cycle)
now = Time.zone.now
order_cycle.update_columns(opened_notification_at: now, updated_at: now)
end
end
8 changes: 8 additions & 0 deletions app/models/order_cycle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class OrderCycle < ApplicationRecord

attr_accessor :incoming_exchanges, :outgoing_exchanges

before_update :reset_opened_notification_at, if: :will_save_change_to_orders_open_at?
before_update :reset_processed_at, if: :will_save_change_to_orders_close_at?
after_save :sync_subscriptions, if: :opening?

Expand Down Expand Up @@ -333,6 +334,13 @@ def orders_close_at_after_orders_open_at?
errors.add(:orders_close_at, :after_orders_open_at)
end

def reset_opened_notification_at
return unless orders_open_at.present? && orders_open_at_was.present?
return unless orders_open_at > orders_open_at_was

self.opened_notification_at = nil
end

def reset_processed_at
return unless orders_close_at.present? && orders_close_at_was.present?
return unless orders_close_at > orders_close_at_was
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

class AddOpenedNotificationAtToOrderCycle < ActiveRecord::Migration[6.1]
def change
add_column :order_cycles, :opened_notification_at, :timestamp
end
end
1 change: 1 addition & 0 deletions db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@
t.datetime "processed_at"
t.boolean "automatic_notifications", default: false
t.boolean "mails_sent", default: false
t.datetime "opened_notification_at"
end

create_table "order_cycles_distributor_payment_methods", id: false, force: :cascade do |t|
Expand Down
34 changes: 34 additions & 0 deletions spec/jobs/order_cycle_opened_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,38 @@
.to enqueue_job(SubscriptionPlacementJob).with(oc_opened_now.id)
.and enqueue_job(OrderCycleWebhookJob).with(oc_opened_now.id, 'order_cycle.opened')
end

describe "concurrency", concurrency: true do
let(:breakpoint) { Mutex.new }

it "doesn't place duplicate orders when run concurrently" do
oc_opened_now

# Pause jobs when placing new job:
breakpoint.lock
allow(OrderCycleWebhookJob).to(
receive(:new).and_wrap_original do |method, *args|
breakpoint.synchronize {}
method.call(*args)
end
)

expect {
# Start two jobs in parallel:
threads = [
Thread.new { OrderCycleOpenedJob.perform_now },
Thread.new { OrderCycleOpenedJob.perform_now },
]

# Wait for both to jobs to pause.
# This can reveal a race condition.
sleep 0.1

# Resume and complete both jobs:
breakpoint.unlock
threads.each(&:join)
}.to enqueue_job(OrderCycleWebhookJob)
.with(oc_opened_now.id, 'order_cycle.opened').once
end
end
end
4 changes: 0 additions & 4 deletions spec/jobs/order_cycle_webhook_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
orders_close_at: "2022-09-19 17:00:00".to_time,
)
}
let(:endpoint) { order_cycle.webhook_endpoints.create url: "http://url", event_name: "order_cycle.event" }

context "enterprise owner has a webhook endpoint" do
before do
Expand Down Expand Up @@ -54,7 +53,6 @@
job = OrderCycleWebhookJob.new(order_cycle.id, 'order_cycle.event')
order_cycle.destroy!

expect(WebhookDeliveryJob).to_not receive(:perform_later)
expect(JobLogger.logger).to receive(:info).with(/Couldn't find OrderCycle/)

expect{ job.perform_now }.not_to raise_error # so that job runner doesn't attempt to re-run
Expand All @@ -67,6 +65,4 @@
.not_to enqueue_job(WebhookDeliveryJob)
end
end

pending "doesn't create duplicate webhook jobs"
end
28 changes: 26 additions & 2 deletions spec/models/order_cycle_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,30 @@
end
end

describe "opened_notification_at " do
let!(:oc) {
create(:simple_order_cycle, orders_open_at: 2.days.ago, orders_close_at: 1.day.ago, opened_notification_at: 1.week.ago)
}

it "reset opened_notification_at if open date change in future" do
expect(oc.opened_notification_at).to_not be_nil
oc.update!(orders_open_at: 1.week.from_now, orders_close_at: 2.weeks.from_now)
expect(oc.opened_notification_at).to be_nil
end

it "it does not reset opened_notification_at if open date is changed to be earlier" do
expect(oc.opened_notification_at).to_not be_nil
oc.update!(orders_open_at: 3.days.ago)
expect(oc.opened_notification_at).to_not be_nil
end

it "it does not reset opened_notification_at if open date does not change" do
expect(oc.opened_notification_at).to_not be_nil
oc.update!(orders_close_at: 1.day.from_now)
expect(oc.opened_notification_at).to_not be_nil
end
end

describe "processed_at " do
let!(:oc) {
create(:simple_order_cycle, orders_open_at: 1.week.ago, orders_close_at: 1.day.ago, processed_at: 1.hour.ago)
Expand All @@ -562,13 +586,13 @@
expect(oc.processed_at).to be_nil
end

it "it does not reset processed_at if close date change in the past" do
it "it does not reset processed_at if close date is changed to be earlier" do
expect(oc.processed_at).to_not be_nil
oc.update!(orders_close_at: 2.days.ago)
expect(oc.processed_at).to_not be_nil
end

it "it does not reset processed_at if close date do not change" do
it "it does not reset processed_at if close date does not change" do
expect(oc.processed_at).to_not be_nil
oc.update!(orders_open_at: 2.weeks.ago)
expect(oc.processed_at).to_not be_nil
Expand Down

0 comments on commit c56b078

Please sign in to comment.