Skip to content

Commit 9090940

Browse files
authored
fix: delay transformation of reply data to prevent errors while converting (#190)
1 parent d3b21c8 commit 9090940

File tree

2 files changed

+36
-25
lines changed

2 files changed

+36
-25
lines changed

lib/redis_client/cluster/router.rb

+25-24
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class Cluster
1414
class Router
1515
ZERO_CURSOR_FOR_SCAN = '0'
1616
METHODS_FOR_BLOCKING_CMD = %i[blocking_call_v blocking_call].freeze
17+
TSF = ->(b, s) { b.nil? ? s : b.call(s) }.curry
1718

1819
attr_reader :node
1920

@@ -30,25 +31,25 @@ def initialize(config, pool: nil, **kwargs)
3031
def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
3132
cmd = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
3233
case cmd
33-
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
34-
@node.call_all(method, command, args, &block).first
35-
when 'flushall', 'flushdb'
36-
@node.call_primaries(method, command, args, &block).first
37-
when 'ping' then @node.send_ping(method, command, args, &block).first
34+
when 'ping' then @node.send_ping(method, command, args).first.then(&TSF.call(block))
3835
when 'wait' then send_wait_command(method, command, args, &block)
39-
when 'keys' then @node.call_replicas(method, command, args, &block).flatten.sort_by(&:to_s)
40-
when 'dbsize' then @node.call_replicas(method, command, args, &block).select { |e| e.is_a?(Integer) }.sum
36+
when 'keys' then @node.call_replicas(method, command, args).flatten.sort_by(&:to_s).then(&TSF.call(block))
37+
when 'dbsize' then @node.call_replicas(method, command, args).select { |e| e.is_a?(Integer) }.sum.then(&TSF.call(block))
4138
when 'scan' then scan(command, seed: 1)
42-
when 'lastsave' then @node.call_all(method, command, args, &block).sort_by(&:to_i)
39+
when 'lastsave' then @node.call_all(method, command, args).sort_by(&:to_i).then(&TSF.call(block))
4340
when 'role' then @node.call_all(method, command, args, &block)
4441
when 'config' then send_config_command(method, command, args, &block)
4542
when 'client' then send_client_command(method, command, args, &block)
4643
when 'cluster' then send_cluster_command(method, command, args, &block)
47-
when 'readonly', 'readwrite', 'shutdown'
48-
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, cmd
4944
when 'memory' then send_memory_command(method, command, args, &block)
5045
when 'script' then send_script_command(method, command, args, &block)
5146
when 'pubsub' then send_pubsub_command(method, command, args, &block)
47+
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
48+
@node.call_all(method, command, args).first.then(&TSF.call(block))
49+
when 'flushall', 'flushdb'
50+
@node.call_primaries(method, command, args).first.then(&TSF.call(block))
51+
when 'readonly', 'readwrite', 'shutdown'
52+
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, cmd
5253
when 'discard', 'exec', 'multi', 'unwatch'
5354
raise ::RedisClient::Cluster::AmbiguousNodeError, cmd
5455
else
@@ -208,7 +209,7 @@ def assign_asking_node(err_msg)
208209
private
209210

210211
def send_wait_command(method, command, args, retry_count: 3, &block) # rubocop:disable Metrics/AbcSize
211-
@node.call_primaries(method, command, args, &block).select { |r| r.is_a?(Integer) }.sum
212+
@node.call_primaries(method, command, args).select { |r| r.is_a?(Integer) }.sum.then(&TSF.call(block))
212213
rescue ::RedisClient::Cluster::ErrorCollection => e
213214
raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError)
214215
raise if retry_count <= 0
@@ -224,24 +225,24 @@ def send_wait_command(method, command, args, retry_count: 3, &block) # rubocop:d
224225
def send_config_command(method, command, args, &block)
225226
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
226227
when 'resetstat', 'rewrite', 'set'
227-
@node.call_all(method, command, args, &block).first
228+
@node.call_all(method, command, args).first.then(&TSF.call(block))
228229
else assign_node(command).public_send(method, *args, command, &block)
229230
end
230231
end
231232

232233
def send_memory_command(method, command, args, &block)
233234
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
234235
when 'stats' then @node.call_all(method, command, args, &block)
235-
when 'purge' then @node.call_all(method, command, args, &block).first
236+
when 'purge' then @node.call_all(method, command, args).first.then(&TSF.call(block))
236237
else assign_node(command).public_send(method, *args, command, &block)
237238
end
238239
end
239240

240241
def send_client_command(method, command, args, &block)
241242
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
242-
when 'list' then @node.call_all(method, command, args, &block).flatten
243+
when 'list' then @node.call_all(method, command, args).flatten.then(&TSF.call(block))
243244
when 'pause', 'reply', 'setname'
244-
@node.call_all(method, command, args, &block).first
245+
@node.call_all(method, command, args).first.then(&TSF.call(block))
245246
else assign_node(command).public_send(method, *args, command, &block)
246247
end
247248
end
@@ -251,7 +252,7 @@ def send_cluster_command(method, command, args, &block)
251252
when 'addslots', 'delslots', 'failover', 'forget', 'meet', 'replicate',
252253
'reset', 'set-config-epoch', 'setslot'
253254
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, ['cluster', subcommand]
254-
when 'saveconfig' then @node.call_all(method, command, args, &block).first
255+
when 'saveconfig' then @node.call_all(method, command, args).first.then(&TSF.call(block))
255256
when 'getkeysinslot'
256257
raise ArgumentError, command.join(' ') if command.size != 4
257258

@@ -260,25 +261,25 @@ def send_cluster_command(method, command, args, &block)
260261
end
261262
end
262263

263-
def send_script_command(method, command, args, &block)
264+
def send_script_command(method, command, args, &block) # rubocop:disable Metrics/AbcSize
264265
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
265266
when 'debug', 'kill'
266-
@node.call_all(method, command, args, &block).first
267+
@node.call_all(method, command, args).first.then(&TSF.call(block))
267268
when 'flush', 'load'
268-
@node.call_primaries(method, command, args, &block).first
269+
@node.call_primaries(method, command, args).first.then(&TSF.call(block))
269270
when 'exists'
270-
@node.call_all(method, command, args, &block).transpose.map { |arr| arr.any?(&:zero?) ? 0 : 1 }
271+
@node.call_all(method, command, args).transpose.map { |arr| arr.any?(&:zero?) ? 0 : 1 }.then(&TSF.call(block))
271272
else assign_node(command).public_send(method, *args, command, &block)
272273
end
273274
end
274275

275276
def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
276277
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
277-
when 'channels' then @node.call_all(method, command, args, &block).flatten.uniq.sort_by(&:to_s)
278+
when 'channels' then @node.call_all(method, command, args).flatten.uniq.sort_by(&:to_s).then(&TSF.call(block))
278279
when 'numsub'
279-
@node.call_all(method, command, args, &block).reject(&:empty?).map { |e| Hash[*e] }
280-
.reduce({}) { |a, e| a.merge(e) { |_, v1, v2| v1 + v2 } }
281-
when 'numpat' then @node.call_all(method, command, args, &block).select { |e| e.is_a?(Integer) }.sum
280+
@node.call_all(method, command, args).reject(&:empty?).map { |e| Hash[*e] }
281+
.reduce({}) { |a, e| a.merge(e) { |_, v1, v2| v1 + v2 } }.then(&TSF.call(block))
282+
when 'numpat' then @node.call_all(method, command, args).select { |e| e.is_a?(Integer) }.sum.then(&TSF.call(block))
282283
else assign_node(command).public_send(method, *args, command, &block)
283284
end
284285
end

test/redis_client/test_cluster.rb

+11-1
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,16 @@ def test_dedicated_commands # rubocop:disable Metrics/CyclomaticComplexity, Metr
254254
{ command: %w[SCRIPT FLUSH], want: 'OK' },
255255
{ command: %w[SCRIPT EXISTS b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c], want: [0] },
256256
{ command: %w[SCRIPT EXISTS 5b9fb3410653a731f8ddfeff39a0c061 31b6de18e43fe980ed07d8b0f5a8cabe], want: [0, 0] },
257+
{
258+
command: %w[SCRIPT EXISTS b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c],
259+
blk: ->(reply) { reply.map { |r| !r.zero? } },
260+
want: [false]
261+
},
262+
{
263+
command: %w[SCRIPT EXISTS 5b9fb3410653a731f8ddfeff39a0c061 31b6de18e43fe980ed07d8b0f5a8cabe],
264+
blk: ->(reply) { reply.map { |r| !r.zero? } },
265+
want: [false, false]
266+
},
257267
{ command: %w[PUBSUB CHANNELS test-channel*], want: [] },
258268
{ command: %w[PUBSUB NUMSUB test-channel], want: { 'test-channel' => 0 } },
259269
{ command: %w[PUBSUB NUMPAT], want: 0 },
@@ -264,7 +274,7 @@ def test_dedicated_commands # rubocop:disable Metrics/CyclomaticComplexity, Metr
264274
next if c.key?(:supported_redis_version) && TEST_REDIS_MAJOR_VERSION < c[:supported_redis_version]
265275

266276
msg = "Case: #{c[:command].join(' ')}"
267-
got = -> { @client.call(*c[:command]) }
277+
got = -> { @client.call_v(c[:command], &c[:blk]) }
268278
if c.key?(:error)
269279
assert_raises(c[:error], msg, &got)
270280
elsif c.key?(:is_a)

0 commit comments

Comments
 (0)