Skip to content

Commit 4226940

Browse files
committed
Add weight to proxy backend for janitor
1 parent a848575 commit 4226940

13 files changed

+329
-286
lines changed

cluster/prx.go

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type Proxy struct {
6464
Variables map[string]string `json:"-"`
6565
ServiceName string `json:"serviceName"`
6666
Agent string `json:"agent"`
67+
Weight string `json:"weight"`
6768
}
6869

6970
type DatabaseProxy interface {
@@ -110,6 +111,7 @@ type DatabaseProxy interface {
110111
IsFilterInTags(filter string) bool
111112
IsDown() bool
112113
GetProxyConfig() string
114+
GetJanitorWeight() string
113115
// GetInitContainer(collector opensvc.Collector) string
114116
GetBindAddress() string
115117
GetBindAddressExtraIPV6() string

cluster/prx_consul.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@ import (
1616
"github.com/micro/go-micro/registry"
1717
"github.com/signal18/replication-manager/config"
1818
"github.com/signal18/replication-manager/utils/misc"
19+
"github.com/spf13/pflag"
1920
)
2021

2122
type ConsulProxy struct {
2223
Proxy
2324
}
2425

26+
func (proxy *ConsulProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config) {
27+
flags.BoolVar(&conf.RegistryConsul, "registry-consul", false, "Register write and read SRV DNS to consul")
28+
flags.StringVar(&conf.RegistryConsulCredential, "registry-consul-credential", ":", "Consul credential user:password")
29+
flags.StringVar(&conf.RegistryConsulToken, "registry-consul-token", "", "Consul Token")
30+
flags.StringVar(&conf.RegistryConsulHosts, "registry-servers", "127.0.0.1", "Comma-separated list of registry addresses")
31+
flags.StringVar(&conf.RegistryConsulJanitorWeights, "registry-consul-weights", "100", "Weight of each proxysql host inside janitor proxy")
32+
}
33+
2534
func NewConsulProxy(placement int, cluster *Cluster, proxyHost string) *ConsulProxy {
2635
conf := cluster.Conf
2736
prx := new(ConsulProxy)
@@ -36,7 +45,7 @@ func NewConsulProxy(placement int, cluster *Cluster, proxyHost string) *ConsulPr
3645
prx.WritePort, _ = strconv.Atoi(conf.ProxysqlPort)
3746
prx.ReadPort, _ = strconv.Atoi(conf.ProxysqlPort)
3847

39-
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSProxySQLPartitions, conf.ProxysqlHostsIPV6)
48+
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSProxySQLPartitions, conf.ProxysqlHostsIPV6, conf.RegistryConsulJanitorWeights)
4049

4150
if conf.ProvNetCNI {
4251
if conf.ClusterHead == "" {
@@ -58,7 +67,7 @@ func (proxy *ConsulProxy) Init() {
5867
if cluster.Conf.RegistryConsul == false || cluster.IsActive() == false {
5968
return
6069
}
61-
opt.Addrs = strings.Split(cluster.Conf.RegistryHosts, ",")
70+
opt.Addrs = strings.Split(cluster.Conf.RegistryConsulHosts, ",")
6271
//DefaultRegistry()
6372
//opt := registry.DefaultRegistry
6473
reg := registry.NewRegistry()

cluster/prx_get.go

+4
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ func (prx *Proxy) GetClusterConnection() (*sqlx.DB, error) {
7272

7373
}
7474

75+
func (proxy *Proxy) GetJanitorWeight() string {
76+
return proxy.Weight
77+
}
78+
7579
func (proxy *Proxy) GetProxyConfig() string {
7680
proxy.ClusterGroup.LogPrintf(LvlInfo, "Proxy Config generation "+proxy.Datadir+"/config.tar.gz")
7781
err := proxy.ClusterGroup.Configurator.GenerateProxyConfig(proxy.Datadir, proxy.ClusterGroup.Conf.WorkingDir+"/"+proxy.ClusterGroup.Name, proxy.GetEnv())

cluster/prx_haproxy.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type HaproxyProxy struct {
3131
func NewHaproxyProxy(placement int, cluster *Cluster, proxyHost string) *HaproxyProxy {
3232
conf := cluster.Conf
3333
prx := new(HaproxyProxy)
34-
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSHaProxyPartitions, conf.HaproxyHostsIPV6)
34+
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSHaProxyPartitions, conf.HaproxyHostsIPV6, conf.HaproxyJanitorWeights)
3535
prx.Type = config.ConstProxyHaproxy
3636
prx.Port = strconv.Itoa(conf.HaproxyAPIPort)
3737
prx.ReadPort = conf.HaproxyReadPort
@@ -55,6 +55,7 @@ func (proxy *HaproxyProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config) {
5555
flags.StringVar(&conf.HaproxyUser, "haproxy-user", "admin", "Haproxy API user")
5656
flags.StringVar(&conf.HaproxyPassword, "haproxy-password", "admin", "Haproxy API password")
5757
flags.StringVar(&conf.HaproxyHosts, "haproxy-servers", "127.0.0.1", "HaProxy hosts")
58+
flags.StringVar(&conf.HaproxyJanitorWeights, "haproxy-janitor-weights", "100", "Weight of each HaProxy host inside janitor proxy")
5859
flags.IntVar(&conf.HaproxyAPIPort, "haproxy-api-port", 1999, "HaProxy runtime api port")
5960
flags.IntVar(&conf.HaproxyWritePort, "haproxy-write-port", 3306, "HaProxy read-write port to leader")
6061
flags.IntVar(&conf.HaproxyReadPort, "haproxy-read-port", 3307, "HaProxy load balance read port to all nodes")

cluster/prx_janitor.go

+4
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func (proxy *ProxyJanitor) Connect() (proxysql.ProxySQL, error) {
6565
Host: proxy.Host,
6666
Port: proxy.Port,
6767
WriterHG: "0",
68+
Weight: "1",
6869
}
6970
var err error
7071
err = psql.Connect()
@@ -137,6 +138,7 @@ func (proxy *ProxyJanitor) Init() {
137138
}
138139
} else {
139140
//weight string, max_replication_lag string, max_connections string, compression string
141+
psql.Weight = s.GetJanitorWeight()
140142
err = psql.AddServerAsWriter(misc.Unbracket(s.GetHost()), strconv.Itoa(s.GetWritePort()), proxy.UseSSL())
141143

142144
if cluster.Conf.LogLevel > 2 || cluster.Conf.ProxysqlDebug {
@@ -220,6 +222,7 @@ func (proxy *ProxyJanitor) Refresh() error {
220222
}
221223

222224
if err != nil {
225+
// cluster.LogPrintf(LvlErr, "Backend %s:%s not found error:%s ", misc.Unbracket(s.GetHost()), strconv.Itoa(s.GetWritePort()), err)
223226
isFoundBackendWrite = false
224227
} else {
225228
proxy.BackendsWrite = append(proxy.BackendsWrite, bke)
@@ -243,6 +246,7 @@ func (proxy *ProxyJanitor) Refresh() error {
243246
}
244247
} else {
245248
//scenario restart with failed leader
249+
psql.Weight = s.GetJanitorWeight()
246250
err = psql.AddServerAsWriter(misc.Unbracket(s.GetHost()), strconv.Itoa(s.GetWritePort()), proxy.UseSSL())
247251
}
248252
updated = true

cluster/prx_mariadbshardproxy.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type MariadbShardProxy struct {
3535
func NewMariadbShardProxy(placement int, cluster *Cluster, proxyHost string) *MariadbShardProxy {
3636
conf := cluster.Conf
3737
prx := new(MariadbShardProxy)
38-
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSShardProxyPartitions, conf.MdbsHostsIPV6)
38+
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSShardProxyPartitions, conf.MdbsHostsIPV6, conf.MdbsJanitorWeights)
3939
prx.Type = config.ConstProxySpider
4040
prx.Host, prx.Port = misc.SplitHostPort(proxyHost)
4141
prx.User, prx.Pass = misc.SplitPair(cluster.Conf.GetDecryptedValue("shardproxy-credential"))
@@ -70,6 +70,7 @@ func NewMariadbShardProxy(placement int, cluster *Cluster, proxyHost string) *Ma
7070
func (proxy *MariadbShardProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config) {
7171
flags.BoolVar(&conf.MdbsProxyOn, "shardproxy", false, "MariaDB Spider proxy")
7272
flags.StringVar(&conf.MdbsProxyHosts, "shardproxy-servers", "127.0.0.1:3307", "MariaDB spider proxy hosts IP:Port,IP:Port")
73+
flags.StringVar(&conf.MdbsJanitorWeights, "shardproxy-janitor-weights", "100", "Weight of each MariaDB spider host inside janitor proxy")
7374
flags.StringVar(&conf.MdbsProxyCredential, "shardproxy-credential", "root:mariadb", "MariaDB spider proxy credential")
7475
flags.BoolVar(&conf.MdbsProxyCopyGrants, "shardproxy-copy-grants", true, "Copy grants from shards master")
7576
flags.BoolVar(&conf.MdbsProxyLoadSystem, "shardproxy-load-system", true, "Load Spider system tables")

cluster/prx_maxscale.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func NewMaxscaleProxy(placement int, cluster *Cluster, proxyHost string) *Maxsca
3232
conf := cluster.Conf
3333
prx := new(MaxscaleProxy)
3434
prx.Type = config.ConstProxyMaxscale
35-
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSMaxscalePartitions, conf.MxsHostsIPV6)
35+
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSMaxscalePartitions, conf.MxsHostsIPV6, conf.MxsJanitorWeights)
3636
prx.Port = conf.MxsPort
3737
prx.User = conf.MxsUser
3838
prx.Pass = conf.MxsPass
@@ -58,6 +58,7 @@ func (proxy *MaxscaleProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config)
5858
flags.BoolVar(&conf.MxsDisableMonitor, "maxscale-disable-monitor", false, "Disable maxscale monitoring and fully drive server state")
5959
flags.StringVar(&conf.MxsGetInfoMethod, "maxscale-get-info-method", "maxadmin", "How to get infos from Maxscale maxinfo|maxadmin")
6060
flags.StringVar(&conf.MxsHost, "maxscale-servers", "", "MaxScale hosts ")
61+
flags.StringVar(&conf.MxsJanitorWeights, "maxscale-janitor-weights", "100", "Weight of each MariaDB maxscale inside janitor proxy")
6162
flags.StringVar(&conf.MxsPort, "maxscale-port", "6603", "MaxScale admin port")
6263
flags.StringVar(&conf.MxsUser, "maxscale-user", "admin", "MaxScale admin user")
6364
flags.StringVar(&conf.MxsPass, "maxscale-pass", "mariadb", "MaxScale admin password")

cluster/prx_proxysql.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func NewProxySQLProxy(placement int, cluster *Cluster, proxyHost string) *ProxyS
3333
prx.WritePort, _ = strconv.Atoi(conf.ProxysqlPort)
3434
prx.ReadPort, _ = strconv.Atoi(conf.ProxysqlPort)
3535

36-
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSProxySQLPartitions, conf.ProxysqlHostsIPV6)
36+
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSProxySQLPartitions, conf.ProxysqlHostsIPV6, conf.ProxysqlJanitorWeights)
3737

3838
if conf.ProvNetCNI {
3939
if conf.ClusterHead == "" {
@@ -54,6 +54,7 @@ func (proxy *ProxySQLProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config)
5454
flags.BoolVar(&conf.ProxysqlSaveToDisk, "proxysql-save-to-disk", false, "Save proxysql change to sqllight")
5555
flags.StringVar(&conf.ProxysqlHosts, "proxysql-servers", "", "ProxySQL hosts")
5656
flags.StringVar(&conf.ProxysqlHostsIPV6, "proxysql-servers-ipv6", "", "ProxySQL extra IPV6 bind for interfaces")
57+
flags.StringVar(&conf.ProxysqlJanitorWeights, "proxysql-janitor-weights", "100", "Weight of each proxysql host inside janitor proxy")
5758
flags.StringVar(&conf.ProxysqlPort, "proxysql-port", "3306", "ProxySQL read/write proxy port")
5859
flags.StringVar(&conf.ProxysqlAdminPort, "proxysql-admin-port", "6032", "ProxySQL admin interface port")
5960
flags.StringVar(&conf.ProxysqlReaderHostgroup, "proxysql-reader-hostgroup", "1", "ProxySQL reader hostgroup")
@@ -77,6 +78,7 @@ func (proxy *ProxySQLProxy) Connect() (proxysql.ProxySQL, error) {
7778
Port: proxy.Port,
7879
WriterHG: fmt.Sprintf("%d", proxy.WriterHostgroup),
7980
ReaderHG: fmt.Sprintf("%d", proxy.ReaderHostgroup),
81+
Weight: proxy.Weight,
8082
}
8183

8284
var err error

cluster/prx_set.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,21 @@ func (proxy *Proxy) SetServiceName(namespace string) {
3131
proxy.ServiceName = namespace + "/svc/" + proxy.Name
3232
}
3333

34-
func (proxy *Proxy) SetPlacement(k int, ProvAgents string, SlapOSDBPartitions string, ProxysqlHostsIPV6 string) {
34+
func (proxy *Proxy) SetPlacement(k int, ProvAgents string, SlapOSDBPartitions string, ProxysqlHostsIPV6 string, Weights string) {
3535
slapospartitions := strings.Split(SlapOSDBPartitions, ",")
3636
agents := strings.Split(ProvAgents, ",")
3737
ipv6hosts := strings.Split(ProxysqlHostsIPV6, ",")
38+
weights := strings.Split(Weights, ",")
3839
if k < len(slapospartitions) {
3940
proxy.SlapOSDatadir = slapospartitions[k]
4041
}
4142
if ProvAgents != "" {
4243
proxy.Agent = agents[k%len(agents)]
4344
}
45+
if Weights != "" {
46+
proxy.Weight = weights[k%len(weights)]
47+
}
48+
4449
if k < len(ipv6hosts) {
4550
proxy.HostIPV6 = ipv6hosts[k]
4651
}

cluster/prx_sphinx.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type SphinxProxy struct {
2828
func NewSphinxProxy(placement int, cluster *Cluster, proxyHost string) *SphinxProxy {
2929
conf := cluster.Conf
3030
prx := new(SphinxProxy)
31-
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSSphinxPartitions, conf.SphinxHostsIPV6)
31+
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSSphinxPartitions, conf.SphinxHostsIPV6, conf.SphinxJanitorWeights)
3232
prx.Type = config.ConstProxySphinx
3333

3434
prx.Port = conf.SphinxQLPort
@@ -49,6 +49,7 @@ func NewSphinxProxy(placement int, cluster *Cluster, proxyHost string) *SphinxPr
4949
func (proxy *SphinxProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config) {
5050
flags.BoolVar(&conf.SphinxOn, "sphinx", false, "Turn on SphinxSearch detection")
5151
flags.StringVar(&conf.SphinxHosts, "sphinx-servers", "127.0.0.1", "SphinxSearch hosts")
52+
flags.StringVar(&conf.SphinxJanitorWeights, "sphinx-janitor-weights", "100", "Weight of each Sphinx host inside janitor proxy")
5253
flags.StringVar(&conf.SphinxPort, "sphinx-port", "9312", "SphinxSearch API port")
5354
flags.StringVar(&conf.SphinxQLPort, "sphinx-sql-port", "9306", "SphinxSearch SQL port")
5455
if runtime.GOOS == "linux" {

0 commit comments

Comments
 (0)