Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to identify read-replica nodes during connection creation #6

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ This is similar to 'Cluster Awareness' but uses those servers which are part of

### Connection Properties added for load balancing

- _load_balance_ - It expects **true/false** as its possible values. The 'load_balance' property needs to be set to 'true' to enable cluster-awareness.
- _load_balance_ - Starting with version 0.6, it expects one of **false, any (same as true), only-primary, only-rr, prefer-primary and prefer-rr** as its possible values. The default value for _load_balance_ property is `false`.
- _false_ - No connection load balancing. Behaviour is similar to vanilla ruby-pg driver
- _any_ - Same as value _true_. Distribute connections equally across all nodes in the cluster, irrespective of its type (`primary` or `read-replica`)
- _only-primary_ - Create connections equally across only the primary nodes of the cluster
- _only-rr_ - Create connections equally across only the read-replica nodes of the cluster
- _prefer-primary_ - Create connections equally across primary cluster nodes. If none available, on any available read replica node in the cluster
- _prefer-rr_ - Create connections equally across read replica nodes of the cluster. If none available, on any available primary cluster node
- _topology_keys_ - It takes a comma separated geo-location values. A single geo-location can be given as 'cloud.region.zone'. Multiple geo-locations too can be specified, separated by comma (`,`). Optionally, you can also register your preference for particular geo-locations by appending the preference value with prefix `:`. For example, `cloud.regionA.zoneA:1,cloud.regionA.zoneB:2`.
- _yb_servers_refresh_interval_ - Minimum time interval, in seconds, between two attempts to refresh the information about cluster nodes. This is checked only when a new connection is requested. Default is 300. Valid values are integers between 0 and 600. Value 0 means refresh for each connection request. Any value outside this range is ignored and the default is used.
- _fallback_to_topology_keys_only_ - When set to true, the driver does not attempt to connect to nodes outside of the geo-locations specified via _topology_keys_. Default value is false.
Expand Down
110 changes: 93 additions & 17 deletions lib/ysql/load_balance_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@

class YSQL::LoadBalanceService

LBProperties = Struct.new(:placements_info, :refresh_interval, :fallback_to_tk_only, :failed_host_reconnect_delay)
Node = Struct.new(:host, :port, :cloud, :region, :zone, :public_ip, :count, :is_down, :down_since)
class << self
attr_accessor :logger
end

# Set up a default logger
self.logger = Logger.new(STDOUT)
self.logger.level = Logger::WARN

LBProperties = Struct.new(:lb_value, :placements_info, :refresh_interval, :fallback_to_tk_only, :failed_host_reconnect_delay)
Node = Struct.new(:host, :port, :cloud, :region, :zone, :public_ip, :count, :is_down, :down_since, :node_type)
CloudPlacement = Struct.new(:cloud, :region, :zone)
@@mutex = Concurrent::ReentrantReadWriteLock.new
@@last_refresh_time = -1
Expand All @@ -27,6 +35,7 @@ def self.decrement_connection_count(host)
info = @@cluster_info[host]
unless info.nil?
info.count -= 1
logger.debug "decrement_connection_count(): count for #{host} updated to #{info.count}"
if info.count < 0
# Can go negative if we are here because of a connection that was created in a non-LB fashion
info.count = 0
Expand All @@ -40,6 +49,7 @@ def self.decrement_connection_count(host)
end

def self.connect_to_lb_hosts(lb_props, iopts)
logger.debug "connect_to_lb_hosts(): lb_props = #{lb_props}"
refresh_done = false
@@mutex.acquire_write_lock
begin
Expand All @@ -48,16 +58,21 @@ def self.connect_to_lb_hosts(lb_props, iopts)
if @@control_connection == nil
begin
@@control_connection = create_control_connection(iopts)
logger.debug "connect_to_lb_hosts(): created control connection to #{@@control_connection.host}"
rescue
return nil
end
end
begin
refresh_yb_servers(lb_props.failed_host_reconnect_delay, @@control_connection)
logger.debug "connect_to_lb_hosts(): refreshed yb_servers metadata"
refresh_done = true
rescue => err
if iopts[:host] == @@control_connection.host
if @@cluster_info[iopts[:host]]
if @@cluster_info[iopts[:host]].is_down
logger.debug "connect_to_lb_hosts(): Marking #{@@control_connection.host} as DOWN"
end
@@cluster_info[iopts[:host]].is_down = true
@@cluster_info[iopts[:host]].down_since = Time.now.to_i
end
Expand All @@ -73,6 +88,7 @@ def self.connect_to_lb_hosts(lb_props, iopts)
end
end
@@control_connection = create_control_connection(iopts)
logger.debug "connect_to_lb_hosts(): created control connection to #{@@control_connection.host} in rescue"
end
end
end
Expand All @@ -81,20 +97,43 @@ def self.connect_to_lb_hosts(lb_props, iopts)
end
success = false
new_request = true
strict_preference = true
placement_index = 1
until success
@@mutex.acquire_write_lock
begin
host_port = get_least_loaded_server(lb_props.placements_info, lb_props.fallback_to_tk_only, new_request, placement_index)
if strict_preference
host_port = get_least_loaded_server(lb_props.placements_info, lb_props.fallback_to_tk_only, new_request, placement_index, lb_props.lb_value, strict_preference)
else
host_port = get_least_loaded_server(nil, lb_props.fallback_to_tk_only, new_request, placement_index, lb_props.lb_value, strict_preference)
end
new_request = false
ensure
@@mutex.release_write_lock
end
unless host_port
break
if (lb_props.lb_value == "only-primary" || lb_props.lb_value == "only-rr" )
raise(YSQL::Error, "No node found for load_balance=#{lb_props.lb_value}")
elsif strict_preference && (lb_props.lb_value == "prefer-primary" || lb_props.lb_value == "prefer-rr")
@@mutex.acquire_write_lock
begin
host_port = get_least_loaded_server(nil, lb_props.fallback_to_tk_only, new_request, placement_index, lb_props.lb_value, strict_preference)
ensure
@@mutex.release_write_lock
end
unless host_port
strict_preference = false
placement_index = 1
next
end
else
logger.debug "connect_to_lb_hosts(): lb_host not found for load_balance=#{lb_props.lb_value}"
break
end
end
lb_host = host_port[0]
lb_port = host_port[1]
logger.debug "connect_to_lb_hosts(): lb_host #{lb_host}"
placement_index = host_port[2]
if lb_host.empty?
break
Expand All @@ -109,6 +148,9 @@ def self.connect_to_lb_hosts(lb_props, iopts)
rescue => e
@@mutex.acquire_write_lock
begin
if @@cluster_info[lb_host].is_down
logger.debug "connect_to_lb_hosts(): Marking #{lb_host} as DOWN"
end
@@cluster_info[lb_host].is_down = true
@@cluster_info[lb_host].down_since = Time.now.to_i
@@cluster_info[lb_host].count -= 1
Expand All @@ -133,6 +175,9 @@ def self.create_control_connection(iopts)
success = true
rescue => e
if @@cluster_info[iopts[:host]]
if @@cluster_info[iopts[:host]].is_down
logger.debug "create_control_connection(): Marking #{iopts[:host]} as DOWN"
end
@@cluster_info[iopts[:host]].is_down = true
@@cluster_info[iopts[:host]].down_since = Time.now.to_i
end
Expand Down Expand Up @@ -161,6 +206,7 @@ def self.refresh_yb_servers(failed_host_reconnect_delay_secs, conn)
region = row['region']
zone = row['zone']
public_ip = row['public_ip']
node_type = row['node_type']
public_ip = resolve_host(public_ip)[0][0] if public_ip
if not public_ip.nil? and not public_ip.empty?
found_public_ip = true
Expand All @@ -179,12 +225,15 @@ def self.refresh_yb_servers(failed_host_reconnect_delay_secs, conn)
if old
if old.is_down
if Time.now.to_i - old.down_since > failed_host_reconnect_delay_secs
unless old.is_down
logger.debug "refresh_yb_servers(): Marking #{host} as UP"
end
old.is_down = false
end
@@cluster_info[host] = old
end
else
node = Node.new(host, port, cloud, region, zone, public_ip, 0, false, 0)
node = Node.new(host, port, cloud, region, zone, public_ip, 0, false, 0, node_type)
@@cluster_info[host] = node
end
end
Expand All @@ -196,21 +245,37 @@ def self.refresh_yb_servers(failed_host_reconnect_delay_secs, conn)
@@last_refresh_time = Time.now.to_i
end

def self.get_least_loaded_server(allowed_placements, fallback_to_tk_only, new_request, placement_index)
def self.is_node_type_acceptable(node_type, lb_value, strict_preference)
case lb_value
when "true", "any"
true
when "only-primary"
node_type == "primary"
when "only-rr"
node_type == "read_replica"
when "prefer-primary"
node_type == "primary" || (!strict_preference && node_type == "read_replica")
when "prefer-rr"
node_type == "read_replica" || (!strict_preference && node_type == "primary")
else
false
end
end

def self.get_least_loaded_server(allowed_placements, fallback_to_tk_only, new_request, placement_index, lb_value, strict_preference)
current_index = 1
selected = Array.new
unless allowed_placements.nil? # topology-aware
eligible_hosts = Array.new
logger.debug "get_least_loaded_server(): topology_keys given = #{allowed_placements}"
(placement_index..10).each { |idx|
current_index = idx
selected.clear
min_connections = 1000000 # Using some really high value
@@cluster_info.each do |host, node_info|
unless node_info.is_down
unless allowed_placements[idx].nil?
if !node_info.is_down && !allowed_placements[idx].nil?
if is_node_type_acceptable(node_info.node_type, lb_value, strict_preference)
allowed_placements[idx].each do |cp|
if cp[0] == node_info.cloud && cp[1] == node_info.region && (cp[2] == node_info.zone || cp[2] == "*")
eligible_hosts << host
if node_info.count < min_connections
min_connections = node_info.count
selected.clear
Expand All @@ -231,12 +296,11 @@ def self.get_least_loaded_server(allowed_placements, fallback_to_tk_only, new_re
end

if allowed_placements.nil? || (selected.empty? && !fallback_to_tk_only) # cluster-aware || fallback_to_tk_only = false
unless allowed_placements.nil?
end
logger.debug "get_least_loaded_server(): topology_keys not given or no nodes found for given topology_keys"
min_connections = 1000000 # Using some really high value
selected = Array.new
@@cluster_info.each do |host, node_info|
unless node_info.is_down
if !node_info.is_down && is_node_type_acceptable(node_info.node_type, lb_value, strict_preference)
if node_info.count < min_connections
min_connections = node_info.count
selected.clear
Expand All @@ -254,14 +318,16 @@ def self.get_least_loaded_server(allowed_placements, fallback_to_tk_only, new_re
index = rand(selected.size)
selected_node = selected[index]
@@cluster_info[selected_node].count += 1
selected_port = @@cluster_info[selected_node].port
if !@@useHostColumn.nil? && !@@useHostColumn
selected_node = @@cluster_info[selected_node].public_ip
end
Array[selected_node, @@cluster_info[selected_node].port, current_index]
Array[selected_node, selected_port, current_index]
end
end

def self.parse_lb_args_from_url(conn_string)
logger.debug "parse_lb_args_from_url(): conn_string = #{conn_string}"
string_parts = conn_string.split('?', -1)
if string_parts.length != 2
return conn_string, nil
Expand Down Expand Up @@ -293,23 +359,33 @@ def self.parse_lb_args_from_url(conn_string)

base_string = base_string.chop if base_string[-1] == "&"
base_string = base_string.chop if base_string[-1] == "?"
if not lb_props.empty? and lb_props[:load_balance].to_s.downcase == "true"
if not lb_props.empty? and is_lb_enabled(lb_props[:load_balance].to_s.downcase)
return base_string, parse_connect_lb_args(lb_props)
else
return base_string, nil
end
end
end

def self.is_lb_enabled(lb)
case lb
when "true", "any", "only-primary", "prefer-primary", "only-rr", "prefer-rr"
true
else
false
end
end

def self.parse_connect_lb_args(hash_arg)
logger.debug "parse_connect_lb_args(): hash_arg = #{hash_arg}"
lb = hash_arg.delete(:load_balance)
tk = hash_arg.delete(:topology_keys)
ri = hash_arg.delete(:yb_servers_refresh_interval)
ttl = hash_arg.delete(:failed_host_reconnect_delay_secs)
fb = hash_arg.delete(:fallback_to_topology_keys_only)

if lb && lb.to_s.downcase == "true"
lb_properties = LBProperties.new(nil, 300, false, 5)
if is_lb_enabled(lb.to_s.downcase)
lb_properties = LBProperties.new(lb.to_s.downcase, nil, 300, false, 5)
if tk
lb_properties.placements_info = Hash.new
tk_parts = tk.split(',', -1)
Expand Down
2 changes: 1 addition & 1 deletion lib/ysql/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module YSQL
# Library version
PG_VERSION = '1.5.6'
VERSION = '0.5'
VERSION = '0.6'
end