@@ -11,84 +11,91 @@ module Redis
11
11
class SentinelsClient < Client
12
12
def initialize ( master_name , sentinels , role = :master , protocol = Protocol ::RESP2 , **options )
13
13
@master_name = master_name
14
+
14
15
@sentinel_endpoints = sentinels . map do |sentinel |
15
16
::IO ::Endpoint . tcp ( sentinel [ :host ] , sentinel [ :port ] )
16
17
end
18
+
17
19
@role = role
18
-
19
20
@protocol = protocol
20
21
@pool = connect ( **options )
21
22
end
22
-
23
+
23
24
private
24
-
25
+
25
26
# Override the parent method. The only difference is that this one needs
26
27
# to resolve the master/slave address.
27
28
def connect ( **options )
28
29
Async ::Pool ::Controller . wrap ( **options ) do
29
30
endpoint = resolve_address
30
31
peer = endpoint . connect
31
32
stream = ::IO ::Stream ( peer )
32
-
33
+
33
34
@protocol . client ( stream )
34
35
end
35
36
end
36
-
37
+
37
38
def resolve_address
38
- address = case @role
39
- when :master then resolve_master
40
- when :slave then resolve_slave
41
- else raise ArgumentError , "Unknown instance role #{ @role } "
42
- end
43
-
39
+ case @role
40
+ when :master
41
+ resolve_master
42
+ when :slave
43
+ resolve_slave
44
+ else
45
+ raise ArgumentError , "Unknown instance role #{ @role } "
46
+ end => address
47
+
44
48
address or raise RuntimeError , "Unable to fetch #{ @role } via Sentinel."
45
49
end
46
-
50
+
47
51
def resolve_master
48
52
@sentinel_endpoints . each do |sentinel_endpoint |
49
53
client = Client . new ( sentinel_endpoint )
50
-
54
+
51
55
begin
52
56
address = client . call ( 'sentinel' , 'get-master-addr-by-name' , @master_name )
53
57
rescue Errno ::ECONNREFUSED
54
58
next
55
59
end
56
-
60
+
57
61
return ::IO ::Endpoint . tcp ( address [ 0 ] , address [ 1 ] ) if address
58
62
end
59
-
63
+
60
64
nil
61
65
end
62
-
66
+
63
67
def resolve_slave
64
68
@sentinel_endpoints . each do |sentinel_endpoint |
65
69
client = Client . new ( sentinel_endpoint )
66
-
70
+
67
71
begin
68
72
reply = client . call ( 'sentinel' , 'slaves' , @master_name )
69
73
rescue Errno ::ECONNREFUSED
70
74
next
71
75
end
72
-
76
+
73
77
slaves = available_slaves ( reply )
74
78
next if slaves . empty?
75
-
79
+
76
80
slave = select_slave ( slaves )
77
81
return ::IO ::Endpoint . tcp ( slave [ 'ip' ] , slave [ 'port' ] )
78
82
end
79
-
83
+
80
84
nil
81
85
end
82
-
83
- def available_slaves ( slaves_cmd_reply )
86
+
87
+ def available_slaves ( reply )
84
88
# The reply is an array with the format: [field1, value1, field2,
85
89
# value2, etc.].
86
90
# When a slave is marked as down by the sentinel, the "flags" field
87
91
# (comma-separated array) contains the "s_down" value.
88
- slaves_cmd_reply . map { |s | s . each_slice ( 2 ) . to_h }
89
- . reject { |s | s . fetch ( 'flags' ) . split ( ',' ) . include? ( 's_down' ) }
92
+ slaves = reply . map { |fields | fields . each_slice ( 2 ) . to_h }
93
+
94
+ slaves . reject do |slave |
95
+ slave [ 'flags' ] . split ( ',' ) . include? ( 's_down' )
96
+ end
90
97
end
91
-
98
+
92
99
def select_slave ( available_slaves )
93
100
available_slaves . sample
94
101
end
0 commit comments