Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions internal/control/handler_create_worktree.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@ import (
"time"

"github.com/newhook/co/internal/db"
"github.com/newhook/co/internal/git"
"github.com/newhook/co/internal/logging"
"github.com/newhook/co/internal/mise"
"github.com/newhook/co/internal/project"
"github.com/newhook/co/internal/worktree"
)

// HandleCreateWorktreeTask handles a scheduled worktree creation task
func HandleCreateWorktreeTask(ctx context.Context, proj *project.Project, task *db.ScheduledTask) error {
func (cp *ControlPlane) HandleCreateWorktreeTask(ctx context.Context, proj *project.Project, task *db.ScheduledTask) error {
workID := task.WorkID
branchName := task.Metadata["branch"]
baseBranch := task.Metadata["base_branch"]
Expand Down Expand Up @@ -47,13 +44,11 @@ func HandleCreateWorktreeTask(ctx context.Context, proj *project.Project, task *
}

mainRepoPath := proj.MainRepoPath()
gitOps := git.NewOperations()
wtOps := worktree.NewOperations()
var branchExistsOnRemote bool

// For existing branches, check if branch exists on remote
if useExisting {
_, branchExistsOnRemote, _ = gitOps.ValidateExistingBranch(ctx, mainRepoPath, branchName)
_, branchExistsOnRemote, _ = cp.Git.ValidateExistingBranch(ctx, mainRepoPath, branchName)
}

// If worktree path is already set and exists, skip creation
Expand All @@ -72,32 +67,33 @@ func HandleCreateWorktreeTask(ctx context.Context, proj *project.Project, task *

if useExisting {
// For existing branches, fetch the branch first
if err := gitOps.FetchBranch(ctx, mainRepoPath, branchName); err != nil {
if err := cp.Git.FetchBranch(ctx, mainRepoPath, branchName); err != nil {
// Ignore fetch errors - branch might only exist locally
logging.Debug("Could not fetch branch from origin (may only exist locally)", "branch", branchName)
}

// Create worktree from existing branch
if err := wtOps.CreateFromExisting(ctx, mainRepoPath, worktreePath, branchName); err != nil {
if err := cp.Worktree.CreateFromExisting(ctx, mainRepoPath, worktreePath, branchName); err != nil {
_ = os.RemoveAll(workDir)
return fmt.Errorf("failed to create worktree from existing branch: %w", err)
}
} else {
// Fetch latest from origin for the base branch
if err := gitOps.FetchBranch(ctx, mainRepoPath, baseBranch); err != nil {
if err := cp.Git.FetchBranch(ctx, mainRepoPath, baseBranch); err != nil {
_ = os.RemoveAll(workDir)
return fmt.Errorf("failed to fetch base branch: %w", err)
}

// Create git worktree with new branch based on origin/<baseBranch>
if err := wtOps.Create(ctx, mainRepoPath, worktreePath, branchName, "origin/"+baseBranch); err != nil {
if err := cp.Worktree.Create(ctx, mainRepoPath, worktreePath, branchName, "origin/"+baseBranch); err != nil {
_ = os.RemoveAll(workDir)
return fmt.Errorf("failed to create worktree: %w", err)
}
}

// Initialize mise if configured
if err := mise.InitializeWithOutput(worktreePath, io.Discard); err != nil {
miseOps := cp.Mise(worktreePath)
if err := miseOps.InitializeWithOutput(io.Discard); err != nil {
logging.Warn("mise initialization failed", "error", err)
// Non-fatal, continue
}
Expand All @@ -114,7 +110,7 @@ func HandleCreateWorktreeTask(ctx context.Context, proj *project.Project, task *
if useExisting && branchExistsOnRemote {
logging.Info("Skipping git push - branch already exists on remote", "branch", branchName)
} else {
if err := gitOps.PushSetUpstream(ctx, branchName, work.WorktreePath); err != nil {
if err := cp.Git.PushSetUpstream(ctx, branchName, work.WorktreePath); err != nil {
return fmt.Errorf("git push failed: %w", err)
}
}
Expand Down
5 changes: 2 additions & 3 deletions internal/control/handler_destroy_worktree.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import (
"github.com/newhook/co/internal/db"
"github.com/newhook/co/internal/logging"
"github.com/newhook/co/internal/project"
"github.com/newhook/co/internal/work"
)

// HandleDestroyWorktreeTask handles a scheduled worktree destruction task
func HandleDestroyWorktreeTask(ctx context.Context, proj *project.Project, task *db.ScheduledTask) error {
func (cp *ControlPlane) HandleDestroyWorktreeTask(ctx context.Context, proj *project.Project, task *db.ScheduledTask) error {
workID := task.WorkID

logging.Info("Destroying worktree for work",
Expand All @@ -30,7 +29,7 @@ func HandleDestroyWorktreeTask(ctx context.Context, proj *project.Project, task
}

// Delegate to the shared DestroyWork function
if err := work.DestroyWork(ctx, proj, workID, io.Discard); err != nil {
if err := cp.WorkDestroyer.DestroyWork(ctx, proj, workID, io.Discard); err != nil {
return err
}

Expand Down
5 changes: 2 additions & 3 deletions internal/control/handler_git_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"fmt"

"github.com/newhook/co/internal/db"
"github.com/newhook/co/internal/git"
"github.com/newhook/co/internal/logging"
"github.com/newhook/co/internal/project"
)

// HandleGitPushTask handles a scheduled git push task with retry support.
// Returns nil on success, error on failure (caller handles retry/completion).
func HandleGitPushTask(ctx context.Context, proj *project.Project, task *db.ScheduledTask) error {
func (cp *ControlPlane) HandleGitPushTask(ctx context.Context, proj *project.Project, task *db.ScheduledTask) error {
workID := task.WorkID
// Get branch and directory from metadata
branch := task.Metadata["branch"]
Expand All @@ -34,7 +33,7 @@ func HandleGitPushTask(ctx context.Context, proj *project.Project, task *db.Sche

logging.Info("Executing git push", "branch", branch, "dir", dir, "attempt", task.AttemptCount+1)

if err := git.NewOperations().PushSetUpstream(ctx, branch, dir); err != nil {
if err := cp.Git.PushSetUpstream(ctx, branch, dir); err != nil {
return err
}

Expand Down
5 changes: 2 additions & 3 deletions internal/control/handler_pr_feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import (
"time"

"github.com/newhook/co/internal/db"
"github.com/newhook/co/internal/feedback"
"github.com/newhook/co/internal/logging"
"github.com/newhook/co/internal/project"
)

// HandlePRFeedbackTask handles a scheduled PR feedback check.
// Returns nil on success, error on failure (caller handles retry/completion).
func HandlePRFeedbackTask(ctx context.Context, proj *project.Project, task *db.ScheduledTask) error {
func (cp *ControlPlane) HandlePRFeedbackTask(ctx context.Context, proj *project.Project, task *db.ScheduledTask) error {
workID := task.WorkID
logging.Debug("Starting PR feedback check task", "task_id", task.ID, "work_id", workID)

Expand All @@ -28,7 +27,7 @@ func HandlePRFeedbackTask(ctx context.Context, proj *project.Project, task *db.S
logging.Debug("Checking PR feedback", "pr_url", work.PRURL, "work_id", workID)

// Process PR feedback - creates beads but doesn't add them to work
createdCount, err := feedback.ProcessPRFeedbackQuiet(ctx, proj, proj.DB, workID)
createdCount, err := cp.FeedbackProcessor.ProcessPRFeedback(ctx, proj, proj.DB, workID)
if err != nil {
return fmt.Errorf("failed to check PR feedback: %w", err)
}
Expand Down
5 changes: 2 additions & 3 deletions internal/control/handler_spawn_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"fmt"
"io"

"github.com/newhook/co/internal/claude"
"github.com/newhook/co/internal/db"
"github.com/newhook/co/internal/logging"
"github.com/newhook/co/internal/project"
)

// HandleSpawnOrchestratorTask handles a scheduled orchestrator spawn task
func HandleSpawnOrchestratorTask(ctx context.Context, proj *project.Project, task *db.ScheduledTask) error {
func (cp *ControlPlane) HandleSpawnOrchestratorTask(ctx context.Context, proj *project.Project, task *db.ScheduledTask) error {
workID := task.WorkID
workerName := task.Metadata["worker_name"]

Expand All @@ -36,7 +35,7 @@ func HandleSpawnOrchestratorTask(ctx context.Context, proj *project.Project, tas
}

// Spawn the orchestrator
if err := claude.SpawnWorkOrchestrator(ctx, workID, proj.Config.Project.Name, work.WorktreePath, workerName, io.Discard); err != nil {
if err := cp.OrchestratorSpawner.SpawnWorkOrchestrator(ctx, workID, proj.Config.Project.Name, work.WorktreePath, workerName, io.Discard); err != nil {
return fmt.Errorf("failed to spawn orchestrator: %w", err)
}

Expand Down
35 changes: 19 additions & 16 deletions internal/control/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@ import (
trackingwatcher "github.com/newhook/co/internal/tracking/watcher"
)

// RunControlPlaneLoop runs the main control plane event loop
// RunControlPlaneLoop runs the main control plane event loop with default dependencies.
func RunControlPlaneLoop(ctx context.Context, proj *project.Project, procManager *procmon.Manager) error {
cp := NewControlPlane()
return RunControlPlaneLoopWithControlPlane(ctx, proj, procManager, cp)
}

// RunControlPlaneLoopWithControlPlane runs the main control plane event loop with provided dependencies.
// This allows testing with mock dependencies.
func RunControlPlaneLoopWithControlPlane(ctx context.Context, proj *project.Project, procManager *procmon.Manager, cp *ControlPlane) error {
// Initialize tracking database watcher
trackingDBPath := filepath.Join(proj.Root, ".co", "tracking.db")
watcher, err := trackingwatcher.New(trackingwatcher.DefaultConfig(trackingDBPath))
Expand Down Expand Up @@ -59,13 +66,13 @@ func RunControlPlaneLoop(ctx context.Context, proj *project.Project, procManager
// Handle database change event
if event.Payload.Type == trackingwatcher.DBChanged {
logging.Debug("Database changed, checking scheduled tasks")
ProcessAllDueTasks(ctx, proj)
ProcessAllDueTasksWithControlPlane(ctx, proj, cp)
}

case <-checkTimer.C:
// Periodic check as a safety net
logging.Debug("Control plane periodic check")
ProcessAllDueTasks(ctx, proj)
ProcessAllDueTasksWithControlPlane(ctx, proj, cp)
checkTimer.Reset(checkInterval)

case <-cleanupTimer.C:
Expand All @@ -82,21 +89,17 @@ func RunControlPlaneLoop(ctx context.Context, proj *project.Project, procManager
// TaskHandler is the signature for all scheduled task handlers.
type TaskHandler func(ctx context.Context, proj *project.Project, task *db.ScheduledTask) error

// taskHandlers maps task types to their handler functions.
var taskHandlers = map[string]TaskHandler{
db.TaskTypeCreateWorktree: HandleCreateWorktreeTask,
db.TaskTypeImportPR: HandleImportPRTask,
db.TaskTypeSpawnOrchestrator: HandleSpawnOrchestratorTask,
db.TaskTypePRFeedback: HandlePRFeedbackTask,
db.TaskTypeCommentResolution: HandleCommentResolutionTask,
db.TaskTypeGitPush: HandleGitPushTask,
db.TaskTypeGitHubComment: HandleGitHubCommentTask,
db.TaskTypeGitHubResolveThread: HandleGitHubResolveThreadTask,
db.TaskTypeDestroyWorktree: HandleDestroyWorktreeTask,
// ProcessAllDueTasks checks for and executes any scheduled tasks that are due across all works.
// This uses the default ControlPlane with production dependencies.
func ProcessAllDueTasks(ctx context.Context, proj *project.Project) {
cp := NewControlPlane()
ProcessAllDueTasksWithControlPlane(ctx, proj, cp)
}

// ProcessAllDueTasks checks for and executes any scheduled tasks that are due across all works
func ProcessAllDueTasks(ctx context.Context, proj *project.Project) {
// ProcessAllDueTasksWithControlPlane checks for and executes any scheduled tasks with provided dependencies.
func ProcessAllDueTasksWithControlPlane(ctx context.Context, proj *project.Project, cp *ControlPlane) {
taskHandlers := cp.GetTaskHandlers()

// Get the next due task globally (not work-specific)
for {
task, err := proj.DB.GetNextScheduledTask(ctx)
Expand Down
106 changes: 106 additions & 0 deletions internal/control/plane.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package control

import (
"context"
"io"

"github.com/newhook/co/internal/claude"
"github.com/newhook/co/internal/db"
"github.com/newhook/co/internal/feedback"
"github.com/newhook/co/internal/git"
"github.com/newhook/co/internal/mise"
"github.com/newhook/co/internal/project"
"github.com/newhook/co/internal/work"
"github.com/newhook/co/internal/worktree"
"github.com/newhook/co/internal/zellij"
)

// OrchestratorSpawner defines the interface for spawning work orchestrators.
// This abstraction enables testing without actual zellij operations.
type OrchestratorSpawner interface {
SpawnWorkOrchestrator(ctx context.Context, workID, projectName, workDir, friendlyName string, w io.Writer) error
}

// WorkDestroyer defines the interface for destroying work units.
// This abstraction enables testing without actual file system operations.
type WorkDestroyer interface {
DestroyWork(ctx context.Context, proj *project.Project, workID string, w io.Writer) error
}

// DefaultOrchestratorSpawner implements OrchestratorSpawner using the claude package.
type DefaultOrchestratorSpawner struct{}

// SpawnWorkOrchestrator implements OrchestratorSpawner.
func (d *DefaultOrchestratorSpawner) SpawnWorkOrchestrator(ctx context.Context, workID, projectName, workDir, friendlyName string, w io.Writer) error {
return claude.SpawnWorkOrchestrator(ctx, workID, projectName, workDir, friendlyName, w)
}

// DefaultWorkDestroyer implements WorkDestroyer using the work package.
type DefaultWorkDestroyer struct{}

// DestroyWork implements WorkDestroyer.
func (d *DefaultWorkDestroyer) DestroyWork(ctx context.Context, proj *project.Project, workID string, w io.Writer) error {
return work.DestroyWork(ctx, proj, workID, w)
}

// ControlPlane manages the execution of scheduled tasks with injectable dependencies.
// It allows for testing without actual CLI tools, services, or file system operations.
type ControlPlane struct {
Git git.Operations
Worktree worktree.Operations
Zellij zellij.SessionManager
Mise func(dir string) mise.Operations
FeedbackProcessor feedback.Processor
OrchestratorSpawner OrchestratorSpawner
WorkDestroyer WorkDestroyer
}

// NewControlPlane creates a new ControlPlane with default production dependencies.
func NewControlPlane() *ControlPlane {
return &ControlPlane{
Git: git.NewOperations(),
Worktree: worktree.NewOperations(),
Zellij: zellij.New(),
Mise: mise.NewOperations,
FeedbackProcessor: feedback.NewProcessor(),
OrchestratorSpawner: &DefaultOrchestratorSpawner{},
WorkDestroyer: &DefaultWorkDestroyer{},
}
}

// NewControlPlaneWithDeps creates a new ControlPlane with provided dependencies for testing.
func NewControlPlaneWithDeps(
gitOps git.Operations,
wtOps worktree.Operations,
zellijMgr zellij.SessionManager,
miseOps func(dir string) mise.Operations,
feedbackProc feedback.Processor,
orchestratorSpawner OrchestratorSpawner,
workDestroyer WorkDestroyer,
) *ControlPlane {
return &ControlPlane{
Git: gitOps,
Worktree: wtOps,
Zellij: zellijMgr,
Mise: miseOps,
FeedbackProcessor: feedbackProc,
OrchestratorSpawner: orchestratorSpawner,
WorkDestroyer: workDestroyer,
}
}

// GetTaskHandlers returns the task handler map for the control plane.
func (cp *ControlPlane) GetTaskHandlers() map[string]TaskHandler {
return map[string]TaskHandler{
db.TaskTypeCreateWorktree: cp.HandleCreateWorktreeTask,
db.TaskTypeSpawnOrchestrator: cp.HandleSpawnOrchestratorTask,
db.TaskTypePRFeedback: cp.HandlePRFeedbackTask,
db.TaskTypeGitPush: cp.HandleGitPushTask,
db.TaskTypeDestroyWorktree: cp.HandleDestroyWorktreeTask,
// These handlers don't need ControlPlane dependencies - keep as standalone functions
db.TaskTypeImportPR: HandleImportPRTask,
db.TaskTypeCommentResolution: HandleCommentResolutionTask,
db.TaskTypeGitHubComment: HandleGitHubCommentTask,
db.TaskTypeGitHubResolveThread: HandleGitHubResolveThreadTask,
}
}
4 changes: 2 additions & 2 deletions internal/feedback/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"github.com/newhook/co/internal/project"
)

// ProcessPRFeedbackQuiet processes PR feedback without outputting to stdout.
// processPRFeedbackQuiet processes PR feedback without outputting to stdout.
// This is used by the scheduler to avoid interfering with the TUI.
// Returns the number of beads created and any error.
func ProcessPRFeedbackQuiet(ctx context.Context, proj *project.Project, database *db.DB, workID string) (int, error) {
func processPRFeedbackQuiet(ctx context.Context, proj *project.Project, database *db.DB, workID string) (int, error) {
return processPRFeedbackInternal(ctx, proj, database, workID, true)
}

Expand Down
32 changes: 32 additions & 0 deletions internal/feedback/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package feedback

import (
"context"

"github.com/newhook/co/internal/db"
"github.com/newhook/co/internal/project"
)

// Processor defines the interface for processing PR feedback.
// This abstraction enables testing without actual GitHub API calls.
type Processor interface {
// ProcessPRFeedback processes PR feedback for a work and creates beads.
// Returns the number of beads created and any error.
ProcessPRFeedback(ctx context.Context, proj *project.Project, database *db.DB, workID string) (int, error)
}

// DefaultProcessor implements Processor using the actual feedback processing logic.
type DefaultProcessor struct{}

// Compile-time check that DefaultProcessor implements Processor.
var _ Processor = (*DefaultProcessor)(nil)

// NewProcessor creates a new default Processor implementation.
func NewProcessor() Processor {
return &DefaultProcessor{}
}

// ProcessPRFeedback implements Processor.ProcessPRFeedback.
func (p *DefaultProcessor) ProcessPRFeedback(ctx context.Context, proj *project.Project, database *db.DB, workID string) (int, error) {
return processPRFeedbackQuiet(ctx, proj, database, workID)
}