From 96707138629a2941a138361ff9f5b7bf76982bcb Mon Sep 17 00:00:00 2001 From: "NARUSE, Yui" Date: Fri, 21 Oct 2016 18:22:12 +0900 Subject: [PATCH] Support multiple queues --- bin/stress | 63 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/bin/stress b/bin/stress index eb7dc7f..4a0b971 100755 --- a/bin/stress +++ b/bin/stress @@ -13,9 +13,11 @@ queue_config = :table => 'test_stress', :disable_resource_limit => true, # TODO backend-specific test cases :cleanup_interval => 200, + alive_time: 300, insert_processes: 0, worker_processes: 0, } +multiple_queues = nil opt = OptionParser.new opt.on('--url URL', 'database url') {|v| queue_config[:url] = v } @@ -24,30 +26,33 @@ opt.on('--disable_resource_limit=true', TrueClass, 'use resource limit or not') opt.on('--cleanup_interval SECOND', Integer, 'cleanup interval') {|v| queue_config[:cleanup_interval] = v } opt.on('--insert_processes NUM', Integer, 'inserts') {|v| queue_config[:insert_processes] = v } opt.on('--worker_processes NUM', Integer, 'workers') {|v| queue_config[:worker_processes] = v } +opt.on('--alive-time SECOND', Integer, 'alive time') {|v| queue_config[:alive_time] = v } opt.on('--retention-time SECOND', Integer, 'retention time') {|v| queue_config[:retention_time] = v } +opt.on('--task-prefetch COUNT', Integer, 'task prefetch') {|v| queue_config[:task_prefetch] = v } +opt.on('--multi-queues COUNT', Integer, 'multiple queues; spwan NUM processes for each queue') {|v| multiple_queues = v } opt.parse!(ARGV) module PerfectQueue class Queue - def submit1000(data) - @client.submit1000(data) + def submit10000(data) + @client.submit10000(data) end end class Client - def submit1000(data) - @backend.submit1000(data) + def submit10000(data) + @backend.submit10000(data) end end module Backend class RDBCompatBackend - def submit1000(h) + def submit10000(h) rd = Random.new i = 0 connect { begin begin n = Process.clock_gettime(Process::CLOCK_REALTIME, :second) - submit0("import.1/main.action_#{n}_#{rd.hex(80)}", 'user02', h, now: n) + submit0("import.1/main.action_#{rd.hex(20)}", 'user02', h, now: n) end while (i+=1) < 10000 end } @@ -91,8 +96,9 @@ module PerfectQueue end -def insert1000(queue) +def insert10000(queue) Process.setproctitle'bin/stress/insert' + rd = Random.new while 1 t0 = t = Process.clock_gettime(Process::CLOCK_REALTIME, :second) h = {"path":"in/1/main.action.e9f070b5bfea96442af13ce6acc36699_0f7ad8aee859867aae303190e372ec1e.msgpack.gz", @@ -113,13 +119,13 @@ def insert1000(queue) "params":{}}, "params":{}} begin - queue.submit1000(h) + queue.submit10000(h) rescue Sequel::DatabaseError p $! sleep 5 end t = Process.clock_gettime(Process::CLOCK_REALTIME, :second) - puts "#{__method__}#{Process.pid}: #{t-t0}sec for 1000 inserts\n" + puts "#{__method__}#{Process.pid}: #{t-t0}sec for 10000 inserts\n" end rescue Interrupt exit @@ -149,7 +155,7 @@ def insert(queue) "params":{}} begin n = Process.clock_gettime(Process::CLOCK_REALTIME, :second) - queue.submit("import.1/main.action_#{n}_#{rd.hex(20)}", 'user02', h, now: t) + queue.submit("import.1/main.action_#{rd.hex(20)}", 'user02', h, now: t) rescue p $! sleep 1 @@ -164,14 +170,15 @@ def worker(queue) i = 0 t0 = t = Process.clock_gettime(Process::CLOCK_REALTIME, :second) begin - ary = queue.poll_multi(max_acquire: 11, now: t) - sleep 1 + ary = queue.poll_multi(max_acquire: queue.config[:task_prefetch].to_i+1) ary.each do |x| + x.heartbeat! + sleep 2+rand x.finish!({}) end if ary t = Process.clock_gettime(Process::CLOCK_REALTIME, :second) rescue - p $!.class + p $! sleep 1 end while (i+=1) < 100 puts "#{__method__}#{Process.pid}: #{t-t0}sec for 100 acquires\n" @@ -180,17 +187,29 @@ rescue Interrupt exit end -pids = [] -queue = PerfectQueue.open(queue_config) -#queue.client.init_database(:force => true) -queue.config[:insert_processes].times do - pids << fork { insert1000(queue) } +def fork_processes(pids, queue_config) + queue = PerfectQueue.open(queue_config) + #queue.client.init_database(:force => true) + queue.config[:insert_processes].times do + pids << fork { insert10000(queue) } + end + queue.config[:worker_processes].times do + #queue.client.backend.instance_variable_set(:@cleanup_interval_count, rand(200)) + pids << fork { worker(queue) } + end + queue.close end -queue.config[:worker_processes].times do - #queue.client.backend.instance_variable_set(:@cleanup_interval_count, rand(200)) - pids << fork { worker(queue) } + +pids = [] +if multiple_queues + multiple_queues.times do |i| + config = queue_config.dup + config[:table] += i.to_s + fork_processes(pids, config) + end +else + fork_processes(pids, queue_config) end -queue.close trap (:INT) do pids.each do |pid|