Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [ "1.21.x", "1.22.x", "1.23.x", "1.24.x", "1.25.x", "1.26.x"]
# 1.21.x dropped: go.mod requires >= 1.22 (needed for SysProcAttr.CgroupFD)
go-version: [ "1.22.x", "1.23.x", "1.24.x", "1.25.x", "1.26.x"]

steps:
- uses: actions/checkout@v4
Expand All @@ -35,6 +36,42 @@ jobs:
- name: Run tests with race detector
run: go test -v -race -coverprofile=coverage.txt -covermode=atomic ./...

# cgroup-integration: Layer 3 tests that require real cgroupv2 and root access.
# Runs only on the latest stable Go version to keep CI fast.
cgroup-integration:
name: Cgroup Integration Tests
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.26.x"

- name: Verify cgroupv2 is available
run: |
if ! grep -q cgroup2 /proc/mounts; then
echo "cgroupv2 not mounted — skipping integration tests"
echo "CGROUP_AVAILABLE=false" >> "$GITHUB_ENV"
else
echo "CGROUP_AVAILABLE=true" >> "$GITHUB_ENV"
fi

- name: Build test dependencies (healthworker)
run: go build ./testdata/healthworker/...

- name: Run cgroup integration tests (as root)
if: env.CGROUP_AVAILABLE == 'true'
run: |
sudo --preserve-env=PATH,GOPATH,GOCACHE,HOME \
env HERD_CGROUP_TEST=1 \
$(which go) test -v -run TestSandbox -timeout 60s ./...
env:
GOPATH: ${{ env.GOPATH }}
GOCACHE: ${{ env.GOCACHE }}

lint:
name: Lint Code
runs-on: ubuntu-latest
Expand All @@ -44,7 +81,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.21.x"
go-version: "1.22.x"
cache: false # golangci-lint-action handles its own caching

- name: golangci-lint
Expand Down
24 changes: 24 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from golang:1.24-alpine as BUILDER

WORKDIR /herd

COPY . .

WORKDIR /herd/examples/playwright

RUN go mod download

RUN go build -o main main.go

from ubuntu:24.04 AS RUNNER

RUN apt update && apt install -y nodejs npm


RUN npx playwright install chromium --with-deps

WORKDIR /app

COPY --from=BUILDER /herd/examples/playwright/main .

CMD ["./main"]
5 changes: 5 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
services:
playwright:
build: .
ports:
- "8080:8080"
137 changes: 137 additions & 0 deletions factory_cgroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// factory_cgroup_test.go — Unit tests for ProcessFactory cgroup configuration.
//
// No build tag: runs on all platforms (macOS, Linux, Windows).
// No processes are spawned; only field values and option validation are tested.
package herd

import (
"testing"
)

func TestNewProcessFactory_DefaultCgroupPIDs(t *testing.T) {
f := NewProcessFactory("./fake-binary")
if f.cgroupPIDs != 100 {
t.Errorf("expected default cgroupPIDs=100, got %d", f.cgroupPIDs)
}
}

func TestNewProcessFactory_DefaultMemoryCPUUnlimited(t *testing.T) {
f := NewProcessFactory("./fake-binary")
if f.cgroupMemory != 0 {
t.Errorf("expected default cgroupMemory=0 (unlimited), got %d", f.cgroupMemory)
}
if f.cgroupCPU != 0 {
t.Errorf("expected default cgroupCPU=0 (unlimited), got %d", f.cgroupCPU)
}
}

func TestWithMemoryLimit_StoresBytes(t *testing.T) {
const limit = 512 * 1024 * 1024 // 512 MB
f := NewProcessFactory("./fake-binary").WithMemoryLimit(limit)
if f.cgroupMemory != limit {
t.Errorf("expected cgroupMemory=%d, got %d", limit, f.cgroupMemory)
}
}

func TestWithMemoryLimit_Zero_DisablesLimit(t *testing.T) {
f := NewProcessFactory("./fake-binary").WithMemoryLimit(512 * 1024 * 1024).WithMemoryLimit(0)
if f.cgroupMemory != 0 {
t.Errorf("expected cgroupMemory=0 after zeroing, got %d", f.cgroupMemory)
}
}

func TestWithMemoryLimit_NegativePanics(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Error("expected panic for negative WithMemoryLimit")
}
}()
NewProcessFactory("./fake-binary").WithMemoryLimit(-1)
}

func TestWithCPULimit_HalfCore(t *testing.T) {
f := NewProcessFactory("./fake-binary").WithCPULimit(0.5)
if f.cgroupCPU != 50_000 {
t.Errorf("expected cgroupCPU=50000 for 0.5 cores, got %d", f.cgroupCPU)
}
}

func TestWithCPULimit_TwoCores(t *testing.T) {
f := NewProcessFactory("./fake-binary").WithCPULimit(2.0)
if f.cgroupCPU != 200_000 {
t.Errorf("expected cgroupCPU=200000 for 2.0 cores, got %d", f.cgroupCPU)
}
}

func TestWithCPULimit_Zero_DisablesLimit(t *testing.T) {
f := NewProcessFactory("./fake-binary").WithCPULimit(1.0).WithCPULimit(0)
if f.cgroupCPU != 0 {
t.Errorf("expected cgroupCPU=0 after zeroing, got %d", f.cgroupCPU)
}
}

func TestWithCPULimit_NegativePanics(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Error("expected panic for negative WithCPULimit")
}
}()
NewProcessFactory("./fake-binary").WithCPULimit(-0.5)
}

func TestWithPIDsLimit_Explicit(t *testing.T) {
f := NewProcessFactory("./fake-binary").WithPIDsLimit(50)
if f.cgroupPIDs != 50 {
t.Errorf("expected cgroupPIDs=50, got %d", f.cgroupPIDs)
}
}

func TestWithPIDsLimit_Unlimited(t *testing.T) {
f := NewProcessFactory("./fake-binary").WithPIDsLimit(-1)
if f.cgroupPIDs != -1 {
t.Errorf("expected cgroupPIDs=-1 for unlimited, got %d", f.cgroupPIDs)
}
}

func TestWithPIDsLimit_ZeroPanics(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Error("expected panic for WithPIDsLimit(0)")
}
}()
NewProcessFactory("./fake-binary").WithPIDsLimit(0)
}

func TestWithPIDsLimit_LessThanNegativeOnePanics(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Error("expected panic for WithPIDsLimit(-2)")
}
}()
NewProcessFactory("./fake-binary").WithPIDsLimit(-2)
}

func TestWithPIDsLimit_Chaining(t *testing.T) {
// Verify the builder returns the same factory pointer for fluent chaining.
f := NewProcessFactory("./fake-binary")
f2 := f.WithPIDsLimit(25)
if f != f2 {
t.Error("WithPIDsLimit should return the same *ProcessFactory for chaining")
}
}

func TestWithMemoryLimit_Chaining(t *testing.T) {
f := NewProcessFactory("./fake-binary")
f2 := f.WithMemoryLimit(1024)
if f != f2 {
t.Error("WithMemoryLimit should return the same *ProcessFactory for chaining")
}
}

func TestWithCPULimit_Chaining(t *testing.T) {
f := NewProcessFactory("./fake-binary")
f2 := f.WithCPULimit(1.0)
if f != f2 {
t.Error("WithCPULimit should return the same *ProcessFactory for chaining")
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/hackstrix/herd

go 1.21
go 1.22
86 changes: 79 additions & 7 deletions process_worker_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type processWorker struct {
healthPath string // e.g. "/health" or "/"
client *http.Client

cgroupHandle sandboxHandle

mu sync.Mutex
cmd *exec.Cmd
sessionID string // guarded by mu
Expand Down Expand Up @@ -134,6 +136,9 @@ func (w *processWorker) monitor() {

// broadcast to all the listeners that the worker is dead.
close(w.dead)
if w.cgroupHandle != nil {
w.cgroupHandle.Cleanup()
}

w.mu.Lock()
prevSession := w.sessionID
Expand Down Expand Up @@ -166,6 +171,10 @@ type ProcessFactory struct {
healthPath string // path to poll for liveness; defaults to "/health"
startTimeout time.Duration // maximum time to wait for the first successful health check
startHealthCheckDelay time.Duration // delay the health check for the first time.
enableSandbox bool // true by default for isolation
cgroupMemory int64 // bytes; 0 means unlimited
cgroupCPU int64 // quota in micros per 100ms period; 0 means unlimited
cgroupPIDs int64 // max pids; -1 means unlimited
counter atomic.Int64
}

Expand All @@ -183,6 +192,8 @@ func NewProcessFactory(binary string, args ...string) *ProcessFactory {
healthPath: "/health",
startTimeout: 30 * time.Second,
startHealthCheckDelay: 1 * time.Second,
enableSandbox: true,
cgroupPIDs: 100,
}
}

Expand Down Expand Up @@ -229,6 +240,48 @@ func (f *ProcessFactory) WithStartHealthCheckDelay(d time.Duration) *ProcessFact
return f
}

// WithMemoryLimit sets the cgroup memory limit, in bytes, for each spawned worker.
// A value of 0 disables the memory limit.
func (f *ProcessFactory) WithMemoryLimit(bytes int64) *ProcessFactory {
if bytes < 0 {
panic("herd: WithMemoryLimit bytes must be >= 0")
}
f.cgroupMemory = bytes
return f
}

// WithCPULimit sets the cgroup CPU quota in cores for each spawned worker.
// For example, 0.5 means half a CPU and 2 means two CPUs. A value of 0 disables the limit.
func (f *ProcessFactory) WithCPULimit(cores float64) *ProcessFactory {
if cores < 0 {
panic("herd: WithCPULimit cores must be >= 0")
}
if cores == 0 {
f.cgroupCPU = 0
return f
}
f.cgroupCPU = int64(cores * 100_000)
return f
}

// WithPIDsLimit sets the cgroup PID limit for each spawned worker.
// Pass -1 for unlimited. Values of 0 or less than -1 are invalid.
func (f *ProcessFactory) WithPIDsLimit(n int64) *ProcessFactory {
if n == 0 || n < -1 {
panic("herd: WithPIDsLimit n must be > 0 or -1 for unlimited")
}
f.cgroupPIDs = n
return f
}

// WithInsecureSandbox disables the namespace/cgroup sandbox.
// Use only for local debugging on non-Linux systems or when you explicitly
// trust the spawned processes.
func (f *ProcessFactory) WithInsecureSandbox() *ProcessFactory {
f.enableSandbox = false
return f
}

func streamLogs(workerID string, pipe io.ReadCloser, isError bool) {
// bufio.Scanner guarantees we read line-by-line, preventing torn logs.
scanner := bufio.NewScanner(pipe)
Expand Down Expand Up @@ -271,6 +324,21 @@ func (f *ProcessFactory) Spawn(ctx context.Context) (Worker[*http.Client], error
// During program exits, this should be cleaned up by the Shutdown method
cmd := exec.Command(f.binary, resolvedArgs...)
cmd.Env = append(os.Environ(), append([]string{"PORT=" + portStr}, resolvedEnv...)...)
var cgroupHandle sandboxHandle

if f.enableSandbox {
h, err := applySandboxFlags(cmd, id, sandboxConfig{
memoryMaxBytes: f.cgroupMemory,
cpuMaxMicros: f.cgroupCPU,
pidsMax: f.cgroupPIDs,
})
if err != nil {
return nil, fmt.Errorf("herd: ProcessFactory: failed to apply sandbox: %w", err)
}
cgroupHandle = h
} else {
log.Printf("[%s] WARNING: running UN-SANDBOXED. Not recommended for production.", id)
}

stdout, err := cmd.StdoutPipe()
if err != nil {
Expand All @@ -284,20 +352,24 @@ func (f *ProcessFactory) Spawn(ctx context.Context) (Worker[*http.Client], error
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("herd: ProcessFactory: start %s: %w", f.binary, err)
}
if cgroupHandle != nil {
cgroupHandle.PostStart()
}
log.Printf("[%s] started pid=%d addr=%s", id, cmd.Process.Pid, address)

// Stream logs in background
go streamLogs(id, stdout, false)
go streamLogs(id, stderr, true)

w := &processWorker{
id: id,
port: port,
address: address,
healthPath: f.healthPath,
client: &http.Client{Timeout: 3 * time.Second},
cmd: cmd,
dead: make(chan struct{}),
id: id,
port: port,
address: address,
healthPath: f.healthPath,
client: &http.Client{Timeout: 3 * time.Second},
cgroupHandle: cgroupHandle,
cmd: cmd,
dead: make(chan struct{}),
}

// Monitor the process in background — fires onCrash if it exits unexpectedly
Expand Down
16 changes: 16 additions & 0 deletions sandbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package herd

// sandboxConfig contains per-worker sandbox resource constraints.
// A value of 0 means "unlimited" for memory and CPU.
type sandboxConfig struct {
memoryMaxBytes int64
cpuMaxMicros int64
pidsMax int64
}

// sandboxHandle owns post-start and cleanup hooks for sandbox resources.
// Implementations may be no-op on unsupported or soft-fail paths.
type sandboxHandle interface {
PostStart()
Cleanup()
}
Loading
Loading