Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2E: dynamic host volume tests for sticky volumes #24869

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 176 additions & 47 deletions e2e/dynamic_host_volumes/dynamic_host_volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
package dynamic_host_volumes

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/hashicorp/nomad/api"
nomadapi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/v3/jobs3"
"github.com/hashicorp/nomad/e2e/v3/volumes3"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
)
Expand All @@ -24,54 +27,13 @@ func TestDynamicHostVolumes_CreateWorkflow(t *testing.T) {
e2eutil.WaitForLeader(t, nomad)
e2eutil.WaitForNodesReady(t, nomad, 1)

out, err := e2eutil.Command("nomad", "volume", "create",
"-detach", "input/volume-create.nomad.hcl")
must.NoError(t, err)

split := strings.Split(out, " ")
volID := strings.TrimSpace(split[len(split)-1])
t.Logf("[%v] volume %q created", time.Since(start), volID)

t.Cleanup(func() {
_, err := e2eutil.Command("nomad", "volume", "delete", "-type", "host", volID)
must.NoError(t, err)
})

out, err = e2eutil.Command("nomad", "volume", "status", "-type", "host", volID)
must.NoError(t, err)

nodeID, err := e2eutil.GetField(out, "Node ID")
must.NoError(t, err)
must.NotEq(t, "", nodeID)
t.Logf("[%v] waiting for volume %q to be ready", time.Since(start), volID)

must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
node, _, err := nomad.Nodes().Info(nodeID, nil)
if err != nil {
return err
}
_, ok := node.HostVolumes["created-volume"]
if !ok {
return fmt.Errorf("node %q did not fingerprint volume %q", nodeID, volID)
}
vol, _, err := nomad.HostVolumes().Get(volID, nil)
if err != nil {
return err
}
if vol.State != "ready" {
return fmt.Errorf("node fingerprinted volume but status was not updated")
}
t.Logf("[%v] volume %q is ready", time.Since(start), volID)
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(50*time.Millisecond),
))
_, cleanupVol := volumes3.Create(t, "input/volume-create.nomad.hcl",
volumes3.WithClient(nomad))
t.Cleanup(cleanupVol)

t.Logf("[%v] submitting mounter job", time.Since(start))
_, cleanup := jobs3.Submit(t, "./input/mount-created.nomad.hcl")
t.Cleanup(cleanup)
_, cleanupJob := jobs3.Submit(t, "./input/mount-created.nomad.hcl")
t.Cleanup(cleanupJob)
t.Logf("[%v] test complete, cleaning up", time.Since(start))
}

Expand Down Expand Up @@ -184,3 +146,170 @@ func TestDynamicHostVolumes_RegisterWorkflow(t *testing.T) {
t.Cleanup(cleanup2)
t.Logf("[%v] test complete, cleaning up", time.Since(start))
}

// TestDynamicHostVolumes_StickyVolumes tests where a job marks a volume as
// sticky and its allocations should have strong associations with specific
// volumes as they are replaced
func TestDynamicHostVolumes_StickyVolumes(t *testing.T) {

start := time.Now()
nomad := e2eutil.NomadClient(t)
e2eutil.WaitForLeader(t, nomad)
e2eutil.WaitForNodesReady(t, nomad, 2)

// TODO: if we create # of volumes == # of nodes, we can make test flakes
// stand out more easily

_, cleanup1 := volumes3.Create(t, "input/volume-sticky.nomad.hcl",
volumes3.WithClient(nomad))
t.Cleanup(cleanup1)

_, cleanup2 := volumes3.Create(t, "input/volume-sticky.nomad.hcl",
volumes3.WithClient(nomad))
t.Cleanup(cleanup2)

t.Logf("[%v] submitting sticky volume mounter job", time.Since(start))
jobSub, cleanupJob := jobs3.Submit(t, "./input/sticky.nomad.hcl")
t.Cleanup(cleanupJob)

allocID1 := jobSub.Allocs()[0].ID
alloc, _, err := nomad.Allocations().Info(allocID1, nil)
must.NoError(t, err)

must.Len(t, 1, alloc.HostVolumeIDs)
selectedVolID := alloc.HostVolumeIDs[0]
selectedNodeID := alloc.NodeID
t.Logf("[%v] volume %q on node %q was selected",
time.Since(start), selectedVolID, selectedNodeID)

// Test: force reschedule

_, err = nomad.Allocations().Stop(alloc, nil)
must.NoError(t, err)

t.Logf("[%v] stopped allocation %q", time.Since(start), alloc.ID)

var allocID2 string

must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
allocs, _, err := nomad.Jobs().Allocations(jobSub.JobID(), true, nil)
must.NoError(t, err)
if len(allocs) != 2 {
return fmt.Errorf("alloc not started")
}
for _, a := range allocs {
if a.ID != allocID1 {
allocID2 = a.ID
if a.ClientStatus != api.AllocClientStatusRunning {
return fmt.Errorf("replacement alloc not running")
}
}
}
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(50*time.Millisecond),
))

newAlloc, _, err := nomad.Allocations().Info(allocID2, nil)
must.NoError(t, err)
must.Eq(t, []string{selectedVolID}, newAlloc.HostVolumeIDs)
must.Eq(t, selectedNodeID, newAlloc.NodeID)
t.Logf("[%v] replacement alloc %q is running", time.Since(start), newAlloc.ID)

// Test: drain node

t.Logf("[%v] draining node %q", time.Since(start), selectedNodeID)
cleanup, err := drainNode(nomad, selectedNodeID, time.Second*20)
t.Cleanup(cleanup)
must.NoError(t, err)

must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
evals, _, err := nomad.Jobs().Evaluations(jobSub.JobID(), nil)
if err != nil {
return err
}

got := map[string]string{}

for _, eval := range evals {
got[eval.ID[:8]] = fmt.Sprintf("status=%q trigger=%q create_index=%d",
eval.Status,
eval.TriggeredBy,
eval.CreateIndex,
)
if eval.Status == nomadapi.EvalStatusBlocked {
return nil
}
}

return fmt.Errorf("expected blocked eval, got evals => %#v", got)
}),
wait.Timeout(10*time.Second),
wait.Gap(50*time.Millisecond),
))

t.Logf("[%v] undraining node %q", time.Since(start), selectedNodeID)
cleanup()

var allocID3 string
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
allocs, _, err := nomad.Jobs().Allocations(jobSub.JobID(), true, nil)
must.NoError(t, err)
if len(allocs) != 3 {
return fmt.Errorf("alloc not started")
}
for _, a := range allocs {
if a.ID != allocID1 && a.ID != allocID2 {
allocID3 = a.ID
if a.ClientStatus != api.AllocClientStatusRunning {
return fmt.Errorf("replacement alloc %q not running", allocID3)
}
}
}
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(50*time.Millisecond),
))

newAlloc, _, err = nomad.Allocations().Info(allocID3, nil)
must.NoError(t, err)
must.Eq(t, []string{selectedVolID}, newAlloc.HostVolumeIDs)
must.Eq(t, selectedNodeID, newAlloc.NodeID)
t.Logf("[%v] replacement alloc %q is running", time.Since(start), newAlloc.ID)

}

func drainNode(nomad *nomadapi.Client, nodeID string, timeout time.Duration) (func(), error) {
resp, err := nomad.Nodes().UpdateDrainOpts(nodeID, &nomadapi.DrainOptions{
DrainSpec: &nomadapi.DrainSpec{},
MarkEligible: false,
}, nil)
if err != nil {
return func() {}, err
}

cleanup := func() {
nomad.Nodes().UpdateDrainOpts(nodeID, &nomadapi.DrainOptions{
MarkEligible: true}, nil)
}

ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
drainCh := nomad.Nodes().MonitorDrain(ctx, nodeID, resp.EvalCreateIndex, false)

for {
select {
case <-ctx.Done():
return cleanup, err
case msg := <-drainCh:
if msg == nil {
return cleanup, nil
}
}
}
}
5 changes: 5 additions & 0 deletions e2e/dynamic_host_volumes/input/mount-created.nomad.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ job "example" {

group "web" {

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

network {
mode = "bridge"
port "www" {
Expand Down
5 changes: 5 additions & 0 deletions e2e/dynamic_host_volumes/input/mount-registered.nomad.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ job "example" {

group "web" {

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

network {
mode = "bridge"
port "www" {
Expand Down
62 changes: 62 additions & 0 deletions e2e/dynamic_host_volumes/input/sticky.nomad.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: BUSL-1.1

job "example" {

# this job will get deployed and recheduled a lot in this test, so make sure
# it happens as quickly as possible

update {
min_healthy_time = "1s"
}

reschedule {
delay = "5s"
delay_function = "constant"
unlimited = true
}

group "web" {

network {
mode = "bridge"
port "www" {
to = 8001
}
}

restart {
attempts = 0
mode = "fail"
}

volume "data" {
type = "host"
source = "sticky-volume"
sticky = true
}

task "http" {

driver = "docker"
config {
image = "busybox:1"
command = "httpd"
args = ["-v", "-f", "-p", "8001", "-h", "/var/www"]
ports = ["www"]
}

volume_mount {
volume = "data"
destination = "/var/www"
}

resources {
cpu = 128
memory = 128
}

}
}

}
5 changes: 5 additions & 0 deletions e2e/dynamic_host_volumes/input/volume-create.nomad.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ capability {
access_mode = "single-node-writer"
attachment_mode = "file-system"
}

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
16 changes: 16 additions & 0 deletions e2e/dynamic_host_volumes/input/volume-sticky.nomad.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: BUSL-1.1

name = "sticky-volume"
type = "host"
plugin_id = "mkdir"

capability {
access_mode = "single-node-writer"
attachment_mode = "file-system"
}

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
Loading