Skip to content

Commit 6e7f456

Browse files
authored
Initial rework of sentinels client. (#51)
- Use docker for running integration tests.
1 parent c87f30b commit 6e7f456

11 files changed

+402
-164
lines changed

.github/workflows/test-coverage.yaml

+34-1
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,41 @@ jobs:
4949
name: coverage-${{matrix.os}}-${{matrix.ruby}}
5050
path: .covered.db
5151

52+
test-sentinel:
53+
name: ${{matrix.ruby}} on ${{matrix.os}} (Sentinel)
54+
runs-on: ${{matrix.os}}-latest
55+
56+
strategy:
57+
matrix:
58+
os:
59+
- ubuntu
60+
61+
ruby:
62+
- "3.3"
63+
64+
steps:
65+
- uses: actions/checkout@v4
66+
67+
- name: Install Docker Compose
68+
run: |
69+
sudo apt-get update
70+
sudo apt-get install -y docker-compose
71+
72+
- name: Run tests
73+
timeout-minutes: 10
74+
env:
75+
RUBY_VERSION: ${{matrix.ruby}}
76+
run: docker-compose -f sentinel/docker-compose.yaml up --exit-code-from tests
77+
78+
- uses: actions/upload-artifact@v3
79+
with:
80+
name: coverage-${{matrix.os}}-${{matrix.ruby}}
81+
path: .covered.db
82+
5283
validate:
53-
needs: test
84+
needs:
85+
- test
86+
- test-sentinel
5487
runs-on: ubuntu-latest
5588

5689
steps:

.github/workflows/test-sentinel.yaml

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
name: Test Sentinel
2+
3+
on: [push, pull_request]
4+
5+
permissions:
6+
contents: read
7+
8+
env:
9+
CONSOLE_OUTPUT: XTerm
10+
11+
jobs:
12+
test:
13+
name: ${{matrix.ruby}} on ${{matrix.os}}
14+
runs-on: ${{matrix.os}}-latest
15+
continue-on-error: ${{matrix.experimental}}
16+
17+
strategy:
18+
matrix:
19+
os:
20+
- ubuntu
21+
22+
ruby:
23+
- "3.1"
24+
- "3.2"
25+
- "3.3"
26+
27+
experimental: [false]
28+
29+
steps:
30+
- uses: actions/checkout@v4
31+
32+
- name: Install Docker Compose
33+
run: |
34+
sudo apt-get update
35+
sudo apt-get install -y docker-compose
36+
37+
- name: Run tests
38+
timeout-minutes: 10
39+
env:
40+
RUBY_VERSION: ${{matrix.ruby}}
41+
run: docker-compose -f sentinel/docker-compose.yaml up --exit-code-from tests

lib/async/redis.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66

77
require_relative 'redis/version'
88
require_relative 'redis/client'
9-
require_relative 'redis/sentinels'
9+
require_relative 'redis/sentinel_client'

lib/async/redis/client.rb

+61-57
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,65 @@ module Redis
2626
class Client
2727
include ::Protocol::Redis::Methods
2828

29+
module Methods
30+
def subscribe(*channels)
31+
context = Context::Subscribe.new(@pool, channels)
32+
33+
return context unless block_given?
34+
35+
begin
36+
yield context
37+
ensure
38+
context.close
39+
end
40+
end
41+
42+
def transaction(&block)
43+
context = Context::Transaction.new(@pool)
44+
45+
return context unless block_given?
46+
47+
begin
48+
yield context
49+
ensure
50+
context.close
51+
end
52+
end
53+
54+
alias multi transaction
55+
56+
def pipeline(&block)
57+
context = Context::Pipeline.new(@pool)
58+
59+
return context unless block_given?
60+
61+
begin
62+
yield context
63+
ensure
64+
context.close
65+
end
66+
end
67+
68+
# Deprecated.
69+
alias nested pipeline
70+
71+
def call(*arguments)
72+
@pool.acquire do |connection|
73+
connection.write_request(arguments)
74+
75+
connection.flush
76+
77+
return connection.read_response
78+
end
79+
end
80+
81+
def close
82+
@pool.close
83+
end
84+
end
85+
86+
include Methods
87+
2988
def initialize(endpoint = Endpoint.local, protocol: endpoint.protocol, **options)
3089
@endpoint = endpoint
3190
@protocol = protocol
@@ -38,8 +97,8 @@ def initialize(endpoint = Endpoint.local, protocol: endpoint.protocol, **options
3897

3998
# @return [client] if no block provided.
4099
# @yield [client, task] yield the client in an async task.
41-
def self.open(*arguments, &block)
42-
client = self.new(*arguments)
100+
def self.open(*arguments, **options, &block)
101+
client = self.new(*arguments, **options)
43102

44103
return client unless block_given?
45104

@@ -52,61 +111,6 @@ def self.open(*arguments, &block)
52111
end.wait
53112
end
54113

55-
def close
56-
@pool.close
57-
end
58-
59-
def subscribe(*channels)
60-
context = Context::Subscribe.new(@pool, channels)
61-
62-
return context unless block_given?
63-
64-
begin
65-
yield context
66-
ensure
67-
context.close
68-
end
69-
end
70-
71-
def transaction(&block)
72-
context = Context::Transaction.new(@pool)
73-
74-
return context unless block_given?
75-
76-
begin
77-
yield context
78-
ensure
79-
context.close
80-
end
81-
end
82-
83-
alias multi transaction
84-
85-
def pipeline(&block)
86-
context = Context::Pipeline.new(@pool)
87-
88-
return context unless block_given?
89-
90-
begin
91-
yield context
92-
ensure
93-
context.close
94-
end
95-
end
96-
97-
# Deprecated.
98-
alias nested pipeline
99-
100-
def call(*arguments)
101-
@pool.acquire do |connection|
102-
connection.write_request(arguments)
103-
104-
connection.flush
105-
106-
return connection.read_response
107-
end
108-
end
109-
110114
protected
111115

112116
def connect(**options)

lib/async/redis/endpoint.rb

+5-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ class Endpoint < ::IO::Endpoint::Generic
2323

2424
def self.local(**options)
2525
self.new(LOCALHOST, **options)
26-
end
26+
end
27+
28+
def self.remote(host, port = 6379, **options)
29+
self.new(URI.parse("redis://#{host}:#{port}"), **options)
30+
end
2731

2832
SCHEMES = {
2933
'redis' => URI::Generic,

lib/async/redis/sentinel_client.rb

+153
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2020, by David Ortiz.
5+
# Copyright, 2023-2024, by Samuel Williams.
6+
7+
require_relative 'client'
8+
require 'io/stream'
9+
10+
module Async
11+
module Redis
12+
class SentinelClient
13+
DEFAULT_MASTER_NAME = 'mymaster'
14+
15+
include ::Protocol::Redis::Methods
16+
include Client::Methods
17+
18+
# Create a new instance of the SentinelClient.
19+
#
20+
# @property endpoints [Array(Endpoint)] The list of sentinel endpoints.
21+
# @property master_name [String] The name of the master instance, defaults to 'mymaster'.
22+
# @property role [Symbol] The role of the instance that you want to connect to, either `:master` or `:slave`.
23+
# @property protocol [Protocol] The protocol to use when connecting to the actual Redis server, defaults to {Protocol::RESP2}.
24+
def initialize(endpoints, master_name: DEFAULT_MASTER_NAME, role: :master, protocol: Protocol::RESP2, **options)
25+
@endpoints = endpoints
26+
@master_name = master_name
27+
@role = role
28+
@protocol = protocol
29+
30+
# A cache of sentinel connections.
31+
@sentinels = {}
32+
33+
@pool = connect(**options)
34+
end
35+
36+
# @attribute [String] The name of the master instance.
37+
attr :master_name
38+
39+
# @attribute [Symbol] The role of the instance that you want to connect to.
40+
attr :role
41+
42+
def resolve_address(role = @role)
43+
case role
44+
when :master
45+
resolve_master
46+
when :slave
47+
resolve_slave
48+
else
49+
raise ArgumentError, "Unknown instance role #{role}"
50+
end => address
51+
52+
Console.debug(self, "Resolved #{@role} address: #{address}")
53+
54+
address or raise RuntimeError, "Unable to fetch #{role} via Sentinel."
55+
end
56+
57+
def close
58+
super
59+
60+
@sentinels.each do |_, client|
61+
client.close
62+
end
63+
end
64+
65+
def failover(name = @master_name)
66+
sentinels do |client|
67+
return client.call('SENTINEL', 'FAILOVER', name)
68+
end
69+
end
70+
71+
def masters
72+
sentinels do |client|
73+
return client.call('SENTINEL', 'MASTERS').map{|fields| fields.each_slice(2).to_h}
74+
end
75+
end
76+
77+
def master(name = @master_name)
78+
sentinels do |client|
79+
return client.call('SENTINEL', 'MASTER', name).each_slice(2).to_h
80+
end
81+
end
82+
83+
def resolve_master
84+
sentinels do |client|
85+
begin
86+
address = client.call('SENTINEL', 'GET-MASTER-ADDR-BY-NAME', @master_name)
87+
rescue Errno::ECONNREFUSED
88+
next
89+
end
90+
91+
return Endpoint.remote(address[0], address[1]) if address
92+
end
93+
94+
return nil
95+
end
96+
97+
def resolve_slave
98+
sentinels do |client|
99+
begin
100+
reply = client.call('SENTINEL', 'SLAVES', @master_name)
101+
rescue Errno::ECONNREFUSED
102+
next
103+
end
104+
105+
slaves = available_slaves(reply)
106+
next if slaves.empty?
107+
108+
slave = select_slave(slaves)
109+
return Endpoint.remote(slave['ip'], slave['port'])
110+
end
111+
112+
return nil
113+
end
114+
115+
protected
116+
117+
# Override the parent method. The only difference is that this one needs to resolve the master/slave address.
118+
def connect(**options)
119+
Async::Pool::Controller.wrap(**options) do
120+
endpoint = resolve_address
121+
peer = endpoint.connect
122+
stream = ::IO::Stream(peer)
123+
124+
@protocol.client(stream)
125+
end
126+
end
127+
128+
def sentinels
129+
@endpoints.map do |endpoint|
130+
@sentinels[endpoint] ||= Client.new(endpoint)
131+
132+
yield @sentinels[endpoint]
133+
end
134+
end
135+
136+
def available_slaves(reply)
137+
# The reply is an array with the format: [field1, value1, field2,
138+
# value2, etc.].
139+
# When a slave is marked as down by the sentinel, the "flags" field
140+
# (comma-separated array) contains the "s_down" value.
141+
slaves = reply.map{|fields| fields.each_slice(2).to_h}
142+
143+
slaves.reject do |slave|
144+
slave['flags'].split(',').include?('s_down')
145+
end
146+
end
147+
148+
def select_slave(available_slaves)
149+
available_slaves.sample
150+
end
151+
end
152+
end
153+
end

0 commit comments

Comments
 (0)