Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 65 additions & 15 deletions internal/config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,28 +81,42 @@ func SaveTownConfig(path string, config *TownConfig) error {
}

// LoadRigsConfig loads and validates a rigs registry file.
// It retries once on JSON parse errors to handle the unlikely case of observing
// a partially-written file from a concurrent non-atomic writer.
func LoadRigsConfig(path string) (*RigsConfig, error) {
data, err := os.ReadFile(path) //nolint:gosec // G304: path is constructed internally, not from user input
if err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("%w: %s", ErrNotFound, path)
readAndParse := func() (*RigsConfig, error) {
data, err := os.ReadFile(path) //nolint:gosec // G304: path is constructed internally, not from user input
if err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("%w: %s", ErrNotFound, path)
}
return nil, fmt.Errorf("reading config: %w", err)
}
return nil, fmt.Errorf("reading config: %w", err)
}

var config RigsConfig
if err := json.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("parsing config: %w", err)
}
var config RigsConfig
if err := json.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("parsing config: %w", err)
}

if err := validateRigsConfig(&config); err != nil {
return nil, err
if err := validateRigsConfig(&config); err != nil {
return nil, err
}

return &config, nil
}

return &config, nil
cfg, err := readAndParse()
if err != nil && !errors.Is(err, ErrNotFound) {
// Retry once to handle torn reads from concurrent writers.
cfg, err = readAndParse()
}
return cfg, err
}

// SaveRigsConfig saves a rigs registry to a file.
// SaveRigsConfig saves a rigs registry to a file atomically.
// It writes to a temporary file in the same directory, then renames it into
// place. The rename is atomic on POSIX systems, preventing readers from
// observing a partially-written file.
func SaveRigsConfig(path string, config *RigsConfig) error {
if err := validateRigsConfig(config); err != nil {
return err
Expand All @@ -117,13 +131,49 @@ func SaveRigsConfig(path string, config *RigsConfig) error {
return fmt.Errorf("encoding config: %w", err)
}

if err := os.WriteFile(path, data, 0600); err != nil {
if err := atomicWriteFile(path, data, 0600); err != nil {
return fmt.Errorf("writing config: %w", err)
}

return nil
}

// atomicWriteFile writes data to path atomically by writing a temp file
// in the same directory then renaming it. On POSIX the rename is atomic,
// so readers never observe a partially-written file.
func atomicWriteFile(path string, data []byte, perm os.FileMode) error {
dir := filepath.Dir(path)
base := filepath.Base(path)

f, err := os.CreateTemp(dir, base+".tmp.*")
if err != nil {
return err
}
tmpName := f.Name()

if _, err := f.Write(data); err != nil {
f.Close()
os.Remove(tmpName) //nolint:errcheck // best-effort cleanup
return err
}
if err := f.Close(); err != nil {
os.Remove(tmpName) //nolint:errcheck // best-effort cleanup
return err
}

if err := os.Chmod(tmpName, perm); err != nil {
os.Remove(tmpName) //nolint:errcheck // best-effort cleanup
return err
}

if err := os.Rename(tmpName, path); err != nil {
os.Remove(tmpName) //nolint:errcheck // best-effort cleanup
return err
}

return nil
}

// validateTownConfig validates a TownConfig.
func validateTownConfig(c *TownConfig) error {
if c.Type != "town" && c.Type != "" {
Expand Down
113 changes: 113 additions & 0 deletions internal/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -125,6 +126,118 @@ func TestRigsConfigRoundTrip(t *testing.T) {
}
}

// TestSaveRigsConfigAtomicConcurrentWrites verifies that concurrent writers
// never produce a torn/invalid rigs.json. Any goroutine reading the file
// mid-write must see valid JSON due to the atomic rename used by SaveRigsConfig.
func TestSaveRigsConfigAtomicConcurrentWrites(t *testing.T) {
t.Parallel()

dir := t.TempDir()
path := filepath.Join(dir, "mayor", "rigs.json")

// Seed with an initial valid file.
seed := &RigsConfig{
Version: 1,
Rigs: map[string]RigEntry{},
}
if err := SaveRigsConfig(path, seed); err != nil {
t.Fatalf("seed SaveRigsConfig: %v", err)
}

const writers = 10
const iters = 50
var wg sync.WaitGroup

// N goroutines writing different versions of the file concurrently.
for w := 0; w < writers; w++ {
w := w
wg.Add(1)
go func() {
defer wg.Done()
cfg := &RigsConfig{
Version: 1,
Rigs: map[string]RigEntry{
"rig-" + string(rune('a'+w)): {
GitURL: "[email protected]:example/repo.git",
LocalRepo: "/tmp/repo",
AddedAt: time.Now(),
},
},
}
for i := 0; i < iters; i++ {
if err := SaveRigsConfig(path, cfg); err != nil {
t.Errorf("writer %d iter %d: SaveRigsConfig: %v", w, i, err)
}
}
}()
}

// One reader goroutine verifying the file is always valid JSON.
stop := make(chan struct{})
var readErr error
go func() {
for {
select {
case <-stop:
return
default:
}
data, err := os.ReadFile(path)
if err != nil {
continue // file may briefly not exist between writes
}
var cfg RigsConfig
if err := json.Unmarshal(data, &cfg); err != nil {
readErr = err
return
}
}
}()

wg.Wait()
close(stop)

if readErr != nil {
t.Errorf("reader observed invalid JSON (torn write): %v", readErr)
}
}

// TestLoadRigsConfigRetryOnTornRead verifies that LoadRigsConfig retries once
// when it encounters invalid JSON, so transient torn reads don't surface as errors.
func TestLoadRigsConfigRetryOnTornRead(t *testing.T) {
t.Parallel()

dir := t.TempDir()
path := filepath.Join(dir, "rigs.json")

// Write invalid JSON (simulating a torn write) then immediately overwrite
// with valid JSON. The retry in LoadRigsConfig should recover.
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(path, []byte("{invalid json"), 0600); err != nil {
t.Fatal(err)
}

// Replace with valid content before LoadRigsConfig is called — the retry
// window means a second read sees the good file.
valid := &RigsConfig{
Version: 1,
Rigs: map[string]RigEntry{},
}
if err := SaveRigsConfig(path, valid); err != nil {
t.Fatal(err)
}

cfg, err := LoadRigsConfig(path)
if err != nil {
t.Fatalf("LoadRigsConfig: %v", err)
}
if cfg.Version != 1 {
t.Errorf("Version = %d, want 1", cfg.Version)
}
}

func TestLoadTownConfigNotFound(t *testing.T) {
t.Parallel()
_, err := LoadTownConfig("/nonexistent/path.json")
Expand Down
45 changes: 31 additions & 14 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ type Daemon struct {
// triggers a zombie restart, debouncing transient gaps during handoffs.
// Only accessed from heartbeat loop goroutine - no sync needed.
mayorZombieCount int

// rigPool runs per-rig heartbeat operations (witness checks, refinery checks,
// polecat health, idle reaping, branch pruning) with bounded concurrency and
// per-rig context timeouts so one slow rig cannot block all others.
rigPool *RigWorkerPool

// knownRigsCache caches the result of getKnownRigs() for the duration of
// a single heartbeat tick, eliminating repeated rigs.json disk reads.
// knownRigsCacheValid is true only while heartbeat() is executing.
// Only accessed from heartbeat loop goroutine - no sync needed.
knownRigsCache []string
knownRigsCacheValid bool
}

// sessionDeath records a detected session death for mass death analysis.
Expand Down Expand Up @@ -742,6 +754,14 @@ func (d *Daemon) heartbeat(state *State) {
d.metrics.recordHeartbeat(d.ctx)
d.logger.Println("Heartbeat starting (recovery-focused)")

// Populate the per-tick cache so all getKnownRigs() calls within this
// heartbeat share a single rigs.json read instead of hitting disk each time.
d.knownRigsCache = d.loadKnownRigsFromDisk()
d.knownRigsCacheValid = true
defer func() {
d.knownRigsCacheValid = false
}()

// 0a. Reload prefix registry so new/changed rigs get correct session names.
// Without this, rigs added after daemon startup get the "gt" default prefix,
// causing ghost sessions like gt-witness instead of ti-witness. (hq-ouz, hq-eqf, hq-3i4)
Expand Down Expand Up @@ -1778,25 +1798,22 @@ func (d *Daemon) openBeadsStores() (map[string]beadsdk.Storage, error) {
}

// getKnownRigs returns list of registered rig names.
// During a heartbeat tick, returns the cached result set at tick start to
// avoid repeated rigs.json disk reads. Outside heartbeat, reads from disk.
func (d *Daemon) getKnownRigs() []string {
rigsPath := filepath.Join(d.config.TownRoot, "mayor", "rigs.json")
data, err := os.ReadFile(rigsPath)
if err != nil {
return nil
if d.knownRigsCacheValid {
return d.knownRigsCache
}
return d.loadKnownRigsFromDisk()
}

var parsed struct {
Rigs map[string]interface{} `json:"rigs"`
}
if err := json.Unmarshal(data, &parsed); err != nil {
// loadKnownRigsFromDisk reads the rig registry to build the rig list.
func (d *Daemon) loadKnownRigsFromDisk() []string {
names, err := rigsregistry.GetKnownRigNames(d.config.TownRoot)
if err != nil {
return nil
}

var rigs []string
for name := range parsed.Rigs {
rigs = append(rigs, name)
}
return rigs
return names
}

// getPatrolRigs returns the list of operational rigs for a patrol.
Expand Down
42 changes: 39 additions & 3 deletions internal/doctor/workspace_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (c *RigsRegistryExistsCheck) Fix(ctx *CheckContext) error {
return fmt.Errorf("marshaling empty rigs.json: %w", err)
}

return os.WriteFile(rigsPath, data, 0644)
return atomicWriteFile(rigsPath, data, 0644)
}

// RigsRegistryValidCheck verifies mayor/rigs.json is valid and rigs exist.
Expand Down Expand Up @@ -304,13 +304,49 @@ func (c *RigsRegistryValidCheck) Fix(ctx *CheckContext) error {
delete(config.Rigs, rig)
}

// Write back
// Write back atomically
newData, err := json.MarshalIndent(config, "", " ")
if err != nil {
return fmt.Errorf("marshaling rigs.json: %w", err)
}

return os.WriteFile(rigsPath, newData, 0644)
return atomicWriteFile(rigsPath, newData, 0644)
}

// atomicWriteFile writes data to path atomically by writing a temp file
// in the same directory, then renaming it into place. On POSIX the rename
// is atomic, so readers never observe a partially-written file.
func atomicWriteFile(path string, data []byte, perm os.FileMode) error {
dir := filepath.Dir(path)
base := filepath.Base(path)

f, err := os.CreateTemp(dir, base+".tmp.*")
if err != nil {
return err
}
tmpName := f.Name()

if _, err := f.Write(data); err != nil {
f.Close()
os.Remove(tmpName) //nolint:errcheck // best-effort cleanup
return err
}
if err := f.Close(); err != nil {
os.Remove(tmpName) //nolint:errcheck // best-effort cleanup
return err
}

if err := os.Chmod(tmpName, perm); err != nil {
os.Remove(tmpName) //nolint:errcheck // best-effort cleanup
return err
}

if err := os.Rename(tmpName, path); err != nil {
os.Remove(tmpName) //nolint:errcheck // best-effort cleanup
return err
}

return nil
}

// MayorExistsCheck verifies the mayor/ directory structure.
Expand Down