diff --git a/Makefile b/Makefile index 580c859b..7a73f2c1 100644 --- a/Makefile +++ b/Makefile @@ -22,9 +22,23 @@ GO ?= go .PHONY: verify verify: check-go-version tidy-check lint test build validate-actions patch-coverage + @$(MAKE) --no-print-directory write-verify-sentinel @echo "" @echo "==> verify: all checks passed" +# Record that verify passed against the current working-tree diff. The +# pre-commit review gate (~/.claude/hooks/review-gate.sh) reads this +# sentinel and must find the same diff hash it computes, otherwise it +# blocks the commit. The hash must match the gate's: +# { git diff --cached HEAD; git diff; } | shasum -a 256 | cut -c1-16 +# .claude is gitignored, so writing the sentinel does not alter the diff. +.PHONY: write-verify-sentinel +write-verify-sentinel: + @if git rev-parse --git-dir >/dev/null 2>&1; then \ + mkdir -p .claude; \ + { git diff --cached HEAD; git diff; } | shasum -a 256 | cut -c1-16 > .claude/.last-verify-passed; \ + fi + .PHONY: check-go-version check-go-version: @have=$$($(GO) env GOVERSION | sed 's/^go//'); \ diff --git a/pkg/fwdservice/fwdservice.go b/pkg/fwdservice/fwdservice.go index 44210901..1699533e 100644 --- a/pkg/fwdservice/fwdservice.go +++ b/pkg/fwdservice/fwdservice.go @@ -375,12 +375,15 @@ func (svcFwd *ServiceFWD) podStillEligible(podName string, k8sPods []v1.Pod) boo return false } -// findKeyToKeep finds a forward key for a pod that is still eligible -func (svcFwd *ServiceFWD) findKeyToKeep(k8sPods []v1.Pod) string { +// findPodNameToKeep finds the name of a currently-forwarded pod that is still +// eligible. For a normal service we forward exactly one pod (all of its ports), +// so this identifies which pod we should keep forwarding to. Returns "" if none +// of the currently-forwarded pods are still eligible. +func (svcFwd *ServiceFWD) findPodNameToKeep(k8sPods []v1.Pod) string { forwards := svcFwd.getForwardInfos() for _, fwd := range forwards { if svcFwd.podStillEligible(fwd.podName, k8sPods) { - return fwd.key + return fwd.podName } } return "" @@ -392,21 +395,46 @@ func (svcFwd *ServiceFWD) syncHeadlessService(k8sPods []v1.Pod) { svcFwd.LoopPodsToForward(k8sPods, true) } -// syncNormalService syncs forwards for a normal (non-headless) service +// syncNormalService syncs forwards for a normal (non-headless) service. +// +// A normal service forwards a single pod, but that pod may expose multiple +// ports, each tracked as a separate entry in PortForwards (key: +// "service.podname.localport"). We therefore reason in terms of the pod NAME to +// keep, not a single forward key: all forwards belonging to the kept pod must be +// preserved, and forwards belonging to any other pod removed. +// +// Crucially, we always (re)invoke LoopPodsToForward for the kept pod so that any +// of its ports that are not currently forwarded get re-established. +// LoopPodsToForward skips ports that already exist, so this is a no-op for a +// fully-healthy pod, but it recovers a multi-port service when a single port's +// connection was reset and torn down independently (e.g. the TCP RST behavior of +// kubernetes/kubernetes#111825). Without this, the surviving port's map entry +// caused the dead port (and the service's /etc/hosts entries) to never be +// restored, leaving the service in an unrecoverable zombie state (issue #509). func (svcFwd *ServiceFWD) syncNormalService(k8sPods []v1.Pod) { - keyToKeep := svcFwd.findKeyToKeep(k8sPods) + podNameToKeep := svcFwd.findPodNameToKeep(k8sPods) - // Remove forwards for pods we're not keeping + // Remove forwards belonging to any pod other than the one we're keeping. forwards := svcFwd.getForwardInfos() for _, fwd := range forwards { - if fwd.key != keyToKeep { + if fwd.podName != podNameToKeep { svcFwd.RemoveServicePod(fwd.key) } } - // Start new forward if needed - if keyToKeep == "" { + if podNameToKeep == "" { + // No good pod is currently forwarded - start forwarding the first eligible pod. svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false) + return + } + + // Already forwarding a good pod. Ensure ALL of its ports are forwarded, + // re-establishing any that were torn down independently. + for i := range k8sPods { + if k8sPods[i].Name == podNameToKeep { + svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[i]}, false) + return + } } } diff --git a/pkg/fwdservice/fwdservice_test.go b/pkg/fwdservice/fwdservice_test.go index 95e8fe8f..ac442180 100644 --- a/pkg/fwdservice/fwdservice_test.go +++ b/pkg/fwdservice/fwdservice_test.go @@ -722,6 +722,171 @@ func TestSyncPodForwards_RemovesStoppedPods_FullKeyFormat(t *testing.T) { } } +// TestSyncPodForwards_MultiPort_RestoresDeadPort is a regression test for issue #509. +// +// A multi-port normal service can lose one port's forward independently (e.g. the +// TCP RST behavior of kubernetes/kubernetes#111825 tears down a single port's +// listener). Auto-reconnect re-runs SyncPodForwards, which must re-establish the +// missing port while keeping the surviving one. Previously syncNormalService kept +// only a single forward KEY for the pod and skipped LoopPodsToForward entirely +// when any forward existed, so the dead port was never recreated and the service +// was stuck in an unrecoverable zombie state. +// +//goland:noinspection DuplicatedCode +func TestSyncPodForwards_MultiPort_RestoresDeadPort(t *testing.T) { + cleanup := setupMockInterface() + defer cleanup() + + restClient, restCleanup := setupMockRESTClient() + defer restCleanup() + + namespace := "default" + labels := map[string]string{"app": "test"} + + runningPod := createTestPod("running-pod", namespace, v1.PodRunning, labels) + clientset := fake.NewClientset(runningPod) + + // Two-port service: port A (80) and port B (9100). + svc := createTestService("test-svc", namespace, []v1.ServicePort{ + {Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: v1.ProtocolTCP}, + {Port: 9100, TargetPort: intstr.FromInt32(9100), Protocol: v1.ProtocolTCP}, + }, false) + + hosts, err := txeh.NewHosts(&txeh.HostsConfig{}) + if err != nil { + t.Fatalf("Failed to create txeh.Hosts: %v", err) + } + hostFile := &fwdport.HostFileWithLock{Hosts: hosts} + + debouncer := &mockDebouncer{immediate: true} + + svcFwd := &ServiceFWD{ + ClientSet: clientset, + Svc: svc, + PodLabelSelector: "app=test", + Headless: false, + Context: "test-context", + Namespace: namespace, + PortForwards: make(map[string]*fwdport.PortForwardOpts), + NamespaceServiceLock: &sync.Mutex{}, + Hostfile: hostFile, + SyncDebouncer: debouncer.debounce, + LastSyncedAt: time.Now().Add(-10 * time.Minute), + RESTClient: restClient, + } + + // Simulate the state after port A (80) was reset and cleaned up: only port B + // (9100) remains in the map, pointing at a still-eligible pod. + survivingPort := &fwdport.PortForwardOpts{ + PodName: "running-pod", + Service: "test-svc", + LocalPort: "9100", + Namespace: namespace, + Context: "test-context", + ManualStopChan: make(chan struct{}), + DoneChan: make(chan struct{}), + } + svcFwd.PortForwards["test-svc.running-pod.9100"] = survivingPort + + // Auto-reconnect re-runs the sync. + svcFwd.SyncPodForwards(true) + + // Give goroutines time to register the recreated forward. + time.Sleep(200 * time.Millisecond) + + svcFwd.NamespaceServiceLock.Lock() + _, hasPortA := svcFwd.PortForwards["test-svc.running-pod.80"] + _, hasPortB := svcFwd.PortForwards["test-svc.running-pod.9100"] + count := len(svcFwd.PortForwards) + svcFwd.NamespaceServiceLock.Unlock() + + if !hasPortB { + t.Error("Surviving port 9100 should have been kept, but it was removed") + } + if !hasPortA { + t.Error("Dead port 80 should have been re-established by SyncPodForwards, but it is missing. " + + "This is a regression of issue #509 (multi-port service stuck in zombie state after a single port's RST).") + } + if count != 2 { + t.Errorf("Expected 2 forwards (both ports of the pod), got %d", count) + } +} + +// TestSyncPodForwards_MultiPort_HealthyResyncKeepsAllPorts verifies that a routine +// resync of a healthy multi-port service does not drop ports. Regression test for +// the broader bug behind issue #509: syncNormalService used to keep only a single +// forward key and remove the rest, so a multi-port service would degrade to one +// port on the first forced resync. +// +//goland:noinspection DuplicatedCode +func TestSyncPodForwards_MultiPort_HealthyResyncKeepsAllPorts(t *testing.T) { + cleanup := setupMockInterface() + defer cleanup() + + restClient, restCleanup := setupMockRESTClient() + defer restCleanup() + + namespace := "default" + labels := map[string]string{"app": "test"} + + runningPod := createTestPod("running-pod", namespace, v1.PodRunning, labels) + clientset := fake.NewClientset(runningPod) + + svc := createTestService("test-svc", namespace, []v1.ServicePort{ + {Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: v1.ProtocolTCP}, + {Port: 9100, TargetPort: intstr.FromInt32(9100), Protocol: v1.ProtocolTCP}, + }, false) + + hosts, err := txeh.NewHosts(&txeh.HostsConfig{}) + if err != nil { + t.Fatalf("Failed to create txeh.Hosts: %v", err) + } + hostFile := &fwdport.HostFileWithLock{Hosts: hosts} + + debouncer := &mockDebouncer{immediate: true} + + svcFwd := &ServiceFWD{ + ClientSet: clientset, + Svc: svc, + PodLabelSelector: "app=test", + Headless: false, + Context: "test-context", + Namespace: namespace, + PortForwards: make(map[string]*fwdport.PortForwardOpts), + NamespaceServiceLock: &sync.Mutex{}, + Hostfile: hostFile, + SyncDebouncer: debouncer.debounce, + LastSyncedAt: time.Now().Add(-10 * time.Minute), + RESTClient: restClient, + } + + // Both ports are currently healthy and forwarded. + for _, lp := range []string{"80", "9100"} { + svcFwd.PortForwards["test-svc.running-pod."+lp] = &fwdport.PortForwardOpts{ + PodName: "running-pod", + Service: "test-svc", + LocalPort: lp, + Namespace: namespace, + Context: "test-context", + ManualStopChan: make(chan struct{}), + DoneChan: make(chan struct{}), + } + } + + // A forced resync (or the 5-minute periodic resync) must not drop a port. + svcFwd.SyncPodForwards(true) + + time.Sleep(200 * time.Millisecond) + + svcFwd.NamespaceServiceLock.Lock() + count := len(svcFwd.PortForwards) + svcFwd.NamespaceServiceLock.Unlock() + + if count != 2 { + t.Errorf("Healthy multi-port service should retain both ports after resync, got %d", count) + } +} + // TestAddServicePod tests adding a pod to the PortForwards map func TestAddServicePod(t *testing.T) { svcFwd := &ServiceFWD{ diff --git a/test/integration/multiport_reconnect_test.go b/test/integration/multiport_reconnect_test.go new file mode 100644 index 00000000..e6f6bb0e --- /dev/null +++ b/test/integration/multiport_reconnect_test.go @@ -0,0 +1,148 @@ +//go:build integration +// +build integration + +package integration + +import ( + "fmt" + "io" + "net/http" + "regexp" + "strings" + "testing" + "time" +) + +// apiBase is the kubefwd REST API base URL. The API server binds a +// dedicated loopback IP (see pkg/fwdapi/manager.go: APIIP/APIPort) and is +// enabled with --api. +const apiBase = "http://127.2.27.1/api" + +// TestMultiPortReconnect is the end-to-end regression test for issue #509. +// +// A multi-port normal service must keep ALL of its ports forwarded across a +// resync. Previously syncNormalService kept a single forward key and dropped +// the rest, so a resync (the path auto-reconnect drives after a single port's +// connection is reset) left the service unable to recover the dropped port and +// removed its shared /etc/hosts entry. +// +// Reproducing the exact upstream RST trigger (kubernetes/kubernetes#111825) is +// not deterministic, so this test drives the same code path deterministically: +// it forces a resync via the REST API (POST /v1/services/:key/sync?force=true, +// which calls SyncPodForwards(true)) and asserts every port still serves +// afterwards. On the pre-fix code this forced sync stopped one port and removed +// the `multiport` hostname, breaking both ports. +// +//goland:noinspection DuplicatedCode +func TestMultiPortReconnect(t *testing.T) { + requiresSudo(t) + requiresKindCluster(t) + + // Use a known API key so the test can authenticate. startKubefwd copies + // os.Environ() into the child process, so set it before starting. + const apiKey = "kubefwd-integration-test-key" + t.Setenv("KUBEFWD_API_KEY", apiKey) + + // Start kubefwd with auto-reconnect and the REST API enabled. + cmd := startKubefwd(t, "svc", "-n", "test-multiport", "-a", "--api", "-v") + defer stopKubefwd(t, cmd) + + t.Log("Waiting for initial forwarding to stabilize...") + time.Sleep(10 * time.Second) + + // Both ports must serve before we do anything. + assertPortServes(t, "http://multiport:80/", "ok-80", "before resync") + assertPortServes(t, "http://multiport:8080/", "ok-8080", "before resync") + t.Log("✓ Both ports serving before resync") + + // Discover the registry key for the multiport service via the API. + key := discoverServiceKey(t, apiKey, "multiport.test-multiport.") + t.Logf("Service key: %s", key) + + // Force a resync — the exact code path auto-reconnect drives. On pre-fix + // code this drops one port and removes the shared hostname. + t.Log("Forcing resync via API...") + forceSync(t, apiKey, key) + + // Give the async SyncPodForwards time to run. + time.Sleep(8 * time.Second) + + // The crux of #509: after the resync BOTH ports must still serve. + assertPortServes(t, "http://multiport:80/", "ok-80", "after resync (#509)") + assertPortServes(t, "http://multiport:8080/", "ok-8080", "after resync (#509)") + t.Log("✓ Both ports still serving after resync — #509 fixed") +} + +// assertPortServes fails the test if the URL does not return HTTP 200 with the +// expected body substring. A removed /etc/hosts entry surfaces here as a DNS +// lookup failure. +func assertPortServes(t *testing.T, url, wantBody, phase string) { + t.Helper() + + resp, err := httpGet(url, 5, 1*time.Second) + if err != nil { + t.Fatalf("[%s] %s did not respond: %v", phase, url, err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + t.Fatalf("[%s] %s returned HTTP %d (expected 200)", phase, url, resp.StatusCode) + } + if !strings.Contains(string(body), wantBody) { + t.Fatalf("[%s] %s body %q does not contain %q", phase, url, string(body), wantBody) + } +} + +// discoverServiceKey queries the API service list and returns the first +// registry key with the given prefix. +func discoverServiceKey(t *testing.T, apiKey, prefix string) string { + t.Helper() + + req, err := http.NewRequest(http.MethodGet, apiBase+"/v1/services", nil) + if err != nil { + t.Fatalf("failed to build services request: %v", err) + } + req.Header.Set("Authorization", "Bearer "+apiKey) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("failed to list services via API: %v", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + t.Fatalf("services list returned HTTP %d: %s", resp.StatusCode, string(body)) + } + + re := regexp.MustCompile(regexp.QuoteMeta(prefix) + `[A-Za-z0-9._-]+`) + key := re.FindString(string(body)) + if key == "" { + t.Fatalf("no service key with prefix %q found in API response: %s", prefix, string(body)) + } + return key +} + +// forceSync issues a forced resync for the given service key. +func forceSync(t *testing.T, apiKey, key string) { + t.Helper() + + url := fmt.Sprintf("%s/v1/services/%s/sync?force=true", apiBase, key) + req, err := http.NewRequest(http.MethodPost, url, nil) + if err != nil { + t.Fatalf("failed to build sync request: %v", err) + } + req.Header.Set("Authorization", "Bearer "+apiKey) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("failed to call sync endpoint: %v", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + t.Fatalf("sync endpoint returned HTTP %d: %s", resp.StatusCode, string(body)) + } +} diff --git a/test/manifests/multiport-service.yaml b/test/manifests/multiport-service.yaml new file mode 100644 index 00000000..95e74d6a --- /dev/null +++ b/test/manifests/multiport-service.yaml @@ -0,0 +1,80 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: test-multiport +--- +# Single nginx serving two ports (80 and 8080) so one pod backs a +# multi-port service. Used by the #509 reconnect regression test, which +# verifies a forced resync keeps every port of the service forwarded. +apiVersion: v1 +kind: ConfigMap +metadata: + name: multiport-nginx + namespace: test-multiport +data: + nginx.conf: | + events {} + http { + server { + listen 80; + location / { return 200 'ok-80\n'; } + } + server { + listen 8080; + location / { return 200 'ok-8080\n'; } + } + } +--- +apiVersion: v1 +kind: Service +metadata: + name: multiport + namespace: test-multiport +spec: + selector: + app: multiport + ports: + - name: http + port: 80 + targetPort: 80 + - name: http-alt + port: 8080 + targetPort: 8080 + type: ClusterIP +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: multiport + namespace: test-multiport +spec: + replicas: 1 + selector: + matchLabels: + app: multiport + template: + metadata: + labels: + app: multiport + spec: + containers: + - name: nginx + image: nginx:alpine + ports: + - containerPort: 80 + - containerPort: 8080 + volumeMounts: + - name: conf + mountPath: /etc/nginx/nginx.conf + subPath: nginx.conf + resources: + requests: + cpu: 50m + memory: 16Mi + limits: + cpu: 100m + memory: 32Mi + volumes: + - name: conf + configMap: + name: multiport-nginx