Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 117 additions & 109 deletions packages/integration-tests/src/daemon-children.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,51 +1,29 @@
import { spawn, execFile } from "node:child_process";
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { spawn, execFile, type ChildProcess } from "node:child_process";
import { existsSync, mkdirSync, writeFileSync } from "node:fs";
import { mkdtemp, rm, realpath } from "node:fs/promises";
import { createServer } from "node:net";
import { tmpdir } from "node:os";
import { dirname, join, resolve } from "node:path";
import { fileURLToPath } from "node:url";
import { promisify } from "node:util";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { isWindows, killProcessTree } from "@aoagents/ao-core";
import {
clearDaemonChildrenRegistry,
getDaemonChildren,
isWindows,
killProcessTree,
registerDaemonChild,
} from "@aoagents/ao-core";
import { sleep } from "./helpers/polling.js";

const execFileAsync = promisify(execFile);
const __dirname = dirname(fileURLToPath(import.meta.url));
const repoRoot = resolve(__dirname, "../../..");
const cliEntry = join(repoRoot, "packages/cli/src/index.ts");
const tsxBin = join(repoRoot, "packages/cli/node_modules/.bin/tsx");
const dashboardEntry = join(repoRoot, "packages/web/dist-server/start-all.js");

const canRun = !isWindows() && existsSync(tsxBin) && existsSync(dashboardEntry);

async function getFreePort(): Promise<number> {
return new Promise((resolve, reject) => {
const server = createServer();
server.once("error", reject);
server.listen(0, "127.0.0.1", () => {
const address = server.address();
server.close(() => {
if (address && typeof address === "object") resolve(address.port);
else reject(new Error("Could not allocate a free port"));
});
});
});
}
const repoRoot = resolve(__dirname, "..", "..", "..");
const cliEntry = join(repoRoot, "packages", "cli", "dist", "index.js");

async function readChildPids(pid: number): Promise<number[]> {
try {
const { stdout } = await execFileAsync("pgrep", ["-P", String(pid)]);
return stdout
.split(/\s+/)
.map((value) => Number(value))
.filter((value) => Number.isFinite(value) && value > 0);
} catch {
return [];
}
}
const canRun = !isWindows() && existsSync(cliEntry);

function isAlive(pid: number): boolean {
if (pid <= 0) return false;
try {
process.kill(pid, 0);
return true;
Expand All @@ -54,96 +32,126 @@ function isAlive(pid: number): boolean {
}
}

function childHasExited(child: ChildProcess): boolean {
return child.exitCode !== null || child.signalCode !== null;
}

function requirePid(child: ChildProcess, role: string): number {
expect(child.pid, `${role} pid`).toBeTypeOf("number");
return child.pid as number;
}

function spawnSleeper(): ChildProcess {
return spawn(process.execPath, ["-e", "setInterval(() => {}, 30_000)"], {
stdio: "ignore",
});
}

async function waitForChildExit(
child: ChildProcess,
pid: number,
timeoutMs = 5_000,
): Promise<boolean> {
if (childHasExited(child) || !isAlive(pid)) return true;

await Promise.race([
new Promise<void>((resolve) => child.once("exit", () => resolve())),
sleep(timeoutMs),
]);

return childHasExited(child) || !isAlive(pid);
}

async function terminateChild(child: ChildProcess | undefined): Promise<void> {
const pid = child?.pid;
if (!child || typeof pid !== "number") return;
if (isAlive(pid)) {
await killProcessTree(pid, "SIGKILL");
}
await waitForChildExit(child, pid, 2_000);
}

function writeFakeRunningState(home: string, daemonPid: number): void {
const stateDir = join(home, ".agent-orchestrator");
mkdirSync(stateDir, { recursive: true });
writeFileSync(
join(stateDir, "running.json"),
JSON.stringify(
{
pid: daemonPid,
configPath: join(home, "agent-orchestrator.yaml"),
port: 0,
startedAt: new Date().toISOString(),
projects: ["daemon-int"],
},
null,
2,
),
);
}

describe.skipIf(!canRun)("daemon child reaping (integration)", () => {
let tmpHome: string;
let repoPath: string;
let configPath: string;
let startPid: number | undefined;
let port: number;
let originalHome: string | undefined;
let originalUserProfile: string | undefined;
let daemonParent: ChildProcess | undefined;
let registeredChild: ChildProcess | undefined;

beforeEach(async () => {
tmpHome = await realpath(await mkdtemp(join(tmpdir(), "ao-daemon-int-home-")));
port = await getFreePort();
repoPath = join(tmpHome, "repo");
mkdirSync(repoPath, { recursive: true });
await execFileAsync("git", ["init"], { cwd: repoPath });
await execFileAsync("git", ["config", "user.email", "test@example.com"], { cwd: repoPath });
await execFileAsync("git", ["config", "user.name", "Test User"], { cwd: repoPath });
writeFileSync(join(repoPath, "README.md"), "# daemon child reaping\n");
await execFileAsync("git", ["add", "."], { cwd: repoPath });
await execFileAsync("git", ["commit", "-m", "Initial commit"], { cwd: repoPath });

configPath = join(repoPath, "agent-orchestrator.yaml");
writeFileSync(
configPath,
["runtime: process", "agent: claude-code", "workspace: worktree"].join("\n"),
);

const globalConfigPath = join(tmpHome, "global-agent-orchestrator.yaml");
writeFileSync(
globalConfigPath,
[
`port: ${port}`,
"defaults:",
" runtime: process",
" agent: claude-code",
" workspace: worktree",
" notifiers: []",
"projects:",
" daemon-int:",
" displayName: Daemon Integration",
` path: ${JSON.stringify(repoPath)}`,
" defaultBranch: main",
" sessionPrefix: daemon-int",
].join("\n"),
);
configPath = globalConfigPath;
originalHome = process.env["HOME"];
originalUserProfile = process.env["USERPROFILE"];
process.env["HOME"] = tmpHome;
process.env["USERPROFILE"] = tmpHome;
clearDaemonChildrenRegistry();
}, 30_000);

afterEach(async () => {
if (startPid && isAlive(startPid)) {
await killProcessTree(startPid, "SIGKILL");
}
await terminateChild(registeredChild);
await terminateChild(daemonParent);
clearDaemonChildrenRegistry();
if (originalHome === undefined) delete process.env["HOME"];
else process.env["HOME"] = originalHome;
if (originalUserProfile === undefined) delete process.env["USERPROFILE"];
else process.env["USERPROFILE"] = originalUserProfile;
await rm(tmpHome, { recursive: true, force: true }).catch(() => {});
}, 30_000);

it("ao stop terminates children spawned by ao start", async () => {
it("ao stop --all terminates registered daemon children without starting the dashboard", async () => {
daemonParent = spawnSleeper();
registeredChild = spawnSleeper();

const daemonPid = requirePid(daemonParent, "daemon parent");
const childPid = requirePid(registeredChild, "registered child");

registerDaemonChild({
pid: childPid,
parentPid: daemonPid,
role: "test-dashboard",
command: "node dummy-dashboard.js",
});
writeFakeRunningState(tmpHome, daemonPid);

const env = {
...process.env,
HOME: tmpHome,
USERPROFILE: tmpHome,
AO_CALLER_TYPE: "agent",
AO_CONFIG_PATH: configPath,
AO_GLOBAL_CONFIG: configPath,
PORT: String(port),
};
const start = spawn(tsxBin, [cliEntry, "start", "--no-orchestrator", "--reap-orphans"], {
cwd: repoPath,

const { stdout } = await execFileAsync(process.execPath, [cliEntry, "stop", "--all"], {
cwd: tmpHome,
env,
stdio: "ignore",
timeout: 20_000,
});
startPid = start.pid;
expect(startPid).toBeTypeOf("number");

const runningPath = join(tmpHome, ".agent-orchestrator/running.json");
let runningPid: number | undefined;
for (let i = 0; i < 100; i++) {
if (existsSync(runningPath)) {
const running = JSON.parse(readFileSync(runningPath, "utf-8")) as { pid?: number };
runningPid = running.pid;
break;
}
await sleep(100);
}
expect(runningPid).toBeTypeOf("number");

const childPids = await readChildPids(runningPid!);
expect(childPids.length).toBeGreaterThan(0);

await execFileAsync(tsxBin, [cliEntry, "stop", "--all"], { cwd: repoPath, env, timeout: 20_000 });
await sleep(5_000);

const stillAlive = childPids.filter(isAlive);
expect(stillAlive).toEqual([]);
expect(isAlive(runningPid!)).toBe(false);
}, 60_000);

expect(stdout).toContain("Swept 1 registered daemon child");
const [registeredChildExited, daemonParentExited] = await Promise.all([
waitForChildExit(registeredChild, childPid),
waitForChildExit(daemonParent, daemonPid),
]);
expect(registeredChildExited).toBe(true);
expect(daemonParentExited).toBe(true);
expect(getDaemonChildren()).toEqual([]);
}, 30_000);
});
Loading