Skip to content

Commit 9670713

Browse files
committedOct 24, 2016
Support multiple queues
1 parent 3bedea7 commit 9670713

File tree

1 file changed

+41
-22
lines changed

1 file changed

+41
-22
lines changed
 

‎bin/stress

+41-22
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ queue_config =
1313
:table => 'test_stress',
1414
:disable_resource_limit => true, # TODO backend-specific test cases
1515
:cleanup_interval => 200,
16+
alive_time: 300,
1617
insert_processes: 0,
1718
worker_processes: 0,
1819
}
20+
multiple_queues = nil
1921
opt = OptionParser.new
2022

2123
opt.on('--url URL', 'database url') {|v| queue_config[:url] = v }
@@ -24,30 +26,33 @@ opt.on('--disable_resource_limit=true', TrueClass, 'use resource limit or not')
2426
opt.on('--cleanup_interval SECOND', Integer, 'cleanup interval') {|v| queue_config[:cleanup_interval] = v }
2527
opt.on('--insert_processes NUM', Integer, 'inserts') {|v| queue_config[:insert_processes] = v }
2628
opt.on('--worker_processes NUM', Integer, 'workers') {|v| queue_config[:worker_processes] = v }
29+
opt.on('--alive-time SECOND', Integer, 'alive time') {|v| queue_config[:alive_time] = v }
2730
opt.on('--retention-time SECOND', Integer, 'retention time') {|v| queue_config[:retention_time] = v }
31+
opt.on('--task-prefetch COUNT', Integer, 'task prefetch') {|v| queue_config[:task_prefetch] = v }
32+
opt.on('--multi-queues COUNT', Integer, 'multiple queues; spwan NUM processes for each queue') {|v| multiple_queues = v }
2833
opt.parse!(ARGV)
2934

3035
module PerfectQueue
3136
class Queue
32-
def submit1000(data)
33-
@client.submit1000(data)
37+
def submit10000(data)
38+
@client.submit10000(data)
3439
end
3540
end
3641
class Client
37-
def submit1000(data)
38-
@backend.submit1000(data)
42+
def submit10000(data)
43+
@backend.submit10000(data)
3944
end
4045
end
4146
module Backend
4247
class RDBCompatBackend
43-
def submit1000(h)
48+
def submit10000(h)
4449
rd = Random.new
4550
i = 0
4651
connect {
4752
begin
4853
begin
4954
n = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
50-
submit0("import.1/main.action_#{n}_#{rd.hex(80)}", 'user02', h, now: n)
55+
submit0("import.1/main.action_#{rd.hex(20)}", 'user02', h, now: n)
5156
end while (i+=1) < 10000
5257
end
5358
}
@@ -91,8 +96,9 @@ module PerfectQueue
9196
end
9297

9398

94-
def insert1000(queue)
99+
def insert10000(queue)
95100
Process.setproctitle'bin/stress/insert'
101+
rd = Random.new
96102
while 1
97103
t0 = t = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
98104
h = {"path":"in/1/main.action.e9f070b5bfea96442af13ce6acc36699_0f7ad8aee859867aae303190e372ec1e.msgpack.gz",
@@ -113,13 +119,13 @@ def insert1000(queue)
113119
"params":{}},
114120
"params":{}}
115121
begin
116-
queue.submit1000(h)
122+
queue.submit10000(h)
117123
rescue Sequel::DatabaseError
118124
p $!
119125
sleep 5
120126
end
121127
t = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
122-
puts "#{__method__}#{Process.pid}: #{t-t0}sec for 1000 inserts\n"
128+
puts "#{__method__}#{Process.pid}: #{t-t0}sec for 10000 inserts\n"
123129
end
124130
rescue Interrupt
125131
exit
@@ -149,7 +155,7 @@ def insert(queue)
149155
"params":{}}
150156
begin
151157
n = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
152-
queue.submit("import.1/main.action_#{n}_#{rd.hex(20)}", 'user02', h, now: t)
158+
queue.submit("import.1/main.action_#{rd.hex(20)}", 'user02', h, now: t)
153159
rescue
154160
p $!
155161
sleep 1
@@ -164,14 +170,15 @@ def worker(queue)
164170
i = 0
165171
t0 = t = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
166172
begin
167-
ary = queue.poll_multi(max_acquire: 11, now: t)
168-
sleep 1
173+
ary = queue.poll_multi(max_acquire: queue.config[:task_prefetch].to_i+1)
169174
ary.each do |x|
175+
x.heartbeat!
176+
sleep 2+rand
170177
x.finish!({})
171178
end if ary
172179
t = Process.clock_gettime(Process::CLOCK_REALTIME, :second)
173180
rescue
174-
p $!.class
181+
p $!
175182
sleep 1
176183
end while (i+=1) < 100
177184
puts "#{__method__}#{Process.pid}: #{t-t0}sec for 100 acquires\n"
@@ -180,17 +187,29 @@ rescue Interrupt
180187
exit
181188
end
182189

183-
pids = []
184-
queue = PerfectQueue.open(queue_config)
185-
#queue.client.init_database(:force => true)
186-
queue.config[:insert_processes].times do
187-
pids << fork { insert1000(queue) }
190+
def fork_processes(pids, queue_config)
191+
queue = PerfectQueue.open(queue_config)
192+
#queue.client.init_database(:force => true)
193+
queue.config[:insert_processes].times do
194+
pids << fork { insert10000(queue) }
195+
end
196+
queue.config[:worker_processes].times do
197+
#queue.client.backend.instance_variable_set(:@cleanup_interval_count, rand(200))
198+
pids << fork { worker(queue) }
199+
end
200+
queue.close
188201
end
189-
queue.config[:worker_processes].times do
190-
#queue.client.backend.instance_variable_set(:@cleanup_interval_count, rand(200))
191-
pids << fork { worker(queue) }
202+
203+
pids = []
204+
if multiple_queues
205+
multiple_queues.times do |i|
206+
config = queue_config.dup
207+
config[:table] += i.to_s
208+
fork_processes(pids, config)
209+
end
210+
else
211+
fork_processes(pids, queue_config)
192212
end
193-
queue.close
194213

195214
trap (:INT) do
196215
pids.each do |pid|

0 commit comments

Comments
 (0)