-
Notifications
You must be signed in to change notification settings - Fork 12
Enable DNS communication between redis #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 8 commits
8a685d2
6b52b39
fa6dcc5
e2db1db
5aff807
8af906d
1d1f024
e848855
8d316df
cd4e5cb
2af1e1f
0dd60b7
5586c8a
af0a0f3
ac50cff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ spec: | |
| memory: 100Mi | ||
| redis: | ||
| replicas: 3 | ||
| headless: true | ||
| resources: | ||
| requests: | ||
| cpu: 100m | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ import ( | |
| "errors" | ||
| "fmt" | ||
| "strconv" | ||
| "strings" | ||
| "time" | ||
|
|
||
| appsv1 "k8s.io/api/apps/v1" | ||
|
|
@@ -40,6 +41,7 @@ type RedisFailoverCheck interface { | |
| IsRedisRunning(rFailover *redisfailoverv1.RedisFailover) bool | ||
| IsSentinelRunning(rFailover *redisfailoverv1.RedisFailover) bool | ||
| IsClusterRunning(rFailover *redisfailoverv1.RedisFailover) bool | ||
| GetRedisesPods(rFailover *redisfailoverv1.RedisFailover) (*corev1.PodList, error) | ||
| } | ||
|
|
||
| // RedisFailoverChecker is our implementation of RedisFailoverCheck interface | ||
|
|
@@ -156,7 +158,8 @@ func (r *RedisFailoverChecker) CheckAllSlavesFromMaster(master string, rf *redis | |
|
|
||
| rport := getRedisPort(rf.Spec.Redis.Port) | ||
| for _, rp := range rps.Items { | ||
| if rp.Status.PodIP == master { | ||
| podAddress := GetPodAddress(&rp, rf) | ||
| if podAddress == master { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we have this is a enum or static variable rather than a dynamic string?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| err = r.setMasterLabelIfNecessary(rf.Namespace, rp) | ||
| if err != nil { | ||
| return err | ||
|
|
@@ -176,13 +179,27 @@ func (r *RedisFailoverChecker) CheckAllSlavesFromMaster(master string, rf *redis | |
| } | ||
| } | ||
|
|
||
| slave, err := r.redisClient.GetSlaveOf(rp.Status.PodIP, rport, password) | ||
| slave, err := r.redisClient.GetSlaveOf(podAddress, rport, password) | ||
| if err != nil { | ||
| r.logger.Errorf("Get slave of master failed, maybe this node is not ready, pod ip: %s", rp.Status.PodIP) | ||
| r.logger.Errorf("Get slave of master failed, maybe this node is not ready, pod address: %s", podAddress) | ||
|
prashanna-frsh marked this conversation as resolved.
Outdated
|
||
| return err | ||
| } | ||
| // Compare master with what Redis returns | ||
| // Redis may return either DNS name or IP depending on how it was configured | ||
| if slave != "" && slave != master { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: same as above comment |
||
| return fmt.Errorf("slave %s don't have the master %s, has %s", rp.Status.PodIP, master, slave) | ||
| // If they don't match directly, try resolving both to IPs for comparison | ||
| // (in case one is DNS and the other is IP, but they refer to the same pod) | ||
| masterIP := GetPodIPFromAddress(master, rf, rps) | ||
| slaveIP := GetPodIPFromAddress(slave, rf, rps) | ||
| if slaveIP != masterIP { | ||
| return fmt.Errorf("slave %s don't have the master %s, has %s", podAddress, master, slave) | ||
| } | ||
| // They resolve to the same IP, but formats differ | ||
| // If headless is enabled and master is DNS name but slave is IP, prefer DNS name | ||
| if rf.Spec.Redis.Headless && strings.Contains(master, ".svc.cluster.local") && !strings.Contains(slave, ".svc.cluster.local") { | ||
| // Master is DNS name, slave is IP - reconfigure to use DNS name | ||
| return fmt.Errorf("slave %s configured with IP %s but should use DNS name %s for headless mode stability", podAddress, slave, master) | ||
| } | ||
| } | ||
| } | ||
| return nil | ||
|
|
@@ -221,12 +238,13 @@ func (r *RedisFailoverChecker) CheckIfMasterLocalhost(rFailover *redisfailoverv1 | |
| for _, sip := range redisIps { | ||
| master, err := r.redisClient.GetSlaveOf(sip, rport, password) | ||
| if err != nil { | ||
| r.logger.Warningf("CheckIfMasterLocalhost -- GetSlaveOf Failed") | ||
| r.logger.Warningf("CheckIfMasterLocalhost -- GetSlaveOf Failed for address %s", sip) | ||
| return false, err | ||
| } else if master == "" { | ||
| r.logger.Warningf("CheckIfMasterLocalhost -- Master already available ?? check manually") | ||
| return false, errors.New("unexpected master state, fix manually") | ||
| } else { | ||
| // Check if master is localhost (127.0.0.1) - this check works for both IP and DNS | ||
| if master == "127.0.0.1" { | ||
| lhmaster++ | ||
| } | ||
|
|
@@ -312,12 +330,22 @@ func (r *RedisFailoverChecker) CheckSentinelMonitor(sentinel, masterName string, | |
| } | ||
|
|
||
| // GetMasterIP connects to all redis and returns the master of the redis failover | ||
| // When headless is enabled, returns DNS name if the master pod is Ready, otherwise returns IP | ||
| func (r *RedisFailoverChecker) GetMasterIP(rf *redisfailoverv1.RedisFailover) (string, error) { | ||
| rips, err := r.GetRedisesIPs(rf) | ||
| // Get pods first to avoid duplicate calls (GetRedisesIPs also calls GetStatefulSetPods) | ||
| pods, err := r.k8sService.GetStatefulSetPods(rf.Namespace, GetRedisName(rf)) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| // Build list of Redis addresses from pods (same logic as GetRedisesIPs) | ||
| rips := []string{} | ||
| for _, rp := range pods.Items { | ||
| if rp.Status.Phase == corev1.PodRunning && rp.DeletionTimestamp == nil { // Only work with running pods | ||
| rips = append(rips, GetPodAddress(&rp, rf)) | ||
| } | ||
| } | ||
|
|
||
| password, err := k8s.GetRedisPassword(r.k8sService, rf) | ||
| if err != nil { | ||
| return "", err | ||
|
|
@@ -328,11 +356,30 @@ func (r *RedisFailoverChecker) GetMasterIP(rf *redisfailoverv1.RedisFailover) (s | |
| for _, rip := range rips { | ||
| master, err := r.redisClient.IsMaster(rip, rport, password) | ||
| if err != nil { | ||
| r.logger.Errorf("Get redis info failed, maybe this node is not ready, pod ip: %s", rip) | ||
| r.logger.Errorf("Get redis info failed, maybe this node is not ready, pod address: %s", rip) | ||
| continue | ||
| } | ||
| if master { | ||
| masters = append(masters, rip) | ||
| // If rip is already a DNS name, use it directly | ||
| // Otherwise, find the pod and get its DNS name if headless is enabled | ||
| if strings.Contains(rip, ".svc.cluster.local") { | ||
| masters = append(masters, rip) | ||
| } else { | ||
| // rip is an IP, find the matching pod and get DNS name if headless is enabled | ||
| foundPod := false | ||
| for _, pod := range pods.Items { | ||
| if pod.Status.PodIP == rip { | ||
| masterAddress := GetPodAddress(&pod, rf) | ||
| masters = append(masters, masterAddress) | ||
| foundPod = true | ||
| break | ||
| } | ||
| } | ||
| // If we didn't find a matching pod, use the IP as fallback | ||
| if !foundPod { | ||
| masters = append(masters, rip) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -361,7 +408,7 @@ func (r *RedisFailoverChecker) GetNumberMasters(rf *redisfailoverv1.RedisFailove | |
| for _, rip := range rips { | ||
| master, err := r.redisClient.IsMaster(rip, rport, password) | ||
| if err != nil { | ||
| r.logger.Errorf("Get redis info failed, maybe this node is not ready, pod ip: %s", rip) | ||
| r.logger.Errorf("Get redis info failed, maybe this node is not ready, pod address: %s", rip) | ||
| continue | ||
| } | ||
| if master { | ||
|
|
@@ -371,7 +418,7 @@ func (r *RedisFailoverChecker) GetNumberMasters(rf *redisfailoverv1.RedisFailove | |
| return nMasters, nil | ||
| } | ||
|
|
||
| // GetRedisesIPs returns the IPs of the Redis nodes | ||
| // GetRedisesIPs returns the addresses (IPs or DNS names) of the Redis nodes | ||
| func (r *RedisFailoverChecker) GetRedisesIPs(rf *redisfailoverv1.RedisFailover) ([]string, error) { | ||
| redises := []string{} | ||
| rps, err := r.k8sService.GetStatefulSetPods(rf.Namespace, GetRedisName(rf)) | ||
|
|
@@ -380,7 +427,7 @@ func (r *RedisFailoverChecker) GetRedisesIPs(rf *redisfailoverv1.RedisFailover) | |
| } | ||
| for _, rp := range rps.Items { | ||
| if rp.Status.Phase == corev1.PodRunning && rp.DeletionTimestamp == nil { // Only work with running pods | ||
| redises = append(redises, rp.Status.PodIP) | ||
| redises = append(redises, GetPodAddress(&rp, rf)) | ||
| } | ||
| } | ||
| return redises, nil | ||
|
|
@@ -414,7 +461,8 @@ func (r *RedisFailoverChecker) GetMaxRedisPodTime(rf *redisfailoverv1.RedisFailo | |
| } | ||
| start := redisNode.Status.StartTime.Round(time.Second) | ||
| alive := time.Since(start) | ||
| r.logger.Debugf("Pod %s has been alive for %.f seconds", redisNode.Status.PodIP, alive.Seconds()) | ||
| podAddress := GetPodAddress(&redisNode, rf) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we are doing getPodAddress() call at many places and doing computation each and every time. Is it possible to do it in a base funtion and pass it around? |
||
| r.logger.Debugf("Pod %s (address: %s) has been alive for %.f seconds", redisNode.Name, podAddress, alive.Seconds()) | ||
| if alive > maxTime { | ||
| maxTime = alive | ||
| } | ||
|
|
@@ -438,7 +486,8 @@ func (r *RedisFailoverChecker) GetRedisesSlavesPods(rf *redisfailoverv1.RedisFai | |
| rport := getRedisPort(rf.Spec.Redis.Port) | ||
| for _, rp := range rps.Items { | ||
| if rp.Status.Phase == corev1.PodRunning && rp.DeletionTimestamp == nil { // Only work with running | ||
| master, err := r.redisClient.IsMaster(rp.Status.PodIP, rport, password) | ||
| podAddress := GetPodAddress(&rp, rf) | ||
| master, err := r.redisClient.IsMaster(podAddress, rport, password) | ||
| if err != nil { | ||
| return []string{}, err | ||
| } | ||
|
|
@@ -465,7 +514,8 @@ func (r *RedisFailoverChecker) GetRedisesMasterPod(rFailover *redisfailoverv1.Re | |
| rport := getRedisPort(rFailover.Spec.Redis.Port) | ||
| for _, rp := range rps.Items { | ||
| if rp.Status.Phase == corev1.PodRunning && rp.DeletionTimestamp == nil { // Only work with running | ||
| master, err := r.redisClient.IsMaster(rp.Status.PodIP, rport, password) | ||
| podAddress := GetPodAddress(&rp, rFailover) | ||
|
prashanna-frsh marked this conversation as resolved.
|
||
| master, err := r.redisClient.IsMaster(podAddress, rport, password) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
@@ -540,6 +590,11 @@ func (r *RedisFailoverChecker) IsClusterRunning(rFailover *redisfailoverv1.Redis | |
| return r.IsSentinelRunning(rFailover) && r.IsRedisRunning(rFailover) | ||
| } | ||
|
|
||
| // GetRedisesPods returns the PodList of Redis pods | ||
| func (r *RedisFailoverChecker) GetRedisesPods(rFailover *redisfailoverv1.RedisFailover) (*corev1.PodList, error) { | ||
| return r.k8sService.GetStatefulSetPods(rFailover.Namespace, GetRedisName(rFailover)) | ||
| } | ||
|
|
||
| func getRedisPort(p int32) string { | ||
| return strconv.Itoa(int(p)) | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.