Skip to content

Commit 5bdb9e8

Browse files
author
KJ Tsanaktsidis
committed
Implement #watch and #multi specially for cluster-client
This PR makes watch & multi work more or less the same way for clustering as they do for normal redis. Since it's supposed to be valid to perform your multi call on the original redis object, like this: ``` redis.watch('key') do redis.multi do |tx| # tx is performed on the same connection as the watch end end ``` we need to keeps some state in an ivar @active_watcher so we know to call MULTI on the same actual connection as WATCH (and appropriately fail if the keys got redirected or the node went down). This is technically threadsafe, because the watch/multi implementation is wrapped in the `synchronize` monitor; however, for good performance in multithreaded environments, you will most likely want to use a connection pool of Redis::Cluster instances.
1 parent 7cc45e5 commit 5bdb9e8

File tree

4 files changed

+131
-4
lines changed

4 files changed

+131
-4
lines changed

cluster/lib/redis/cluster.rb

+56
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,62 @@ def cluster(subcommand, *args)
9696
send_command([:cluster, subcommand] + args, &block)
9797
end
9898

99+
# Transactions need different implementations in cluster mode, using purpose-built
100+
# primitives available in redis-cluster-client. These methods (watch and multii
101+
# implement the same interface as the methods in ::Redis::Commands::Transactions.
102+
103+
def watch(*keys)
104+
synchronize do |client|
105+
# client is a ::Redis::Cluster::Client instance, which is a subclass of
106+
# ::RedisClient::Cluster
107+
108+
if @active_watcher
109+
# We're already within a #watch block, just add keys to the existing watch
110+
@active_watcher.watch(keys)
111+
else
112+
unless block_given?
113+
raise ArgumentError, "#{self.class.name} requires that the initial #watch call of a transaction " \
114+
"passes a block"
115+
end
116+
117+
client.watch(keys) do |watcher|
118+
@active_watcher = watcher
119+
yield self
120+
ensure
121+
@active_watcher = nil
122+
end
123+
124+
end
125+
end
126+
end
127+
128+
def multi
129+
synchronize do |client|
130+
if @active_watcher
131+
# If we're inside a #watch block, use that to execute the transaction
132+
@active_watcher.multi do |tx|
133+
yield MultiConnection.new(tx)
134+
end
135+
else
136+
# Make a new transaction from whole cloth.
137+
client.multi do |tx|
138+
yield MultiConnection.new(tx)
139+
end
140+
end
141+
end
142+
end
143+
144+
def unwatch
145+
synchronize do
146+
if @active_watcher
147+
@active_watcher.unwatch
148+
else
149+
# This will raise an AmbiguiousNodeError
150+
super
151+
end
152+
end
153+
end
154+
99155
private
100156

101157
def initialize_client(options)

cluster/lib/redis/cluster/client.rb

+4
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ def multi(watch: nil, &block)
9898
handle_errors { super(watch: watch, &block) }
9999
end
100100

101+
def watch(keys, &block)
102+
handle_errors { super(keys, &block) }
103+
end
104+
101105
private
102106

103107
def handle_errors

cluster/test/client_transactions_test.rb

+69
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,73 @@ def test_cluster_client_does_not_support_transaction_by_multiple_keys
4848
assert_nil(redis.get("key#{i}"))
4949
end
5050
end
51+
52+
def test_cluster_client_does_support_transaction_with_optimistic_locking
53+
redis.mset('{key}1', '1', '{key}2', '2')
54+
55+
another = Fiber.new do
56+
cli = build_another_client
57+
cli.mset('{key}1', '3', '{key}2', '4')
58+
cli.close
59+
end
60+
61+
redis.watch('{key}1', '{key}2') do
62+
another.resume
63+
v1 = redis.get('{key}1')
64+
v2 = redis.get('{key}2')
65+
redis.multi do |tx|
66+
tx.set('{key}1', v2)
67+
tx.set('{key}2', v1)
68+
end
69+
end
70+
71+
assert_equal %w[3 4], redis.mget('{key}1', '{key}2')
72+
end
73+
74+
def test_cluster_client_can_unwatch_transaction
75+
redis.set('key1', 'initial_value')
76+
77+
another = Fiber.new do
78+
cli = build_another_client
79+
cli.set('key1', 'another_value')
80+
end
81+
82+
redis.watch('key1') do
83+
another.resume
84+
redis.unwatch
85+
end
86+
# After calling unwatch, the same connection can be used to open a transaction which
87+
# isn't conditional and so will commit
88+
got = redis.multi do |tx|
89+
tx.set('key1', 'final_value')
90+
end
91+
92+
assert_equal ['OK'], got
93+
assert_equal 'final_value', redis.get('key1')
94+
end
95+
96+
def test_cluster_client_unwatches_on_exception
97+
redis.set('key1', 'initial_value')
98+
99+
another = Fiber.new do
100+
cli = build_another_client
101+
cli.set('key1', 'another_value')
102+
end
103+
104+
assert_raises(RuntimeError) do
105+
redis.watch('key1') do
106+
another.resume
107+
raise 'bang'
108+
end
109+
end
110+
# After catching the exception, the same connection can be used to open a transaction which
111+
# isn't conditional and so will commit
112+
# n.b. the actual behaviour which ensures this is actually in redis-cluster-client
113+
got = redis.multi do |tx|
114+
tx.set('key1', 'final_value')
115+
end
116+
117+
assert_equal ['OK'], got
118+
assert_equal 'final_value', redis.get('key1')
119+
end
51120
end

cluster/test/commands_on_transactions_test.rb

+2-4
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,8 @@ def test_unwatch
3838
end
3939

4040
def test_watch
41-
assert_raises(Redis::CommandError, "CROSSSLOT Keys in request don't hash to the same slot") do
42-
redis.watch('key1', 'key2')
41+
assert_raises(Redis::Cluster::TransactionConsistencyError) do
42+
redis.watch('key1', 'key2') {}
4343
end
44-
45-
assert_equal 'OK', redis.watch('{key}1', '{key}2')
4644
end
4745
end

0 commit comments

Comments
 (0)