Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ type Daemon struct {
// triggers a zombie restart, debouncing transient gaps during handoffs.
// Only accessed from heartbeat loop goroutine - no sync needed.
mayorZombieCount int

// rigPool runs per-rig heartbeat operations (witness checks, refinery checks,
// polecat health, idle reaping, branch pruning) with bounded concurrency and
// per-rig context timeouts so one slow rig cannot block all others.
rigPool *RigWorkerPool
}

// sessionDeath records a detected session death for mass death analysis.
Expand Down Expand Up @@ -315,6 +320,7 @@ func New(config *Config) (*Daemon, error) {
restartTracker: restartTracker,
otelProvider: otelProvider,
metrics: dm,
rigPool: newRigWorkerPool(0, 0, logger), // defaults: 10 workers, 30s timeout
}, nil
}

Expand Down Expand Up @@ -1412,9 +1418,10 @@ func (d *Daemon) checkDeaconHeartbeat() {
// Respects the rigs filter in daemon.json patrol config.
func (d *Daemon) ensureWitnessesRunning() {
rigs := d.getPatrolRigs("witness")
for _, rigName := range rigs {
d.rigPool.runPerRig(d.ctx, rigs, func(ctx context.Context, rigName string) error {
d.ensureWitnessRunning(rigName)
}
return nil
})
}

// hasPendingEvents checks if there are pending .event files in the given channel directory.
Expand Down Expand Up @@ -1488,9 +1495,10 @@ func (d *Daemon) ensureWitnessRunning(rigName string) {
// Respects the rigs filter in daemon.json patrol config.
func (d *Daemon) ensureRefineriesRunning() {
rigs := d.getPatrolRigs("refinery")
for _, rigName := range rigs {
d.rigPool.runPerRig(d.ctx, rigs, func(ctx context.Context, rigName string) error {
d.ensureRefineryRunning(rigName)
}
return nil
})
}

// ensureRefineryRunning ensures the refinery for a specific rig is running.
Expand Down Expand Up @@ -1626,7 +1634,7 @@ func (d *Daemon) killDeaconSessions() {
// killWitnessSessions kills leftover witness tmux sessions for all rigs.
// Called when the witness patrol is disabled. (hq-2mstj)
func (d *Daemon) killWitnessSessions() {
for _, rigName := range d.getKnownRigs() {
d.rigPool.runPerRig(d.ctx, d.getKnownRigs(), func(ctx context.Context, rigName string) error {
name := session.WitnessSessionName(session.PrefixFor(rigName))
exists, _ := d.tmux.HasSession(name)
if exists {
Expand All @@ -1635,13 +1643,14 @@ func (d *Daemon) killWitnessSessions() {
d.logger.Printf("Error killing %s session: %v", name, err)
}
}
}
return nil
})
}

// killRefinerySessions kills leftover refinery tmux sessions for all rigs.
// Called when the refinery patrol is disabled. (hq-2mstj)
func (d *Daemon) killRefinerySessions() {
for _, rigName := range d.getKnownRigs() {
d.rigPool.runPerRig(d.ctx, d.getKnownRigs(), func(ctx context.Context, rigName string) error {
name := session.RefinerySessionName(session.PrefixFor(rigName))
exists, _ := d.tmux.HasSession(name)
if exists {
Expand All @@ -1650,7 +1659,8 @@ func (d *Daemon) killRefinerySessions() {
d.logger.Printf("Error killing %s session: %v", name, err)
}
}
}
return nil
})
}

// killDefaultPrefixGhosts kills tmux sessions that use the default "gt" prefix
Expand Down Expand Up @@ -2234,10 +2244,10 @@ func KillOrphanedDaemons(townRoot string) (int, error) {
// When a crash is detected, the polecat is automatically restarted.
// This provides faster recovery than waiting for GUPP timeout or Witness detection.
func (d *Daemon) checkPolecatSessionHealth() {
rigs := d.getKnownRigs()
for _, rigName := range rigs {
d.rigPool.runPerRig(d.ctx, d.getKnownRigs(), func(ctx context.Context, rigName string) error {
d.checkRigPolecatHealth(rigName)
}
return nil
})
}

// checkRigPolecatHealth checks polecat session health for a specific rig.
Expand Down Expand Up @@ -2511,12 +2521,12 @@ Restart deferred to stuck-agent-dog plugin for context-aware recovery.`,
// This reaper checks heartbeat state and kills sessions idle longer than the threshold.
func (d *Daemon) reapIdlePolecats() {
opCfg := d.loadOperationalConfig().GetDaemonConfig()
timeout := opCfg.PolecatIdleSessionTimeoutD()
idleTimeout := opCfg.PolecatIdleSessionTimeoutD()

rigs := d.getKnownRigs()
for _, rigName := range rigs {
d.reapRigIdlePolecats(rigName, timeout)
}
d.rigPool.runPerRig(d.ctx, d.getKnownRigs(), func(ctx context.Context, rigName string) error {
d.reapRigIdlePolecats(rigName, idleTimeout)
return nil
})
}

// reapRigIdlePolecats checks all polecats in a rig and kills idle sessions.
Expand Down Expand Up @@ -2688,11 +2698,12 @@ func (d *Daemon) pruneStaleBranches() {
}
}

// Prune in each rig's git directory
for _, rigName := range d.getKnownRigs() {
// Prune in each rig's git directory (parallel — each rig is independent).
d.rigPool.runPerRig(d.ctx, d.getKnownRigs(), func(ctx context.Context, rigName string) error {
rigPath := filepath.Join(d.config.TownRoot, rigName)
pruneInDir(rigPath, rigName)
}
return nil
})

// Also prune in the town root itself (mayor clone)
pruneInDir(d.config.TownRoot, "town-root")
Expand Down
99 changes: 99 additions & 0 deletions internal/daemon/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package daemon

import (
"context"
"log"
"sync"
"time"
)

const (
defaultRigConcurrency = 10
defaultRigTimeout = 30 * time.Second
)

// RigWorkerPool runs per-rig heartbeat operations with bounded concurrency
// and per-rig context timeouts. This prevents a slow or hung rig from
// blocking heartbeat operations on all other rigs.
//
// With N rigs and a serial loop, the heartbeat takes O(N × max_op_time).
// With the pool, it takes O(max_op_time) — one slow rig no longer gates all others.
type RigWorkerPool struct {
concurrency int
timeout time.Duration
logger *log.Logger
}

// newRigWorkerPool creates a RigWorkerPool.
// Zero or negative values for concurrency and timeout fall back to package defaults.
func newRigWorkerPool(concurrency int, timeout time.Duration, logger *log.Logger) *RigWorkerPool {
if concurrency <= 0 {
concurrency = defaultRigConcurrency
}
if timeout <= 0 {
timeout = defaultRigTimeout
}
return &RigWorkerPool{
concurrency: concurrency,
timeout: timeout,
logger: logger,
}
}

// runPerRig executes fn once for each rig, with bounded concurrency and per-rig timeouts.
//
// Each invocation of fn receives a child context derived from parent with the pool's
// per-rig timeout applied. If fn respects its context (checks ctx.Done()), it will
// be canceled when the timeout fires.
//
// runPerRig blocks until all goroutines complete. Errors are counted and a single
// summary line is logged rather than per-rig noise.
func (p *RigWorkerPool) runPerRig(
parent context.Context,
rigs []string,
fn func(ctx context.Context, rigName string) error,
) {
if len(rigs) == 0 {
return
}

sem := make(chan struct{}, p.concurrency)
var wg sync.WaitGroup
var mu sync.Mutex
var errCount int

for _, r := range rigs {
wg.Add(1)
go func(rigName string) {
defer wg.Done()

// Acquire a worker slot; block until one is available.
sem <- struct{}{}
defer func() { <-sem }()

// Each rig gets its own timeout-bounded context so a slow rig
// can be signaled to stop without affecting other rigs.
ctx, cancel := context.WithTimeout(parent, p.timeout)
defer cancel()

if err := fn(ctx, rigName); err != nil {
mu.Lock()
errCount++
mu.Unlock()
if p.logger != nil {
p.logger.Printf("rig_worker: %s: %v", rigName, err)
}
}
}(r)
}

wg.Wait()

mu.Lock()
count := errCount
mu.Unlock()

if count > 0 && p.logger != nil {
p.logger.Printf("rig_worker: %d/%d rig(s) had errors", count, len(rigs))
}
}
Loading