Skip to content

Commit 27594b0

Browse files
LeonidVasTotktonada
authored andcommitted
Fix task releasing at a client disconnection
There are tarantool versions, where box.session.id() value may be clobbered after a yield in on_disconnect trigger, see [1]. This commit works the problem around. [1]: tarantool/tarantool#4627 Fixes #103 @Totktonada: style fixes, more comments.
1 parent e29dc72 commit 27594b0

File tree

2 files changed

+106
-4
lines changed

2 files changed

+106
-4
lines changed

queue/abstract.lua

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,18 +151,22 @@ function tube.ack(self, id)
151151
return result
152152
end
153153

154-
function tube.release(self, id, opts)
154+
local function tube_release_internal(self, id, opts, session_id)
155155
opts = opts or {}
156-
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
156+
local _taken = box.space._queue_taken:get{session_id, self.tube_id, id}
157157
if _taken == nil then
158158
error("Task was not taken in the session")
159159
end
160160

161-
box.space._queue_taken:delete{session.id(), self.tube_id, id}
161+
box.space._queue_taken:delete{session_id, self.tube_id, id}
162162
self:peek(id)
163163
return self.raw:normalize_task(self.raw:release(id, opts))
164164
end
165165

166+
function tube.release(self, id, opts)
167+
return tube_release_internal(self, id, opts, session.id())
168+
end
169+
166170
function tube.peek(self, id)
167171
local task = self.raw:peek(id)
168172
if task == nil then
@@ -371,7 +375,7 @@ function method._on_consumer_disconnect()
371375
log.warn("Consumer %s disconnected, release task %s(%s)",
372376
id, task[3], tube[1])
373377

374-
queue.tube[tube[1]]:release(task[3])
378+
tube_release_internal(queue.tube[tube[1]], task[3], nil, id)
375379
end
376380
end
377381
end

t/110-disconnect-trigger-check.t

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#!/usr/bin/env tarantool
2+
3+
local fiber = require('fiber')
4+
local netbox = require('net.box')
5+
local os = require('os')
6+
local queue = require('queue')
7+
local tap = require('tap')
8+
local tnt = require('t.tnt')
9+
10+
local tube
11+
local test = tap.test('lost a session id after yield')
12+
13+
-- The test cases are in check_result().
14+
test:plan(2)
15+
16+
-- Verify that _queue_taken space is empty.
17+
local function check_result()
18+
if tube == nil then
19+
os.exit(1)
20+
end
21+
22+
-- tube:drop() is most simple way to check that _queue_taken
23+
-- is empty. It give an error if it is not so.
24+
local ok, res = pcall(tube.drop, tube)
25+
test:is(ok, true, 'drop empty queue')
26+
test:is(res, true, 'tube:drop() result is true')
27+
28+
tnt.finish()
29+
os.exit(test:check() and 0 or 1)
30+
end
31+
32+
-- Yield in queue's on_disconnect trigger (which handles a client
33+
-- disconnection) may lead to a situation when _queue_taken
34+
-- temporary space is not cleaned and becomes inconsistent with
35+
-- 'status' field in <tube_name> space. This appears only on
36+
-- tarantool versions affected by gh-4627.
37+
--
38+
-- See https://github.com/tarantool/queue/issues/103
39+
-- See https://github.com/tarantool/tarantool/issues/4627
40+
local function test_lost_session_id_after_yield()
41+
-- We must check the results of a test after
42+
-- the queue._on_consumer_disconnect trigger
43+
-- has been done.
44+
--
45+
-- Triggers are run in LIFO order.
46+
box.session.on_disconnect(check_result)
47+
48+
local listen = 'localhost:1918'
49+
tnt.cfg{listen = listen}
50+
51+
local driver = 'fifottl'
52+
tube = queue.create_tube('test_tube', driver, {if_not_exists = true})
53+
54+
rawset(_G, 'queue', require('queue'))
55+
tube:grant('guest', {call = true})
56+
57+
-- We need at least two tasks to trigger box.session.id()
58+
-- call after a yield in the queue._on_consumer_disconnect
59+
-- trigger (in the version of queue before the fix). See
60+
-- more below.
61+
queue.tube.test_tube:put('1')
62+
queue.tube.test_tube:put('2')
63+
local connection = netbox.connect(listen)
64+
connection:call('queue.tube.test_tube:take')
65+
connection:call('queue.tube.test_tube:take')
66+
67+
-- After disconnection of a client the _on_consumer_disconnect
68+
-- trigger is run. It changes 'status' field for tuples in
69+
-- <tube_name> space in a loop and removes the corresponding
70+
-- tuples from _queue_taken space. The version before the fix
71+
-- operates in this way:
72+
--
73+
-- | <_on_consumer_disconnect>
74+
-- | for task in tasks of the client:
75+
-- | call <task:release>
76+
-- |
77+
-- | <task:release>
78+
-- | delete _queue_taken tuple using box.session.id()
79+
-- | update <tube_name> space using task_id -- !! yield
80+
--
81+
-- So the deletion from _queue_taken may be unable to delete
82+
-- right tuples for second and following tasks, because
83+
-- box.session.id() may give a garbage.
84+
connection:close()
85+
86+
-- Wait for check_result() trigger, which will ensure that
87+
-- _queue_taken space is cleaned and will exit successfully
88+
-- in the case (or exit abnormally otherwise).
89+
fiber.sleep(5)
90+
91+
-- Wrong session id may lead to 'Task was not taken in the
92+
-- session' error in the _on_consumer_disconnect and so the
93+
-- second on_disconnect trigger (check_result) will not be
94+
-- fired.
95+
os.exit(1)
96+
end
97+
98+
test_lost_session_id_after_yield()

0 commit comments

Comments
 (0)