Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,23 @@ GO ?= go

.PHONY: verify
verify: check-go-version tidy-check lint test build validate-actions patch-coverage
@$(MAKE) --no-print-directory write-verify-sentinel
@echo ""
@echo "==> verify: all checks passed"

# Record that verify passed against the current working-tree diff. The
# pre-commit review gate (~/.claude/hooks/review-gate.sh) reads this
# sentinel and must find the same diff hash it computes, otherwise it
# blocks the commit. The hash must match the gate's:
# { git diff --cached HEAD; git diff; } | shasum -a 256 | cut -c1-16
# .claude is gitignored, so writing the sentinel does not alter the diff.
.PHONY: write-verify-sentinel
write-verify-sentinel:
@if git rev-parse --git-dir >/dev/null 2>&1; then \
mkdir -p .claude; \
{ git diff --cached HEAD; git diff; } | shasum -a 256 | cut -c1-16 > .claude/.last-verify-passed; \
fi

.PHONY: check-go-version
check-go-version:
@have=$$($(GO) env GOVERSION | sed 's/^go//'); \
Expand Down
46 changes: 37 additions & 9 deletions pkg/fwdservice/fwdservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,15 @@ func (svcFwd *ServiceFWD) podStillEligible(podName string, k8sPods []v1.Pod) boo
return false
}

// findKeyToKeep finds a forward key for a pod that is still eligible
func (svcFwd *ServiceFWD) findKeyToKeep(k8sPods []v1.Pod) string {
// findPodNameToKeep finds the name of a currently-forwarded pod that is still
// eligible. For a normal service we forward exactly one pod (all of its ports),
// so this identifies which pod we should keep forwarding to. Returns "" if none
// of the currently-forwarded pods are still eligible.
func (svcFwd *ServiceFWD) findPodNameToKeep(k8sPods []v1.Pod) string {
forwards := svcFwd.getForwardInfos()
for _, fwd := range forwards {
if svcFwd.podStillEligible(fwd.podName, k8sPods) {
return fwd.key
return fwd.podName
}
}
return ""
Expand All @@ -392,21 +395,46 @@ func (svcFwd *ServiceFWD) syncHeadlessService(k8sPods []v1.Pod) {
svcFwd.LoopPodsToForward(k8sPods, true)
}

// syncNormalService syncs forwards for a normal (non-headless) service
// syncNormalService syncs forwards for a normal (non-headless) service.
//
// A normal service forwards a single pod, but that pod may expose multiple
// ports, each tracked as a separate entry in PortForwards (key:
// "service.podname.localport"). We therefore reason in terms of the pod NAME to
// keep, not a single forward key: all forwards belonging to the kept pod must be
// preserved, and forwards belonging to any other pod removed.
//
// Crucially, we always (re)invoke LoopPodsToForward for the kept pod so that any
// of its ports that are not currently forwarded get re-established.
// LoopPodsToForward skips ports that already exist, so this is a no-op for a
// fully-healthy pod, but it recovers a multi-port service when a single port's
// connection was reset and torn down independently (e.g. the TCP RST behavior of
// kubernetes/kubernetes#111825). Without this, the surviving port's map entry
// caused the dead port (and the service's /etc/hosts entries) to never be
// restored, leaving the service in an unrecoverable zombie state (issue #509).
func (svcFwd *ServiceFWD) syncNormalService(k8sPods []v1.Pod) {
keyToKeep := svcFwd.findKeyToKeep(k8sPods)
podNameToKeep := svcFwd.findPodNameToKeep(k8sPods)

// Remove forwards for pods we're not keeping
// Remove forwards belonging to any pod other than the one we're keeping.
forwards := svcFwd.getForwardInfos()
for _, fwd := range forwards {
if fwd.key != keyToKeep {
if fwd.podName != podNameToKeep {
svcFwd.RemoveServicePod(fwd.key)
}
}

// Start new forward if needed
if keyToKeep == "" {
if podNameToKeep == "" {
// No good pod is currently forwarded - start forwarding the first eligible pod.
svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false)
return
}

// Already forwarding a good pod. Ensure ALL of its ports are forwarded,
// re-establishing any that were torn down independently.
for i := range k8sPods {
if k8sPods[i].Name == podNameToKeep {
svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[i]}, false)
return
}
}
}

Expand Down
165 changes: 165 additions & 0 deletions pkg/fwdservice/fwdservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,171 @@ func TestSyncPodForwards_RemovesStoppedPods_FullKeyFormat(t *testing.T) {
}
}

// TestSyncPodForwards_MultiPort_RestoresDeadPort is a regression test for issue #509.
//
// A multi-port normal service can lose one port's forward independently (e.g. the
// TCP RST behavior of kubernetes/kubernetes#111825 tears down a single port's
// listener). Auto-reconnect re-runs SyncPodForwards, which must re-establish the
// missing port while keeping the surviving one. Previously syncNormalService kept
// only a single forward KEY for the pod and skipped LoopPodsToForward entirely
// when any forward existed, so the dead port was never recreated and the service
// was stuck in an unrecoverable zombie state.
//
//goland:noinspection DuplicatedCode
func TestSyncPodForwards_MultiPort_RestoresDeadPort(t *testing.T) {
cleanup := setupMockInterface()
defer cleanup()

restClient, restCleanup := setupMockRESTClient()
defer restCleanup()

namespace := "default"
labels := map[string]string{"app": "test"}

runningPod := createTestPod("running-pod", namespace, v1.PodRunning, labels)
clientset := fake.NewClientset(runningPod)

// Two-port service: port A (80) and port B (9100).
svc := createTestService("test-svc", namespace, []v1.ServicePort{
{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: v1.ProtocolTCP},
{Port: 9100, TargetPort: intstr.FromInt32(9100), Protocol: v1.ProtocolTCP},
}, false)

hosts, err := txeh.NewHosts(&txeh.HostsConfig{})
if err != nil {
t.Fatalf("Failed to create txeh.Hosts: %v", err)
}
hostFile := &fwdport.HostFileWithLock{Hosts: hosts}

debouncer := &mockDebouncer{immediate: true}

svcFwd := &ServiceFWD{
ClientSet: clientset,
Svc: svc,
PodLabelSelector: "app=test",
Headless: false,
Context: "test-context",
Namespace: namespace,
PortForwards: make(map[string]*fwdport.PortForwardOpts),
NamespaceServiceLock: &sync.Mutex{},
Hostfile: hostFile,
SyncDebouncer: debouncer.debounce,
LastSyncedAt: time.Now().Add(-10 * time.Minute),
RESTClient: restClient,
}

// Simulate the state after port A (80) was reset and cleaned up: only port B
// (9100) remains in the map, pointing at a still-eligible pod.
survivingPort := &fwdport.PortForwardOpts{
PodName: "running-pod",
Service: "test-svc",
LocalPort: "9100",
Namespace: namespace,
Context: "test-context",
ManualStopChan: make(chan struct{}),
DoneChan: make(chan struct{}),
}
svcFwd.PortForwards["test-svc.running-pod.9100"] = survivingPort

// Auto-reconnect re-runs the sync.
svcFwd.SyncPodForwards(true)

// Give goroutines time to register the recreated forward.
time.Sleep(200 * time.Millisecond)

svcFwd.NamespaceServiceLock.Lock()
_, hasPortA := svcFwd.PortForwards["test-svc.running-pod.80"]
_, hasPortB := svcFwd.PortForwards["test-svc.running-pod.9100"]
count := len(svcFwd.PortForwards)
svcFwd.NamespaceServiceLock.Unlock()

if !hasPortB {
t.Error("Surviving port 9100 should have been kept, but it was removed")
}
if !hasPortA {
t.Error("Dead port 80 should have been re-established by SyncPodForwards, but it is missing. " +
"This is a regression of issue #509 (multi-port service stuck in zombie state after a single port's RST).")
}
if count != 2 {
t.Errorf("Expected 2 forwards (both ports of the pod), got %d", count)
}
}

// TestSyncPodForwards_MultiPort_HealthyResyncKeepsAllPorts verifies that a routine
// resync of a healthy multi-port service does not drop ports. Regression test for
// the broader bug behind issue #509: syncNormalService used to keep only a single
// forward key and remove the rest, so a multi-port service would degrade to one
// port on the first forced resync.
//
//goland:noinspection DuplicatedCode
func TestSyncPodForwards_MultiPort_HealthyResyncKeepsAllPorts(t *testing.T) {
cleanup := setupMockInterface()
defer cleanup()

restClient, restCleanup := setupMockRESTClient()
defer restCleanup()

namespace := "default"
labels := map[string]string{"app": "test"}

runningPod := createTestPod("running-pod", namespace, v1.PodRunning, labels)
clientset := fake.NewClientset(runningPod)

svc := createTestService("test-svc", namespace, []v1.ServicePort{
{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: v1.ProtocolTCP},
{Port: 9100, TargetPort: intstr.FromInt32(9100), Protocol: v1.ProtocolTCP},
}, false)

hosts, err := txeh.NewHosts(&txeh.HostsConfig{})
if err != nil {
t.Fatalf("Failed to create txeh.Hosts: %v", err)
}
hostFile := &fwdport.HostFileWithLock{Hosts: hosts}

debouncer := &mockDebouncer{immediate: true}

svcFwd := &ServiceFWD{
ClientSet: clientset,
Svc: svc,
PodLabelSelector: "app=test",
Headless: false,
Context: "test-context",
Namespace: namespace,
PortForwards: make(map[string]*fwdport.PortForwardOpts),
NamespaceServiceLock: &sync.Mutex{},
Hostfile: hostFile,
SyncDebouncer: debouncer.debounce,
LastSyncedAt: time.Now().Add(-10 * time.Minute),
RESTClient: restClient,
}

// Both ports are currently healthy and forwarded.
for _, lp := range []string{"80", "9100"} {
svcFwd.PortForwards["test-svc.running-pod."+lp] = &fwdport.PortForwardOpts{
PodName: "running-pod",
Service: "test-svc",
LocalPort: lp,
Namespace: namespace,
Context: "test-context",
ManualStopChan: make(chan struct{}),
DoneChan: make(chan struct{}),
}
}

// A forced resync (or the 5-minute periodic resync) must not drop a port.
svcFwd.SyncPodForwards(true)

time.Sleep(200 * time.Millisecond)

svcFwd.NamespaceServiceLock.Lock()
count := len(svcFwd.PortForwards)
svcFwd.NamespaceServiceLock.Unlock()

if count != 2 {
t.Errorf("Healthy multi-port service should retain both ports after resync, got %d", count)
}
}

// TestAddServicePod tests adding a pod to the PortForwards map
func TestAddServicePod(t *testing.T) {
svcFwd := &ServiceFWD{
Expand Down
Loading