1- local fiber = require (' fiber' )
21local vshard = require (' vshard' )
32local errors = require (' errors' )
43
54local dev_checks = require (' crud.common.dev_checks' )
65local utils = require (' crud.common.utils' )
6+ local fiber_clock = require (' fiber' ).clock
77
88local CallError = errors .new_class (' Call' )
99local NotInitializedError = errors .new_class (' NotInitialized' )
@@ -12,26 +12,6 @@ local call = {}
1212
1313local DEFAULT_VSHARD_CALL_TIMEOUT = 2
1414
15- local function call_on_replicaset (replicaset , channel , vshard_call , func_name , func_args , opts )
16- -- replicaset:<vshard_call>(func_name,...)
17- local func_ret , err = replicaset [vshard_call ](replicaset , func_name , func_args , opts )
18- if type (err ) == ' table' and err .type == ' ClientError' and type (err .message ) == ' string' then
19- if err .message == string.format (" Procedure '%s' is not defined" , func_name ) then
20- if func_name :startswith (' _crud.' ) then
21- err = NotInitializedError :new (" crud isn't initialized on replicaset" )
22- else
23- err = NotInitializedError :new (" Function %s is not registered" , func_name )
24- end
25- end
26- end
27-
28- channel :put ({
29- replicaset_uuid = replicaset .uuid ,
30- func_ret = func_ret ,
31- err = err ,
32- })
33- end
34-
3515local function call_impl (vshard_call , func_name , func_args , opts )
3616 dev_checks (' string' , ' string' , ' ?table' , {
3717 timeout = ' ?number' ,
@@ -52,39 +32,42 @@ local function call_impl(vshard_call, func_name, func_args, opts)
5232 end
5333 end
5434
55- local nodes_count = utils .table_count (replicasets )
56- local channel = fiber .channel (nodes_count )
57-
35+ local futures_by_replicasets = {}
36+ local call_opts = {is_async = true }
5837 for _ , replicaset in pairs (replicasets ) do
59- fiber .create (
60- call_on_replicaset , replicaset , channel , vshard_call , func_name , func_args , {
61- timeout = timeout ,
62- }
63- )
38+ local future = replicaset [vshard_call ](replicaset , func_name , func_args , call_opts )
39+ futures_by_replicasets [replicaset .uuid ] = future
6440 end
6541
6642 local results = {}
67-
68- for _ = 1 , channel :size () do
69- local res = channel :get ()
70-
71- if res == nil then
72- if channel :is_closed () then
73- return nil , CallError :new (" Channel is closed" )
74- end
75-
76- return nil , CallError :new (" Timeout was reached" )
43+ local deadline = fiber_clock () + timeout
44+ for replicaset_uuid , future in pairs (futures_by_replicasets ) do
45+ local wait_timeout = deadline - fiber_clock ()
46+ if wait_timeout < 0 then
47+ wait_timeout = 0
7748 end
7849
79- if res .err ~= nil then
80- res .err = errors .wrap (res .err )
50+ local result , err = future :wait_result (wait_timeout )
51+ if err == nil and result [1 ] == nil then
52+ err = result [2 ]
53+ end
8154
55+ if err ~= nil then
56+ if err .type == ' ClientError' and type (err .message ) == ' string' then
57+ if err .message == string.format (" Procedure '%s' is not defined" , func_name ) then
58+ if func_name :startswith (' _crud.' ) then
59+ err = NotInitializedError :new (" crud isn't initialized on replicaset: %q" , replicaset_uuid )
60+ else
61+ err = NotInitializedError :new (" Function %s is not registered" , func_name )
62+ end
63+ end
64+ end
65+ err = errors .wrap (err )
8266 return nil , CallError :new (utils .format_replicaset_error (
83- res . replicaset_uuid , " Function returned an error: %s" , res . err
67+ replicaset_uuid , " Function returned an error: %s" , err
8468 ))
8569 end
86-
87- results [res .replicaset_uuid ] = res .func_ret
70+ results [replicaset_uuid ] = result [1 ]
8871 end
8972
9073 return results
0 commit comments