Skip to content

Commit

Permalink
PERF: Don't allow a single user to monopolize the defer queue (discou…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielwaterworth authored Feb 7, 2024
1 parent 67229a7 commit 3092285
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 10 deletions.
1 change: 1 addition & 0 deletions lib/hijack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def hijack(info: nil, &blk)
Scheduler::Defer.later(
"hijack #{params["controller"]} #{params["action"]} #{info}",
force: false,
current_user: current_user&.id,
&scheduled.method(:resolve)
)
rescue WorkQueue::WorkQueueFull
Expand Down
16 changes: 12 additions & 4 deletions lib/scheduler/defer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ def initialize
@async = !Rails.env.test?
@queue =
WorkQueue::ThreadSafeWrapper.new(
WorkQueue::FairQueue.new(500) { WorkQueue::BoundedQueue.new(100) },
WorkQueue::FairQueue.new(:site, 500) do
WorkQueue::FairQueue.new(:user, 100) { WorkQueue::BoundedQueue.new(50) }
end,
)

@mutex = Mutex.new
Expand Down Expand Up @@ -48,15 +50,21 @@ def async=(val)
@async = val
end

def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, force: true, &blk)
def later(
desc = nil,
db = RailsMultisite::ConnectionManagement.current_db,
force: true,
current_user: nil,
&blk
)
@stats_mutex.synchronize do
stats = (@stats[desc] ||= { queued: 0, finished: 0, duration: 0, errors: 0 })
stats[:queued] += 1
end

if @async
start_thread if !@thread&.alive? && !@paused
@queue.push({ key: db, task: [db, blk, desc] }, force: force)
@queue.push({ site: db, user: current_user, db: db, job: blk, desc: desc }, force: force)
else
blk.call
end
Expand Down Expand Up @@ -93,7 +101,7 @@ def start_thread

# using non_block to match Ruby #deq
def do_work(non_block = false)
db, job, desc = @queue.shift(block: !non_block)[:task]
db, job, desc = @queue.shift(block: !non_block).values_at(:db, :job, :desc)

start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
db ||= RailsMultisite::ConnectionManagement::DEFAULT
Expand Down
9 changes: 4 additions & 5 deletions lib/work_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ def size
class FairQueue
attr_reader :size

def initialize(limit, &blk)
def initialize(key, limit, &blk)
@limit = limit
@size = 0
@key = key
@elements = Hash.new { |h, k| h[k] = blk.call }
end

def push(task, force:)
raise WorkQueueFull if !force && @size >= @limit
key, task = task.values_at(:key, :task)
key = task[@key]
@elements[key].push(task, force: force)
@size += 1
nil
Expand All @@ -72,10 +73,8 @@ def shift
task = queue.shift

@elements[key] = queue unless queue.empty?

@size -= 1

{ key: key, task: task }
task
end
end

Expand Down
36 changes: 36 additions & 0 deletions spec/lib/scheduler/defer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,40 @@ def wait_for(timeout, &blk)

expect(s).to eq("good")
end

describe "#later" do
let!(:ivar) { Concurrent::IVar.new }
let!(:responses) { Thread::Queue.new }

def later(db, current_user, request)
@defer.later(nil, db, current_user: current_user) do
ivar.value
responses.push([db, current_user, request])
end
end

it "runs jobs in a fair order" do
later("site1", 1, 1)
later("site1", 1, 2)
later("site1", 2, 3)
later("site2", 3, 4)
later("site2", 4, 5)
later("site2", 4, 6)

ivar.set(nil)

result = 6.times.map { responses.shift }

expect(result).to eq(
[
["site1", 1, 1],
["site2", 3, 4],
["site1", 2, 3],
["site2", 4, 5],
["site1", 1, 2],
["site2", 4, 6],
],
)
end
end
end
2 changes: 1 addition & 1 deletion spec/lib/work_queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@

RSpec.describe WorkQueue::FairQueue do
subject(:queue) do
WorkQueue::FairQueue.new(global_limit) { WorkQueue::BoundedQueue.new(per_key_limit) }
WorkQueue::FairQueue.new(:key, global_limit) { WorkQueue::BoundedQueue.new(per_key_limit) }
end

let(:global_limit) { 5 }
Expand Down

0 comments on commit 3092285

Please sign in to comment.