Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions kubernetes/internal/task-executor/runtime/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,28 @@ func (e *processExecutor) Start(ctx context.Context, task *types.Task) error {
return fmt.Errorf("failed to resolve target PID: %w", err)
}

// Inherit environment variables from the target process (Main Container)
targetEnv, err := getProcEnviron(targetPID)
if err != nil {
return fmt.Errorf("failed to read target process environment: %w", err)
}

nsenterArgs := []string{
"-t", strconv.Itoa(targetPID),
"--mount", "--uts", "--ipc", "--net", "--pid",
"--",
"/bin/sh", "-c", shimScript,
}
cmd = exec.Command("nsenter", nsenterArgs...)
cmd.Env = targetEnv
klog.InfoS("Starting sidecar task", "id", task.Name, "targetPID", targetPID)

} else {
// Host Logic: Direct execution
// Use exec.Command instead of CommandContext to ensure the process survives
// after the HTTP request context is canceled.
cmd = exec.Command("/bin/sh", "-c", shimScript)
cmd.Env = os.Environ()
klog.InfoS("Starting host task", "name", task.Name, "cmd", safeCmdStr, "exitPath", exitPath)
}

Expand Down Expand Up @@ -145,8 +153,6 @@ func (e *processExecutor) executeCommand(task *types.Task, cmd *exec.Cmd, pidPat

// Apply environment variables from ProcessTask spec
if task.Spec.Process != nil {
// Start with current environment
cmd.Env = os.Environ()
// Add task-specific environment variables
for _, env := range task.Spec.Process.Env {
if env.Name != "" {
Expand Down Expand Up @@ -454,3 +460,22 @@ func (e *processExecutor) findPidByEnvVar(envName, expectedValue string) (int, e

return 0, fmt.Errorf("no process found with environment variable %s=%s", envName, expectedValue)
}

// getProcEnviron reads the environment variables of a process from /proc/<pid>/environ.
// It returns a list of "KEY=VALUE" strings.
func getProcEnviron(pid int) ([]string, error) {
envPath := filepath.Join("/proc", strconv.Itoa(pid), "environ")
data, err := os.ReadFile(envPath)
if err != nil {
return nil, err
}

// Environment variables in /proc/<pid>/environ are separated by null bytes
var envs []string
for _, env := range strings.Split(string(data), "\x00") {
if len(env) > 0 {
envs = append(envs, env)
}
}
return envs, nil
}
56 changes: 56 additions & 0 deletions kubernetes/internal/task-executor/runtime/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"os"
"os/exec"
"path/filepath"
"testing"
"time"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/types"
"github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/utils"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
)

func setupTestExecutor(t *testing.T) (Executor, string) {
Expand Down Expand Up @@ -245,3 +247,57 @@ func TestNewExecutor(t *testing.T) {
t.Error("NewExecutor should fail with nil config")
}
}

func TestProcessExecutor_EnvInheritance(t *testing.T) {
if _, err := exec.LookPath("sh"); err != nil {
t.Skip("sh not found")
}

// 1. Setup Host Environment
expectedHostVar := "HOST_TEST_VAR=host_value"
os.Setenv("HOST_TEST_VAR", "host_value")
defer os.Unsetenv("HOST_TEST_VAR")

executor, _ := setupTestExecutor(t)
pExecutor := executor.(*processExecutor)
ctx := context.Background()

// 2. Define Task with Custom Env
task := &types.Task{
Name: "env-test",
Spec: v1alpha1.TaskSpec{
Process: &v1alpha1.ProcessTask{
Command: []string{"env"},
Env: []corev1.EnvVar{
{Name: "TASK_TEST_VAR", Value: "task_value"},
},
},
},
}
expectedTaskVar := "TASK_TEST_VAR=task_value"

taskDir, err := utils.SafeJoin(pExecutor.rootDir, task.Name)
assert.Nil(t, err)
os.MkdirAll(taskDir, 0755)

// 3. Start Task
if err := executor.Start(ctx, task); err != nil {
t.Fatalf("Start failed: %v", err)
}

// 4. Wait for completion
time.Sleep(200 * time.Millisecond)

status, err := executor.Inspect(ctx, task)
assert.Nil(t, err)
assert.Equal(t, types.TaskStateSucceeded, status.State)

// 5. Verify Output
stdoutPath := filepath.Join(taskDir, StdoutFile)
output, err := os.ReadFile(stdoutPath)
assert.Nil(t, err)
outputStr := string(output)

assert.Contains(t, outputStr, expectedHostVar, "Should inherit host environment variables")
assert.Contains(t, outputStr, expectedTaskVar, "Should include task-specific environment variables")
}
53 changes: 51 additions & 2 deletions kubernetes/test/e2e_task/task_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
TargetContainer = "task-e2e-target"
ExecutorContainer = "task-e2e-executor"
VolumeName = "task-e2e-vol"
HostPort = "38080"
HostPort = "5758"
)

var _ = Describe("Task Executor E2E", Ordered, func() {
Expand Down Expand Up @@ -63,6 +63,7 @@ var _ = Describe("Task Executor E2E", Ordered, func() {
targetCmd := exec.Command("docker", "run", "-d", "--name", TargetContainer,
"-v", fmt.Sprintf("%s:/tmp/tasks", VolumeName),
"-e", "SANDBOX_MAIN_CONTAINER=main",
"-e", "TARGET_VAR=hello-from-target",
"golang:1.24", "sleep", "infinity")
targetCmd.Stdout = os.Stdout
targetCmd.Stderr = os.Stderr
Expand All @@ -74,7 +75,7 @@ var _ = Describe("Task Executor E2E", Ordered, func() {
"--privileged",
"-u", "0",
"--pid=container:"+TargetContainer,
"-p", HostPort+":8080",
"-p", HostPort+":5758",
ImageName,
"-enable-sidecar-mode=true",
"-main-container-name=main",
Expand Down Expand Up @@ -148,4 +149,52 @@ var _ = Describe("Task Executor E2E", Ordered, func() {
}, 5*time.Second, 500*time.Millisecond).Should(BeNil())
})
})

Context("When creating a task checking environment variables", func() {
taskName := "e2e-env-test"

It("should inherit environment variables from target container", func() {
By("Creating task running 'env'")
task := &api.Task{
Name: taskName,
Spec: v1alpha1.TaskSpec{
Process: &v1alpha1.ProcessTask{
Command: []string{"env"},
},
},
}
_, err := client.Set(context.Background(), task)
Expect(err).NotTo(HaveOccurred())

By("Waiting for task to succeed")
Eventually(func(g Gomega) {
got, err := client.Get(context.Background())
g.Expect(err).NotTo(HaveOccurred())
g.Expect(got).NotTo(BeNil())
g.Expect(got.Name).To(Equal(taskName))
g.Expect(got.Status.State.Terminated).NotTo(BeNil())
g.Expect(got.Status.State.Terminated.ExitCode).To(BeZero())
}, 10*time.Second, 1*time.Second).Should(Succeed())

By("Verifying stdout contains target container env")
// Read stdout.log from the executor container (which shares the volume)
out, err := exec.Command("docker", "exec", ExecutorContainer, "cat", fmt.Sprintf("/tmp/tasks/%s/stdout.log", taskName)).CombinedOutput()
Expect(err).NotTo(HaveOccurred(), "Failed to read stdout.log: %s", string(out))

outputStr := string(out)
Expect(outputStr).To(ContainSubstring("TARGET_VAR=hello-from-target"), "Task environment should inherit from target container")
})

It("should be deletable", func() {
By("Deleting task")
_, err := client.Set(context.Background(), nil)
Expect(err).NotTo(HaveOccurred())

By("Verifying deletion")
Eventually(func() *api.Task {
got, _ := client.Get(context.Background())
return got
}, 5*time.Second, 500*time.Millisecond).Should(BeNil())
})
})
})
Loading