diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index da9e044..58954f1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,12 +11,12 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: "1.23" + go-version: "1.24" - - uses: dominikh/staticcheck-action@v1.3.1 + - uses: dominikh/staticcheck-action@v1.4.0 with: install-go: false - version: "2024.1" + version: "2025.1" test: runs-on: ${{ matrix.os }} @@ -29,7 +29,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: "1.23" + go-version: "1.24" - name: test run: sudo --preserve-env make test @@ -47,7 +47,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: "1.23" + go-version: "1.24" - name: Install protoc-gen-go run: | @@ -84,7 +84,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: "1.23" + go-version: "1.24" - name: e2e run: make test-e2e diff --git a/README.md b/README.md index 39357a8..ae92e69 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ the last TCP connection. While in scaled down state, it will listen on the same port the application inside the container was listening on and will restore the container on the first incoming connection. Depending on the memory size of the checkpointed program this happens in tens to a few hundred milliseconds, -virtually unnoticable to the user. As all the memory contents are stored to disk +virtually unnoticeable to the user. As all the memory contents are stored to disk during checkpointing, all state of the application is restored. [It adjusts resource requests](#in-place-resource-scaling) in scaled down state in-place if the cluster supports it. To prevent huge resource usage spikes when draining a @@ -283,7 +283,7 @@ zeropod.ctrox.dev/ports-map: "nginx=80,81;sidecar=8080" ### `zeropod.ctrox.dev/scaledown-duration` Configures how long to wait before scaling down again after the last -connnection. The duration is reset whenever a connection happens. Setting it to +connection. The duration is reset whenever a connection happens. Setting it to 0 disables scaling down. If unset it defaults to 1 minute. ```yaml @@ -354,6 +354,17 @@ to be enabled in the host kernel (`CONFIG_USERFAULTFD`). zeropod.ctrox.dev/live-migrate: "nginx" ``` +### `zeropod.ctrox.dev/disable-migrate-data` + +When migrating a pod (regardless of live or scaled down), data that has been +written to the containers file system will be copied over. This is done by +copying the upper layer of the container overlayfs. Enabled by default but can +be disabled with this annotation. + +```yaml +zeropod.ctrox.dev/disable-migrate-data: "true" +``` + ### `io.containerd.runc.v2.group` It's possible to reduce the resource usage further by grouping multiple pods diff --git a/api/node/v1/meta.go b/api/node/v1/meta.go index ef5c97a..7b49928 100644 --- a/api/node/v1/meta.go +++ b/api/node/v1/meta.go @@ -8,6 +8,7 @@ const ( SocketPath = runPath + "node.sock" imagesPath = varPath + "i/" SnapshotSuffix = "snapshot" + UpperSuffix = "upper" WorkDirSuffix = "work" MigrateAnnotationKey = "zeropod.ctrox.dev/migrate" LiveMigrateAnnotationKey = "zeropod.ctrox.dev/live-migrate" @@ -28,6 +29,10 @@ func SnapshotPath(id string) string { return filepath.Join(ImagePath(id), SnapshotSuffix) } +func UpperPath(id string) string { + return filepath.Join(ImagePath(id), SnapshotSuffix, UpperSuffix) +} + func LazyPagesSocket(id string) string { return filepath.Join(runPath, id+".sock") } diff --git a/cmd/freezer/Dockerfile b/cmd/freezer/Dockerfile index 46b1453..f4c0d46 100644 --- a/cmd/freezer/Dockerfile +++ b/cmd/freezer/Dockerfile @@ -10,6 +10,6 @@ COPY cmd/freezer cmd/freezer ARG TARGETARCH RUN CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH GO111MODULE=on go build -ldflags "-s -w" -a -o freezer cmd/freezer/main.go -FROM gcr.io/distroless/static-debian12 +FROM gcr.io/distroless/static-debian12:debug COPY --from=builder /workspace/freezer / ENTRYPOINT ["/freezer"] diff --git a/cmd/freezer/main.go b/cmd/freezer/main.go index fb54d27..330eb2c 100644 --- a/cmd/freezer/main.go +++ b/cmd/freezer/main.go @@ -1,13 +1,16 @@ package main import ( + "context" "encoding/json" "flag" "fmt" "io" "log/slog" "math/rand/v2" + "net" "net/http" + "os" "time" ) @@ -68,7 +71,16 @@ func main() { fmt.Fprintf(w, "%s", freezeJSON) }) - go http.ListenAndServe(":8080", nil) + lc := net.ListenConfig{} + // disable multipath TCP for now since it's not supported by CRIU + lc.SetMultipathTCP(false) + + ln, err := lc.Listen(context.Background(), "tcp", ":8080") + if err != nil { + slog.Error("tcp listen", "err", err) + os.Exit(1) + } + go http.Serve(ln, nil) for { since := time.Since(f.LastObservation) if since > time.Millisecond*50 { diff --git a/cmd/installer/main.go b/cmd/installer/main.go index bba81de..20ae07b 100644 --- a/cmd/installer/main.go +++ b/cmd/installer/main.go @@ -86,6 +86,7 @@ network-lock skip "zeropod.ctrox.dev/live-migrate", "zeropod.ctrox.dev/disable-probe-detection", "zeropod.ctrox.dev/probe-buffer-size", + "zeropod.ctrox.dev/disable-migrate-data", "io.containerd.runc.v2.group" ] @@ -108,6 +109,7 @@ network-lock skip "zeropod.ctrox.dev/live-migrate", "zeropod.ctrox.dev/disable-probe-detection", "zeropod.ctrox.dev/probe-buffer-size", + "zeropod.ctrox.dev/disable-migrate-data", "io.containerd.runc.v2.group" ] diff --git a/cmd/manager/Dockerfile b/cmd/manager/Dockerfile index 3a64e37..e60041a 100644 --- a/cmd/manager/Dockerfile +++ b/cmd/manager/Dockerfile @@ -1,5 +1,5 @@ ARG CRIU_IMAGE_NAME=ghcr.io/ctrox/zeropod-criu -ARG CRIU_VERSION=v4.0 +ARG CRIU_VERSION=v4.1 FROM --platform=$BUILDPLATFORM golang:1.24 AS builder diff --git a/e2e/Dockerfile b/e2e/Dockerfile index 0fcc6b4..06efe34 100644 --- a/e2e/Dockerfile +++ b/e2e/Dockerfile @@ -1,4 +1,4 @@ -FROM docker:28.0-cli +FROM docker:28.3.3-cli RUN apk add --update go make iptables WORKDIR /app diff --git a/e2e/migration_test.go b/e2e/migration_test.go index f241e41..d0356cb 100644 --- a/e2e/migration_test.go +++ b/e2e/migration_test.go @@ -2,7 +2,9 @@ package e2e import ( "context" + "fmt" "path" + "strings" "testing" "time" @@ -15,22 +17,23 @@ import ( corev1 "k8s.io/api/core/v1" ) +type testCase struct { + deploy *appsv1.Deployment + svc *corev1.Service + sameNode bool + migrationCount int + liveMigration bool + expectDataNotMigrated bool + beforeMigration func(t *testing.T) + afterMigration func(t *testing.T, tc testCase) +} + func TestMigration(t *testing.T) { if testing.Short() { t.Skip("skipping e2e test") } e2e := setupOnce(t) ctx := context.Background() - - type testCase struct { - deploy *appsv1.Deployment - svc *corev1.Service - sameNode bool - migrationCount int - liveMigration bool - beforeMigration func(t *testing.T) - afterMigration func(t *testing.T) - } cases := map[string]testCase{ "same-node live migration": { deploy: freezerDeployment("same-node-live-migration", "default", 256, liveMigrateAnnotation("freezer")), @@ -54,27 +57,33 @@ func TestMigration(t *testing.T) { migrationCount: 1, }, "same-node non-live migration": { - deploy: freezerDeployment("same-node-migration", "default", 1, migrateAnnotation("freezer"), scaleDownAfter(time.Second*5)), - svc: testService(8080), - sameNode: true, - liveMigration: false, - migrationCount: 1, - beforeMigration: nonLiveBeforeMigration, - afterMigration: nonLiveAfterMigration, + deploy: freezerDeployment("same-node-migration", "default", 1, migrateAnnotation("freezer"), scaleDownAfter(time.Second)), + svc: testService(8080), + sameNode: true, + liveMigration: false, + migrationCount: 1, + afterMigration: nonLiveAfterMigration, }, "cross-node non-live migration": { - deploy: freezerDeployment("same-node-migration", "default", 1, migrateAnnotation("freezer"), scaleDownAfter(time.Second*5)), - svc: testService(8080), - sameNode: false, - liveMigration: false, - migrationCount: 1, - beforeMigration: nonLiveBeforeMigration, - afterMigration: nonLiveAfterMigration, + deploy: freezerDeployment("cross-node-migration", "default", 1, migrateAnnotation("freezer"), scaleDownAfter(time.Second)), + svc: testService(8080), + sameNode: false, + liveMigration: false, + migrationCount: 1, + afterMigration: nonLiveAfterMigration, + }, + "data migration disabled": { + deploy: freezerDeployment("data-migration-disabled", "default", 1, liveMigrateAnnotation("freezer"), disableDataMigration()), + svc: testService(8080), + sameNode: true, + liveMigration: true, + expectDataNotMigrated: true, + migrationCount: 1, }, } migrate := func(t *testing.T, ctx context.Context, e2e *e2eConfig, tc testCase) { - pods := podsOfDeployment(t, ctx, e2e.client, tc.deploy) + pods := podsOfDeployment(t, e2e.client, tc.deploy) if len(pods) < 1 { t.Fatal("expected at least one pod in the deployment") } @@ -87,10 +96,12 @@ func TestMigration(t *testing.T) { uncordon := cordonNode(t, ctx, e2e.client, pod.Spec.NodeName) defer uncordon() } - - assert.NoError(t, e2e.client.Delete(ctx, &pod)) + if !tc.liveMigration { + waitUntilScaledDown(t, ctx, e2e.client, &pod) + } + require.NoError(t, e2e.client.Delete(ctx, &pod)) assert.Eventually(t, func() bool { - pods := podsOfDeployment(t, ctx, e2e.client, tc.deploy) + pods := podsOfDeployment(t, e2e.client, tc.deploy) if len(pods) != 1 { return false } @@ -133,6 +144,7 @@ func TestMigration(t *testing.T) { for range tc.migrationCount { tc.beforeMigration(t) checkCtx, cancel := context.WithCancel(ctx) + writePodData(t, migrationPod(t, tc.deploy)) defer cancel() if tc.liveMigration { go func() { @@ -142,12 +154,52 @@ func TestMigration(t *testing.T) { } migrate(t, ctx, e2e, tc) cancel() - tc.afterMigration(t) + tc.afterMigration(t, tc) + if tc.expectDataNotMigrated { + _, err := readPodData(t, migrationPod(t, tc.deploy)) + assert.Error(t, err) + } else { + data, err := readPodDataEventually(t, migrationPod(t, tc.deploy)) + assert.NoError(t, err) + assert.Equal(t, t.Name(), strings.TrimSpace(data)) + } } }) } } +func migrationPod(t testing.TB, deploy *appsv1.Deployment) *corev1.Pod { + pods := podsOfDeployment(t, e2e.client, deploy) + if len(pods) != 1 { + t.Errorf("pod of deployment %s not found", deploy.Name) + } + return &pods[0] +} + +func writePodData(t testing.TB, pod *corev1.Pod) { + assert.Eventually(t, func() bool { + _, _, err := podExec(e2e.cfg, pod, fmt.Sprintf("echo %s > /containerdata", t.Name())) + return err == nil + }, time.Second*10, time.Second) +} + +func readPodData(t testing.TB, pod *corev1.Pod) (string, error) { + data, _, err := podExec(e2e.cfg, pod, "cat /containerdata") + return data, err +} + +func readPodDataEventually(t testing.TB, pod *corev1.Pod) (string, error) { + var data string + var err error + if !assert.Eventually(t, func() bool { + data, err = readPodData(t, pod) + return err == nil + }, time.Second*10, time.Second) { + return "", err + } + return data, nil +} + func defaultBeforeMigration(t *testing.T) { assert.Eventually(t, func() bool { if err := freezerWrite(t.Name(), e2e.port); err != nil { @@ -161,7 +213,7 @@ func defaultBeforeMigration(t *testing.T) { }, time.Second*10, time.Second) } -func defaultAfterMigration(t *testing.T) { +func defaultAfterMigration(t *testing.T, _ testCase) { f, err := freezerRead(e2e.port) require.NoError(t, err) t.Logf("freeze duration: %s", f.LastFreezeDuration) @@ -169,20 +221,9 @@ func defaultAfterMigration(t *testing.T) { assert.Less(t, f.LastFreezeDuration, time.Second, "freeze duration") } -func nonLiveBeforeMigration(t *testing.T) { - defaultBeforeMigration(t) - require.Eventually(t, func() bool { - pods := podsOfDeployment(t, context.Background(), e2e.client, freezerDeployment("same-node-migration", "default", 1)) - if len(pods) == 0 { - return false - } - return pods[0].Labels[path.Join(manager.StatusLabelKeyPrefix, "freezer")] == shimv1.ContainerPhase_SCALED_DOWN.String() - }, time.Second*30, time.Second, "container is scaled down before migration") -} - -func nonLiveAfterMigration(t *testing.T) { +func nonLiveAfterMigration(t *testing.T, tc testCase) { require.Never(t, func() bool { - pods := podsOfDeployment(t, context.Background(), e2e.client, freezerDeployment("same-node-migration", "default", 1)) + pods := podsOfDeployment(t, e2e.client, tc.deploy) if len(pods) == 0 { return true } @@ -192,5 +233,5 @@ func nonLiveAfterMigration(t *testing.T) { } return status != shimv1.ContainerPhase_SCALED_DOWN.String() }, time.Second*30, time.Second, "container is scaled down after migration") - defaultAfterMigration(t) + defaultAfterMigration(t, tc) } diff --git a/e2e/setup_test.go b/e2e/setup_test.go index 50a13b9..0cb5a30 100644 --- a/e2e/setup_test.go +++ b/e2e/setup_test.go @@ -438,6 +438,12 @@ func livenessProbe(probe *corev1.Probe) podOption { } } +func disableDataMigration() podOption { + return annotations(map[string]string{ + shim.DisableMigrateDataAnnotationKey: "true", + }) +} + const agnHostImage = "registry.k8s.io/e2e-test-images/agnhost:2.39" func agnContainer(name string, port int) podOption { @@ -542,12 +548,18 @@ func freezerDeployment(name, namespace string, memoryGiB int, opts ...podOption) }, Spec: appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "zeropod-e2e"}, + MatchLabels: map[string]string{ + "app": "zeropod-e2e", + "name": name, + }, }, Replicas: ptr.To(int32(1)), Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"app": "zeropod-e2e"}, + Labels: map[string]string{ + "app": "zeropod-e2e", + "name": name, + }, }, Spec: corev1.PodSpec{ RuntimeClassName: ptr.To(v1.RuntimeClassName), @@ -605,9 +617,9 @@ func createDeployAndWait(t testing.TB, ctx context.Context, c client.Client, dep } } -func podsOfDeployment(t testing.TB, ctx context.Context, c client.Client, deploy *appsv1.Deployment) []corev1.Pod { +func podsOfDeployment(t testing.TB, c client.Client, deploy *appsv1.Deployment) []corev1.Pod { podList := &corev1.PodList{} - if err := c.List(ctx, podList, client.MatchingLabels(deploy.Spec.Selector.MatchLabels)); err != nil { + if err := c.List(t.Context(), podList, client.MatchingLabels(deploy.Spec.Selector.MatchLabels)); err != nil { t.Error(err) } @@ -719,7 +731,7 @@ func podExec(cfg *rest.Config, pod *corev1.Pod, command string) (string, string, Name(pod.Name). SubResource("exec"). VersionedParams(&corev1.PodExecOptions{ - Command: []string{"/bin/sh", "-c", command}, + Command: []string{"sh", "-c", command}, Stdin: false, Stdout: true, Stderr: true, diff --git a/go.mod b/go.mod index 499abc1..840fc5d 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ctrox/zeropod -go 1.23.0 +go 1.24.0 require ( github.com/checkpoint-restore/go-criu/v7 v7.2.0 diff --git a/shim/config.go b/shim/config.go index f58609a..794666e 100644 --- a/shim/config.go +++ b/shim/config.go @@ -25,6 +25,7 @@ const ( LiveMigrateAnnotationKey = "zeropod.ctrox.dev/live-migrate" DisableProbeDetectAnnotationKey = "zeropod.ctrox.dev/disable-probe-detection" ProbeBufferSizeAnnotationKey = "zeropod.ctrox.dev/probe-buffer-size" + DisableMigrateDataAnnotationKey = "zeropod.ctrox.dev/disable-migrate-data" CRIContainerNameAnnotation = "io.kubernetes.cri.container-name" CRIContainerTypeAnnotation = "io.kubernetes.cri.container-type" CRIPodNameAnnotation = "io.kubernetes.cri.sandbox-name" @@ -55,6 +56,7 @@ type Config struct { ContainerdNamespace string DisableProbeDetection bool ProbeBufferSize int + DisableMigrateData bool spec *specs.Spec } @@ -157,6 +159,15 @@ func NewConfig(ctx context.Context, spec *specs.Spec) (*Config, error) { } } + disableMigrateDataValue := spec.Annotations[DisableMigrateDataAnnotationKey] + disableMigrateData := false + if disableMigrateDataValue != "" { + disableMigrateData, err = strconv.ParseBool(disableMigrateDataValue) + if err != nil { + return nil, err + } + } + return &Config{ Ports: containerPorts, ScaleDownDuration: dur, @@ -173,6 +184,7 @@ func NewConfig(ctx context.Context, spec *specs.Spec) (*Config, error) { ContainerdNamespace: ns, DisableProbeDetection: disableProbeDetection, ProbeBufferSize: probeBufferSize, + DisableMigrateData: disableMigrateData, spec: spec, }, nil } diff --git a/shim/evac.go b/shim/evac.go index 2875c40..47b22ec 100644 --- a/shim/evac.go +++ b/shim/evac.go @@ -2,11 +2,13 @@ package shim import ( "context" + "errors" "fmt" "io" "net" "os" "path/filepath" + "strings" "time" "github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/process" @@ -15,6 +17,7 @@ import ( "github.com/containerd/ttrpc" nodev1 "github.com/ctrox/zeropod/api/node/v1" v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/prometheus/procfs" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -140,6 +143,8 @@ func (c *Container) evac(ctx context.Context) error { return fmt.Errorf("failed checkpointing: %w", err) case <-lazyStarted: log.G(ctx).Info("successful started lazy checkpointing") + c.prepareMigrateData(ctx) + log.G(ctx).Infof("making evac request with image ID: %s", c.ID()) evacReq.MigrationInfo.PausedAt = timestamppb.New(pausedAt) if _, err := nodeClient.Evac(ctx, evacReq); err != nil { @@ -205,9 +210,89 @@ func (c *Container) evacScaledDown(ctx context.Context) error { if _, err := nodeClient.PrepareEvac(ctx, evacReq); err != nil { return fmt.Errorf("requesting evac: %w", err) } + c.prepareMigrateData(ctx) if _, err := nodeClient.Evac(ctx, evacReq); err != nil { return fmt.Errorf("requesting evac: %w", err) } return nil } + +func (c *Container) prepareMigrateData(ctx context.Context) { + if c.cfg.DisableMigrateData { + return + } + // for now we allow this to fail and continue with the migration since + // it could be unreliable and is not strictly required to migrate + // (although this depends on the application). + if err := moveUpperDirToImage(c.ID()); err != nil { + log.G(ctx).Errorf("adding container data to image: %s", err) + } +} + +// moveUpperDirToImage finds the overlayfs upper directory of the container and +// moves it to the snapshot upper dir to be transferred by the evac. Only call +// this once the container is either stopped or frozen. +func moveUpperDirToImage(containerID string) error { + upper, err := findUpperDir(containerID) + if err != nil { + return fmt.Errorf("finding upper storage dir: %w", err) + } + to := nodev1.UpperPath(containerID) + if err := os.MkdirAll(to, 0644); err != nil { + return err + } + + return renameAllSubDirs(upper, to) +} + +// MoveImageToUpperDir does the same as moveUpperDirToImage but in reverse +func MoveImageToUpperDir(containerID, imageDir string) error { + upper, err := findUpperDir(containerID) + if err != nil { + return fmt.Errorf("finding upper storage dir: %w", err) + } + + return renameAllSubDirs(filepath.Join(imageDir, nodev1.UpperSuffix), upper) +} + +func renameAllSubDirs(from, to string) error { + dirs, err := os.ReadDir(from) + if err != nil { + return err + } + + var errs []error + for _, dir := range dirs { + errs = append(errs, os.Rename(filepath.Join(from, dir.Name()), filepath.Join(to, dir.Name()))) + } + + return errors.Join(errs...) +} + +func findUpperDir(containerID string) (string, error) { + p, err := procfs.NewFS("/proc") + if err != nil { + return "", fmt.Errorf("new procfs: %w", err) + } + + proc, err := p.Self() + if err != nil { + return "", fmt.Errorf("getting process info: %w", err) + } + + mountInfo, err := proc.MountInfo() + if err != nil { + return "", fmt.Errorf("getting mount info: %w", err) + } + + for _, mount := range mountInfo { + if strings.Contains(mount.MountPoint, containerID) { + if v, ok := mount.SuperOptions["upperdir"]; ok { + return v, nil + } + } + } + + return "", fmt.Errorf("upper dir not found for container %s", containerID) +} diff --git a/shim/task/service_zeropod.go b/shim/task/service_zeropod.go index 8eb9dee..25ae2ba 100644 --- a/shim/task/service_zeropod.go +++ b/shim/task/service_zeropod.go @@ -170,7 +170,18 @@ func (w *wrapper) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * zeropodContainer.SetSkipStart(skipStart) } - return w.service.Create(ctx, r) + resp, err := w.service.Create(ctx, r) + if err != nil { + return nil, err + } + + if cfg.AnyMigrationEnabled() && !cfg.DisableMigrateData { + if err := zshim.MoveImageToUpperDir(r.ID, r.Checkpoint); err != nil { + log.G(ctx).Errorf("restoring container data: %s", err) + } + } + + return resp, nil } func (w *wrapper) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { diff --git a/socket/Dockerfile b/socket/Dockerfile index 4e49522..07add51 100644 --- a/socket/Dockerfile +++ b/socket/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.23 as gomod +FROM golang:1.24 as gomod WORKDIR /app ADD go.* /app