Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,36 @@ Herd is built around three core interfaces:

---

## 📊 Monitoring

`Pool.Stats()` returns a point-in-time snapshot of both **pool state** and **host resource usage**, powered by the `herd/observer` subpackage.

```go
import (
"fmt"
"github.com/hackstrix/herd"
)

stats := pool.Stats()

fmt.Printf("Workers : %d total, %d available\n",
stats.TotalWorkers, stats.AvailableWorkers)
fmt.Printf("Sessions: %d active, %d acquiring\n",
stats.ActiveSessions, stats.InflightAcquires)

// Node-level resource snapshot (Linux only; zero on macOS/Windows)
fmt.Printf("Host RAM: %d MB total, %d MB available\n",
stats.Node.TotalMemoryBytes/1024/1024,
stats.Node.AvailableMemoryBytes/1024/1024)
fmt.Printf("CPU Idle: %.1f%%\n", stats.Node.CPUIdle*100)
```

> **Note:** On Linux, `Stats()` blocks for ~100 ms to measure CPU idle via two `/proc/stat` samples. Cache the result if you expose it on a high-traffic metrics endpoint.

The `Node` field is zero-valued on non-Linux platforms — treat a zero `TotalMemoryBytes` as "metrics unavailable" rather than "machine has no RAM."

---

## 📄 License

MIT License. See `LICENSE` for details.
46 changes: 46 additions & 0 deletions observer/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Package observer provides lightweight, OS-level resource sampling.
//
// # Overview
//
// PollNodeStats returns a point-in-time snapshot of host memory and CPU
// availability. It is designed to be called from Pool.Stats so that
// dashboards and alerting systems can see not just pool-level metrics
// (workers, sessions) but also whether the *host* is under pressure.
//
// # Platform support
//
// - Linux: reads /proc/meminfo and /proc/stat (two samples, 100 ms apart).
// - All other platforms: returns a zero-valued NodeStats with no error.
// This allows the observer package to compile and be imported on macOS and
// Windows without any build-tag gymnastics in the caller.
//
// # CPU measurement latency
//
// On Linux, PollNodeStats blocks for ~100 ms because computing CPU idle
// requires two /proc/stat snapshots separated by a measurement window.
// If you call Pool.Stats() in a hot path, cache the result or call
// PollNodeStats from a background goroutine.
package observer

// NodeStats is a point-in-time snapshot of host resource availability.
// All fields are zero on non-Linux platforms.
type NodeStats struct {
// TotalMemoryBytes is the total physical RAM on the host, in bytes.
TotalMemoryBytes int64

// AvailableMemoryBytes is the amount of memory available for new
// processes without swapping (corresponds to /proc/meminfo MemAvailable).
AvailableMemoryBytes int64

// CPUIdle is the fraction of CPU time that was idle during the
// measurement window (0.0 = fully busy, 1.0 = fully idle).
// Averaged across all logical cores.
CPUIdle float64
}

// PollNodeStats returns a current snapshot of host resources.
// On non-Linux platforms it returns a zero NodeStats and nil error.
// On Linux it blocks for ~100 ms to measure CPU idle.
func PollNodeStats() (NodeStats, error) {
return pollNodeStats()
}
166 changes: 166 additions & 0 deletions observer/observer_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
//go:build linux

package observer

import (
"bufio"
"fmt"
"os"
"strconv"
"strings"
"time"
)

// pollNodeStats is the Linux implementation of PollNodeStats.
// It reads /proc/meminfo for memory and /proc/stat twice (100 ms apart)
// for CPU idle.
func pollNodeStats() (NodeStats, error) {
mem, err := readMemInfo()
if err != nil {
return NodeStats{}, fmt.Errorf("observer: readMemInfo: %w", err)
}

cpu, err := measureCPUIdle(100 * time.Millisecond)
if err != nil {
return NodeStats{}, fmt.Errorf("observer: measureCPUIdle: %w", err)
}

return NodeStats{
TotalMemoryBytes: mem.total,
AvailableMemoryBytes: mem.available,
CPUIdle: cpu,
}, nil
}

// ---------------------------------------------------------------------------
// /proc/meminfo
// ---------------------------------------------------------------------------

type memInfo struct {
total int64 // bytes
available int64 // bytes
}

// readMemInfo parses /proc/meminfo and returns MemTotal and MemAvailable.
// Values in the file are in kibibytes (kB); we convert to bytes.
func readMemInfo() (memInfo, error) {
f, err := os.Open("/proc/meminfo")
if err != nil {
return memInfo{}, err
}
defer f.Close()

var info memInfo
found := 0
scanner := bufio.NewScanner(f)
for scanner.Scan() && found < 2 {
line := scanner.Text()
var key string
var val int64
// Format: "MemTotal: 16384000 kB"
parts := strings.Fields(line)
if len(parts) < 2 {
continue
}
key = strings.TrimSuffix(parts[0], ":")
val, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil {
continue
}
val *= 1024 // kB → bytes
switch key {
case "MemTotal":
info.total = val
found++
case "MemAvailable":
info.available = val
found++
}
}
if err := scanner.Err(); err != nil {
return memInfo{}, err
}
return info, nil
}

// ---------------------------------------------------------------------------
// /proc/stat CPU idle
// ---------------------------------------------------------------------------

// cpuStat holds the raw CPU tick counters from /proc/stat's "cpu" line.
type cpuStat struct {
user int64
nice int64
system int64
idle int64
iowait int64
irq int64
softirq int64
steal int64
}

func (s cpuStat) total() int64 {
return s.user + s.nice + s.system + s.idle + s.iowait + s.irq + s.softirq + s.steal
}

func (s cpuStat) idleTotal() int64 {
return s.idle + s.iowait
}

// readCPUStat reads the first "cpu" line from /proc/stat.
func readCPUStat() (cpuStat, error) {
f, err := os.Open("/proc/stat")
if err != nil {
return cpuStat{}, err
}
defer f.Close()

scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "cpu ") {
continue
}
// "cpu 264230 882 69154 1490779 3965 0 3080 0 0 0"
fields := strings.Fields(line)
if len(fields) < 5 {
return cpuStat{}, fmt.Errorf("observer: unexpected /proc/stat format: %q", line)
}
parse := func(i int) int64 {
v, _ := strconv.ParseInt(fields[i], 10, 64)
return v
}
return cpuStat{
user: parse(1),
nice: parse(2),
system: parse(3),
idle: parse(4),
iowait: parse(5),
irq: parse(6),
softirq: parse(7),
steal: parse(8),
}, nil
}
return cpuStat{}, fmt.Errorf("observer: 'cpu' line not found in /proc/stat")
}

// measureCPUIdle takes two /proc/stat snapshots separated by window and
// returns the fraction of CPU time spent idle (0.0 = fully busy, 1.0 = idle).
func measureCPUIdle(window time.Duration) (float64, error) {
before, err := readCPUStat()
if err != nil {
return 0, err
}
time.Sleep(window)
after, err := readCPUStat()
if err != nil {
return 0, err
}

totalDelta := after.total() - before.total()
idleDelta := after.idleTotal() - before.idleTotal()
if totalDelta <= 0 {
return 1.0, nil // no ticks elapsed — treat as fully idle
}
return float64(idleDelta) / float64(totalDelta), nil
}
43 changes: 43 additions & 0 deletions observer/observer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package observer

import (
"runtime"
"testing"
)

func TestPollNodeStats(t *testing.T) {
stats, err := PollNodeStats()
if err != nil {
t.Fatalf("PollNodeStats() returned error: %v", err)
}

if runtime.GOOS != "linux" {
// On macOS / Windows the stub returns zeroes — that's expected.
t.Logf("non-Linux platform (%s): NodeStats is zero-valued by design", runtime.GOOS)
if stats.TotalMemoryBytes != 0 || stats.AvailableMemoryBytes != 0 || stats.CPUIdle != 0 {
t.Errorf("expected zero NodeStats on non-Linux, got %+v", stats)
}
return
}

// On Linux all fields must be populated.
if stats.TotalMemoryBytes <= 0 {
t.Errorf("TotalMemoryBytes should be > 0 on Linux, got %d", stats.TotalMemoryBytes)
}
if stats.AvailableMemoryBytes <= 0 {
t.Errorf("AvailableMemoryBytes should be > 0 on Linux, got %d", stats.AvailableMemoryBytes)
}
if stats.AvailableMemoryBytes > stats.TotalMemoryBytes {
t.Errorf("AvailableMemoryBytes (%d) > TotalMemoryBytes (%d) — not possible",
stats.AvailableMemoryBytes, stats.TotalMemoryBytes)
}
if stats.CPUIdle < 0.0 || stats.CPUIdle > 1.0 {
t.Errorf("CPUIdle must be in [0, 1], got %f", stats.CPUIdle)
}

t.Logf("NodeStats: total=%d MB available=%d MB cpuIdle=%.2f%%",
stats.TotalMemoryBytes/1024/1024,
stats.AvailableMemoryBytes/1024/1024,
stats.CPUIdle*100,
)
}
11 changes: 11 additions & 0 deletions observer/observer_unsupported.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//go:build !linux

package observer

// pollNodeStats returns a zeroed NodeStats on non-Linux platforms.
// This is intentional — there is no /proc filesystem on macOS or Windows.
// Callers should treat a zero NodeStats as "metrics unavailable" rather
// than "machine has zero memory."
func pollNodeStats() (NodeStats, error) {
return NodeStats{}, nil
}
28 changes: 25 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ import (
"log"
"sync"
"time"

"github.com/hackstrix/herd/observer"
)

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -114,6 +116,13 @@ type PoolStats struct {
// InflightAcquires is the number of Acquire calls currently in the "slow path"
// (waiting for a worker to become available). Useful for queue-depth alerting.
InflightAcquires int

// Node is a snapshot of host-level resource availability (memory, CPU idle).
// Populated by observer.PollNodeStats(); zero-valued on non-Linux platforms
// or if the poll fails.
//
// Note: on Linux, Stats() blocks for ~100 ms to measure CPU idle.
Node observer.NodeStats
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -347,7 +356,7 @@ func (p *Pool[C]) release(sessionID string, w Worker[C]) {
p.mu.Unlock()

if !isValid {
log.Printf("[pool] release(%q): worker %s returned to pool", sessionID, w.ID())
log.Printf("[pool] release(%q): worker %s already evicted (crash or health-check), discarding", sessionID, w.ID())
return
}

Expand Down Expand Up @@ -518,24 +527,37 @@ func (p *Pool[C]) healthCheckLoop() {

// Stats returns a point-in-time snapshot of pool state.
// Safe to call concurrently.
//
// On Linux this blocks for ~100 ms to measure CPU idle via /proc/stat.
// Cache the result if you call Stats() in a hot path.
func (p *Pool[C]) Stats() PoolStats {
nodeStats, _ := observer.PollNodeStats()

p.mu.Lock()
defer p.mu.Unlock()
return PoolStats{
TotalWorkers: len(p.workers),
AvailableWorkers: len(p.available),
ActiveSessions: len(p.sessions),
InflightAcquires: len(p.inflight),
Node: nodeStats,
}
}

// Shutdown gracefully stops the pool.
// It closes all background goroutines and then kills every worker.
// In-flight Acquire calls will receive a context cancellation error if
// the caller's ctx is tied to the application lifetime.
//
// Two signals are sent deliberately:
// - p.cancel() cancels p.ctx, which unblocks any in-flight addWorker
// goroutines that are blocking on factory.Spawn (they use a
// context.WithTimeout derived from p.ctx).
// - close(p.done) signals the healthCheckLoop and runTTLSweep goroutines
// to exit their ticker loops cleanly.
func (p *Pool[C]) Shutdown(ctx context.Context) error {
p.cancel()
close(p.done)
p.cancel() // kill in-flight factory.Spawn calls
close(p.done) // stop background health-check and TTL loops
p.mu.Lock()
workers := make([]Worker[C], len(p.workers))
copy(workers, p.workers)
Expand Down
Loading
Loading