Skip to content

Commit 8f1395d

Browse files
committed
feat: implement migrating upper container layer
this is a very basic implementation of migrating the containers ephemeral storage during migration. It only supports the overlay snapshotter. It simply finds the upper layer by looking at /proc/*/mounts and then renames all paths in the upper layer into the snapshot directory before evac. On restore we do the reverse. This works okay for a first iteration but there may be many edge-cases and potential performance improvements. Especially the image pulling over TTRPC could be improved.
1 parent 5c5a968 commit 8f1395d

File tree

11 files changed

+175
-18
lines changed

11 files changed

+175
-18
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,17 @@ to be enabled in the host kernel (`CONFIG_USERFAULTFD`).
354354
zeropod.ctrox.dev/live-migrate: "nginx"
355355
```
356356

357+
### `zeropod.ctrox.dev/disbale-migrate-data`
358+
359+
When migrating a pod (regardless of live or scaled down), data that has been
360+
written to the containers file system will be copied over. This is done by
361+
copying the upper layer of the container overlayfs. Enabled by default but can
362+
be disabled with this annotation.
363+
364+
```yaml
365+
zeropod.ctrox.dev/disbale-migrate-data: "true"
366+
```
367+
357368
### `io.containerd.runc.v2.group`
358369

359370
It's possible to reduce the resource usage further by grouping multiple pods

api/node/v1/meta.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const (
88
SocketPath = runPath + "node.sock"
99
imagesPath = varPath + "i/"
1010
SnapshotSuffix = "snapshot"
11+
UpperSuffix = "upper"
1112
WorkDirSuffix = "work"
1213
MigrateAnnotationKey = "zeropod.ctrox.dev/migrate"
1314
LiveMigrateAnnotationKey = "zeropod.ctrox.dev/live-migrate"
@@ -28,6 +29,10 @@ func SnapshotPath(id string) string {
2829
return filepath.Join(ImagePath(id), SnapshotSuffix)
2930
}
3031

32+
func UpperPath(id string) string {
33+
return filepath.Join(ImagePath(id), SnapshotSuffix, UpperSuffix)
34+
}
35+
3136
func LazyPagesSocket(id string) string {
3237
return filepath.Join(runPath, id+".sock")
3338
}

cmd/freezer/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ COPY cmd/freezer cmd/freezer
1010
ARG TARGETARCH
1111
RUN CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH GO111MODULE=on go build -ldflags "-s -w" -a -o freezer cmd/freezer/main.go
1212

13-
FROM gcr.io/distroless/static-debian12
13+
FROM gcr.io/distroless/static-debian12:debug
1414
COPY --from=builder /workspace/freezer /
1515
ENTRYPOINT ["/freezer"]

cmd/freezer/main.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package main
22

33
import (
4+
"context"
45
"encoding/json"
56
"flag"
67
"fmt"
78
"io"
89
"log/slog"
910
"math/rand/v2"
11+
"net"
1012
"net/http"
13+
"os"
1114
"time"
1215
)
1316

@@ -68,7 +71,16 @@ func main() {
6871
fmt.Fprintf(w, "%s", freezeJSON)
6972
})
7073

71-
go http.ListenAndServe(":8080", nil)
74+
lc := net.ListenConfig{}
75+
// disable multipath TCP for now since it's not supported by CRIU
76+
lc.SetMultipathTCP(false)
77+
78+
ln, err := lc.Listen(context.Background(), "tcp", ":8080")
79+
if err != nil {
80+
slog.Error("tcp listen", "err", err)
81+
os.Exit(1)
82+
}
83+
go http.Serve(ln, nil)
7284
for {
7385
since := time.Since(f.LastObservation)
7486
if since > time.Millisecond*50 {

cmd/installer/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ network-lock skip
8686
"zeropod.ctrox.dev/live-migrate",
8787
"zeropod.ctrox.dev/disable-probe-detection",
8888
"zeropod.ctrox.dev/probe-buffer-size",
89+
"zeropod.ctrox.dev/disable-migrate-data",
8990
"io.containerd.runc.v2.group"
9091
]
9192
@@ -108,6 +109,7 @@ network-lock skip
108109
"zeropod.ctrox.dev/live-migrate",
109110
"zeropod.ctrox.dev/disable-probe-detection",
110111
"zeropod.ctrox.dev/probe-buffer-size",
112+
"zeropod.ctrox.dev/disable-migrate-data",
111113
"io.containerd.runc.v2.group"
112114
]
113115

e2e/migration_test.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package e2e
22

33
import (
44
"context"
5+
"fmt"
56
"path"
67
"testing"
78
"time"
@@ -23,13 +24,14 @@ func TestMigration(t *testing.T) {
2324
ctx := context.Background()
2425

2526
type testCase struct {
26-
deploy *appsv1.Deployment
27-
svc *corev1.Service
28-
sameNode bool
29-
migrationCount int
30-
liveMigration bool
31-
beforeMigration func(t *testing.T)
32-
afterMigration func(t *testing.T)
27+
deploy *appsv1.Deployment
28+
svc *corev1.Service
29+
sameNode bool
30+
migrationCount int
31+
liveMigration bool
32+
expectDataNotMigrated bool
33+
beforeMigration func(t *testing.T)
34+
afterMigration func(t *testing.T)
3335
}
3436
cases := map[string]testCase{
3537
"same-node live migration": {
@@ -74,7 +76,7 @@ func TestMigration(t *testing.T) {
7476
}
7577

7678
migrate := func(t *testing.T, ctx context.Context, e2e *e2eConfig, tc testCase) {
77-
pods := podsOfDeployment(t, ctx, e2e.client, tc.deploy)
79+
pods := podsOfDeployment(t, e2e.client, tc.deploy)
7880
if len(pods) < 1 {
7981
t.Fatal("expected at least one pod in the deployment")
8082
}
@@ -90,7 +92,7 @@ func TestMigration(t *testing.T) {
9092

9193
assert.NoError(t, e2e.client.Delete(ctx, &pod))
9294
assert.Eventually(t, func() bool {
93-
pods := podsOfDeployment(t, ctx, e2e.client, tc.deploy)
95+
pods := podsOfDeployment(t, e2e.client, tc.deploy)
9496
if len(pods) != 1 {
9597
return false
9698
}
@@ -132,6 +134,8 @@ func TestMigration(t *testing.T) {
132134

133135
for range tc.migrationCount {
134136
tc.beforeMigration(t)
137+
_, _, err := podExec(e2e.cfg, migrationPod(t, tc.deploy), fmt.Sprintf("echo %s > /containerdata", t.Name()))
138+
assert.NoError(t, err)
135139
checkCtx, cancel := context.WithCancel(ctx)
136140
defer cancel()
137141
if tc.liveMigration {
@@ -143,11 +147,26 @@ func TestMigration(t *testing.T) {
143147
migrate(t, ctx, e2e, tc)
144148
cancel()
145149
tc.afterMigration(t)
150+
data, _, err := podExec(e2e.cfg, migrationPod(t, tc.deploy), "cat /containerdata")
151+
if tc.expectDataNotMigrated {
152+
assert.Error(t, err)
153+
} else {
154+
assert.NoError(t, err)
155+
assert.Equal(t, t.Name(), data)
156+
}
146157
}
147158
})
148159
}
149160
}
150161

162+
func migrationPod(t testing.TB, deploy *appsv1.Deployment) *corev1.Pod {
163+
pods := podsOfDeployment(t, e2e.client, deploy)
164+
if len(pods) != 1 {
165+
t.Errorf("pod of deployment %s not found", deploy.Name)
166+
}
167+
return &pods[0]
168+
}
169+
151170
func defaultBeforeMigration(t *testing.T) {
152171
assert.Eventually(t, func() bool {
153172
if err := freezerWrite(t.Name(), e2e.port); err != nil {
@@ -172,7 +191,7 @@ func defaultAfterMigration(t *testing.T) {
172191
func nonLiveBeforeMigration(t *testing.T) {
173192
defaultBeforeMigration(t)
174193
require.Eventually(t, func() bool {
175-
pods := podsOfDeployment(t, context.Background(), e2e.client, freezerDeployment("same-node-migration", "default", 1))
194+
pods := podsOfDeployment(t, e2e.client, freezerDeployment("same-node-migration", "default", 1))
176195
if len(pods) == 0 {
177196
return false
178197
}
@@ -182,7 +201,7 @@ func nonLiveBeforeMigration(t *testing.T) {
182201

183202
func nonLiveAfterMigration(t *testing.T) {
184203
require.Never(t, func() bool {
185-
pods := podsOfDeployment(t, context.Background(), e2e.client, freezerDeployment("same-node-migration", "default", 1))
204+
pods := podsOfDeployment(t, e2e.client, freezerDeployment("same-node-migration", "default", 1))
186205
if len(pods) == 0 {
187206
return true
188207
}

e2e/setup_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -605,9 +605,9 @@ func createDeployAndWait(t testing.TB, ctx context.Context, c client.Client, dep
605605
}
606606
}
607607

608-
func podsOfDeployment(t testing.TB, ctx context.Context, c client.Client, deploy *appsv1.Deployment) []corev1.Pod {
608+
func podsOfDeployment(t testing.TB, c client.Client, deploy *appsv1.Deployment) []corev1.Pod {
609609
podList := &corev1.PodList{}
610-
if err := c.List(ctx, podList, client.MatchingLabels(deploy.Spec.Selector.MatchLabels)); err != nil {
610+
if err := c.List(t.Context(), podList, client.MatchingLabels(deploy.Spec.Selector.MatchLabels)); err != nil {
611611
t.Error(err)
612612
}
613613

@@ -719,7 +719,7 @@ func podExec(cfg *rest.Config, pod *corev1.Pod, command string) (string, string,
719719
Name(pod.Name).
720720
SubResource("exec").
721721
VersionedParams(&corev1.PodExecOptions{
722-
Command: []string{"/bin/sh", "-c", command},
722+
Command: []string{"sh", "-c", command},
723723
Stdin: false,
724724
Stdout: true,
725725
Stderr: true,

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/ctrox/zeropod
22

3-
go 1.23.0
3+
go 1.24.0
44

55
require (
66
github.com/checkpoint-restore/go-criu/v7 v7.2.0

shim/config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const (
2525
LiveMigrateAnnotationKey = "zeropod.ctrox.dev/live-migrate"
2626
DisableProbeDetectAnnotationKey = "zeropod.ctrox.dev/disable-probe-detection"
2727
ProbeBufferSizeAnnotationKey = "zeropod.ctrox.dev/probe-buffer-size"
28+
DisableMigrateDataAnnotationKey = "zeropod.ctrox.dev/disable-migrate-data"
2829
CRIContainerNameAnnotation = "io.kubernetes.cri.container-name"
2930
CRIContainerTypeAnnotation = "io.kubernetes.cri.container-type"
3031
CRIPodNameAnnotation = "io.kubernetes.cri.sandbox-name"
@@ -55,6 +56,7 @@ type Config struct {
5556
ContainerdNamespace string
5657
DisableProbeDetection bool
5758
ProbeBufferSize int
59+
DisableMigrateData bool
5860
spec *specs.Spec
5961
}
6062

@@ -157,6 +159,15 @@ func NewConfig(ctx context.Context, spec *specs.Spec) (*Config, error) {
157159
}
158160
}
159161

162+
disableMigrateDataValue := spec.Annotations[DisableMigrateDataAnnotationKey]
163+
disableMigrateData := false
164+
if disableMigrateDataValue != "" {
165+
disableMigrateData, err = strconv.ParseBool(disableMigrateDataValue)
166+
if err != nil {
167+
return nil, err
168+
}
169+
}
170+
160171
return &Config{
161172
Ports: containerPorts,
162173
ScaleDownDuration: dur,
@@ -173,6 +184,7 @@ func NewConfig(ctx context.Context, spec *specs.Spec) (*Config, error) {
173184
ContainerdNamespace: ns,
174185
DisableProbeDetection: disableProbeDetection,
175186
ProbeBufferSize: probeBufferSize,
187+
DisableMigrateData: disableMigrateData,
176188
spec: spec,
177189
}, nil
178190
}

shim/evac.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@ package shim
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"net"
89
"os"
910
"path/filepath"
11+
"strings"
1012
"time"
1113

1214
"github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/process"
1315
runcC "github.com/containerd/go-runc"
1416
"github.com/containerd/log"
1517
"github.com/containerd/ttrpc"
1618
nodev1 "github.com/ctrox/zeropod/api/node/v1"
19+
"github.com/prometheus/procfs"
1720
"google.golang.org/protobuf/types/known/timestamppb"
1821
)
1922

@@ -139,6 +142,8 @@ func (c *Container) evac(ctx context.Context) error {
139142
return fmt.Errorf("failed checkpointing: %w", err)
140143
case <-lazyStarted:
141144
log.G(ctx).Info("successful started lazy checkpointing")
145+
c.prepareMigrateData(ctx)
146+
142147
log.G(ctx).Infof("making evac request with image ID: %s", c.ID())
143148
evacReq.MigrationInfo.PausedAt = timestamppb.New(pausedAt)
144149
if _, err := nodeClient.Evac(ctx, evacReq); err != nil {
@@ -204,9 +209,89 @@ func (c *Container) evacScaledDown(ctx context.Context) error {
204209
if _, err := nodeClient.PrepareEvac(ctx, evacReq); err != nil {
205210
return fmt.Errorf("requesting evac: %w", err)
206211
}
212+
c.prepareMigrateData(ctx)
207213

208214
if _, err := nodeClient.Evac(ctx, evacReq); err != nil {
209215
return fmt.Errorf("requesting evac: %w", err)
210216
}
211217
return nil
212218
}
219+
220+
func (c *Container) prepareMigrateData(ctx context.Context) {
221+
if c.cfg.DisableMigrateData {
222+
return
223+
}
224+
// for now we allow this to fail and continue with the migration since
225+
// it could be unreliable and is not strictly required to migrate
226+
// (although this depends on the application).
227+
if err := moveUpperDirToImage(c.ID()); err != nil {
228+
log.G(ctx).Errorf("adding container data to image: %s", err)
229+
}
230+
}
231+
232+
// moveUpperDirToImage finds the overlayfs upper directory of the container and
233+
// moves it to the snapshot upper dir to be transferred by the evac. Only call
234+
// this once the container is either stopped or frozen.
235+
func moveUpperDirToImage(containerID string) error {
236+
upper, err := findUpperDir(containerID)
237+
if err != nil {
238+
return fmt.Errorf("finding upper storage dir: %w", err)
239+
}
240+
to := nodev1.UpperPath(containerID)
241+
if err := os.MkdirAll(to, 0644); err != nil {
242+
return err
243+
}
244+
245+
return renameAllSubDirs(upper, to)
246+
}
247+
248+
// MoveImageToUpperDir does the same as moveUpperDirToImage but in reverse
249+
func MoveImageToUpperDir(containerID, imageDir string) error {
250+
upper, err := findUpperDir(containerID)
251+
if err != nil {
252+
return fmt.Errorf("finding upper storage dir: %w", err)
253+
}
254+
255+
return renameAllSubDirs(filepath.Join(imageDir, nodev1.UpperSuffix), upper)
256+
}
257+
258+
func renameAllSubDirs(from, to string) error {
259+
dirs, err := os.ReadDir(from)
260+
if err != nil {
261+
return err
262+
}
263+
264+
var errs []error
265+
for _, dir := range dirs {
266+
errs = append(errs, os.Rename(filepath.Join(from, dir.Name()), filepath.Join(to, dir.Name())))
267+
}
268+
269+
return errors.Join(errs...)
270+
}
271+
272+
func findUpperDir(containerID string) (string, error) {
273+
p, err := procfs.NewFS("/proc")
274+
if err != nil {
275+
return "", fmt.Errorf("new procfs: %w", err)
276+
}
277+
278+
proc, err := p.Self()
279+
if err != nil {
280+
return "", fmt.Errorf("getting process info: %w", err)
281+
}
282+
283+
mountInfo, err := proc.MountInfo()
284+
if err != nil {
285+
return "", fmt.Errorf("getting mount info: %w", err)
286+
}
287+
288+
for _, mount := range mountInfo {
289+
if strings.Contains(mount.MountPoint, containerID) {
290+
if v, ok := mount.SuperOptions["upperdir"]; ok {
291+
return v, nil
292+
}
293+
}
294+
}
295+
296+
return "", fmt.Errorf("upper dir not found for container %s", containerID)
297+
}

0 commit comments

Comments
 (0)