diff --git a/internal/beads/handoff.go b/internal/beads/handoff.go index 6bfde94c69..10492f981b 100644 --- a/internal/beads/handoff.go +++ b/internal/beads/handoff.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "time" "github.com/steveyegge/gastown/internal/lock" @@ -44,6 +45,27 @@ func (b *Beads) FindHandoffBead(role string) (*Issue, error) { return nil, nil } +// FindAllHandoffBeads fetches all pinned beads once and returns a map from +// role name to handoff bead. This avoids the N+1 subprocess problem where +// FindHandoffBead is called once per agent, each spawning a bd subprocess. +func (b *Beads) FindAllHandoffBeads() (map[string]*Issue, error) { + issues, err := b.List(ListOptions{Status: StatusPinned, Priority: -1}) + if err != nil { + return nil, fmt.Errorf("listing pinned issues: %w", err) + } + + result := make(map[string]*Issue) + for _, issue := range issues { + // Handoff bead titles follow the pattern " Handoff" + if strings.HasSuffix(issue.Title, " Handoff") { + role := strings.TrimSuffix(issue.Title, " Handoff") + result[role] = issue + } + } + + return result, nil +} + // GetOrCreateHandoffBead returns the handoff bead for a role, creating it if needed. func (b *Beads) GetOrCreateHandoffBead(role string) (*Issue, error) { // Check if it exists diff --git a/internal/cmd/rig.go b/internal/cmd/rig.go index a9b46836dc..e33c7028ba 100644 --- a/internal/cmd/rig.go +++ b/internal/cmd/rig.go @@ -10,6 +10,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/spf13/cobra" @@ -1905,10 +1906,112 @@ func runRigStatus(cmd *cobra.Command, args []string) error { } fmt.Println() + // --- Parallel data gathering phase --- + // All expensive operations (tmux health checks, beads queries, git status) + // run concurrently. Display phase follows with pre-fetched data. + var dataWg sync.WaitGroup + // Witness status - fmt.Printf("%s\n", style.Bold.Render("Witness")) witMgr := witness.NewManager(r) - witnessRunning, _ := witMgr.IsRunning() + var witnessRunning bool + dataWg.Add(1) + go func() { + defer dataWg.Done() + witnessRunning, _ = witMgr.IsRunning() + }() + + // Refinery status + queue + refMgr := refinery.NewManager(r) + var refineryRunning bool + var refineryQueue []refinery.QueueItem + dataWg.Add(1) + go func() { + defer dataWg.Done() + refineryRunning, _ = refMgr.IsRunning() + if refineryRunning { + refineryQueue, _ = refMgr.Queue() + } + }() + + // Polecats list (involves per-polecat beads + git queries) + polecatGit := git.NewGit(r.Path) + polecatMgr := polecat.NewManager(r, polecatGit, t) + var polecats []*polecat.Polecat + var polecatsErr error + dataWg.Add(1) + go func() { + defer dataWg.Done() + polecats, polecatsErr = polecatMgr.List() + }() + + // Crew list + crewMgr := crew.NewManager(r, git.NewGit(townRoot)) + var crewWorkers []*crew.CrewWorker + var crewErr error + dataWg.Add(1) + go func() { + defer dataWg.Done() + crewWorkers, crewErr = crewMgr.List() + }() + + dataWg.Wait() + + // --- Polecat + Crew session checks (parallel, after List completes) --- + type polecatInfo struct { + name string + state polecat.State + issue string + hasSession bool + } + var pInfos []polecatInfo + type crewInfo struct { + name string + hasSession bool + branch string + dirty bool + } + var cInfos []crewInfo + + var sessionWg sync.WaitGroup + + if polecatsErr == nil && len(polecats) > 0 { + pInfos = make([]polecatInfo, len(polecats)) + for i, p := range polecats { + pInfos[i] = polecatInfo{name: p.Name, state: p.State, issue: p.Issue} + sessionWg.Add(1) + go func(idx int, p *polecat.Polecat) { + defer sessionWg.Done() + sessionName := session.PolecatSessionName(session.PrefixFor(rigName), p.Name) + pInfos[idx].hasSession, _ = t.HasSession(sessionName) + }(i, p) + } + } + + if crewErr == nil && len(crewWorkers) > 0 { + cInfos = make([]crewInfo, len(crewWorkers)) + for i, w := range crewWorkers { + cInfos[i] = crewInfo{name: w.Name} + sessionWg.Add(1) + go func(idx int, w *crew.CrewWorker) { + defer sessionWg.Done() + sessionName := crewSessionName(rigName, w.Name) + cInfos[idx].hasSession, _ = t.HasSession(sessionName) + crewGit := git.NewGit(w.ClonePath) + cInfos[idx].branch, _ = crewGit.CurrentBranch() + gitStatus, _ := crewGit.Status() + if gitStatus != nil && !gitStatus.Clean { + cInfos[idx].dirty = true + } + }(i, w) + } + } + + sessionWg.Wait() + + // --- Display phase (all data pre-fetched) --- + + // Witness + fmt.Printf("%s\n", style.Bold.Render("Witness")) if witnessRunning { fmt.Printf(" %s running\n", style.Success.Render("●")) } else { @@ -1916,16 +2019,12 @@ func runRigStatus(cmd *cobra.Command, args []string) error { } fmt.Println() - // Refinery status + // Refinery fmt.Printf("%s\n", style.Bold.Render("Refinery")) - refMgr := refinery.NewManager(r) - refineryRunning, _ := refMgr.IsRunning() if refineryRunning { fmt.Printf(" %s running\n", style.Success.Render("●")) - // Show queue size - queue, err := refMgr.Queue() - if err == nil && len(queue) > 0 { - fmt.Printf(" Queue: %d items\n", len(queue)) + if len(refineryQueue) > 0 { + fmt.Printf(" Queue: %d items\n", len(refineryQueue)) } } else { fmt.Printf(" %s stopped\n", style.Dim.Render("○")) @@ -1933,20 +2032,14 @@ func runRigStatus(cmd *cobra.Command, args []string) error { fmt.Println() // Polecats - polecatGit := git.NewGit(r.Path) - polecatMgr := polecat.NewManager(r, polecatGit, t) - polecats, err := polecatMgr.List() fmt.Printf("%s", style.Bold.Render("Polecats")) - if err != nil || len(polecats) == 0 { + if polecatsErr != nil || len(polecats) == 0 { fmt.Printf(" (none)\n") } else { fmt.Printf(" (%d)\n", len(polecats)) - for _, p := range polecats { - sessionName := session.PolecatSessionName(session.PrefixFor(rigName), p.Name) - hasSession, _ := t.HasSession(sessionName) - + for _, pi := range pInfos { sessionIcon := style.Dim.Render("○") - if hasSession { + if pi.hasSession { sessionIcon = style.Success.Render("●") } @@ -1957,51 +2050,41 @@ func runRigStatus(cmd *cobra.Command, args []string) error { // witness can detect unsubmitted work (gt-3071b). Previously this // showed "done" which masked failures where polecats died before // running gt done, leaving work stranded in worktrees. - displayState := p.State - if hasSession && displayState == polecat.StateDone { + displayState := pi.state + if pi.hasSession && displayState == polecat.StateDone { displayState = polecat.StateWorking - } else if !hasSession && displayState == polecat.StateWorking { + } else if !pi.hasSession && displayState == polecat.StateWorking { displayState = polecat.State("stalled") } stateStr := string(displayState) - if p.Issue != "" { - stateStr = fmt.Sprintf("%s → %s", displayState, p.Issue) + if pi.issue != "" { + stateStr = fmt.Sprintf("%s → %s", displayState, pi.issue) } - fmt.Printf(" %s %s: %s\n", sessionIcon, p.Name, stateStr) + fmt.Printf(" %s %s: %s\n", sessionIcon, pi.name, stateStr) } } fmt.Println() // Crew - crewMgr := crew.NewManager(r, git.NewGit(townRoot)) - crewWorkers, err := crewMgr.List() fmt.Printf("%s", style.Bold.Render("Crew")) - if err != nil || len(crewWorkers) == 0 { + if crewErr != nil || len(crewWorkers) == 0 { fmt.Printf(" (none)\n") } else { fmt.Printf(" (%d)\n", len(crewWorkers)) - for _, w := range crewWorkers { - sessionName := crewSessionName(rigName, w.Name) - hasSession, _ := t.HasSession(sessionName) - + for _, ci := range cInfos { sessionIcon := style.Dim.Render("○") - if hasSession { + if ci.hasSession { sessionIcon = style.Success.Render("●") } - // Get git info - crewGit := git.NewGit(w.ClonePath) - branch, _ := crewGit.CurrentBranch() - gitStatus, _ := crewGit.Status() - gitInfo := "" - if gitStatus != nil && !gitStatus.Clean { + if ci.dirty { gitInfo = style.Warning.Render(" (dirty)") } - fmt.Printf(" %s %s: %s%s\n", sessionIcon, w.Name, branch, gitInfo) + fmt.Printf(" %s %s: %s%s\n", sessionIcon, ci.name, ci.branch, gitInfo) } } diff --git a/internal/cmd/status.go b/internal/cmd/status.go index 9d4270f6e4..db3df46039 100644 --- a/internal/cmd/status.go +++ b/internal/cmd/status.go @@ -885,12 +885,42 @@ func gatherStatus() (TownStatus, error) { rs.CrewCount = len(workers) } + // Run hooks, agents, and MQ discovery concurrently within this rig. + // Each was previously sequential; now they overlap since they use + // independent bd/beads calls. + var rigWg sync.WaitGroup + // Discover hooks for all agents in this rig // In --fast mode, skip expensive handoff bead lookups. Hook info comes from // preloaded agent beads via discoverRigAgents instead. if !statusFast { - rs.Hooks = discoverRigHooks(r, rs.Crews) + rigWg.Add(1) + go func() { + defer rigWg.Done() + rs.Hooks = discoverRigHooks(r, rs.Crews) + }() } + + // Get MQ summary if rig has a refinery + // Skip in --fast mode to avoid expensive bd queries + if !statusFast { + rigWg.Add(1) + go func() { + defer rigWg.Done() + rs.MQ = getMQSummary(r) + }() + } + + // Discover runtime state for all agents in this rig + // (uses preloaded maps, so it's fast — but run concurrently with hooks/MQ) + rigWg.Add(1) + go func() { + defer rigWg.Done() + rs.Agents = discoverRigAgents(allSessions, r, rs.Crews, allAgentBeads, allHookBeads, mailRouter, statusFast) + }() + + rigWg.Wait() + activeHooks := 0 for _, hook := range rs.Hooks { if hook.HasWork { @@ -899,15 +929,6 @@ func gatherStatus() (TownStatus, error) { } rigActiveHooks[idx] = activeHooks - // Discover runtime state for all agents in this rig - rs.Agents = discoverRigAgents(allSessions, r, rs.Crews, allAgentBeads, allHookBeads, mailRouter, statusFast) - - // Get MQ summary if rig has a refinery - // Skip in --fast mode to avoid expensive bd queries - if !statusFast { - rs.MQ = getMQSummary(r) - } - status.Rigs[idx] = rs }(i, r) } @@ -1503,40 +1524,71 @@ func capitalizeFirst(s string) string { } // discoverRigHooks finds all hook attachments for agents in a rig. -// It scans polecats, crew workers, witness, and refinery for handoff beads. +// It fetches all pinned handoff beads in a single bd call, then resolves +// each agent's hook in-memory. This replaces the previous N+1 pattern where +// each agent triggered a separate bd subprocess. func discoverRigHooks(r *rig.Rig, crews []string) []AgentHookInfo { var hooks []AgentHookInfo // Create beads instance for the rig b := beads.New(r.Path) + // Batch-fetch all handoff beads in one bd call + allHandoffs, err := b.FindAllHandoffBeads() + if err != nil { + // On error, return empty hooks for all agents rather than failing + allHandoffs = make(map[string]*beads.Issue) + } + // Check polecats for _, name := range r.Polecats { - hook := getAgentHook(b, name, r.Name+"/"+name, constants.RolePolecat) - hooks = append(hooks, hook) + hooks = append(hooks, resolveHookFromMap(allHandoffs, name, r.Name+"/"+name, constants.RolePolecat)) } // Check crew workers for _, name := range crews { - hook := getAgentHook(b, name, r.Name+"/crew/"+name, constants.RoleCrew) - hooks = append(hooks, hook) + hooks = append(hooks, resolveHookFromMap(allHandoffs, name, r.Name+"/crew/"+name, constants.RoleCrew)) } // Check witness if r.HasWitness { - hook := getAgentHook(b, constants.RoleWitness, r.Name+"/witness", constants.RoleWitness) - hooks = append(hooks, hook) + hooks = append(hooks, resolveHookFromMap(allHandoffs, constants.RoleWitness, r.Name+"/witness", constants.RoleWitness)) } // Check refinery if r.HasRefinery { - hook := getAgentHook(b, constants.RoleRefinery, r.Name+"/refinery", constants.RoleRefinery) - hooks = append(hooks, hook) + hooks = append(hooks, resolveHookFromMap(allHandoffs, constants.RoleRefinery, r.Name+"/refinery", constants.RoleRefinery)) } return hooks } +// resolveHookFromMap builds an AgentHookInfo from a pre-fetched map of handoff beads. +// This is the in-memory equivalent of getAgentHook, avoiding per-agent bd subprocess calls. +func resolveHookFromMap(allHandoffs map[string]*beads.Issue, role, agentAddress, roleType string) AgentHookInfo { + hook := AgentHookInfo{ + Agent: agentAddress, + Role: roleType, + } + + handoff, ok := allHandoffs[role] + if !ok || handoff == nil { + return hook + } + + attachment := beads.ParseAttachmentFields(handoff) + if attachment != nil && attachment.AttachedMolecule != "" { + hook.HasWork = true + hook.Molecule = attachment.AttachedMolecule + hook.Title = handoff.Title + } else if handoff.Description != "" { + hook.HasWork = true + hook.Title = handoff.Title + } + + return hook +} + // discoverGlobalAgents checks runtime state for town-level agents (Mayor, Deacon). // Uses parallel fetching for performance. If skipMail is true, mail lookups are skipped. // allSessions is a preloaded map of tmux sessions for O(1) lookup. @@ -1800,7 +1852,8 @@ func discoverRigAgents(allSessions map[string]bool, r *rig.Rig, crews []string, } // getMQSummary queries beads for merge-request issues and returns a summary. - +// Uses a single bd call to fetch all non-closed merge-requests, then splits +// open vs in_progress in memory. Previously used two separate bd calls. // Returns nil if the rig has no refinery or no MQ issues. func getMQSummary(r *rig.Rig) *MQSummary { if !r.HasRefinery { @@ -1810,38 +1863,39 @@ func getMQSummary(r *rig.Rig) *MQSummary { // Create beads instance for the rig b := beads.New(r.BeadsPath()) - // Query for all open merge-request issues + // Single query for all non-closed merge-request issues. + // Status "all" fetches everything; we filter open/in_progress in memory. opts := beads.ListOptions{ Label: "gt:merge-request", - Status: "open", + Status: "all", Priority: -1, // No priority filter } - openMRs, err := b.List(opts) - if err != nil { - return nil - } - - // Query for in-progress merge-requests - opts.Status = "in_progress" - inProgressMRs, err := b.List(opts) + allMRs, err := b.List(opts) if err != nil { return nil } - // Count pending (open with no blockers) vs blocked + // Split by status in memory pending := 0 blocked := 0 - for _, mr := range openMRs { - if len(mr.BlockedBy) > 0 || mr.BlockedByCount > 0 { - blocked++ - } else { - pending++ + inProgress := 0 + for _, mr := range allMRs { + switch mr.Status { + case "open": + if len(mr.BlockedBy) > 0 || mr.BlockedByCount > 0 { + blocked++ + } else { + pending++ + } + case "in_progress": + inProgress++ } + // closed/other statuses are ignored } // Determine queue state state := "idle" - if len(inProgressMRs) > 0 { + if inProgress > 0 { state = "processing" } else if pending > 0 { state = "idle" // Has work but not processing yet @@ -1851,24 +1905,24 @@ func getMQSummary(r *rig.Rig) *MQSummary { // Determine queue health health := "empty" - total := pending + len(inProgressMRs) + blocked + total := pending + inProgress + blocked if total > 0 { health = "healthy" // Check for potential issues - if pending > 10 && len(inProgressMRs) == 0 { + if pending > 10 && inProgress == 0 { // Large queue but nothing processing - may be stuck health = "stale" } } // Only return summary if there's something to show - if pending == 0 && len(inProgressMRs) == 0 && blocked == 0 { + if pending == 0 && inProgress == 0 && blocked == 0 { return nil } return &MQSummary{ Pending: pending, - InFlight: len(inProgressMRs), + InFlight: inProgress, Blocked: blocked, State: state, Health: health, diff --git a/internal/polecat/manager.go b/internal/polecat/manager.go index edda76c11c..687f7fde88 100644 --- a/internal/polecat/manager.go +++ b/internal/polecat/manager.go @@ -12,6 +12,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "syscall" "time" @@ -1877,6 +1878,7 @@ func (m *Manager) PoolStatus() (active int, names []string) { } // List returns all polecats in the rig. +// Loads polecat state in parallel to avoid sequential bd subprocess overhead. func (m *Manager) List() ([]*Polecat, error) { polecatsDir := filepath.Join(m.rig.Path, "polecats") @@ -1888,7 +1890,8 @@ func (m *Manager) List() ([]*Polecat, error) { return nil, fmt.Errorf("reading polecats dir: %w", err) } - var polecats []*Polecat + // Filter to valid directories first + var names []string for _, entry := range entries { if !entry.IsDir() { continue @@ -1896,12 +1899,32 @@ func (m *Manager) List() ([]*Polecat, error) { if strings.HasPrefix(entry.Name(), ".") { continue } + names = append(names, entry.Name()) + } - polecat, err := m.Get(entry.Name()) - if err != nil { - continue // Skip invalid polecats + // Load all polecats in parallel — each loadFromBeads call involves + // multiple bd/git subprocess calls that are independent per polecat. + results := make([]*Polecat, len(names)) + var wg sync.WaitGroup + for i, name := range names { + wg.Add(1) + go func(idx int, name string) { + defer wg.Done() + p, err := m.Get(name) + if err != nil { + return // Skip invalid polecats (leaves nil in results) + } + results[idx] = p + }(i, name) + } + wg.Wait() + + // Compact — remove nil entries from failed Gets + polecats := make([]*Polecat, 0, len(results)) + for _, p := range results { + if p != nil { + polecats = append(polecats, p) } - polecats = append(polecats, polecat) } return polecats, nil