Skip to content
76 changes: 54 additions & 22 deletions bin/haproxyctl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,32 @@ end

display_usage! if argument =~ /help/ || ARGV.length < 1


process ||= argument.scan(/-p\s*([^\[\s\]]+)/).flatten

if !process.empty?
process = process[0].to_i
elsif process.empty?
if nbproc > 1
# Default to old behavior to use the first socket.
process = 1
else
# Default to unix socket not bound to a process id.
process = 0
end
end
# Strip of the -p <process> argument as argument is passed to unix socket if not defined below
argument = argument.gsub(/-p\s*([^\[\s\]]*)/, '')

# For all data fetching logic below, we need to strip ^\d+:\s if dsh_output
dsh_output ||= begin
if process == 0
1
else
0
end
end

begin
case argument
when 'start'
Expand Down Expand Up @@ -87,7 +113,7 @@ begin
# # removes the listener
# conn = conn - 1
# puts "metric connections int #{conn}"
# status = unixsock('show stat')
# status = unixsock(process, 'show stat')
# status.each do |line|
# line = line.split(',')
# if line[0] !~ /^#/
Expand All @@ -102,55 +128,60 @@ begin
# puts 'status err haproxy is not running!'
# end
when 'show health'
status = unixsock('show stat')
status = unixsock(process, 'show stat')
status.each do |line|
data = line.split(',')
printf "%-30s %-30s %-7s %3s\n", data[0], data[1], data[17], data[18]
dsh_data0 ||= data[0].sub(/\d+:\s/, '') if dsh_output || data[0]
printf "%-30s %-30s %-7s %3s\n", dsh_data0, data[1], data[17], data[18]
end
when /show backend(s?)/
status = unixsock('show stat').grep(/BACKEND/)
status = unixsock(process, 'show stat').grep(/BACKEND/)
status.each do |line|
data = line.split(',')
printf "%-30s %-30s %-7s %3s\n", data[0], data[1], data[17], data[18]
dsh_data0 ||= data[0].sub(/\d+:\s/, '') if dsh_output || data[0]
printf "%-30s %-30s %-7s %3s\n", dsh_data0, data[1], data[17], data[18]
end
when /disable all EXCEPT (.+)/
servername = Regexp.last_match[ 1]
status = unixsock('show stat')
servername = Regexp.last_match(1)
status = unixsock(process, 'show stat')
backend = status.grep(/#{servername}/)
backend.each do |line|
backend_group = line.split(',')
status.each do |pool|
data = pool.split(',')
if (data[0] == backend_group[0]) && ( data[1] !~ /#{servername}|BACKEND|FRONTEND/) && ( data[17] == 'UP')
unixsock("disable server #{data[0]}/#{data[1]}")
dsh_data0 ||= data[0].sub(/\d+:\s/, '') if dsh_output || data[0]
if (dsh_data0 == backend_group[0]) && ( data[1] !~ /#{servername}|BACKEND|FRONTEND/) && ( data[17] == 'UP')
unixsock(process, "disable server #{dsh_data0}/#{data[1]}")
end
end
end
when /disable all (.+)/
servername = Regexp.last_match[ 1]
status = unixsock('show stat')
servername = Regexp.last_match(1)
status = unixsock(process, 'show stat')
status.each do |line|
data = line.split(',')
if ( data[1] == servername) && ( data[17] == 'UP')
unixsock("disable server #{data[0]}/#{servername}")
dsh_data0 ||= data[0].sub(/\d+:\s/, '') if dsh_output || data[0]
unixsock(process, "disable server #{dsh_data0}/#{servername}")
end
end
when /enable all EXCEPT (.+)/
servername = Regexp.last_match[ 1]
status = unixsock('show stat')
servername = Regexp.last_match(1)
status = unixsock(process, 'show stat')
backend = status.grep(/#{servername}/)
backend.each do |line|
backend_group = line.split(',')
status.each do |pool|
data = pool.split(',')
if (data[0] == backend_group[0]) && ( data[1] !~ /#{servername}|BACKEND|FRONTEND/) && ( data[17] =~ /Down|MAINT/i)
unixsock("enable server #{data[0]}/#{data[1]}")
dsh_data0 ||= data[0].sub(/\d+:\s/, '') if dsh_output || data[0]
if (dsh_data0 == backend_group[0]) && ( data[1] !~ /#{servername}|BACKEND|FRONTEND/) && ( data[17] =~ /Down|MAINT/i)
unixsock(process, "enable server #{dsh_data0}/#{data[1]}")
end
end
end
when /show stat (.+)/
fieldnames = Regexp.last_match[ 1]
status = unixsock('show stat')
fieldnames = Regexp.last_match(1)
status = unixsock(process, 'show stat')
indices = fieldnames.split(' ').map do |name|
status.first.split(',').index(name) || begin
$stderr.puts("no such field: #{name}")
Expand All @@ -164,18 +195,19 @@ begin
puts (row[0...2] + filtered).compact.join(',')
end
when /enable all (.+)/
servername = Regexp.last_match[ 1]
status = unixsock('show stat')
servername = Regexp.last_match(1)
status = unixsock(process, 'show stat')
status.each do |line|
data = line.split(',')
if ( data[1] == servername) && ( data[17] =~ /Down|MAINT/i)
unixsock("enable server #{data[0]}/#{servername}")
dsh_data0 ||= data[0].sub(/\d+:\s/, '') if dsh_output || data[0]
unixsock(process, "enable server #{dsh_data0}/#{servername}")
end
end
when 'version'
version
else
puts unixsock(argument)
puts unixsock(process, argument)
end
rescue Errno::ENOENT => e
STDERR.puts e
Expand Down
65 changes: 42 additions & 23 deletions lib/haproxyctl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,55 @@ def reload(pids)
end
end

def unixsock(command)
output = []
runs = 0
def unixsock(process, command)

begin
ctl = UNIXSocket.open(socket)
if ctl
ctl.write "#{command}\r\n"
else
puts "cannot talk to #{socket}"
def execute(socket, command)
output = []
runs = 0

begin
ctl = UNIXSocket.open(socket)
if ctl
ctl.write "#{command}\r\n"
else
puts "cannot talk to #{socket}"
end
rescue Errno::EPIPE
ctl.close
sleep 0.5
runs += 1
if runs < 4
retry
else
puts "the unix socket at #{socket} closed before we could complete this request"
exit
end
end
while (line = ctl.gets)
unless line =~ /Unknown command/
output << line
end
end
rescue Errno::EPIPE
ctl.close
sleep 0.5
runs += 1
if runs < 4
retry

output
end

if process == 0
if nbproc > 1
# Only multiple socket execution prefixes lines with process id
# - inspired from dsh.
sockets().each.sort.map{|k,v| execute(v, command).map { |line| "#{k}: #{line}" }}.flatten
else
puts "the unix socket at #{socket} closed before we could complete this request"
exit
execute(sockets()[0], command)
end
end
while (line = ctl.gets)
unless line =~ /Unknown command/
output << line
else
if !sockets().has_key?(process)
fail(RuntimeError.new "Could not find a stats socket with process #{process} in #{config_path}")
else
execute(sockets()[process], command)
end
end
ctl.close

output
end

def display_usage!
Expand Down
23 changes: 8 additions & 15 deletions lib/haproxyctl/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,21 @@ def exec
def nbproc
@nbproc ||= begin
config.match /nbproc \s*(\d*)\s*/
Regexp.last_match[1].to_i || 1
Regexp.last_match(1).to_i || 1
end
end

def socket
@socket ||= begin
# If the haproxy config is using nbproc > 1, we assume that all cores
# except for 1 do not need commands sent to their sockets (if they exist).
# This is a poor assumption, so TODO: improve CLI to accept argument for
# processes to target.
if nbproc > 1
config.match /stats\s+socket \s*([^\s]*) \s*.*process \s*1[\d^]?/
else
config.match /stats\s+socket \s*([^\s]*)/
end
Regexp.last_match[1] || fail("Expecting 'stats socket <UNIX_socket_path>' in #{config_path}")
end
def sockets
# Always capture socket path, and include process id if it exists.
# Note: whoever runs haproxy with nbprocs > 1 and has a socket listener without process id .. can blame themself.
@sockets = Hash[config.scan(/stats\s+socket\s+([^\[\s\]]*)(?:(?:.*process)?(?:.*process\s+([^\[\s\]]*)))?/).collect { |v| [v[1].to_i,v[0]] }]
@sockets.empty? && fail(RuntimeError.new "Expecting 'stats socket <UNIX_socket_path>' in #{config_path}")
@sockets
end

def pidfile
if config.match(/pidfile \s*([^\s]*)/)
@pidfile = Regexp.last_match[1]
@pidfile = Regexp.last_match(1)
else
std_pid = '/var/run/haproxy.pid'
if File.exists?(std_pid)
Expand Down
22 changes: 12 additions & 10 deletions rhapr/lib/rhapr/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Rhapr
module Environment
attr_reader :haproxy_pid, :config_path, :config, :exec, :socket_path
attr_reader :haproxy_pid, :config_path, :config, :exec, :socket_paths

# @return [String, nil] The path to the HAProxy configuration file, or nil if not found. Set the ENV variable $HAPROXY_CONFIG to override defaults.
def config_path
Expand Down Expand Up @@ -55,24 +55,26 @@ def exec
(@exec)
end

# @param [int] process id for looking up correct socket path
# @return [UNIXSocket] A connection to the HAProxy Socket
# @raise [RuntimeError] Raised if a socket connection could not be established
def socket
def socket(process)
begin
UNIXSocket.open(socket_path)
UNIXSocket.open(socket_paths[process])
rescue Errno::EACCES => e
raise RuntimeError.new("Could not open a socket with HAProxy. Error message: #{e.message}")
end
end

# @return [String] The path to the HAProxy stats socket.
# @return [Array] Entries of [ProcessNumber, Path] for HAProxy stats sockets. ProcessNumber can be 0 if not bound to any.
# @raise [RuntimeError] Raised if no stats socket has been specified, in the HAProxy configuration.
# @todo: Should there be an ENV var for this? Perhaps allow config-less runs of rhapr?
def socket_path
@socket_path ||= begin
config.match /stats\s+socket\s+([^\s]*)/
Regexp.last_match[1] || fail(RuntimeError.new "Expecting 'stats socket <UNIX_socket_path>' in #{config_path}")
end
def socket_paths
# Always capture socket path, and include process id if it exists.
# Note: whoever runs haproxy with nbprocs > 1 and has a socket listener without process id .. can blame themself.
@socket_paths = Hash[config.scan(/stats\s+socket\s+([^[\s]]*)(?:(?:.*process)?(?:.*process\s+([^[\s]]*)))?/).collect { |v| [v[1].to_i,v[0]] }]
@socket_paths.empty? && fail(RuntimeError.new "Expecting 'stats socket <UNIX_socket_path>' in #{config_path}")
@socket_paths
end

# @return [String] Returns the path to the pidfile, specified in the HAProxy configuration. Returns an assumption, if not found.
Expand All @@ -81,7 +83,7 @@ def socket_path
def pid
@pid ||= begin
config.match /pidfile ([^\s]*)/
Regexp.last_match[1] || '/var/run/haproxy.pid'
Regexp.last_match(1) || '/var/run/haproxy.pid'
end
end

Expand Down
7 changes: 4 additions & 3 deletions rhapr/lib/rhapr/interface.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ class Interface
EMPTY = "\n"

# @param [String, #to_s] message The message to be sent to HAProxy
# @param [int] process id for retrieving correct stats socket
# return [Array<String>] All of the output from HAProxy, read in.
# @see Rhapr::Interface#write, Rhapr::Interface#read_full
def send(message)
sock = socket
def send(message, process=1)
sock = socket(process)

write(sock, message)
read_full(sock)
Expand Down Expand Up @@ -68,7 +69,7 @@ def get_weight(backend, server)
resp = send "get weight #{backend}/#{server}"

resp.match /([[:digit:]]+) \(initial ([[:digit:]]+)\)/
weight, initial = Regexp.last_match[1], Regexp.last_match[2]
weight, initial = Regexp.last_match(1), Regexp.last_match(2)

return [weight.to_i, initial.to_i] if weight and initial

Expand Down
36 changes: 36 additions & 0 deletions rhapr/spec/config_fixtures/nbprocs_haproxy.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
global
daemon
maxconn 1024
# quiet
pidfile /var/run/haproxy.pid
nbproc 2
stats socket /tmp/haproxy-1 level admin uid 501 process 4 group staff mode 0660
stats socket /tmp/haproxy-2 level admin uid 501 group staff mode 0660 process 5

defaults
log global
mode http
option httplog
option dontlognull
stats enable
stats uri /proxystats # and this guy for statistics
stats auth webreport:areallysecretsupersecurepassword
stats refresh 5s

listen thrift :9090
mode tcp
balance roundrobin
option tcplog
option redispatch
retries 3

contimeout 5000
clitimeout 40000
srvtimeout 7000

server thrift1 localhost:9091 maxconn 20 check inter 20000
server thrift2 localhost:9092 maxconn 20 check inter 20000
server thrift3 localhost:9093 maxconn 20 check inter 20000
server thrift4 localhost:9094 maxconn 20 check inter 20000
server thrift5 localhost:9095 maxconn 20 check inter 20000
server thrift6 localhost:9096 maxconn 20 check inter 20000
Loading