Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into alert
Browse files Browse the repository at this point in the history
  • Loading branch information
caffeinated92 committed Jul 16, 2024
2 parents 0fd6220 + 48ad128 commit 779e724
Show file tree
Hide file tree
Showing 46 changed files with 301 additions and 71 deletions.
39 changes: 25 additions & 14 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,17 +712,22 @@ func (cluster *Cluster) StateProcessing() {
if s.ErrKey == "WARN0074" {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending master physical backup to reseed %s", s.ServerUrl)
if master != nil {
backupext := ".xbtream"
task := "reseed" + cluster.Conf.BackupPhysicalType
if servertoreseed.IsReseeding {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Cancel backup reseeding, %s is already reseeding", s.ServerUrl)
} else {
servertoreseed.SetInReseedBackup(true)
backupext := ".xbtream"
task := "reseed" + cluster.Conf.BackupPhysicalType

if cluster.Conf.CompressBackups {
backupext = backupext + ".gz"
}
if cluster.Conf.CompressBackups {
backupext = backupext + ".gz"
}

if mybcksrv != nil {
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed, task)
} else {
go cluster.SSTRunSender(master.GetMasterBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed, task)
if mybcksrv != nil {
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed, task)
} else {
go cluster.SSTRunSender(master.GetMasterBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed, task)
}
}
} else {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "No master cancel backup reseeding %s", s.ServerUrl)
Expand All @@ -746,12 +751,18 @@ func (cluster *Cluster) StateProcessing() {
// }
}
if s.ErrKey == "WARN0076" {
task := "flashback" + cluster.Conf.BackupPhysicalType
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending server physical backup to flashback reseed %s", s.ServerUrl)
if mybcksrv != nil {
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed, task)
if servertoreseed.IsReseeding {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Cancel backup reseeding, %s is already reseeding", s.ServerUrl)
} else {
go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed, task)
servertoreseed.SetInReseedBackup(true)
task := "flashback" + cluster.Conf.BackupPhysicalType
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending server physical backup to flashback reseed %s", s.ServerUrl)

if mybcksrv != nil {
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed, task)
} else {
go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed, task)
}
}
}
if s.ErrKey == "WARN0077" {
Expand Down
5 changes: 5 additions & 0 deletions cluster/cluster_acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,11 @@ func (cluster *Cluster) IsURLPassACL(strUser string, URL string) bool {
return true
}
}
if cluster.APIUsers[strUser].Grants[config.GrantClusterDropMonitor] {
if strings.Contains(URL, "/api/clusters/"+cluster.Name+"/actions/dropserver") {
return true
}
}
if cluster.APIUsers[strUser].Grants[config.GrantClusterSwitchover] {
if strings.Contains(URL, "/api/clusters/"+cluster.Name+"/actions/switchover") {
return true
Expand Down
3 changes: 2 additions & 1 deletion cluster/cluster_chk.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (cluster *Cluster) isMasterFailed() bool {
func (cluster *Cluster) isMaxMasterFailedCountReached() bool {
// no illimited failed count

if cluster.GetMaster().FailCount >= cluster.Conf.MaxFail {
if cluster.GetMaster() != nil && cluster.GetMaster().FailCount >= cluster.Conf.MaxFail {
cluster.SetState("WARN0023", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0023"]), ErrFrom: "CHECK"})
return true
} else {
Expand Down Expand Up @@ -274,6 +274,7 @@ func (cluster *Cluster) isMaxscaleSupectRunning() bool {
}

func (cluster *Cluster) isFoundCandidateMaster() bool {

if cluster.GetTopology() == topoActivePassive {
return true
}
Expand Down
67 changes: 67 additions & 0 deletions cluster/cluster_del.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package cluster

import (
"errors"
"fmt"
"strings"

"github.com/signal18/replication-manager/config"
Expand All @@ -19,6 +21,33 @@ func (cluster *Cluster) RemoveServerFromIndex(index int) {
cluster.Servers = newServers
}

func (cluster *Cluster) RemoveServerMonitor(host string, port string) error {
newServers := make([]*ServerMonitor, 0)
index := -1
//Find the index
for i, srv := range cluster.Servers {
//Skip the server
if srv.Host == host && srv.Port == port {
index = i
}

}

if index >= 0 {
cluster.Conf.Hosts = strings.ReplaceAll(strings.Replace(cluster.Conf.Hosts, host+":"+port, "", 1), ",,", ",")
cluster.StateMachine.SetFailoverState()
cluster.Lock()
newServers = append(newServers, cluster.Servers[:index]...)
newServers = append(newServers, cluster.Servers[index+1:]...)
cluster.Servers = newServers
cluster.Unlock()
cluster.StateMachine.RemoveFailoverState()
} else {
return errors.New(fmt.Sprintf("Host with address %s:%s not found in cluster!", host, port))
}
return nil
}

func (cluster *Cluster) CancelRollingRestart() error {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "API receive cancel rolling restart")
for _, pr := range cluster.Proxies {
Expand Down Expand Up @@ -79,3 +108,41 @@ func (cluster *Cluster) DropProxyTag(dtag string) {
cluster.SetClusterCredentialsFromConfig()
cluster.SetProxiesRestartCookie()
}

func (cluster *Cluster) RemoveProxyMonitor(prx string, host string, port string) error {
newProxies := make([]DatabaseProxy, 0)
index := -1
for i, pr := range cluster.Proxies {
if pr.GetHost() == host && pr.GetPort() == port {
index = i
}
}
if index >= 0 {
cluster.StateMachine.SetFailoverState()
cluster.Lock()
if len(cluster.Proxies) == 1 {
cluster.Proxies = newProxies
} else {
newProxies = append(newProxies, cluster.Proxies[:index]...)
newProxies = append(newProxies, cluster.Proxies[index+1:]...)
cluster.Proxies = newProxies
}

switch prx {
case config.ConstProxyHaproxy:
cluster.Conf.HaproxyHosts = strings.ReplaceAll(strings.Replace(cluster.Conf.HaproxyHosts, host, "", 1), ",,", ",")
case config.ConstProxyMaxscale:
cluster.Conf.MxsHost = strings.ReplaceAll(strings.Replace(cluster.Conf.MxsHost, host, "", 1), ",,", ",")
case config.ConstProxySqlproxy:
cluster.Conf.ProxysqlHosts = strings.ReplaceAll(strings.Replace(cluster.Conf.ProxysqlHosts, host, "", 1), ",,", ",")
case config.ConstProxySpider:
cluster.Conf.MdbsProxyHosts = strings.ReplaceAll(strings.Replace(cluster.Conf.MdbsProxyHosts, host, "", 1), ",,", ",")
}
cluster.Unlock()
cluster.StateMachine.RemoveFailoverState()
} else {
return errors.New(fmt.Sprintf("Proxy host with address %s:%s not found in cluster!", host, port))
}

return nil
}
6 changes: 6 additions & 0 deletions cluster/cluster_topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ func (cluster *Cluster) TopologyDiscover(wcg *sync.WaitGroup) error {
cluster.master = cluster.Servers[k]
cluster.master.SetMaster()
}

// Set master when master reconnect after become suspect
if cluster.master == cluster.Servers[k] && cluster.master.State == stateSuspect {
cluster.master.SetMaster()
}

if cluster.master.IsReadOnly() && !cluster.master.IsRelay {
cluster.master.SetReadWrite()
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTopology, config.LvlInfo, "Server %s disable read only as last non slave", cluster.master.URL)
Expand Down
14 changes: 9 additions & 5 deletions cluster/prx.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,31 +182,35 @@ func (cluster *Cluster) newProxyList() error {
for k, proxyHost := range strings.Split(cluster.Conf.MxsHost, ",") {
prx := NewMaxscaleProxy(k, cluster, proxyHost)
cluster.AddProxy(prx)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModProxy, config.LvlDbg, "New Maxscale proxy created: %s %s", prx.GetHost(), prx.GetPort())
}
}
if cluster.Conf.HaproxyOn {
if cluster.Conf.HaproxyHosts != "" && cluster.Conf.HaproxyOn {
for k, proxyHost := range strings.Split(cluster.Conf.HaproxyHosts, ",") {
prx := NewHaproxyProxy(k, cluster, proxyHost)
cluster.AddProxy(prx)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModProxy, config.LvlDbg, "New HA Proxy created: %s %s", prx.GetHost(), prx.GetPort())
}
}
if cluster.Conf.ExtProxyOn {
if cluster.Conf.ExtProxyVIP != "" && cluster.Conf.ExtProxyOn {
for k, proxyHost := range strings.Split(cluster.Conf.ExtProxyVIP, ",") {
prx := NewExternalProxy(k, cluster, proxyHost)
cluster.AddProxy(prx)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModProxy, config.LvlDbg, "New external proxy created: %s %s", prx.GetHost(), prx.GetPort())
}

}
if cluster.Conf.ProxysqlOn {
if cluster.Conf.ProxysqlHosts != "" && cluster.Conf.ProxysqlOn {
for k, proxyHost := range strings.Split(cluster.Conf.ProxysqlHosts, ",") {
prx := NewProxySQLProxy(k, cluster, proxyHost)
cluster.AddProxy(prx)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModProxy, config.LvlDbg, "New ProxySQL proxy created: %s %s", prx.GetHost(), prx.GetPort())
}
}
if cluster.Conf.ProxyJanitorHosts != "" {
for k, proxyHost := range strings.Split(cluster.Conf.ProxyJanitorHosts, ",") {
prx := NewProxyJanitor(k, cluster, proxyHost)
cluster.AddProxy(prx)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModProxy, config.LvlDbg, "New ProxyJanitor proxy created: %s %s", prx.GetHost(), prx.GetPort())
}
}
if cluster.Conf.MdbsProxyHosts != "" && cluster.Conf.MdbsProxyOn {
Expand Down Expand Up @@ -301,7 +305,7 @@ func (cluster *Cluster) IsProxyEqualMaster() bool {
// if cluster.IsVerbose() {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModProxy, config.LvlInfo, "Proxy compare master: %d %d", cluster.GetMaster().ServerID, uint(sid))
// }
if cluster.GetMaster().ServerID == uint64(sid) || pr.GetType() == config.ConstProxySpider {
if cluster.GetMaster() != nil && cluster.GetMaster().ServerID == uint64(sid) || pr.GetType() == config.ConstProxySpider {
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/prx_maxscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (proxy *MaxscaleProxy) Init() {
if master == nil {
return
}
if cluster.GetMaster().MxsServerName == "" {
if cluster.GetMaster() != nil && cluster.GetMaster().MxsServerName == "" {
return
}

Expand Down
17 changes: 9 additions & 8 deletions cluster/prx_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ func (proxy *Proxy) SetPlacement(k int, ProvAgents string, SlapOSDBPartitions st
}

func (proxy *Proxy) SetDataDir() {

proxy.Datadir = proxy.ClusterGroup.Conf.WorkingDir + "/" + proxy.ClusterGroup.Name + "/" + proxy.Host + "_" + proxy.Port
if _, err := os.Stat(proxy.Datadir); os.IsNotExist(err) {
os.MkdirAll(proxy.Datadir, os.ModePerm)
os.MkdirAll(proxy.Datadir+"/log", os.ModePerm)
os.MkdirAll(proxy.Datadir+"/var", os.ModePerm)
os.MkdirAll(proxy.Datadir+"/init", os.ModePerm)
os.MkdirAll(proxy.Datadir+"/bck", os.ModePerm)
if proxy.Host != "" {
proxy.Datadir = proxy.ClusterGroup.Conf.WorkingDir + "/" + proxy.ClusterGroup.Name + "/" + proxy.Host + "_" + proxy.Port
if _, err := os.Stat(proxy.Datadir); os.IsNotExist(err) {
os.MkdirAll(proxy.Datadir, os.ModePerm)
os.MkdirAll(proxy.Datadir+"/log", os.ModePerm)
os.MkdirAll(proxy.Datadir+"/var", os.ModePerm)
os.MkdirAll(proxy.Datadir+"/init", os.ModePerm)
os.MkdirAll(proxy.Datadir+"/bck", os.ModePerm)
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion cluster/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ type ServerMonitor struct {
WorkLoad *config.WorkLoadsMap `json:"workLoad"`
DelayStat *ServerDelayStat `json:"delayStat"`
SlaveVariables SlaveVariables `json:"slaveVariables"`
IsReseeding bool `json:"isReseeding"`
MDevIssues ServerBug `json:"mdevIssues"`
IsCheckedForMDevIssues bool `json:"isCheckedForMdevIssues"`
IsInSlowQueryCapture bool
Expand Down Expand Up @@ -426,7 +427,7 @@ func (server *ServerMonitor) Ping(wg *sync.WaitGroup) {
}
if cluster.GetMaster() != nil && server.URL == cluster.GetMaster().URL && server.GetCluster().GetTopology() != topoUnknown {
server.FailSuspectHeartbeat = cluster.StateMachine.GetHeartbeats()
if cluster.GetMaster().FailCount <= cluster.Conf.MaxFail {
if cluster.GetMaster() != nil && cluster.GetMaster().FailCount <= cluster.Conf.MaxFail {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, "INFO", "Master Failure detected! Retry %d/%d", cluster.GetMaster().FailCount, cluster.Conf.MaxFail)
}
if server.FailCount >= cluster.Conf.MaxFail {
Expand Down
Loading

0 comments on commit 779e724

Please sign in to comment.