Skip to content

Commit e3eace8

Browse files
committed
Support multiple queues
1 parent 3bedea7 commit e3eace8

File tree

1 file changed

+39
-21
lines changed

1 file changed

+39
-21
lines changed

bin/stress

+39-21
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ queue_config =
1313
:table => 'test_stress',
1414
:disable_resource_limit => true, # TODO backend-specific test cases
1515
:cleanup_interval => 200,
16+
alive_time: 60,
1617
insert_processes: 0,
1718
worker_processes: 0,
1819
}
20+
multiple_queues = nil
1921
opt = OptionParser.new
2022

2123
opt.on('--url URL', 'database url') {|v| queue_config[:url] = v }
@@ -25,29 +27,31 @@ opt.on('--cleanup_interval SECOND', Integer, 'cleanup interval') {|v| queue_conf
2527
opt.on('--insert_processes NUM', Integer, 'inserts') {|v| queue_config[:insert_processes] = v }
2628
opt.on('--worker_processes NUM', Integer, 'workers') {|v| queue_config[:worker_processes] = v }
2729
opt.on('--retention-time SECOND', Integer, 'retention time') {|v| queue_config[:retention_time] = v }
30+
opt.on('--task-prefetch COUNT', Integer, 'task prefetch') {|v| queue_config[:task_prefetch] = v }
31+
opt.on('--multi-queues COUNT', Integer, 'multiple queues; spwan NUM processes for each queue') {|v| multiple_queues = v }
2832
opt.parse!(ARGV)
2933

3034
module PerfectQueue
3135
class Queue
32-
def submit1000(data)
33-
@client.submit1000(data)
36+
def submit10000(data)
37+
@client.submit10000(data)
3438
end
3539
end
3640
class Client
37-
def submit1000(data)
38-
@backend.submit1000(data)
41+
def submit10000(data)
42+
@backend.submit10000(data)
3943
end
4044
end
4145
module Backend
4246
class RDBCompatBackend
43-
def submit1000(h)
47+
def submit10000(h)
4448
rd = Random.new
4549
i = 0
4650
connect {
4751
begin
4852
begin
4953
n = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
50-
submit0("import.1/main.action_#{n}_#{rd.hex(80)}", 'user02', h, now: n)
54+
submit0("import.1/main.action_#{rd.hex(20)}", 'user02', h, now: n)
5155
end while (i+=1) < 10000
5256
end
5357
}
@@ -91,8 +95,9 @@ module PerfectQueue
9195
end
9296

9397

94-
def insert1000(queue)
98+
def insert10000(queue)
9599
Process.setproctitle'bin/stress/insert'
100+
rd = Random.new
96101
while 1
97102
t0 = t = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
98103
h = {"path":"in/1/main.action.e9f070b5bfea96442af13ce6acc36699_0f7ad8aee859867aae303190e372ec1e.msgpack.gz",
@@ -113,13 +118,13 @@ def insert1000(queue)
113118
"params":{}},
114119
"params":{}}
115120
begin
116-
queue.submit1000(h)
121+
queue.submit10000(h)
117122
rescue Sequel::DatabaseError
118123
p $!
119124
sleep 5
120125
end
121126
t = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
122-
puts "#{__method__}#{Process.pid}: #{t-t0}sec for 1000 inserts\n"
127+
puts "#{__method__}#{Process.pid}: #{t-t0}sec for 10000 inserts\n"
123128
end
124129
rescue Interrupt
125130
exit
@@ -149,7 +154,7 @@ def insert(queue)
149154
"params":{}}
150155
begin
151156
n = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
152-
queue.submit("import.1/main.action_#{n}_#{rd.hex(20)}", 'user02', h, now: t)
157+
queue.submit("import.1/main.action_#{rd.hex(20)}", 'user02', h, now: t)
153158
rescue
154159
p $!
155160
sleep 1
@@ -164,9 +169,10 @@ def worker(queue)
164169
i = 0
165170
t0 = t = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
166171
begin
167-
ary = queue.poll_multi(max_acquire: 11, now: t)
168-
sleep 1
172+
ary = queue.poll_multi(max_acquire: queue.config[:task_prefetch].to_i+1, now: t)
169173
ary.each do |x|
174+
x.heartbeat!
175+
sleep 2+rand
170176
x.finish!({})
171177
end if ary
172178
t = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
@@ -180,17 +186,29 @@ rescue Interrupt
180186
exit
181187
end
182188

183-
pids = []
184-
queue = PerfectQueue.open(queue_config)
185-
#queue.client.init_database(:force => true)
186-
queue.config[:insert_processes].times do
187-
pids << fork { insert1000(queue) }
189+
def fork_processes(pids, queue_config)
190+
queue = PerfectQueue.open(queue_config)
191+
#queue.client.init_database(:force => true)
192+
queue.config[:insert_processes].times do
193+
pids << fork { insert10000(queue) }
194+
end
195+
queue.config[:worker_processes].times do
196+
#queue.client.backend.instance_variable_set(:@cleanup_interval_count, rand(200))
197+
pids << fork { worker(queue) }
198+
end
199+
queue.close
188200
end
189-
queue.config[:worker_processes].times do
190-
#queue.client.backend.instance_variable_set(:@cleanup_interval_count, rand(200))
191-
pids << fork { worker(queue) }
201+
202+
pids = []
203+
if multiple_queues
204+
multiple_queues.times do |i|
205+
config = queue_config.dup
206+
config[:table] += i.to_s
207+
fork_processes(pids, config)
208+
end
209+
else
210+
fork_processes(pids, queue_config)
192211
end
193-
queue.close
194212

195213
trap (:INT) do
196214
pids.each do |pid|

0 commit comments

Comments
 (0)