Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions bin/haproxyctl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ end

display_usage! if argument =~ /help/ || ARGV.length < 1


process ||= argument.scan(/-p\s*([^\[\s\]]+)/).flatten

if !process.empty?
process = process[0].to_i
elsif process.empty?
if nbproc > 1
# Default to old behavior to use the first socket.
process = 1
else
# Default to unix socket not bound to a process id.
process = 0
end
end
# Strip of the -p <process> argument as argument is passed to unix socket if not defined below
argument = argument.gsub(/-p\s*([^\[\s\]]*)/, '')

begin
case argument
when 'start'
Expand Down Expand Up @@ -87,7 +104,7 @@ begin
# # removes the listener
# conn = conn - 1
# puts "metric connections int #{conn}"
# status = unixsock('show stat')
# status = unixsock(process, 'show stat')
# status.each do |line|
# line = line.split(',')
# if line[0] !~ /^#/
Expand All @@ -102,55 +119,55 @@ begin
# puts 'status err haproxy is not running!'
# end
when 'show health'
status = unixsock('show stat')
status = unixsock(process, 'show stat')
status.each do |line|
data = line.split(',')
printf "%-30s %-30s %-7s %3s\n", data[0], data[1], data[17], data[18]
end
when /show backend(s?)/
status = unixsock('show stat').grep(/BACKEND/)
status = unixsock(process, 'show stat').grep(/BACKEND/)
status.each do |line|
data = line.split(',')
printf "%-30s %-30s %-7s %3s\n", data[0], data[1], data[17], data[18]
end
when /disable all EXCEPT (.+)/
servername = Regexp.last_match[ 1]
status = unixsock('show stat')
servername = Regexp.last_match(1)
status = unixsock(process, 'show stat')
backend = status.grep(/#{servername}/)
backend.each do |line|
backend_group = line.split(',')
status.each do |pool|
data = pool.split(',')
if (data[0] == backend_group[0]) && ( data[1] !~ /#{servername}|BACKEND|FRONTEND/) && ( data[17] == 'UP')
unixsock("disable server #{data[0]}/#{data[1]}")
unixsock(process, "disable server #{data[0]}/#{data[1]}")
end
end
end
when /disable all (.+)/
servername = Regexp.last_match[ 1]
status = unixsock('show stat')
servername = Regexp.last_match(1)
status = unixsock(process, 'show stat')
status.each do |line|
data = line.split(',')
if ( data[1] == servername) && ( data[17] == 'UP')
unixsock("disable server #{data[0]}/#{servername}")
unixsock(process, "disable server #{data[0]}/#{servername}")
end
end
when /enable all EXCEPT (.+)/
servername = Regexp.last_match[ 1]
status = unixsock('show stat')
servername = Regexp.last_match(1)
status = unixsock(process, 'show stat')
backend = status.grep(/#{servername}/)
backend.each do |line|
backend_group = line.split(',')
status.each do |pool|
data = pool.split(',')
if (data[0] == backend_group[0]) && ( data[1] !~ /#{servername}|BACKEND|FRONTEND/) && ( data[17] =~ /Down|MAINT/i)
unixsock("enable server #{data[0]}/#{data[1]}")
unixsock(process, "enable server #{data[0]}/#{data[1]}")
end
end
end
when /show stat (.+)/
fieldnames = Regexp.last_match[ 1]
status = unixsock('show stat')
fieldnames = Regexp.last_match(1)
status = unixsock(process, 'show stat')
indices = fieldnames.split(' ').map do |name|
status.first.split(',').index(name) || begin
$stderr.puts("no such field: #{name}")
Expand All @@ -164,18 +181,18 @@ begin
puts (row[0...2] + filtered).compact.join(',')
end
when /enable all (.+)/
servername = Regexp.last_match[ 1]
status = unixsock('show stat')
servername = Regexp.last_match(1)
status = unixsock(process, 'show stat')
status.each do |line|
data = line.split(',')
if ( data[1] == servername) && ( data[17] =~ /Down|MAINT/i)
unixsock("enable server #{data[0]}/#{servername}")
unixsock(process, "enable server #{data[0]}/#{servername}")
end
end
when 'version'
version
else
puts unixsock(argument)
puts unixsock(process, argument)
end
rescue Errno::ENOENT => e
STDERR.puts e
Expand Down
65 changes: 42 additions & 23 deletions lib/haproxyctl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,55 @@ def reload(pids)
end
end

def unixsock(command)
output = []
runs = 0
def unixsock(process, command)

begin
ctl = UNIXSocket.open(socket)
if ctl
ctl.write "#{command}\r\n"
else
puts "cannot talk to #{socket}"
def execute(socket, command)
output = []
runs = 0

begin
ctl = UNIXSocket.open(socket)
if ctl
ctl.write "#{command}\r\n"
else
puts "cannot talk to #{socket}"
end
rescue Errno::EPIPE
ctl.close
sleep 0.5
runs += 1
if runs < 4
retry
else
puts "the unix socket at #{socket} closed before we could complete this request"
exit
end
end
while (line = ctl.gets)
unless line =~ /Unknown command/
output << line
end
end
rescue Errno::EPIPE
ctl.close
sleep 0.5
runs += 1
if runs < 4
retry

output
end

if process == 0
if nbproc > 1
# Only multiple socket execution prefixes lines with process id
# - inspired from dsh.
sockets().each.sort.map{|k,v| execute(v, command).map { |line| "#{k}: #{line}" }}
else
puts "the unix socket at #{socket} closed before we could complete this request"
exit
execute(sockets()[0], command)
end
end
while (line = ctl.gets)
unless line =~ /Unknown command/
output << line
else
if !sockets().has_key?(process)
fail(RuntimeError.new "Could not find a stats socket with process #{process} in #{config_path}")
else
execute(sockets()[process], command)
end
end
ctl.close

output
end

def display_usage!
Expand Down
23 changes: 8 additions & 15 deletions lib/haproxyctl/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,21 @@ def exec
def nbproc
@nbproc ||= begin
config.match /nbproc \s*(\d*)\s*/
Regexp.last_match[1].to_i || 1
Regexp.last_match(1).to_i || 1
end
end

def socket
@socket ||= begin
# If the haproxy config is using nbproc > 1, we assume that all cores
# except for 1 do not need commands sent to their sockets (if they exist).
# This is a poor assumption, so TODO: improve CLI to accept argument for
# processes to target.
if nbproc > 1
config.match /stats\s+socket \s*([^\s]*) \s*.*process \s*1[\d^]?/
else
config.match /stats\s+socket \s*([^\s]*)/
end
Regexp.last_match[1] || fail("Expecting 'stats socket <UNIX_socket_path>' in #{config_path}")
end
def sockets
# Always capture socket path, and include process id if it exists.
# Note: whoever runs haproxy with nbprocs > 1 and has a socket listener without process id .. can blame themself.
@sockets = Hash[config.scan(/stats\s+socket\s+([^\[\s\]]*)(?:(?:.*process)?(?:.*process\s+([^\[\s\]]*)))?/).collect { |v| [v[1].to_i,v[0]] }]
@sockets.empty? && fail(RuntimeError.new "Expecting 'stats socket <UNIX_socket_path>' in #{config_path}")
@sockets
end

def pidfile
if config.match(/pidfile \s*([^\s]*)/)
@pidfile = Regexp.last_match[1]
@pidfile = Regexp.last_match(1)
else
std_pid = '/var/run/haproxy.pid'
if File.exists?(std_pid)
Expand Down
22 changes: 12 additions & 10 deletions rhapr/lib/rhapr/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Rhapr
module Environment
attr_reader :haproxy_pid, :config_path, :config, :exec, :socket_path
attr_reader :haproxy_pid, :config_path, :config, :exec, :socket_paths

# @return [String, nil] The path to the HAProxy configuration file, or nil if not found. Set the ENV variable $HAPROXY_CONFIG to override defaults.
def config_path
Expand Down Expand Up @@ -55,24 +55,26 @@ def exec
(@exec)
end

# @param [int] process id for looking up correct socket path
# @return [UNIXSocket] A connection to the HAProxy Socket
# @raise [RuntimeError] Raised if a socket connection could not be established
def socket
def socket(process)
begin
UNIXSocket.open(socket_path)
UNIXSocket.open(socket_paths[process])
rescue Errno::EACCES => e
raise RuntimeError.new("Could not open a socket with HAProxy. Error message: #{e.message}")
end
end

# @return [String] The path to the HAProxy stats socket.
# @return [Array] Entries of [ProcessNumber, Path] for HAProxy stats sockets. ProcessNumber can be 0 if not bound to any.
# @raise [RuntimeError] Raised if no stats socket has been specified, in the HAProxy configuration.
# @todo: Should there be an ENV var for this? Perhaps allow config-less runs of rhapr?
def socket_path
@socket_path ||= begin
config.match /stats\s+socket\s+([^\s]*)/
Regexp.last_match[1] || fail(RuntimeError.new "Expecting 'stats socket <UNIX_socket_path>' in #{config_path}")
end
def socket_paths
# Always capture socket path, and include process id if it exists.
# Note: whoever runs haproxy with nbprocs > 1 and has a socket listener without process id .. can blame themself.
@socket_paths = Hash[config.scan(/stats\s+socket\s+([^[\s]]*)(?:(?:.*process)?(?:.*process\s+([^[\s]]*)))?/).collect { |v| [v[1].to_i,v[0]] }]
@socket_paths.empty? && fail(RuntimeError.new "Expecting 'stats socket <UNIX_socket_path>' in #{config_path}")
@socket_paths
end

# @return [String] Returns the path to the pidfile, specified in the HAProxy configuration. Returns an assumption, if not found.
Expand All @@ -81,7 +83,7 @@ def socket_path
def pid
@pid ||= begin
config.match /pidfile ([^\s]*)/
Regexp.last_match[1] || '/var/run/haproxy.pid'
Regexp.last_match(1) || '/var/run/haproxy.pid'
end
end

Expand Down
7 changes: 4 additions & 3 deletions rhapr/lib/rhapr/interface.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ class Interface
EMPTY = "\n"

# @param [String, #to_s] message The message to be sent to HAProxy
# @param [int] process id for retrieving correct stats socket
# return [Array<String>] All of the output from HAProxy, read in.
# @see Rhapr::Interface#write, Rhapr::Interface#read_full
def send(message)
sock = socket
def send(message, process=1)
sock = socket(process)

write(sock, message)
read_full(sock)
Expand Down Expand Up @@ -68,7 +69,7 @@ def get_weight(backend, server)
resp = send "get weight #{backend}/#{server}"

resp.match /([[:digit:]]+) \(initial ([[:digit:]]+)\)/
weight, initial = Regexp.last_match[1], Regexp.last_match[2]
weight, initial = Regexp.last_match(1), Regexp.last_match(2)

return [weight.to_i, initial.to_i] if weight and initial

Expand Down
36 changes: 36 additions & 0 deletions rhapr/spec/config_fixtures/nbprocs_haproxy.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
global
daemon
maxconn 1024
# quiet
pidfile /var/run/haproxy.pid
nbproc 2
stats socket /tmp/haproxy-1 level admin uid 501 process 4 group staff mode 0660
stats socket /tmp/haproxy-2 level admin uid 501 group staff mode 0660 process 5

defaults
log global
mode http
option httplog
option dontlognull
stats enable
stats uri /proxystats # and this guy for statistics
stats auth webreport:areallysecretsupersecurepassword
stats refresh 5s

listen thrift :9090
mode tcp
balance roundrobin
option tcplog
option redispatch
retries 3

contimeout 5000
clitimeout 40000
srvtimeout 7000

server thrift1 localhost:9091 maxconn 20 check inter 20000
server thrift2 localhost:9092 maxconn 20 check inter 20000
server thrift3 localhost:9093 maxconn 20 check inter 20000
server thrift4 localhost:9094 maxconn 20 check inter 20000
server thrift5 localhost:9095 maxconn 20 check inter 20000
server thrift6 localhost:9096 maxconn 20 check inter 20000
Loading