-
Notifications
You must be signed in to change notification settings - Fork 54
Expand file tree
/
Copy pathclient.ts
More file actions
221 lines (200 loc) · 7.03 KB
/
client.ts
File metadata and controls
221 lines (200 loc) · 7.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
/**
* Bridge client — JSON-RPC over stdio to the Python altimate-engine sidecar.
*
* Usage:
* const result = await Bridge.call("sql.execute", { sql: "SELECT 1" })
* Bridge.stop()
*/
import { spawn, type ChildProcess } from "child_process"
import { existsSync } from "fs"
import path from "path"
import { ensureEngine, enginePythonPath } from "./engine"
import type { BridgeMethod, BridgeMethods } from "./protocol"
import { Telemetry } from "../telemetry"
import { Log } from "../../util/log"
/** Platform-aware path to the python binary inside a venv directory. */
function venvPythonBin(venvDir: string): string {
return process.platform === "win32"
? path.join(venvDir, "Scripts", "python.exe")
: path.join(venvDir, "bin", "python")
}
/** Resolve the Python interpreter to use for the engine sidecar.
* Exported for testing — not part of the public API. */
export function resolvePython(): string {
// 1. Explicit env var
if (process.env.OPENCODE_PYTHON) return process.env.OPENCODE_PYTHON
// 2. Check for .venv relative to altimate-engine package (local dev)
const engineDir = path.resolve(__dirname, "..", "..", "..", "altimate-engine")
const venvPython = venvPythonBin(path.join(engineDir, ".venv"))
if (existsSync(venvPython)) return venvPython
// 3. Check the managed engine venv (created by ensureEngine)
// This must come before the CWD venv check — ensureEngine() installs
// altimate-engine here, so an unrelated .venv in the user's project
// directory must not shadow it.
const managedPython = enginePythonPath()
if (existsSync(managedPython)) return managedPython
// 4. Check for .venv in cwd
const cwdVenv = venvPythonBin(path.join(process.cwd(), ".venv"))
if (existsSync(cwdVenv)) return cwdVenv
// 5. Fallback
return "python3"
}
export namespace Bridge {
let child: ChildProcess | undefined
let requestId = 0
let restartCount = 0
const MAX_RESTARTS = 2
const CALL_TIMEOUT_MS = 30_000
const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>()
let buffer = ""
// Mutex to prevent concurrent start() calls from spawning duplicate processes
let pendingStart: Promise<void> | null = null
export async function call<M extends BridgeMethod>(
method: M,
params: (typeof BridgeMethods)[M] extends { params: infer P } ? P : never,
): Promise<(typeof BridgeMethods)[M] extends { result: infer R } ? R : never> {
const startTime = Date.now()
if (!child || child.exitCode !== null) {
if (restartCount >= MAX_RESTARTS) throw new Error("Python bridge failed after max restarts")
if (pendingStart) {
await pendingStart
// Re-check: the process may have died between startup and now
if (!child || child.exitCode !== null) {
throw new Error("Bridge process died during startup")
}
} else {
pendingStart = start()
try {
await pendingStart
} finally {
pendingStart = null
}
}
}
const id = ++requestId
const request = JSON.stringify({ jsonrpc: "2.0", method, params, id })
return new Promise((resolve, reject) => {
pending.set(id, {
resolve: (value: any) => {
Telemetry.track({
type: "bridge_call",
timestamp: Date.now(),
session_id: Telemetry.getContext().sessionId,
method,
status: "success",
duration_ms: Date.now() - startTime,
})
resolve(value)
},
reject: (reason: any) => {
Telemetry.track({
type: "bridge_call",
timestamp: Date.now(),
session_id: Telemetry.getContext().sessionId,
method,
status: "error",
duration_ms: Date.now() - startTime,
error: String(reason).slice(0, 500),
})
reject(reason)
},
})
child!.stdin!.write(request + "\n")
setTimeout(() => {
if (pending.has(id)) {
pending.delete(id)
const error = new Error(`Bridge timeout: ${method} (${CALL_TIMEOUT_MS}ms)`)
Telemetry.track({
type: "bridge_call",
timestamp: Date.now(),
session_id: Telemetry.getContext().sessionId,
method,
status: "error",
duration_ms: Date.now() - startTime,
error: error.message,
})
reject(error)
}
}, CALL_TIMEOUT_MS)
})
}
async function start() {
await ensureEngine()
const pythonCmd = resolvePython()
// Propagate altimate-code's telemetry opt-out to the Python engine.
// The engine calls altimate_core.init() lazily; this env var ensures
// it won't send telemetry when the user has disabled it here.
await Telemetry.init()
const childEnv = { ...process.env }
if (!Telemetry.isEnabled()) {
childEnv.ALTIMATE_TELEMETRY_DISABLED = "true"
}
child = spawn(pythonCmd, ["-m", "altimate_engine.server"], {
stdio: ["pipe", "pipe", "pipe"],
env: childEnv,
})
buffer = ""
child.stdout!.on("data", (data: Buffer) => {
buffer += data.toString()
const lines = buffer.split("\n")
buffer = lines.pop()!
for (const line of lines) {
if (!line.trim()) continue
try {
const response = JSON.parse(line)
const p = pending.get(response.id)
if (p) {
pending.delete(response.id)
if (response.error) {
p.reject(new Error(response.error.message))
} else {
p.resolve(response.result)
}
}
} catch {
// Skip non-JSON lines (Python startup messages, etc.)
}
}
})
child.stderr!.on("data", (data: Buffer) => {
const msg = data.toString().trim()
if (msg) Log.Default.error("altimate-engine stderr", { message: msg })
})
child.on("error", (err) => {
Log.Default.error("altimate-engine spawn error", { error: String(err) })
restartCount++
for (const [id, p] of pending) {
p.reject(new Error(`Bridge process failed to spawn: ${err}`))
pending.delete(id)
}
child = undefined
})
child.on("exit", (code) => {
if (code !== null && code !== 0) restartCount++
for (const [id, p] of pending) {
p.reject(new Error(`Bridge process exited (code ${code})`))
pending.delete(id)
}
child = undefined
})
// Verify the bridge is alive
try {
await call("ping", {} as any)
} catch (e) {
// Clean up the spawned process so subsequent call() invocations
// correctly detect !child and trigger a restart instead of writing
// to a non-functional process and hanging until timeout.
child?.kill()
child = undefined
throw new Error(`Failed to start Python bridge: ${e}`)
}
}
export function stop() {
child?.kill()
child = undefined
restartCount = 0
}
export function isRunning(): boolean {
return child !== undefined && child.exitCode === null
}
}