diff --git a/job/job.go b/job/job.go index 4fa1fbd..f7b7618 100644 --- a/job/job.go +++ b/job/job.go @@ -95,14 +95,12 @@ func (j *Job) execute(ctx context.Context) (sandbox.Report, error) { cfg.Stdin = proc.Stdin containerId := fmt.Sprintf("%s-%d", j.ID, j.step) - s, err := sandbox.GetManager().NewSandbox(containerId, cfg) - defer sandbox.GetManager().DestroySandbox(containerId) - - if err != nil { + if err := sandbox.GetManager().NewSandbox(containerId, cfg); err != nil { return sandbox.Report{}, fmt.Errorf("cannot create sandbox for process %d: %v", j.step, err) } + defer sandbox.GetManager().DestroySandbox(containerId) - report, err := s.Run(ctx) + report, err := sandbox.GetManager().RunSandbox(ctx, containerId) if err != nil { return sandbox.Report{}, fmt.Errorf("error running process %d: %v", j.step, err) } diff --git a/job/job_test.go b/job/job_test.go index fc250e1..96cf4fb 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -17,7 +17,7 @@ func TestMain(m *testing.M) { config.UseDefaults() job.NewJobPool() - sandbox.NewManager() + sandbox.NewManager(config.MaxConcurrency) exitCode := m.Run() diff --git a/sandbox/manager.go b/sandbox/manager.go index dd7fa04..2deda48 100644 --- a/sandbox/manager.go +++ b/sandbox/manager.go @@ -1,6 +1,7 @@ package sandbox import ( + "context" "fmt" "sync" @@ -13,21 +14,22 @@ type Manager struct { sandboxes map[string]*Sandbox allocatedRanges map[string]int - allocator *allocator.Allocator + allocator *allocator.Allocator + maxConcurrency int - mu sync.Mutex + mu sync.Mutex + sem chan struct{} } -func NewManager() error { - alloc, err := allocator.NewAllocator() - if err != nil { - return err - } +func NewManager(maxConcurrency int) error { + alloc := allocator.NewAllocator() m = &Manager{ sandboxes: make(map[string]*Sandbox), allocatedRanges: make(map[string]int), allocator: alloc, + maxConcurrency: maxConcurrency, + sem: make(chan struct{}, maxConcurrency), } return nil } @@ -36,17 +38,17 @@ func GetManager() *Manager { return m } -func (m *Manager) NewSandbox(id string, cfg *Config) (*Sandbox, error) { +func (m *Manager) NewSandbox(id string, cfg *Config) error { m.mu.Lock() defer m.mu.Unlock() if _, exists := m.sandboxes[id]; exists { - return nil, fmt.Errorf("sandbox with id %q already exists", id) + return fmt.Errorf("sandbox with id %q already exists", id) } idx, rng := m.allocator.Allocate() if idx == -1 { - return nil, fmt.Errorf("no available uid/gid ranges") + return fmt.Errorf("no available uid/gid ranges") } cfg.UserNamespace = &UserNamespaceConfig{ @@ -66,7 +68,27 @@ func (m *Manager) NewSandbox(id string, cfg *Config) (*Sandbox, error) { m.sandboxes[id] = sandbox m.allocatedRanges[id] = idx - return sandbox, nil + return nil +} + +func (m *Manager) RunSandbox(ctx context.Context, id string) (Report, error) { + m.sem <- struct{}{} + defer func() { <-m.sem }() + + m.mu.Lock() + sandbox, exists := m.sandboxes[id] + m.mu.Unlock() + + if !exists { + return Report{}, fmt.Errorf("sandbox with id %q does not exist", id) + } + + report, err := sandbox.Run(ctx) + if err != nil { + return Report{}, fmt.Errorf("error running sandbox %q: %w", id, err) + } + + return report, nil } func (m *Manager) DestroySandbox(id string) error { diff --git a/sandbox/report.go b/sandbox/report.go index f316125..0a712f8 100644 --- a/sandbox/report.go +++ b/sandbox/report.go @@ -5,6 +5,7 @@ import ( "io" "os" "syscall" + "time" ) type Status string @@ -29,6 +30,9 @@ type Report struct { CPUTime uint64 Memory uint64 WallTime int64 + + StartAt time.Time + FinishAt time.Time } func (r Report) String() string { @@ -45,7 +49,7 @@ func (r Report) String() string { return fmt.Sprintf("status: %s\nexit code: %d\nsignal: %d\nstdout: %s\nstderr:%s\ncpu:%d usec\nmemory:%d bytes\n", r.Status, r.ExitCode, r.Signal, stdoutTrim, stderrTrim, r.CPUTime, r.Memory) } -func (s *Sandbox) makeReport(stdoutBuf, stderrBuf io.Reader, state *os.ProcessState, timeLimitExceeded bool) (Report, error) { +func (s *Sandbox) makeReport(stdoutBuf, stderrBuf io.Reader, state *os.ProcessState, timeLimitExceeded bool, startAt, finishAt time.Time) (Report, error) { stdout, err := io.ReadAll(stdoutBuf) if err != nil { return Report{}, fmt.Errorf("error reading stdout: %w", err) @@ -87,5 +91,7 @@ func (s *Sandbox) makeReport(stdoutBuf, stderrBuf io.Reader, state *os.ProcessSt Stderr: string(stderr), CPUTime: stats.GetCPU().GetUsageUsec(), Memory: stats.GetMemory().GetMaxUsage(), + StartAt: startAt, + FinishAt: finishAt, }, nil } diff --git a/sandbox/sandbox.go b/sandbox/sandbox.go index 6665abd..71a6416 100644 --- a/sandbox/sandbox.go +++ b/sandbox/sandbox.go @@ -80,6 +80,8 @@ func (s *Sandbox) Run(ctx context.Context) (Report, error) { Init: true, } + startAt := time.Now() + if err := container.Run(process); err != nil { return Report{}, fmt.Errorf("error running container: %w", err) } @@ -99,7 +101,9 @@ func (s *Sandbox) Run(ctx context.Context) (Report, error) { state, _ := process.Wait() processFinished <- struct{}{} - return s.makeReport(&stdoutBuf, &stderrBuf, state, timeLimitExceeded) + finishAt := time.Now() + + return s.makeReport(&stdoutBuf, &stderrBuf, state, timeLimitExceeded, startAt, finishAt) } func getRlimits(cfg *RlimitConfig) []configs.Rlimit { diff --git a/sandbox/sandbox_test.go b/sandbox/sandbox_test.go index 296ebb4..9e8cb54 100644 --- a/sandbox/sandbox_test.go +++ b/sandbox/sandbox_test.go @@ -3,6 +3,7 @@ package sandbox_test import ( "os" "path/filepath" + "sort" "testing" "github.com/joshjms/castletown/config" @@ -14,7 +15,7 @@ func TestMain(m *testing.M) { sandbox.Init() config.UseDefaults() - sandbox.NewManager() + sandbox.NewManager(2) files, err := os.ReadDir("test_files") require.NoError(nil, err, "failed to read test files directory: %v", err) @@ -113,7 +114,8 @@ func TestSandboxRusageConsistency(t *testing.T) { var minCpuUsage, maxCpuUsage uint64 for i := 0; i < 10; i++ { - report := tc.Run(t) + reports := tc.Run(t) + report := reports[0] if i == 0 { minCpuUsage = report.CPUTime @@ -128,3 +130,37 @@ func TestSandboxRusageConsistency(t *testing.T) { require.Less(t, maxCpuUsage-minCpuUsage, uint64(10000), "cpu usage inconsistent") } + +func TestSandboxConcurrency(t *testing.T) { + expectedStatus := sandbox.STATUS_OK + + tc := sandbox.Testcase{ + File: "test_files/sleep.cpp", + ExpectedStatus: &expectedStatus, + TimeLimitMs: 3000, + Concurrency: 5, + } + + reports := tc.Run(t) + + startTimes := make([]int64, len(reports)) + finishTimes := make([]int64, len(reports)) + + for i, report := range reports { + startTimes[i] = report.StartAt.UnixMilli() + finishTimes[i] = report.FinishAt.UnixMilli() + } + + sort.Slice(startTimes, func(i, j int) bool { + return startTimes[i] < startTimes[j] + }) + sort.Slice(finishTimes, func(i, j int) bool { + return finishTimes[i] < finishTimes[j] + }) + + for i := 2; i < len(startTimes); i++ { + require.Less(t, finishTimes[i-2], startTimes[i], "semaphore didn't work correctly") + } + + tc.Run(t) +} diff --git a/sandbox/test_files/sleep.cpp b/sandbox/test_files/sleep.cpp new file mode 100644 index 0000000..125c336 --- /dev/null +++ b/sandbox/test_files/sleep.cpp @@ -0,0 +1,20 @@ +#include +#include + +int main() { + using clock = std::chrono::steady_clock; + using namespace std::chrono; + + std::cout << "Spinning for ~2 seconds...\n"; + + const auto start = clock::now(); + const auto target = start + seconds(2); + + // Busy loop until we reach target time (100% CPU on one core) + while (clock::now() < target) { + // nothing: pure spin + } + + std::cout << "Done.\n"; + return 0; +} diff --git a/sandbox/test_utils.go b/sandbox/test_utils.go index 7e5c112..3b6da49 100644 --- a/sandbox/test_utils.go +++ b/sandbox/test_utils.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "testing" "time" @@ -21,9 +22,11 @@ type Testcase struct { ExpectedOutput *string TimeLimitMs int64 + + Concurrency int } -func (tc *Testcase) Run(t *testing.T) Report { +func (tc *Testcase) Run(t *testing.T) []Report { m := GetManager() require.NotNil(t, m, "failed to get manager") @@ -32,10 +35,7 @@ func (tc *Testcase) Run(t *testing.T) Report { defer os.RemoveAll(rootFileDir) compileFileDir := filepath.Join(rootFileDir, "proc-0") - execFileDir := filepath.Join(rootFileDir, "proc-1") - os.MkdirAll(compileFileDir, 0755) - os.MkdirAll(execFileDir, 0755) rootfsDir := "/tmp/castletown/images/gcc-15-bookworm" @@ -74,62 +74,91 @@ func (tc *Testcase) Run(t *testing.T) Report { }, } - compileSandbox, err := m.NewSandbox(fmt.Sprintf("%s-%d", id, 0), compileConfig) + compileId := fmt.Sprintf("%s-%d", id, 0) + err := m.NewSandbox(compileId, compileConfig) defer require.NoError(t, err, "failed to create compile sandbox: %v", err) - defer m.DestroySandbox(compileSandbox.GetId()) + defer m.DestroySandbox(compileId) ctx := context.Background() compileStartTime := time.Now() - compileReport, err := compileSandbox.Run(ctx) + compileReport, err := m.RunSandbox(ctx, compileId) require.NoError(t, err, "failed to compile code") compileElapsed := time.Since(compileStartTime) t.Logf("Compile took %v", compileElapsed) require.Equal(t, STATUS_OK, compileReport.Status, "compile status not ok") - execConfig := &Config{ - RootfsImageDir: rootfsDir, - BoxDir: execFileDir, - Args: []string{"./main"}, - Stdin: tc.Stdin, - Cwd: "/box", - Env: []string{ - "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", - }, - TimeLimitMs: tc.TimeLimitMs, - Cgroup: &CgroupConfig{ - CpuQuota: 100000, - Memory: 256 * 1024 * 1024, - PidsLimit: 1, - CpusetCpus: "0", - CpusetMems: "0", - }, - Files: []File{ - { - Src: filepath.Join(compileFileDir, "main"), - Dst: filepath.Join(execFileDir, "main"), - }, - }, + if tc.Concurrency < 1 { + tc.Concurrency = 1 } - execSandbox, err := m.NewSandbox(fmt.Sprintf("%s-%d", id, 1), execConfig) - defer m.DestroySandbox(execSandbox.GetId()) - require.NoError(t, err, "failed to create exec sandbox: %v", err) - - ctx = context.Background() - execStartTime := time.Now() - execReport, err := execSandbox.Run(ctx) - execElapsed := time.Since(execStartTime) - t.Logf("Execution took %v", execElapsed) - require.NoError(t, err, "failed to execute code") - - if tc.ExpectedStatus != nil { - require.Equal(t, *tc.ExpectedStatus, execReport.Status, "status != expectedStatus") + wg := sync.WaitGroup{} + + finishTimes := make([]time.Time, tc.Concurrency) + reports := make([]Report, tc.Concurrency) + + for i := 1; i <= tc.Concurrency; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + execId := fmt.Sprintf("%s-%d", id, i) + + execFileDir := filepath.Join(rootFileDir, fmt.Sprintf("proc-%d", i)) + os.MkdirAll(execFileDir, 0755) + + execConfig := &Config{ + RootfsImageDir: rootfsDir, + BoxDir: execFileDir, + Args: []string{"./main"}, + Stdin: tc.Stdin, + Cwd: "/box", + Env: []string{ + "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + }, + TimeLimitMs: tc.TimeLimitMs, + Cgroup: &CgroupConfig{ + CpuQuota: 100000, + Memory: 256 * 1024 * 1024, + PidsLimit: 1, + CpusetCpus: "0", + CpusetMems: "0", + }, + Files: []File{ + { + Src: filepath.Join(compileFileDir, "main"), + Dst: filepath.Join(execFileDir, "main"), + }, + }, + } + + err = m.NewSandbox(execId, execConfig) + defer m.DestroySandbox(execId) + require.NoError(t, err, "failed to create exec sandbox: %v", err) + + ctx = context.Background() + execStartTime := time.Now() + t.Logf("Starting execution %d at %v", i, execStartTime) + execReport, err := m.RunSandbox(ctx, execId) + execFinishTime := time.Now() + execElapsed := time.Since(execStartTime) + t.Logf("Finished execution %d at %v", i, execFinishTime) + finishTimes[i-1] = execFinishTime + t.Logf("Execution %d took %v", i, execElapsed) + require.NoError(t, err, "failed to execute code") + + if tc.ExpectedStatus != nil { + require.Equal(t, *tc.ExpectedStatus, execReport.Status, "status != expectedStatus") + } + + if tc.ExpectedOutput != nil { + require.Equal(t, *tc.ExpectedOutput, execReport.Stdout, "output != expectedOutput") + } + + reports[i-1] = execReport + }(i) } - if tc.ExpectedOutput != nil { - require.Equal(t, *tc.ExpectedOutput, execReport.Stdout, "output != expectedOutput") - } + wg.Wait() - return execReport + return reports }