Skip to content

Commit 16be0b4

Browse files
committed
TNTP-2109: Implement slow mode for single calls
1 parent 195515c commit 16be0b4

File tree

3 files changed

+63
-15
lines changed

3 files changed

+63
-15
lines changed

crud/common/call.lua

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,49 @@ local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME)
1818

1919
local call = {}
2020

21-
local function call_on_storage(run_as_user, func_name, ...)
22-
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
21+
local bucket_ref_many
22+
local bucket_unref_many
23+
24+
bucket_ref_many = function(bucket_ids, mode)
25+
local reffed = {}
26+
for _, bucket_id in pairs(bucket_ids) do
27+
local ok, err = vshard.storage.bucket_ref(bucket_id, mode)
28+
if not ok then
29+
bucket_unref_many(reffed, mode)
30+
return nil, err
31+
end
32+
table.insert(reffed, bucket_id)
33+
end
34+
return true, nil
35+
end
36+
37+
bucket_unref_many = function(bucket_ids, mode)
38+
local all_ok = true
39+
local last_err = nil
40+
for _, bucket_id in pairs(bucket_ids) do
41+
local ok, err = vshard.storage.bucket_unref(bucket_id, mode)
42+
if not ok then
43+
all_ok = nil
44+
last_err = err
45+
end
46+
end
47+
return all_ok, last_err
48+
end
49+
50+
local function call_on_storage(run_as_user, bucket_ids, mode, func_name, ...)
51+
local ok, ref_err = bucket_ref_many(bucket_ids, mode)
52+
if not ok then
53+
return nil, ref_err
54+
end
55+
56+
local res = {box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)}
57+
58+
ok, ref_err = bucket_unref_many(bucket_ids, mode)
59+
if not ok then
60+
return nil, ref_err
61+
end
62+
63+
return unpack(res, 1, table.maxn(res))
2364
end
2465

2566
call.storage_api = {[CALL_FUNC_NAME] = call_on_storage}
@@ -82,8 +123,8 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc
82123
))
83124
end
84125

85-
local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts)
86-
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)
126+
local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts, mode, bucket_ids)
127+
local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args)
87128

88129
-- In case cluster was just bootstrapped with auto master discovery,
89130
-- replicaset may miss master.
@@ -148,7 +189,7 @@ function call.map(vshard_router, func_name, func_args, opts)
148189
local args, replicaset, replicaset_id = iter:get()
149190

150191
local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
151-
func_name, args, call_opts)
192+
func_name, args, call_opts, opts.mode, {}) -- TODO: provide bucket_ids
152193

153194
if err ~= nil then
154195
local result_info = {
@@ -222,8 +263,8 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts)
222263
local request_timeout = opts.mode == 'read' and opts.request_timeout or nil
223264

224265
local res, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
225-
func_name, func_args, {timeout = timeout,
226-
request_timeout = request_timeout})
266+
func_name, func_args, {timeout = timeout, request_timeout = request_timeout},
267+
opts.mode, {bucket_id})
227268
if err ~= nil then
228269
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
229270
end
@@ -249,7 +290,8 @@ function call.any(vshard_router, func_name, func_args, opts)
249290
local replicaset_id, replicaset = next(replicasets)
250291

251292
local res, err = retry_call_with_master_discovery(replicaset, 'call',
252-
func_name, func_args, {timeout = timeout})
293+
func_name, func_args, {timeout = timeout},
294+
'read', {})
253295
if err ~= nil then
254296
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
255297
end

crud/select/merger.lua

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ local function fetch_chunk(context, state)
171171

172172
-- change context.func_args too, but it does not matter
173173
next_func_args[4].after_tuple = cursor.after_tuple
174-
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, next_func_args)
174+
local mode = "read"
175+
local bucket_ids = {}
176+
local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, next_func_args)
175177

176178
if context.readview then
177179
next_state = {future = context.future_replica.conn:call("_crud.call_on_storage",
@@ -203,7 +205,8 @@ local function new(vshard_router, replicasets, space, index_id, func_name, func_
203205
local buf = buffer.ibuf()
204206
local net_box_opts = {is_async = true, buffer = buf,
205207
skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil}
206-
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)
208+
local bucket_ids = {}
209+
local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args)
207210
local future = replicaset[vshard_call_name](replicaset, "_crud.call_on_storage",
208211
func_args_ext, net_box_opts)
209212

@@ -279,8 +282,11 @@ local function new_readview(vshard_router, replicasets, readview_info, space, in
279282
local net_box_opts = {is_async = true, buffer = buf,
280283
skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil}
281284
func_args[4].readview_id = replicaset_info.id
282-
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)
283-
local future = replica.conn:call("_crud.call_on_storage", func_args_ext, net_box_opts)
285+
local mode = "read"
286+
local bucket_ids = {}
287+
local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args)
288+
local future = replica.conn:call("_crud.call_on_storage",
289+
func_args_ext, net_box_opts)
284290

285291
-- Create a source.
286292
local context = {

test/unit/privileges_test.lua

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ g.before_all(function()
1717
end)
1818

1919
g.test_prepend_current_user_smoke = function()
20-
local res = call.storage_api.call_on_storage(box.session.effective_user(), "unittestfunc", {"too", "foo"})
20+
local res = call.storage_api.call_on_storage(box.session.effective_user(), {}, "read", "unittestfunc", {"too", "foo"})
2121
t.assert_equals(res, {"too", "foo"})
2222
end
2323

2424
g.test_non_existent_user = function()
2525
t.assert_error_msg_contains("User 'non_existent_user' is not found",
26-
call.storage_api.call_on_storage, "non_existent_user", "unittestfunc")
26+
call.storage_api.call_on_storage, "non_existent_user", {}, "read", "unittestfunc")
2727
end
2828

2929
g.test_that_the_session_switches_back = function()
@@ -34,7 +34,7 @@ g.test_that_the_session_switches_back = function()
3434
local reference_user = box.session.effective_user()
3535
t.assert_not_equals(reference_user, "unittestuser")
3636

37-
local res = call.storage_api.call_on_storage("unittestuser", "unittestfunc2")
37+
local res = call.storage_api.call_on_storage("unittestuser", {}, "read", "unittestfunc2")
3838
t.assert_equals(res, "unittestuser")
3939
t.assert_equals(box.session.effective_user(), reference_user)
4040
end

0 commit comments

Comments
 (0)