Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,12 @@ func (j *Job) execute(ctx context.Context) (sandbox.Report, error) {
cfg.Stdin = proc.Stdin

containerId := fmt.Sprintf("%s-%d", j.ID, j.step)
s, err := sandbox.GetManager().NewSandbox(containerId, cfg)
defer sandbox.GetManager().DestroySandbox(containerId)

if err != nil {
if err := sandbox.GetManager().NewSandbox(containerId, cfg); err != nil {
return sandbox.Report{}, fmt.Errorf("cannot create sandbox for process %d: %v", j.step, err)
}
defer sandbox.GetManager().DestroySandbox(containerId)

report, err := s.Run(ctx)
report, err := sandbox.GetManager().RunSandbox(ctx, containerId)
if err != nil {
return sandbox.Report{}, fmt.Errorf("error running process %d: %v", j.step, err)
}
Expand Down
2 changes: 1 addition & 1 deletion job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestMain(m *testing.M) {
config.UseDefaults()

job.NewJobPool()
sandbox.NewManager()
sandbox.NewManager(config.MaxConcurrency)

exitCode := m.Run()

Expand Down
44 changes: 33 additions & 11 deletions sandbox/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sandbox

import (
"context"
"fmt"
"sync"

Expand All @@ -13,21 +14,22 @@ type Manager struct {
sandboxes map[string]*Sandbox
allocatedRanges map[string]int

allocator *allocator.Allocator
allocator *allocator.Allocator
maxConcurrency int

mu sync.Mutex
mu sync.Mutex
sem chan struct{}
}

func NewManager() error {
alloc, err := allocator.NewAllocator()
if err != nil {
return err
}
func NewManager(maxConcurrency int) error {
alloc := allocator.NewAllocator()

m = &Manager{
sandboxes: make(map[string]*Sandbox),
allocatedRanges: make(map[string]int),
allocator: alloc,
maxConcurrency: maxConcurrency,
sem: make(chan struct{}, maxConcurrency),
}
return nil
}
Expand All @@ -36,17 +38,17 @@ func GetManager() *Manager {
return m
}

func (m *Manager) NewSandbox(id string, cfg *Config) (*Sandbox, error) {
func (m *Manager) NewSandbox(id string, cfg *Config) error {
m.mu.Lock()
defer m.mu.Unlock()

if _, exists := m.sandboxes[id]; exists {
return nil, fmt.Errorf("sandbox with id %q already exists", id)
return fmt.Errorf("sandbox with id %q already exists", id)
}

idx, rng := m.allocator.Allocate()
if idx == -1 {
return nil, fmt.Errorf("no available uid/gid ranges")
return fmt.Errorf("no available uid/gid ranges")
}

cfg.UserNamespace = &UserNamespaceConfig{
Expand All @@ -66,7 +68,27 @@ func (m *Manager) NewSandbox(id string, cfg *Config) (*Sandbox, error) {
m.sandboxes[id] = sandbox
m.allocatedRanges[id] = idx

return sandbox, nil
return nil
}

func (m *Manager) RunSandbox(ctx context.Context, id string) (Report, error) {
m.sem <- struct{}{}
defer func() { <-m.sem }()

m.mu.Lock()
sandbox, exists := m.sandboxes[id]
m.mu.Unlock()

if !exists {
return Report{}, fmt.Errorf("sandbox with id %q does not exist", id)
}

report, err := sandbox.Run(ctx)
if err != nil {
return Report{}, fmt.Errorf("error running sandbox %q: %w", id, err)
}

return report, nil
}

func (m *Manager) DestroySandbox(id string) error {
Expand Down
8 changes: 7 additions & 1 deletion sandbox/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"os"
"syscall"
"time"
)

type Status string
Expand All @@ -29,6 +30,9 @@ type Report struct {
CPUTime uint64
Memory uint64
WallTime int64

StartAt time.Time
FinishAt time.Time
}

func (r Report) String() string {
Expand All @@ -45,7 +49,7 @@ func (r Report) String() string {
return fmt.Sprintf("status: %s\nexit code: %d\nsignal: %d\nstdout: %s\nstderr:%s\ncpu:%d usec\nmemory:%d bytes\n", r.Status, r.ExitCode, r.Signal, stdoutTrim, stderrTrim, r.CPUTime, r.Memory)
}

func (s *Sandbox) makeReport(stdoutBuf, stderrBuf io.Reader, state *os.ProcessState, timeLimitExceeded bool) (Report, error) {
func (s *Sandbox) makeReport(stdoutBuf, stderrBuf io.Reader, state *os.ProcessState, timeLimitExceeded bool, startAt, finishAt time.Time) (Report, error) {
stdout, err := io.ReadAll(stdoutBuf)
if err != nil {
return Report{}, fmt.Errorf("error reading stdout: %w", err)
Expand Down Expand Up @@ -87,5 +91,7 @@ func (s *Sandbox) makeReport(stdoutBuf, stderrBuf io.Reader, state *os.ProcessSt
Stderr: string(stderr),
CPUTime: stats.GetCPU().GetUsageUsec(),
Memory: stats.GetMemory().GetMaxUsage(),
StartAt: startAt,
FinishAt: finishAt,
}, nil
}
6 changes: 5 additions & 1 deletion sandbox/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func (s *Sandbox) Run(ctx context.Context) (Report, error) {
Init: true,
}

startAt := time.Now()

if err := container.Run(process); err != nil {
return Report{}, fmt.Errorf("error running container: %w", err)
}
Expand All @@ -99,7 +101,9 @@ func (s *Sandbox) Run(ctx context.Context) (Report, error) {
state, _ := process.Wait()
processFinished <- struct{}{}

return s.makeReport(&stdoutBuf, &stderrBuf, state, timeLimitExceeded)
finishAt := time.Now()

return s.makeReport(&stdoutBuf, &stderrBuf, state, timeLimitExceeded, startAt, finishAt)
}

func getRlimits(cfg *RlimitConfig) []configs.Rlimit {
Expand Down
40 changes: 38 additions & 2 deletions sandbox/sandbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sandbox_test
import (
"os"
"path/filepath"
"sort"
"testing"

"github.com/joshjms/castletown/config"
Expand All @@ -14,7 +15,7 @@ func TestMain(m *testing.M) {
sandbox.Init()
config.UseDefaults()

sandbox.NewManager()
sandbox.NewManager(2)

files, err := os.ReadDir("test_files")
require.NoError(nil, err, "failed to read test files directory: %v", err)
Expand Down Expand Up @@ -113,7 +114,8 @@ func TestSandboxRusageConsistency(t *testing.T) {
var minCpuUsage, maxCpuUsage uint64

for i := 0; i < 10; i++ {
report := tc.Run(t)
reports := tc.Run(t)
report := reports[0]

if i == 0 {
minCpuUsage = report.CPUTime
Expand All @@ -128,3 +130,37 @@ func TestSandboxRusageConsistency(t *testing.T) {

require.Less(t, maxCpuUsage-minCpuUsage, uint64(10000), "cpu usage inconsistent")
}

func TestSandboxConcurrency(t *testing.T) {
expectedStatus := sandbox.STATUS_OK

tc := sandbox.Testcase{
File: "test_files/sleep.cpp",
ExpectedStatus: &expectedStatus,
TimeLimitMs: 3000,
Concurrency: 5,
}

reports := tc.Run(t)

startTimes := make([]int64, len(reports))
finishTimes := make([]int64, len(reports))

for i, report := range reports {
startTimes[i] = report.StartAt.UnixMilli()
finishTimes[i] = report.FinishAt.UnixMilli()
}

sort.Slice(startTimes, func(i, j int) bool {
return startTimes[i] < startTimes[j]
})
sort.Slice(finishTimes, func(i, j int) bool {
return finishTimes[i] < finishTimes[j]
})

for i := 2; i < len(startTimes); i++ {
require.Less(t, finishTimes[i-2], startTimes[i], "semaphore didn't work correctly")
}

tc.Run(t)
}
20 changes: 20 additions & 0 deletions sandbox/test_files/sleep.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include <chrono>
#include <iostream>

int main() {
using clock = std::chrono::steady_clock;
using namespace std::chrono;

std::cout << "Spinning for ~2 seconds...\n";

const auto start = clock::now();
const auto target = start + seconds(2);

// Busy loop until we reach target time (100% CPU on one core)
while (clock::now() < target) {
// nothing: pure spin
}

std::cout << "Done.\n";
return 0;
}
Loading