forked from RapidAI/MaClaw
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathremote_execution_helpers.go
More file actions
171 lines (148 loc) · 4.8 KB
/
remote_execution_helpers.go
File metadata and controls
171 lines (148 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package main
import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
)
// --- Executable Validation ---
// resolveExecutablePath validates and resolves a command path.
// It handles relative paths via LookPath, checks accessibility,
// and rejects directories. All execution strategies share this logic.
func resolveExecutablePath(execPath string) (string, error) {
if !filepath.IsAbs(execPath) {
resolved, err := exec.LookPath(execPath)
if err != nil {
return "", fmt.Errorf("command not found: %s (PATH contains %d entries): %w",
execPath, len(strings.Split(os.Getenv("PATH"), string(os.PathListSeparator))), err)
}
execPath = resolved
}
info, err := os.Stat(execPath)
if err != nil {
return "", fmt.Errorf("command not accessible: %s: %w", execPath, err)
}
if info.IsDir() {
return "", fmt.Errorf("command is a directory: %s", execPath)
}
return execPath, nil
}
// buildExecCmd creates an *exec.Cmd with the standard configuration used
// by all execution strategies: resolved path, args, working directory,
// merged environment, and hidden console window (Windows).
func buildExecCmd(execPath string, args []string, cwd string, env map[string]string) *exec.Cmd {
c := exec.Command(execPath, args...)
c.Dir = cwd
c.Env = buildSDKEnvList(env)
hideCommandWindow(c)
return c
}
// --- Pipe Creation ---
// ProcessPipes holds the stdin/stdout/stderr pipes for a child process.
type ProcessPipes struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
}
// createProcessPipes creates stdin, stdout, and stderr pipes on the given
// exec.Cmd. This eliminates the repetitive pipe-creation boilerplate in
// every execution strategy.
func createProcessPipes(c *exec.Cmd) (ProcessPipes, error) {
stdin, err := c.StdinPipe()
if err != nil {
return ProcessPipes{}, fmt.Errorf("stdin pipe: %w", err)
}
stdout, err := c.StdoutPipe()
if err != nil {
return ProcessPipes{}, fmt.Errorf("stdout pipe: %w", err)
}
stderr, err := c.StderrPipe()
if err != nil {
return ProcessPipes{}, fmt.Errorf("stderr pipe: %w", err)
}
return ProcessPipes{Stdin: stdin, Stdout: stdout, Stderr: stderr}, nil
}
// --- Reader Goroutine Coordination ---
// ReaderCoordinator manages a set of reader goroutines that feed into
// a shared output channel. When all readers finish, the output channel
// is closed automatically.
type ReaderCoordinator struct {
wg sync.WaitGroup
ch chan []byte
}
// NewReaderCoordinator creates a coordinator with the given channel buffer size.
func NewReaderCoordinator(bufSize int) *ReaderCoordinator {
return &ReaderCoordinator{
ch: make(chan []byte, bufSize),
}
}
// Add registers n reader goroutines. Call before starting goroutines.
func (rc *ReaderCoordinator) Add(n int) {
rc.wg.Add(n)
}
// Done marks one reader goroutine as finished.
func (rc *ReaderCoordinator) Done() {
rc.wg.Done()
}
// Output returns the shared output channel.
func (rc *ReaderCoordinator) Output() chan []byte {
return rc.ch
}
// CloseWhenDone starts a goroutine that waits for all readers to finish,
// then closes the output channel.
func (rc *ReaderCoordinator) CloseWhenDone() {
go func() {
rc.wg.Wait()
close(rc.ch)
}()
}
// --- Output Result Application ---
// applyOutputResult applies an OutputResult to a RemoteSession's state.
// Must be called with s.mu held.
func applyOutputResult(s *RemoteSession, result OutputResult) {
s.UpdatedAt = time.Now()
if result.Summary != nil {
s.Summary = *result.Summary
s.Status = SessionStatus(result.Summary.Status)
}
if result.PreviewDelta != nil {
s.Preview.SessionID = s.ID
s.Preview.OutputSeq = result.PreviewDelta.OutputSeq
s.Preview.UpdatedAt = result.PreviewDelta.UpdatedAt
s.Preview.PreviewLines = append(s.Preview.PreviewLines, result.PreviewDelta.AppendLines...)
if len(s.Preview.PreviewLines) > 500 {
s.Preview.PreviewLines = s.Preview.PreviewLines[len(s.Preview.PreviewLines)-500:]
}
}
for _, evt := range result.Events {
s.Events = appendRecentEvents(s.Events, evt, maxRecentImportantEvents)
}
}
// syncOutputResult sends an OutputResult to the Hub client.
// Must be called without s.mu held.
func syncOutputResult(hubClient *RemoteHubClient, result OutputResult) {
if hubClient == nil {
return
}
if result.Summary != nil {
_ = hubClient.SendSessionSummary(*result.Summary)
}
if result.PreviewDelta != nil {
_ = hubClient.SendPreviewDelta(*result.PreviewDelta)
}
for _, evt := range result.Events {
_ = hubClient.SendImportantEvent(evt)
}
}
// appendRawOutputLines appends lines to the session's raw output buffer,
// trimming to the max limit. Must be called with s.mu held.
func appendRawOutputLines(s *RemoteSession, lines []string) {
s.RawOutputLines = append(s.RawOutputLines, lines...)
if len(s.RawOutputLines) > 2000 {
s.RawOutputLines = s.RawOutputLines[len(s.RawOutputLines)-2000:]
}
}