diff --git a/e2e/dynamic_host_volumes/dynamic_host_volumes_test.go b/e2e/dynamic_host_volumes/dynamic_host_volumes_test.go index 8e1b22d9a4e..e12f5ae4808 100644 --- a/e2e/dynamic_host_volumes/dynamic_host_volumes_test.go +++ b/e2e/dynamic_host_volumes/dynamic_host_volumes_test.go @@ -4,13 +4,16 @@ package dynamic_host_volumes import ( + "context" "fmt" - "strings" "testing" "time" + "github.com/hashicorp/nomad/api" + nomadapi "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/e2e/e2eutil" "github.com/hashicorp/nomad/e2e/v3/jobs3" + "github.com/hashicorp/nomad/e2e/v3/volumes3" "github.com/shoenig/test/must" "github.com/shoenig/test/wait" ) @@ -24,54 +27,13 @@ func TestDynamicHostVolumes_CreateWorkflow(t *testing.T) { e2eutil.WaitForLeader(t, nomad) e2eutil.WaitForNodesReady(t, nomad, 1) - out, err := e2eutil.Command("nomad", "volume", "create", - "-detach", "input/volume-create.nomad.hcl") - must.NoError(t, err) - - split := strings.Split(out, " ") - volID := strings.TrimSpace(split[len(split)-1]) - t.Logf("[%v] volume %q created", time.Since(start), volID) - - t.Cleanup(func() { - _, err := e2eutil.Command("nomad", "volume", "delete", "-type", "host", volID) - must.NoError(t, err) - }) - - out, err = e2eutil.Command("nomad", "volume", "status", "-type", "host", volID) - must.NoError(t, err) - - nodeID, err := e2eutil.GetField(out, "Node ID") - must.NoError(t, err) - must.NotEq(t, "", nodeID) - t.Logf("[%v] waiting for volume %q to be ready", time.Since(start), volID) - - must.Wait(t, wait.InitialSuccess( - wait.ErrorFunc(func() error { - node, _, err := nomad.Nodes().Info(nodeID, nil) - if err != nil { - return err - } - _, ok := node.HostVolumes["created-volume"] - if !ok { - return fmt.Errorf("node %q did not fingerprint volume %q", nodeID, volID) - } - vol, _, err := nomad.HostVolumes().Get(volID, nil) - if err != nil { - return err - } - if vol.State != "ready" { - return fmt.Errorf("node fingerprinted volume but status was not updated") - } - t.Logf("[%v] volume %q is ready", time.Since(start), volID) - return nil - }), - wait.Timeout(10*time.Second), - wait.Gap(50*time.Millisecond), - )) + _, cleanupVol := volumes3.Create(t, "input/volume-create.nomad.hcl", + volumes3.WithClient(nomad)) + t.Cleanup(cleanupVol) t.Logf("[%v] submitting mounter job", time.Since(start)) - _, cleanup := jobs3.Submit(t, "./input/mount-created.nomad.hcl") - t.Cleanup(cleanup) + _, cleanupJob := jobs3.Submit(t, "./input/mount-created.nomad.hcl") + t.Cleanup(cleanupJob) t.Logf("[%v] test complete, cleaning up", time.Since(start)) } @@ -184,3 +146,170 @@ func TestDynamicHostVolumes_RegisterWorkflow(t *testing.T) { t.Cleanup(cleanup2) t.Logf("[%v] test complete, cleaning up", time.Since(start)) } + +// TestDynamicHostVolumes_StickyVolumes tests where a job marks a volume as +// sticky and its allocations should have strong associations with specific +// volumes as they are replaced +func TestDynamicHostVolumes_StickyVolumes(t *testing.T) { + + start := time.Now() + nomad := e2eutil.NomadClient(t) + e2eutil.WaitForLeader(t, nomad) + e2eutil.WaitForNodesReady(t, nomad, 2) + + // TODO: if we create # of volumes == # of nodes, we can make test flakes + // stand out more easily + + _, cleanup1 := volumes3.Create(t, "input/volume-sticky.nomad.hcl", + volumes3.WithClient(nomad)) + t.Cleanup(cleanup1) + + _, cleanup2 := volumes3.Create(t, "input/volume-sticky.nomad.hcl", + volumes3.WithClient(nomad)) + t.Cleanup(cleanup2) + + t.Logf("[%v] submitting sticky volume mounter job", time.Since(start)) + jobSub, cleanupJob := jobs3.Submit(t, "./input/sticky.nomad.hcl") + t.Cleanup(cleanupJob) + + allocID1 := jobSub.Allocs()[0].ID + alloc, _, err := nomad.Allocations().Info(allocID1, nil) + must.NoError(t, err) + + must.Len(t, 1, alloc.HostVolumeIDs) + selectedVolID := alloc.HostVolumeIDs[0] + selectedNodeID := alloc.NodeID + t.Logf("[%v] volume %q on node %q was selected", + time.Since(start), selectedVolID, selectedNodeID) + + // Test: force reschedule + + _, err = nomad.Allocations().Stop(alloc, nil) + must.NoError(t, err) + + t.Logf("[%v] stopped allocation %q", time.Since(start), alloc.ID) + + var allocID2 string + + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + allocs, _, err := nomad.Jobs().Allocations(jobSub.JobID(), true, nil) + must.NoError(t, err) + if len(allocs) != 2 { + return fmt.Errorf("alloc not started") + } + for _, a := range allocs { + if a.ID != allocID1 { + allocID2 = a.ID + if a.ClientStatus != api.AllocClientStatusRunning { + return fmt.Errorf("replacement alloc not running") + } + } + } + return nil + }), + wait.Timeout(10*time.Second), + wait.Gap(50*time.Millisecond), + )) + + newAlloc, _, err := nomad.Allocations().Info(allocID2, nil) + must.NoError(t, err) + must.Eq(t, []string{selectedVolID}, newAlloc.HostVolumeIDs) + must.Eq(t, selectedNodeID, newAlloc.NodeID) + t.Logf("[%v] replacement alloc %q is running", time.Since(start), newAlloc.ID) + + // Test: drain node + + t.Logf("[%v] draining node %q", time.Since(start), selectedNodeID) + cleanup, err := drainNode(nomad, selectedNodeID, time.Second*20) + t.Cleanup(cleanup) + must.NoError(t, err) + + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + evals, _, err := nomad.Jobs().Evaluations(jobSub.JobID(), nil) + if err != nil { + return err + } + + got := map[string]string{} + + for _, eval := range evals { + got[eval.ID[:8]] = fmt.Sprintf("status=%q trigger=%q create_index=%d", + eval.Status, + eval.TriggeredBy, + eval.CreateIndex, + ) + if eval.Status == nomadapi.EvalStatusBlocked { + return nil + } + } + + return fmt.Errorf("expected blocked eval, got evals => %#v", got) + }), + wait.Timeout(10*time.Second), + wait.Gap(50*time.Millisecond), + )) + + t.Logf("[%v] undraining node %q", time.Since(start), selectedNodeID) + cleanup() + + var allocID3 string + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + allocs, _, err := nomad.Jobs().Allocations(jobSub.JobID(), true, nil) + must.NoError(t, err) + if len(allocs) != 3 { + return fmt.Errorf("alloc not started") + } + for _, a := range allocs { + if a.ID != allocID1 && a.ID != allocID2 { + allocID3 = a.ID + if a.ClientStatus != api.AllocClientStatusRunning { + return fmt.Errorf("replacement alloc %q not running", allocID3) + } + } + } + return nil + }), + wait.Timeout(10*time.Second), + wait.Gap(50*time.Millisecond), + )) + + newAlloc, _, err = nomad.Allocations().Info(allocID3, nil) + must.NoError(t, err) + must.Eq(t, []string{selectedVolID}, newAlloc.HostVolumeIDs) + must.Eq(t, selectedNodeID, newAlloc.NodeID) + t.Logf("[%v] replacement alloc %q is running", time.Since(start), newAlloc.ID) + +} + +func drainNode(nomad *nomadapi.Client, nodeID string, timeout time.Duration) (func(), error) { + resp, err := nomad.Nodes().UpdateDrainOpts(nodeID, &nomadapi.DrainOptions{ + DrainSpec: &nomadapi.DrainSpec{}, + MarkEligible: false, + }, nil) + if err != nil { + return func() {}, err + } + + cleanup := func() { + nomad.Nodes().UpdateDrainOpts(nodeID, &nomadapi.DrainOptions{ + MarkEligible: true}, nil) + } + + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + drainCh := nomad.Nodes().MonitorDrain(ctx, nodeID, resp.EvalCreateIndex, false) + + for { + select { + case <-ctx.Done(): + return cleanup, err + case msg := <-drainCh: + if msg == nil { + return cleanup, nil + } + } + } +} diff --git a/e2e/dynamic_host_volumes/input/mount-created.nomad.hcl b/e2e/dynamic_host_volumes/input/mount-created.nomad.hcl index cd62a7ab3e3..edd712b0fc2 100644 --- a/e2e/dynamic_host_volumes/input/mount-created.nomad.hcl +++ b/e2e/dynamic_host_volumes/input/mount-created.nomad.hcl @@ -7,6 +7,11 @@ job "example" { group "web" { + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + network { mode = "bridge" port "www" { diff --git a/e2e/dynamic_host_volumes/input/mount-registered.nomad.hcl b/e2e/dynamic_host_volumes/input/mount-registered.nomad.hcl index 905f6caee0f..21c3a15c67e 100644 --- a/e2e/dynamic_host_volumes/input/mount-registered.nomad.hcl +++ b/e2e/dynamic_host_volumes/input/mount-registered.nomad.hcl @@ -7,6 +7,11 @@ job "example" { group "web" { + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + network { mode = "bridge" port "www" { diff --git a/e2e/dynamic_host_volumes/input/sticky.nomad.hcl b/e2e/dynamic_host_volumes/input/sticky.nomad.hcl new file mode 100644 index 00000000000..1f0ed151ad3 --- /dev/null +++ b/e2e/dynamic_host_volumes/input/sticky.nomad.hcl @@ -0,0 +1,62 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: BUSL-1.1 + +job "example" { + + # this job will get deployed and recheduled a lot in this test, so make sure + # it happens as quickly as possible + + update { + min_healthy_time = "1s" + } + + reschedule { + delay = "5s" + delay_function = "constant" + unlimited = true + } + + group "web" { + + network { + mode = "bridge" + port "www" { + to = 8001 + } + } + + restart { + attempts = 0 + mode = "fail" + } + + volume "data" { + type = "host" + source = "sticky-volume" + sticky = true + } + + task "http" { + + driver = "docker" + config { + image = "busybox:1" + command = "httpd" + args = ["-v", "-f", "-p", "8001", "-h", "/var/www"] + ports = ["www"] + } + + volume_mount { + volume = "data" + destination = "/var/www" + } + + resources { + cpu = 128 + memory = 128 + } + + } + } + +} diff --git a/e2e/dynamic_host_volumes/input/volume-create.nomad.hcl b/e2e/dynamic_host_volumes/input/volume-create.nomad.hcl index 862e1643650..60db92098d3 100644 --- a/e2e/dynamic_host_volumes/input/volume-create.nomad.hcl +++ b/e2e/dynamic_host_volumes/input/volume-create.nomad.hcl @@ -9,3 +9,8 @@ capability { access_mode = "single-node-writer" attachment_mode = "file-system" } + +constraint { + attribute = "${attr.kernel.name}" + value = "linux" +} diff --git a/e2e/dynamic_host_volumes/input/volume-sticky.nomad.hcl b/e2e/dynamic_host_volumes/input/volume-sticky.nomad.hcl new file mode 100644 index 00000000000..7de9578260e --- /dev/null +++ b/e2e/dynamic_host_volumes/input/volume-sticky.nomad.hcl @@ -0,0 +1,16 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: BUSL-1.1 + +name = "sticky-volume" +type = "host" +plugin_id = "mkdir" + +capability { + access_mode = "single-node-writer" + attachment_mode = "file-system" +} + +constraint { + attribute = "${attr.kernel.name}" + value = "linux" +} diff --git a/e2e/v3/volumes3/host3.go b/e2e/v3/volumes3/host3.go new file mode 100644 index 00000000000..b854961fae6 --- /dev/null +++ b/e2e/v3/volumes3/host3.go @@ -0,0 +1,225 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package volumes3 + +import ( + "context" + "fmt" + "os" + "os/exec" + "strings" + "testing" + "time" + + "github.com/hashicorp/nomad/api" + nomadapi "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/e2e/v3/util3" + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" +) + +// VolumeSubmission holds state around creating and cleaning up a dynamic host +// volume. +type VolumeSubmission struct { + t *testing.T + + nomadClient *nomadapi.Client + + // inputs + namespace string + filename string + waitState nomadapi.HostVolumeState + + // behaviors + noCleanup bool + timeout time.Duration + verbose bool + + // outputs + volID string + nodeID string +} + +type Option func(*VolumeSubmission) + +type Cleanup func() + +func Create(t *testing.T, filename string, opts ...Option) (*VolumeSubmission, Cleanup) { + t.Helper() + + sub := &VolumeSubmission{ + t: t, + namespace: api.DefaultNamespace, + filename: filename, + waitState: nomadapi.HostVolumeStateReady, + timeout: 10 * time.Second, + } + + for _, opt := range opts { + opt(sub) + } + + start := time.Now() + sub.setClient() // setup API client if not configured by option + sub.run(start) // create the volume via API + sub.waits(start) // wait on node fingerprint + + return sub, sub.cleanup +} + +// VolumeID returns the volume ID set by the server +func (sub *VolumeSubmission) VolumeID() string { + return sub.volID +} + +// NodeID returns the node ID, which may have been set by the server +func (sub *VolumeSubmission) NodeID() string { + return sub.nodeID +} + +// Get fetches the api.HostVolume from the server for further examination +func (sub *VolumeSubmission) Get() *nomadapi.HostVolume { + vol, _, err := sub.nomadClient.HostVolumes().Get(sub.volID, + &api.QueryOptions{Namespace: sub.namespace}) + must.NoError(sub.t, err) + return vol +} + +func (sub *VolumeSubmission) setClient() { + if sub.nomadClient != nil { + return + } + nomadClient, err := nomadapi.NewClient(nomadapi.DefaultConfig()) + must.NoError(sub.t, err, must.Sprint("failed to create nomad API client")) + sub.nomadClient = nomadClient +} + +func (sub *VolumeSubmission) run(start time.Time) { + sub.t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), sub.timeout) + defer cancel() + + bytes, err := exec.CommandContext(ctx, + "nomad", "volume", "create", + "-namespace", sub.namespace, + "-detach", sub.filename).CombinedOutput() + must.NoError(sub.t, err, must.Sprint("error creating volume")) + out := string(bytes) + split := strings.Split(out, " ") + sub.volID = strings.TrimSpace(split[len(split)-1]) + + sub.logf("[%v] volume %q created", time.Since(start), sub.VolumeID()) +} + +func (sub *VolumeSubmission) waits(start time.Time) { + sub.t.Helper() + must.Wait(sub.t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + vol, _, err := sub.nomadClient.HostVolumes().Get(sub.volID, + &api.QueryOptions{Namespace: sub.namespace}) + if err != nil { + return err + } + sub.nodeID = vol.NodeID + + if vol.State != sub.waitState { + return fmt.Errorf("volume is not yet in %q state: %q", sub.waitState, vol.State) + } + + // if we're waiting for the volume to be ready, let's also verify + // that it's correctly fingerprinted on the node + switch sub.waitState { + case nomadapi.HostVolumeStateReady: + node, _, err := sub.nomadClient.Nodes().Info(sub.nodeID, nil) + if err != nil { + return err + } + _, ok := node.HostVolumes[vol.Name] + if !ok { + return fmt.Errorf("node %q did not fingerprint volume %q", sub.nodeID, sub.volID) + } + } + + return nil + }), + wait.Timeout(sub.timeout), + wait.Gap(50*time.Millisecond), + )) + + sub.logf("[%v] volume %q is %q on node %q", + time.Since(start), sub.volID, sub.waitState, sub.nodeID) +} + +func (sub *VolumeSubmission) cleanup() { + if os.Getenv("NOMAD_TEST_SKIPCLEANUP") == "1" { + return + } + if sub.noCleanup { + return + } + if sub.volID == "" { + return + } + + sub.noCleanup = true // so this isn't attempted more than once + ctx, cancel := context.WithTimeout(context.Background(), sub.timeout) + defer cancel() + + sub.logf("deleting volume %q", sub.volID) + err := exec.CommandContext(ctx, + "nomad", "volume", "delete", + "-type", "host", "-namespace", sub.namespace, sub.volID).Run() + must.NoError(sub.t, err) +} + +func (sub *VolumeSubmission) logf(msg string, args ...any) { + sub.t.Helper() + util3.Log3(sub.t, sub.verbose, msg, args...) +} + +// WithClient forces the submission to use the Nomad API client passed from the +// calling test +func WithClient(client *nomadapi.Client) Option { + return func(sub *VolumeSubmission) { + sub.nomadClient = client + } +} + +// WithNamespace sets a specific namespace for the volume and the wait +// query. The namespace should not be set in the spec if you're using this +// option. +func WithNamespace(ns string) Option { + return func(sub *VolumeSubmission) { + sub.namespace = ns + } +} + +// WithTimeout changes the default timeout from 10s +func WithTimeout(timeout time.Duration) Option { + return func(sub *VolumeSubmission) { + sub.timeout = timeout + } +} + +// WithWaitState changes the default state we wait for after creating the volume +// from the default of "ready" +func WithWaitState(state api.HostVolumeState) Option { + return func(sub *VolumeSubmission) { + sub.waitState = state + } +} + +// WithNoCleanup is used for test debugging to skip tearing down the volume +func WithNoCleanup() Option { + return func(sub *VolumeSubmission) { + sub.noCleanup = true + } +} + +// WithVerbose is used for test debugging to write more logs +func WithVerbose() Option { + return func(sub *VolumeSubmission) { + sub.verbose = true + } +}