diff --git a/README.md b/README.md index 24d0a2d..30b6a76 100644 --- a/README.md +++ b/README.md @@ -213,6 +213,36 @@ Herd is built around three core interfaces: --- +## 📊 Monitoring + +`Pool.Stats()` returns a point-in-time snapshot of both **pool state** and **host resource usage**, powered by the `herd/observer` subpackage. + +```go +import ( + "fmt" + "github.com/hackstrix/herd" +) + +stats := pool.Stats() + +fmt.Printf("Workers : %d total, %d available\n", + stats.TotalWorkers, stats.AvailableWorkers) +fmt.Printf("Sessions: %d active, %d acquiring\n", + stats.ActiveSessions, stats.InflightAcquires) + +// Node-level resource snapshot (Linux only; zero on macOS/Windows) +fmt.Printf("Host RAM: %d MB total, %d MB available\n", + stats.Node.TotalMemoryBytes/1024/1024, + stats.Node.AvailableMemoryBytes/1024/1024) +fmt.Printf("CPU Idle: %.1f%%\n", stats.Node.CPUIdle*100) +``` + +> **Note:** On Linux, `Stats()` blocks for ~100 ms to measure CPU idle via two `/proc/stat` samples. Cache the result if you expose it on a high-traffic metrics endpoint. + +The `Node` field is zero-valued on non-Linux platforms — treat a zero `TotalMemoryBytes` as "metrics unavailable" rather than "machine has no RAM." + +--- + ## 📄 License MIT License. See `LICENSE` for details. diff --git a/observer/observer.go b/observer/observer.go new file mode 100644 index 0000000..e660f9a --- /dev/null +++ b/observer/observer.go @@ -0,0 +1,46 @@ +// Package observer provides lightweight, OS-level resource sampling. +// +// # Overview +// +// PollNodeStats returns a point-in-time snapshot of host memory and CPU +// availability. It is designed to be called from Pool.Stats so that +// dashboards and alerting systems can see not just pool-level metrics +// (workers, sessions) but also whether the *host* is under pressure. +// +// # Platform support +// +// - Linux: reads /proc/meminfo and /proc/stat (two samples, 100 ms apart). +// - All other platforms: returns a zero-valued NodeStats with no error. +// This allows the observer package to compile and be imported on macOS and +// Windows without any build-tag gymnastics in the caller. +// +// # CPU measurement latency +// +// On Linux, PollNodeStats blocks for ~100 ms because computing CPU idle +// requires two /proc/stat snapshots separated by a measurement window. +// If you call Pool.Stats() in a hot path, cache the result or call +// PollNodeStats from a background goroutine. +package observer + +// NodeStats is a point-in-time snapshot of host resource availability. +// All fields are zero on non-Linux platforms. +type NodeStats struct { + // TotalMemoryBytes is the total physical RAM on the host, in bytes. + TotalMemoryBytes int64 + + // AvailableMemoryBytes is the amount of memory available for new + // processes without swapping (corresponds to /proc/meminfo MemAvailable). + AvailableMemoryBytes int64 + + // CPUIdle is the fraction of CPU time that was idle during the + // measurement window (0.0 = fully busy, 1.0 = fully idle). + // Averaged across all logical cores. + CPUIdle float64 +} + +// PollNodeStats returns a current snapshot of host resources. +// On non-Linux platforms it returns a zero NodeStats and nil error. +// On Linux it blocks for ~100 ms to measure CPU idle. +func PollNodeStats() (NodeStats, error) { + return pollNodeStats() +} diff --git a/observer/observer_linux.go b/observer/observer_linux.go new file mode 100644 index 0000000..294dc6d --- /dev/null +++ b/observer/observer_linux.go @@ -0,0 +1,166 @@ +//go:build linux + +package observer + +import ( + "bufio" + "fmt" + "os" + "strconv" + "strings" + "time" +) + +// pollNodeStats is the Linux implementation of PollNodeStats. +// It reads /proc/meminfo for memory and /proc/stat twice (100 ms apart) +// for CPU idle. +func pollNodeStats() (NodeStats, error) { + mem, err := readMemInfo() + if err != nil { + return NodeStats{}, fmt.Errorf("observer: readMemInfo: %w", err) + } + + cpu, err := measureCPUIdle(100 * time.Millisecond) + if err != nil { + return NodeStats{}, fmt.Errorf("observer: measureCPUIdle: %w", err) + } + + return NodeStats{ + TotalMemoryBytes: mem.total, + AvailableMemoryBytes: mem.available, + CPUIdle: cpu, + }, nil +} + +// --------------------------------------------------------------------------- +// /proc/meminfo +// --------------------------------------------------------------------------- + +type memInfo struct { + total int64 // bytes + available int64 // bytes +} + +// readMemInfo parses /proc/meminfo and returns MemTotal and MemAvailable. +// Values in the file are in kibibytes (kB); we convert to bytes. +func readMemInfo() (memInfo, error) { + f, err := os.Open("/proc/meminfo") + if err != nil { + return memInfo{}, err + } + defer f.Close() + + var info memInfo + found := 0 + scanner := bufio.NewScanner(f) + for scanner.Scan() && found < 2 { + line := scanner.Text() + var key string + var val int64 + // Format: "MemTotal: 16384000 kB" + parts := strings.Fields(line) + if len(parts) < 2 { + continue + } + key = strings.TrimSuffix(parts[0], ":") + val, err = strconv.ParseInt(parts[1], 10, 64) + if err != nil { + continue + } + val *= 1024 // kB → bytes + switch key { + case "MemTotal": + info.total = val + found++ + case "MemAvailable": + info.available = val + found++ + } + } + if err := scanner.Err(); err != nil { + return memInfo{}, err + } + return info, nil +} + +// --------------------------------------------------------------------------- +// /proc/stat CPU idle +// --------------------------------------------------------------------------- + +// cpuStat holds the raw CPU tick counters from /proc/stat's "cpu" line. +type cpuStat struct { + user int64 + nice int64 + system int64 + idle int64 + iowait int64 + irq int64 + softirq int64 + steal int64 +} + +func (s cpuStat) total() int64 { + return s.user + s.nice + s.system + s.idle + s.iowait + s.irq + s.softirq + s.steal +} + +func (s cpuStat) idleTotal() int64 { + return s.idle + s.iowait +} + +// readCPUStat reads the first "cpu" line from /proc/stat. +func readCPUStat() (cpuStat, error) { + f, err := os.Open("/proc/stat") + if err != nil { + return cpuStat{}, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + if !strings.HasPrefix(line, "cpu ") { + continue + } + // "cpu 264230 882 69154 1490779 3965 0 3080 0 0 0" + fields := strings.Fields(line) + if len(fields) < 5 { + return cpuStat{}, fmt.Errorf("observer: unexpected /proc/stat format: %q", line) + } + parse := func(i int) int64 { + v, _ := strconv.ParseInt(fields[i], 10, 64) + return v + } + return cpuStat{ + user: parse(1), + nice: parse(2), + system: parse(3), + idle: parse(4), + iowait: parse(5), + irq: parse(6), + softirq: parse(7), + steal: parse(8), + }, nil + } + return cpuStat{}, fmt.Errorf("observer: 'cpu' line not found in /proc/stat") +} + +// measureCPUIdle takes two /proc/stat snapshots separated by window and +// returns the fraction of CPU time spent idle (0.0 = fully busy, 1.0 = idle). +func measureCPUIdle(window time.Duration) (float64, error) { + before, err := readCPUStat() + if err != nil { + return 0, err + } + time.Sleep(window) + after, err := readCPUStat() + if err != nil { + return 0, err + } + + totalDelta := after.total() - before.total() + idleDelta := after.idleTotal() - before.idleTotal() + if totalDelta <= 0 { + return 1.0, nil // no ticks elapsed — treat as fully idle + } + return float64(idleDelta) / float64(totalDelta), nil +} diff --git a/observer/observer_test.go b/observer/observer_test.go new file mode 100644 index 0000000..99014df --- /dev/null +++ b/observer/observer_test.go @@ -0,0 +1,43 @@ +package observer + +import ( + "runtime" + "testing" +) + +func TestPollNodeStats(t *testing.T) { + stats, err := PollNodeStats() + if err != nil { + t.Fatalf("PollNodeStats() returned error: %v", err) + } + + if runtime.GOOS != "linux" { + // On macOS / Windows the stub returns zeroes — that's expected. + t.Logf("non-Linux platform (%s): NodeStats is zero-valued by design", runtime.GOOS) + if stats.TotalMemoryBytes != 0 || stats.AvailableMemoryBytes != 0 || stats.CPUIdle != 0 { + t.Errorf("expected zero NodeStats on non-Linux, got %+v", stats) + } + return + } + + // On Linux all fields must be populated. + if stats.TotalMemoryBytes <= 0 { + t.Errorf("TotalMemoryBytes should be > 0 on Linux, got %d", stats.TotalMemoryBytes) + } + if stats.AvailableMemoryBytes <= 0 { + t.Errorf("AvailableMemoryBytes should be > 0 on Linux, got %d", stats.AvailableMemoryBytes) + } + if stats.AvailableMemoryBytes > stats.TotalMemoryBytes { + t.Errorf("AvailableMemoryBytes (%d) > TotalMemoryBytes (%d) — not possible", + stats.AvailableMemoryBytes, stats.TotalMemoryBytes) + } + if stats.CPUIdle < 0.0 || stats.CPUIdle > 1.0 { + t.Errorf("CPUIdle must be in [0, 1], got %f", stats.CPUIdle) + } + + t.Logf("NodeStats: total=%d MB available=%d MB cpuIdle=%.2f%%", + stats.TotalMemoryBytes/1024/1024, + stats.AvailableMemoryBytes/1024/1024, + stats.CPUIdle*100, + ) +} diff --git a/observer/observer_unsupported.go b/observer/observer_unsupported.go new file mode 100644 index 0000000..e92f9f8 --- /dev/null +++ b/observer/observer_unsupported.go @@ -0,0 +1,11 @@ +//go:build !linux + +package observer + +// pollNodeStats returns a zeroed NodeStats on non-Linux platforms. +// This is intentional — there is no /proc filesystem on macOS or Windows. +// Callers should treat a zero NodeStats as "metrics unavailable" rather +// than "machine has zero memory." +func pollNodeStats() (NodeStats, error) { + return NodeStats{}, nil +} diff --git a/pool.go b/pool.go index d1f2690..ffee418 100644 --- a/pool.go +++ b/pool.go @@ -55,6 +55,8 @@ import ( "log" "sync" "time" + + "github.com/hackstrix/herd/observer" ) // --------------------------------------------------------------------------- @@ -114,6 +116,13 @@ type PoolStats struct { // InflightAcquires is the number of Acquire calls currently in the "slow path" // (waiting for a worker to become available). Useful for queue-depth alerting. InflightAcquires int + + // Node is a snapshot of host-level resource availability (memory, CPU idle). + // Populated by observer.PollNodeStats(); zero-valued on non-Linux platforms + // or if the poll fails. + // + // Note: on Linux, Stats() blocks for ~100 ms to measure CPU idle. + Node observer.NodeStats } // --------------------------------------------------------------------------- @@ -347,7 +356,7 @@ func (p *Pool[C]) release(sessionID string, w Worker[C]) { p.mu.Unlock() if !isValid { - log.Printf("[pool] release(%q): worker %s returned to pool", sessionID, w.ID()) + log.Printf("[pool] release(%q): worker %s already evicted (crash or health-check), discarding", sessionID, w.ID()) return } @@ -518,7 +527,12 @@ func (p *Pool[C]) healthCheckLoop() { // Stats returns a point-in-time snapshot of pool state. // Safe to call concurrently. +// +// On Linux this blocks for ~100 ms to measure CPU idle via /proc/stat. +// Cache the result if you call Stats() in a hot path. func (p *Pool[C]) Stats() PoolStats { + nodeStats, _ := observer.PollNodeStats() + p.mu.Lock() defer p.mu.Unlock() return PoolStats{ @@ -526,6 +540,7 @@ func (p *Pool[C]) Stats() PoolStats { AvailableWorkers: len(p.available), ActiveSessions: len(p.sessions), InflightAcquires: len(p.inflight), + Node: nodeStats, } } @@ -533,9 +548,16 @@ func (p *Pool[C]) Stats() PoolStats { // It closes all background goroutines and then kills every worker. // In-flight Acquire calls will receive a context cancellation error if // the caller's ctx is tied to the application lifetime. +// +// Two signals are sent deliberately: +// - p.cancel() cancels p.ctx, which unblocks any in-flight addWorker +// goroutines that are blocking on factory.Spawn (they use a +// context.WithTimeout derived from p.ctx). +// - close(p.done) signals the healthCheckLoop and runTTLSweep goroutines +// to exit their ticker loops cleanly. func (p *Pool[C]) Shutdown(ctx context.Context) error { - p.cancel() - close(p.done) + p.cancel() // kill in-flight factory.Spawn calls + close(p.done) // stop background health-check and TTL loops p.mu.Lock() workers := make([]Worker[C], len(p.workers)) copy(workers, p.workers) diff --git a/pool_test.go b/pool_test.go index 3ac2bce..8b2ef87 100644 --- a/pool_test.go +++ b/pool_test.go @@ -94,7 +94,12 @@ func newTestPool(t *testing.T, workers ...*stubWorker) *Pool[*stubClient] { t.Helper() factory := newStubFactory(workers...) - // Build pool with min=0 so New() doesn't call Spawn at startup + // Build pool with min=0 so New() doesn't call Spawn at startup. + // ctx/cancel must be wired so that addWorker (called by maybeScaleUp) + // can derive its spawn timeout via context.WithTimeout(p.ctx, ...). + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // ensure no goroutine leaks after the test exits + p := &Pool[*stubClient]{ factory: factory, cfg: defaultConfig(), @@ -105,6 +110,8 @@ func newTestPool(t *testing.T, workers ...*stubWorker) *Pool[*stubClient] { workers: make([]Worker[*stubClient], 0, len(workers)), available: make(chan Worker[*stubClient], len(workers)), done: make(chan struct{}), + ctx: ctx, + cancel: cancel, } // Manually wire workers (same logic as New → wireWorker, minus crash hookup)