From d116ea5fab2d9c55482ecb946febb19628c9f03b Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Sat, 30 Jul 2022 09:33:51 -0700 Subject: [PATCH] ci: add job runner, closes #15 --- .dockerignore | 1 - Dockerfile | 5 + docs/etc/id_rsa | 49 +++++++ docs/etc/id_rsa.pub | 1 + docs/etc/id_rsa.randomart | 15 ++ src/WaitGroup.zig | 48 +++++++ src/db/Job.zig | 2 + src/docker.zig | 74 ++++++++++ src/job_doer.zig | 290 ++++++++++++++++++++++++++++++++++++++ src/main.zig | 16 ++- src/runner.zig | 48 +++++++ tools/gen_stage2.sh | 10 ++ zigmod.lock | 1 + zigmod.yml | 1 + 14 files changed, 559 insertions(+), 2 deletions(-) create mode 100644 docs/etc/id_rsa create mode 100644 docs/etc/id_rsa.pub create mode 100644 docs/etc/id_rsa.randomart create mode 100644 src/WaitGroup.zig create mode 100644 src/docker.zig create mode 100644 src/job_doer.zig create mode 100644 src/runner.zig diff --git a/.dockerignore b/.dockerignore index 642d0c8..21b44d0 100644 --- a/.dockerignore +++ b/.dockerignore @@ -4,5 +4,4 @@ iso images src tools -docs .git diff --git a/Dockerfile b/Dockerfile index a33282a..2ed0aa6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,13 @@ FROM alpine +USER root WORKDIR /app RUN apk add git RUN apk add wget tar unzip RUN apk add mercurial +RUN apk add openssh-client VOLUME /data +VOLUME /images +COPY ./docs/etc/id_rsa /root/.ssh/id_rsa +COPY ./docs/etc/id_rsa.pub /root/.ssh/id_rsa.pub COPY ./bin/aquila-linux-x86_64 /app/aquila ENTRYPOINT ["/app/aquila", "--port", "8000", "--db", "/data/access.db"] diff --git a/docs/etc/id_rsa b/docs/etc/id_rsa new file mode 100644 index 0000000..2ce6a1d --- /dev/null +++ b/docs/etc/id_rsa @@ -0,0 +1,49 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAACFwAAAAdzc2gtcn +NhAAAAAwEAAQAAAgEAuILWxMAYcPR+ValpIFtxftpaeYjFmIxLvlpG7KMbZpxf1i2QTl7A +mS8O6e0vHM1KwyY/36dAJQhJdam1oGUyJXcG1aoutgkfuolf8SqMEpCXIgeeLJcC9I92Ow +lkXZwdL4nfF6NjGC7hXUBFltg3UbCmOyWstNhBJeMmmT+9g4543wx3/YpUieoFMEEexW2K +M9J+yAdWb7RXgQ3oEnih9dQHoemt+B79s1oap09XHAxZGx8+iuwDDYzQ9V9AoIMpG0JwR1 +wLSAhSUacISvjg3VBv8aTUphMS73ZMSkLacFwmymJ0WoZHDHl4TimKvYzKfVjzpCLKDKOb +64oQ86HJuVVaBEitAQUvjzJpze+Q+Wx0IvXx7IqwXzsJJsjNx+816bu+oBDKJVGKK5JfRM +mGswekoeGwQeb7cgSUAPY3GWmMQGwEVkVks0GLYuNmOrylrCT7QFMtjZ3f3K3cdkXw1AwU +Ohv9oWeCdbY6vZ1STRt8FgCtO3M/yZUDHlbEWd211QPEG3N6k1D3/Q8C/3aI3D1fJFxDLg +jvL0Xtrav9FuKoLsDe6AZhAO03GT9S2cimdHN8876AVw48TUL3vFiBLoO0/zF0TbUEHWia +JZslodP2OtWrGxK1OPTtiPF93ruyRAIwF2TORXkCwOIeyPCBdOikkTJ7BxqY282Fq2AqCm +8AAAdAQfD830Hw/N8AAAAHc3NoLXJzYQAAAgEAuILWxMAYcPR+ValpIFtxftpaeYjFmIxL +vlpG7KMbZpxf1i2QTl7AmS8O6e0vHM1KwyY/36dAJQhJdam1oGUyJXcG1aoutgkfuolf8S +qMEpCXIgeeLJcC9I92OwlkXZwdL4nfF6NjGC7hXUBFltg3UbCmOyWstNhBJeMmmT+9g454 +3wx3/YpUieoFMEEexW2KM9J+yAdWb7RXgQ3oEnih9dQHoemt+B79s1oap09XHAxZGx8+iu +wDDYzQ9V9AoIMpG0JwR1wLSAhSUacISvjg3VBv8aTUphMS73ZMSkLacFwmymJ0WoZHDHl4 +TimKvYzKfVjzpCLKDKOb64oQ86HJuVVaBEitAQUvjzJpze+Q+Wx0IvXx7IqwXzsJJsjNx+ +816bu+oBDKJVGKK5JfRMmGswekoeGwQeb7cgSUAPY3GWmMQGwEVkVks0GLYuNmOrylrCT7 +QFMtjZ3f3K3cdkXw1AwUOhv9oWeCdbY6vZ1STRt8FgCtO3M/yZUDHlbEWd211QPEG3N6k1 +D3/Q8C/3aI3D1fJFxDLgjvL0Xtrav9FuKoLsDe6AZhAO03GT9S2cimdHN8876AVw48TUL3 +vFiBLoO0/zF0TbUEHWiaJZslodP2OtWrGxK1OPTtiPF93ruyRAIwF2TORXkCwOIeyPCBdO +ikkTJ7BxqY282Fq2AqCm8AAAADAQABAAACAEMp3WPhSQRU+2fTMyFEKBw0/5od30+YQjoY +QpkBBohjg79C9rSQaStZpeQhInUphX1j/vw6tW7FhXf/Ps4UCBz7JtHAESDSUWpldzcidb +qMR3drrngswDalwjPbR12L5lPXrA2+u+OhrQd0zeAK9JgX5WrCXAu4bH6OIQ7H1QR+aCFJ +OKRiJkPNm/Xkvn6/UP9sMBpAEYa37uJbXY7bupjxhZW6qkkXfuI6ellM2DithgIJbYrEPp +hAGhA3jB7hn6T8mkJfjvu5i72NP8eKsM/e5+DeT0nkx4/DUCH4E+uyn6wsCo6pMlOoI9LP +7GDzTwIxzv6Aa2wOYkNrDqCg0/y0ezudJDp8nPPNAH+DrJpkSB4n0qbXIRLXJ92TPU8b2v +PAO+JFe7FsVNFTN5uQ3loUU4fa/rHQU2wl/a1Ocfgb+K1oVB9OkESTVyglUzA3l/vMwjvR +lYsVAdMT8ye2bpMQXhqjV6kJUqeiOIH8CsYJgz/kDN9UNiezpuU13pjUXgZ+63ueVkrCEx +4jqrGwg36pJC2b3A3ZLbnPijmh62Ow+8jXW+FQl8WpnjSlx2D8njrUYpVzJsnulRKc9Lzc +Vz2xiRCdrbYSnVP/BLX7LobhbPMDiP4lyb0TI5BPQVibP46Mo6a5DWn1pHiAU7Uq3utvHV +0RObRJpu61lIscz5uBAAABAQDXeCNGcASUuZnXuaesNYdh6kpOMfhBESp9CUwCNyw6bmJ1 +7/rxdHx48OrDJyywD4Cnlv4iDaeynDbQsHgQoFfVqEMVaAkNinap1rdypCxizvUYE/LOGp +NNR1w9ivjpIIopzlJQhAkPCe7HqSw9Vwzz1xCiK20CRWlPGyWrT/mIGypSzzF4NZ/xFi1i +X6jDaRdOluGnDgPjig37BOfmazvuuIoHXcWq44ilUUI+SK/cRGA7owFN2HlcybZA95cSTR +RUgAei2k2T2uWArCTe2J9TXDOMagFaLMtstxz1H6jJkQ5I8xUCUwuDY1inx90yRaWz6iOf +TrvAabmIKCcY7vsyAAABAQDffm6K+hAJRo/3Xi7cugEmxMP24YczDrK723Qoyg91WXEFhG +oPjG+/cAiaM268OUXmVMwqYDOISyHGpNm+Behqa5WgDYNRJFQ8jBVi6bVGDEaD+7wuyQxn +bvjo3NAPRFBZxyXyK/tDDM182x+ydEaiSBajy3a4gPwkGSi8t06seZslf6P4NLZsH/4dRX +lx2dvY15NuPFlVF9wbd9MI2NDNrXQD9iAx5tc+VE/oQBh++avJY7lpN76lH5JDuJdsG4Cf +c7Q+oKm3/NhyZi1tLpzYKBjrDTcZh8K7HlSp4U4YllukY+N/lGx/Noe/9ivaIS95k0qdjV +2li5JhdXzerFizAAABAQDTWOwvtRdvYd38gGmhj7yDICZWRwKC/GT7izjF172zRY3h16FM ++zA/cu3UMNQShMRSyUP2FV68CR5ZTN1CgHHri7SSmdqm0hus9mxF/OQfzeZZw9hngu5Mc4 +zUNneUskAPLTJd0YQiyfM0DKRkopiKndVj9MpiQzmBPXsLgHAeRmlKEmPNfsTEPn8hCKtT +87BrBgjGFt2/DITQypLoy/YcMkD4m/BjZzWD64/HZ9gUoQdORLu7I+IuvvidTPJC42zlYm +g8gKu2FMMyt/NfiXRqVj2N8X0j2HDvzRtaev/u6VLb+KLY0cRW39EIndLYAZk+Ng9PfXwX +3y6swg1ako1VAAAACW1lZ0BuaXhvcwE= +-----END OPENSSH PRIVATE KEY----- diff --git a/docs/etc/id_rsa.pub b/docs/etc/id_rsa.pub new file mode 100644 index 0000000..d15a954 --- /dev/null +++ b/docs/etc/id_rsa.pub @@ -0,0 +1 @@ +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQC4gtbEwBhw9H5VqWkgW3F+2lp5iMWYjEu+WkbsoxtmnF/WLZBOXsCZLw7p7S8czUrDJj/fp0AlCEl1qbWgZTIldwbVqi62CR+6iV/xKowSkJciB54slwL0j3Y7CWRdnB0vid8Xo2MYLuFdQEWW2DdRsKY7Jay02EEl4yaZP72DjnjfDHf9ilSJ6gUwQR7FbYoz0n7IB1ZvtFeBDegSeKH11Aeh6a34Hv2zWhqnT1ccDFkbHz6K7AMNjND1X0CggykbQnBHXAtICFJRpwhK+ODdUG/xpNSmExLvdkxKQtpwXCbKYnRahkcMeXhOKYq9jMp9WPOkIsoMo5vrihDzocm5VVoESK0BBS+PMmnN75D5bHQi9fHsirBfOwkmyM3H7zXpu76gEMolUYorkl9EyYazB6Sh4bBB5vtyBJQA9jcZaYxAbARWRWSzQYti42Y6vKWsJPtAUy2Nnd/crdx2RfDUDBQ6G/2hZ4J1tjq9nVJNG3wWAK07cz/JlQMeVsRZ3bXVA8Qbc3qTUPf9DwL/dojcPV8kXEMuCO8vRe2tq/0W4qguwN7oBmEA7TcZP1LZyKZ0c3zzvoBXDjxNQve8WIEug7T/MXRNtQQdaJolmyWh0/Y61asbErU49O2I8X3eu7JEAjAXZM5FeQLA4h7I8IF06KSRMnsHGpjbzYWrYCoKbw== aquila insecure public key diff --git a/docs/etc/id_rsa.randomart b/docs/etc/id_rsa.randomart new file mode 100644 index 0000000..715e146 --- /dev/null +++ b/docs/etc/id_rsa.randomart @@ -0,0 +1,15 @@ +The key fingerprint is: +SHA256:qd4LldwWuSZvsv2efgLa5ViyPUa3wm3NGXe36hlhk6A aquila insecure public key + +The key's randomart image is: ++---[RSA 4096]----+ +| | +| . | +| o. | +| . +.o. . | +| SE= = | +| o =o = +.+| +| o .oo% + +B| +| . o.=+ O.Boo| +| . +..=*X. | ++----[SHA256]-----+ diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig new file mode 100644 index 0000000..e64bfac --- /dev/null +++ b/src/WaitGroup.zig @@ -0,0 +1,48 @@ +// brought over from https://github.com/ziglang/zig/blob/c9006d9/src/WaitGroup.zig + +const std = @import("std"); +const Atomic = std.atomic.Atomic; +const assert = std.debug.assert; +const WaitGroup = @This(); + +const is_waiting: usize = 1 << 0; +const one_pending: usize = 1 << 1; + +state: Atomic(usize) = Atomic(usize).init(0), +event: std.Thread.ResetEvent = .{}, + +pub fn start(self: *WaitGroup) void { + const state = self.state.fetchAdd(one_pending, .Monotonic); + assert((state / one_pending) < (std.math.maxInt(usize) / one_pending)); +} + +pub fn finish(self: *WaitGroup) void { + const state = self.state.fetchSub(one_pending, .Release); + assert((state / one_pending) > 0); + + if (state == (one_pending | is_waiting)) { + self.state.fence(.Acquire); + self.event.set(); + } +} + +pub fn wait(self: *WaitGroup) void { + var state = self.state.fetchAdd(is_waiting, .Acquire); + assert(state & is_waiting == 0); + + if ((state / one_pending) > 0) { + self.event.wait(); + } +} + +pub fn reset(self: *WaitGroup) void { + self.state.store(0, .Monotonic); + self.event.reset(); +} + +pub fn isDone(wg: *WaitGroup) bool { + const state = wg.state.load(.Acquire); + assert(state & is_waiting == 0); + + return (state / one_pending) == 0; +} diff --git a/src/db/Job.zig b/src/db/Job.zig index ba46331..3d061f8 100644 --- a/src/db/Job.zig +++ b/src/db/Job.zig @@ -120,6 +120,8 @@ pub fn create(alloc: std.mem.Allocator, package: Package, commit: string, arch: .arch = Arch{ .tag = arch }, .os = Os{ .tag = os }, }); + std.log.info("queued job {s} @ {s} for {s} {s}", .{ package.name, commit, arch, os }); + runner.pickup_tracker.start(); return j; } diff --git a/src/docker.zig b/src/docker.zig new file mode 100644 index 0000000..f57f99c --- /dev/null +++ b/src/docker.zig @@ -0,0 +1,74 @@ +//! https://docs.docker.com/engine/api/v1.41/ + +const std = @import("std"); +const string = []const u8; +const UrlValues = @import("UrlValues"); +const zfetch = @import("zfetch"); + +const max_len = 1024 * 1024 * 5; + +/// https://docs.docker.com/engine/api/v1.41/#tag/Container/operation/ContainerCreate +pub fn containerCreate(alloc: std.mem.Allocator, payload: anytype) !std.json.ValueTree { + const url = "http://localhost/v1.41/containers/create"; + var docker_conn = try zfetch.Connection.connect(alloc, .{ .protocol = .unix, .hostname = "/var/run/docker.sock" }); + defer docker_conn.close(); + var req = try zfetch.Request.fromConnection(alloc, docker_conn, url); + var headers = zfetch.Headers.init(alloc); + try headers.appendValue("Content-Type", "application/json"); + try req.do(.POST, headers, try std.json.stringifyAlloc(alloc, payload, .{})); + const r = req.reader(); + const body_content = try r.readAllAlloc(alloc, max_len); + std.log.debug("{d}: {s}", .{ req.status.code, url }); + if (req.status.code != 201) std.log.debug("{s}", .{body_content}); + return try std.json.Parser.init(alloc, false).parse(body_content); +} + +/// https://docs.docker.com/engine/api/v1.41/#tag/Container/operation/ContainerStart +pub fn containerStart(alloc: std.mem.Allocator, id: string) !void { + const url = try std.fmt.allocPrint(alloc, "http://localhost/v1.41/containers/{s}/start", .{id}); + var docker_conn = try zfetch.Connection.connect(alloc, .{ .protocol = .unix, .hostname = "/var/run/docker.sock" }); + defer docker_conn.close(); + var req = try zfetch.Request.fromConnection(alloc, docker_conn, url); + var headers = zfetch.Headers.init(alloc); + try headers.appendValue("Content-Type", "application/json"); + try req.do(.POST, headers, "{}"); + std.log.debug("{d}: {s}", .{ req.status.code, url }); +} + +/// https://docs.docker.com/engine/api/v1.41/#tag/Container/operation/ContainerInspect +pub fn containerInspect(alloc: std.mem.Allocator, id: string) !std.json.ValueTree { + const url = try std.fmt.allocPrint(alloc, "http://localhost/v1.41/containers/{s}/json", .{id}); + var docker_conn = try zfetch.Connection.connect(alloc, .{ .protocol = .unix, .hostname = "/var/run/docker.sock" }); + defer docker_conn.close(); + var req = try zfetch.Request.fromConnection(alloc, docker_conn, url); + try req.do(.GET, null, null); + const r = req.reader(); + const body_content = try r.readAllAlloc(alloc, max_len); + std.log.debug("{d}: {s}", .{ req.status.code, url }); + return try std.json.Parser.init(alloc, false).parse(body_content); +} + +/// https://docs.docker.com/engine/api/v1.41/#tag/Network/operation/NetworkConnect +pub fn networkConnect(alloc: std.mem.Allocator, network_id: string, container_id: string) !void { + const url = try std.fmt.allocPrint(alloc, "http://localhost/v1.41/networks/{s}/connect", .{network_id}); + var docker_conn = try zfetch.Connection.connect(alloc, .{ .protocol = .unix, .hostname = "/var/run/docker.sock" }); + defer docker_conn.close(); + var req = try zfetch.Request.fromConnection(alloc, docker_conn, url); + var headers = zfetch.Headers.init(alloc); + try headers.appendValue("Content-Type", "application/json"); + try req.do(.POST, headers, try std.json.stringifyAlloc(alloc, .{ .Container = container_id }, .{})); + const r = req.reader(); + const body_content = try r.readAllAlloc(alloc, max_len); + std.log.debug("{d}: {s}", .{ req.status.code, url }); + std.log.debug("{s}", .{body_content}); +} + +/// https://docs.docker.com/engine/api/v1.41/#tag/Container/operation/ContainerDelete +pub fn containerDelete(alloc: std.mem.Allocator, id: string) !void { + const url = try std.fmt.allocPrint(alloc, "http://localhost/v1.41/containers/{s}", .{id}); + var docker_conn = try zfetch.Connection.connect(alloc, .{ .protocol = .unix, .hostname = "/var/run/docker.sock" }); + defer docker_conn.close(); + var req = try zfetch.Request.fromConnection(alloc, docker_conn, url); + try req.do(.DELETE, null, null); + std.log.debug("{d}: {s}", .{ req.status.code, url }); +} diff --git a/src/job_doer.zig b/src/job_doer.zig new file mode 100644 index 0000000..f6c46ea --- /dev/null +++ b/src/job_doer.zig @@ -0,0 +1,290 @@ +const std = @import("std"); +const string = []const u8; +const stringL = []const string; +const stringLL = []const stringL; +const db = @import("./db/_db.zig"); +const UrlValues = @import("UrlValues"); +const zfetch = @import("zfetch"); +const root = @import("root"); +const docker = @import("./docker.zig"); +const WaitGroup = @import("./WaitGroup.zig"); + +// https://hub.docker.com/r/nektro/qemu-system/tags +// https://github.com/nektro/docker-qemu-system/blob/master/Dockerfile +fn getImageName(arch: db.Job.Arch.Tag) string { + return switch (arch) { + .x86_64 => "nektro/qemu-system:x86_64@sha256:1f3d8a4058b35f0f6d40c7a11862be312b377ceb6b138a40db2c2dad90005da1", + }; +} + +const Mount = struct { + Type: string = "bind", + Source: string, + Destination: string, + Mode: string = "ro", + RW: bool = false, + Propagation: string = "rprivate", +}; + +// pub fn start(allocator: std.mem.Allocator, job: *const db.Job) Error!void { +pub fn start(allocator: std.mem.Allocator, job: *db.Job, run_tracker: *WaitGroup) !void { + defer allocator.destroy(job); + defer run_tracker.finish(); + std.log.info("started job {} for {d} - {s} - {s}", .{ job.uuid, job.package, job.arch, job.os }); + + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + const alloc = arena.allocator(); + + try job.update(alloc, .state, .pending); + + // start qemu-system docker container, get id + // TODO use qemu-kvm or cpu=host + // TODO remove absolute path in image bind + const id = blk: { + const tree = try docker.containerCreate(alloc, .{ + .Image = getImageName(job.arch.tag), + .Env = &[_]string{ + try std.fmt.allocPrint(alloc, "arch={s}", .{@tagName(job.arch.tag)}), + try std.fmt.allocPrint(alloc, "os={s}", .{@tagName(job.os.tag)}), + try std.fmt.allocPrint(alloc, "image=/images/{s}/{s}/stage4.qcow2", .{ @tagName(job.arch.tag), @tagName(job.os.tag) }), + }, + .Volumes = .{ + .@"/images" = .{}, + }, + .HostConfig = .{ + .Binds = &[_]string{ + "/home/meg/other/dev/aquila/images:/images:ro", + }, + }, + .Mounts = &[_]Mount{ + .{ .Source = "/home/meg/other/dev/aquila/images", .Destination = "/images" }, + }, + }); + break :blk tree.root.Object.get("Id").?.String; + }; + + // start container + try docker.containerStart(alloc, id); + + // connect container to aquila network + // + { + const own_id = try ownDockerId(alloc); + const json = try docker.containerInspect(alloc, own_id); + const network_id = json.root.Object.get("NetworkSettings").?.Object.get("Networks").?.Object.values()[0].Object.get("NetworkID").?.String; + try docker.networkConnect(alloc, network_id, id); + } + + // wait for ssh to be available + { + // TODO do this better + std.time.sleep(std.time.ns_per_s * 15); + } + + // ssh into system and store results + { + var data_dir = try std.fs.cwd().openDir(root.datadirpath, .{}); + defer data_dir.close(); + + // TODO put this file in a better place + var job_file = try data_dir.createFile("job.jsonl", .{}); + defer job_file.close(); + const w = job_file.writer(); + + const pkg = try db.Package.byKey(alloc, .id, job.package); + const clone_url = try pkg.?.cloneUrl(alloc); + const folder_name = std.mem.splitBackwards(u8, clone_url, "/").next(); + const work_name = try std.fmt.allocPrint(alloc, "workspace/{s}", .{folder_name}); + const host_name = id[0..12]; + + try jsonWriteLine(w, .{ .package = job.package }); + try jsonWriteLine(w, .{ .at = job.commit }); + try jsonWriteLine(w, .{ .arch = job.arch }); + try jsonWriteLine(w, .{ .os = job.os }); + + // TODO fail job if a command exits with non-0 + for (printSysInfoCmd(job.os)) |item| { + try doJobLine(allocator, w, host_name, item); + } + try doJobLine(allocator, w, host_name, &.{ "cd", "llvm-project", "&&", "git", "describe", "--tags" }); + try doJobLine(allocator, w, host_name, &.{ "cd", "zig", "&&", "git", "describe", "--tags" }); + try doJobLine(allocator, w, host_name, &.{ "cd", "workspace", "&&", "git", "clone", clone_url }); + try doJobLine(allocator, w, host_name, &.{ "cd", work_name, "&&", "~/zigmod", "ci" }); + try doJobLine(allocator, w, host_name, &.{ "cd", work_name, "&&", "~/zig/build/zig", "build" }); + try doJobLine(allocator, w, host_name, &.{ "cd", work_name, "&&", "~/zig/build/zig", "build", "test" }); + _ = try execRemoteCmd(allocator, host_name, poweroffCmd(job.os), null); + } + + // remove container + try docker.containerDelete(alloc, id); + + // we're done! + try job.update(alloc, .state, .success); + + std.log.info("job done: {s}", .{job.uuid}); +} + +fn ownDockerId(alloc: std.mem.Allocator) !string { + var file = try std.fs.cwd().openFile("/etc/hostname", .{}); + defer file.close(); + const content = try file.reader().readAllAlloc(alloc, std.math.maxInt(usize)); + return std.mem.trimRight(u8, content, "\n"); +} + +fn jsonWriteLine(writer: std.fs.File.Writer, value: anytype) !void { + try std.json.stringify(value, .{ .whitespace = .{ .indent = .None, .separator = false } }, writer); + try writer.writeByte('\n'); +} + +fn printSysInfoCmd(os: db.Job.Os) stringLL { + return switch (os.tag) { + .debian => &.{ + &.{ "uname", "-a" }, + &.{"free"}, + }, + }; +} + +fn poweroffCmd(os: db.Job.Os) stringL { + return switch (os.tag) { + .debian => &[_]string{ "shutdown", "-h", "now" }, + }; +} + +fn doJobLine(allocator: std.mem.Allocator, writer: std.fs.File.Writer, host_name: string, args: stringL) !void { + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + const alloc = arena.allocator(); + + var list = std.ArrayList(u8).init(alloc); + errdefer list.deinit(); + + try jsonWriteLine(writer, .{ .cmd = args }); + const cmd = try execRemoteCmd(alloc, host_name, args, &list); + + // TODO print output as it happens + // TODO send lines to websocket + var fbs = std.io.fixedBufferStream(list.toOwnedSlice()); + const r = fbs.reader(); + while (try r.readUntilDelimiterOrEofAlloc(alloc, '\n', std.math.maxInt(usize))) |line| { + // TODO filter out ansi escapse + // TODO skip if line length is >0 before and =0 after transform + // TODO filter out `Warning: Permanently added '[5332fa241874]:2222' (ED25519) to the list of known hosts.` messages + try jsonWriteLine(writer, [_]string{std.mem.trimRight(u8, line, "\r")}); + } + try jsonWriteLine(writer, cmd); +} + +const CmdResult = struct { + exit: u32, + duration: u64, +}; + +fn execRemoteCmd(allocator: std.mem.Allocator, host_name: string, args: stringL, stdout: ?*std.ArrayList(u8)) !CmdResult { + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + const alloc = arena.allocator(); + + // construct args + var list = std.ArrayList(string).init(alloc); + errdefer list.deinit(); + try list.ensureTotalCapacityPrecise(16 + args.len); + list.appendSliceAssumeCapacity(&.{"ssh"}); + list.appendSliceAssumeCapacity(&.{ "-o", "StrictHostKeychecking=no" }); + list.appendSliceAssumeCapacity(&.{ "-o", "ConnectionAttempts=1" }); + list.appendSliceAssumeCapacity(&.{ "-o", "RequestTTY=no" }); + list.appendSliceAssumeCapacity(&.{ "-o", "PreferredAuthentications=publickey" }); + list.appendSliceAssumeCapacity(&.{ "-o", "UserKnownHostsFile=/dev/null" }); + list.appendSliceAssumeCapacity(&.{ "-o", "BatchMode=yes" }); + list.appendSliceAssumeCapacity(&.{ "-p", "2222" }); + list.appendAssumeCapacity(try std.fmt.allocPrint(alloc, "root@{s}", .{host_name})); + list.appendSliceAssumeCapacity(args); + + // exec ssh + const begin = @intCast(u64, std.time.milliTimestamp()); + var child = std.ChildProcess.init(list.toOwnedSlice(), alloc); + child.stdout_behavior = .Pipe; + child.stderr_behavior = .Pipe; + + try child.spawn(); + + // collect output + if (stdout) |_| { + try collectOutputPosix(child, stdout.?, stdout.?, std.math.maxInt(usize)); + } + const term = try child.wait(); + const end = @intCast(u64, std.time.milliTimestamp()); + return CmdResult{ + .exit = @intCast(u32, term.Exited), + .duration = (end - begin) / 1000, + }; +} + +// picked out from std.ChildProcess internal code +fn collectOutputPosix(child: std.ChildProcess, stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), max_output_bytes: usize) !void { + const os = std.os; + var poll_fds = [_]os.pollfd{ + .{ .fd = child.stdout.?.handle, .events = os.POLL.IN, .revents = undefined }, + .{ .fd = child.stderr.?.handle, .events = os.POLL.IN, .revents = undefined }, + }; + + var dead_fds: usize = 0; + // We ask for ensureTotalCapacity with this much extra space. This has more of an + // effect on small reads because once the reads start to get larger the amount + // of space an ArrayList will allocate grows exponentially. + const bump_amt = 512; + + const err_mask = os.POLL.ERR | os.POLL.NVAL | os.POLL.HUP; + + while (dead_fds < poll_fds.len) { + const events = try os.poll(&poll_fds, std.math.maxInt(i32)); + if (events == 0) continue; + + var remove_stdout = false; + var remove_stderr = false; + // Try reading whatever is available before checking the error + // conditions. + // It's still possible to read after a POLL.HUP is received, always + // check if there's some data waiting to be read first. + if (poll_fds[0].revents & os.POLL.IN != 0) { + // stdout is ready. + const new_capacity = std.math.min(stdout.items.len + bump_amt, max_output_bytes); + try stdout.ensureTotalCapacity(new_capacity); + const buf = stdout.unusedCapacitySlice(); + if (buf.len == 0) return error.StdoutStreamTooLong; + const nread = try os.read(poll_fds[0].fd, buf); + stdout.items.len += nread; + + // Remove the fd when the EOF condition is met. + remove_stdout = nread == 0; + } else { + remove_stdout = poll_fds[0].revents & err_mask != 0; + } + + if (poll_fds[1].revents & os.POLL.IN != 0) { + // stderr is ready. + const new_capacity = std.math.min(stderr.items.len + bump_amt, max_output_bytes); + try stderr.ensureTotalCapacity(new_capacity); + const buf = stderr.unusedCapacitySlice(); + if (buf.len == 0) return error.StderrStreamTooLong; + const nread = try os.read(poll_fds[1].fd, buf); + stderr.items.len += nread; + + // Remove the fd when the EOF condition is met. + remove_stderr = nread == 0; + } else { + remove_stderr = poll_fds[1].revents & err_mask != 0; + } + + // Exclude the fds that signaled an error. + if (remove_stdout) { + poll_fds[0].fd = -1; + dead_fds += 1; + } + if (remove_stderr) { + poll_fds[1].fd = -1; + dead_fds += 1; + } + } +} diff --git a/src/main.zig b/src/main.zig index be95c96..f294eec 100644 --- a/src/main.zig +++ b/src/main.zig @@ -14,6 +14,7 @@ const signal = @import("signal"); const handler = @import("./handler/_handler.zig"); const db = @import("./db/_db.zig"); +const runner = @import("./runner.zig"); pub const build_options = @import("build_options"); pub const files = @import("self/files"); @@ -28,7 +29,7 @@ pub var domain: string = ""; pub var disable_import_repo = false; pub fn main() !void { - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; + var gpa = std.heap.GeneralPurposeAllocator(.{ .thread_safe = true }){}; const alloc = gpa.allocator(); { @@ -61,6 +62,7 @@ pub fn main() !void { try flag.addSingle("port"); try flag.addMulti("oauth2-client"); try flag.addSingle("disable-import-repo"); + try flag.addSingle("ci"); _ = try flag.parse(.double); try flag.parseEnv(); @@ -130,8 +132,16 @@ pub fn main() !void { } } + // + + if (std.fmt.parseInt(u1, flag.getSingle("ci") orelse "0", 2)) { + std.debug.assert(try docker.amInside()); + (try std.Thread.spawn(.{}, runner.start, .{alloc})).detach(); + } + const port = try std.fmt.parseUnsigned(u16, flag.getSingle("port") orelse "8000", 10); std.log.info("starting server on port {d}", .{port}); + // TODO make this a Server instance and implement proper stop try http.listenAndServe( alloc, try std.net.Address.parseIp("0.0.0.0", port), @@ -141,6 +151,10 @@ pub fn main() !void { } fn handle_sig() void { + std.log.info("ensuring all CI jobs are in stopped state...", .{}); + runner.should_run = false; + runner.control.wait(); + std.log.info("closing database connection...", .{}); db.close(); diff --git a/src/runner.zig b/src/runner.zig new file mode 100644 index 0000000..407d0b1 --- /dev/null +++ b/src/runner.zig @@ -0,0 +1,48 @@ +const std = @import("std"); +const string = []const u8; +const builtin = @import("builtin"); +const WaitGroup = @import("./WaitGroup.zig"); +const db = @import("./db/_db.zig"); +const job_doer = @import("./job_doer.zig"); + +pub var should_run = true; +pub var control = WaitGroup{}; +pub var pickup_tracker = WaitGroup{}; + +var run_tracker = WaitGroup{}; + +pub fn start(allocator: std.mem.Allocator) void { + control.start(); + defer control.finish(); + + while (should_run) { + if (pickup_tracker.isDone()) { + // no work found, sleep 5s so we're not hogging cpu + std.time.sleep(std.time.ns_per_s * 5); + continue; + } + + // find queued jobs and launch them + // TODO add a way to limit the number of jobs + // TODO add integration with docker swarm and checking available workers + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + const alloc = arena.allocator(); + + const candidates = db.Job.byKeyAll(alloc, .state, .queued, .asc) catch continue; + for (candidates) |item| { + const dupe = allocator.create(db.Job) catch continue; + dupe.* = item; + (std.Thread.spawn(.{}, job_doer.start, .{ allocator, dupe, &run_tracker }) catch { + allocator.destroy(dupe); + continue; + }).detach(); + run_tracker.start(); + pickup_tracker.finish(); + } + } + + // wait for all running jobs to finish + // TODO pause docker containers, resume again on startup + run_tracker.wait(); +} diff --git a/tools/gen_stage2.sh b/tools/gen_stage2.sh index 103e548..2ce26aa 100755 --- a/tools/gen_stage2.sh +++ b/tools/gen_stage2.sh @@ -30,34 +30,44 @@ qemu-kvm -m 20480 -hda $after -net nic -net user # TODO automate installer # TODO code/cli add sshd_config +# TODO code/cli add id_rsa.pub # TODO code ssh run rest of commands case "$os" in debian) # apt install curl + # mkdir .ssh # curl -s https://clbin.com/sSR2s > /etc/ssh/sshd_config + # curl -s https://clbin.com/6LSMP > /root/.ssh/authorized_keys # apt install git # git clone https://github.com/llvm/llvm-project # git clone https://github.com/ziglang/zig # mkdir out + # mkdir workspace # shutdown -h now ;; alpine) # apk add curl + # mkdir .ssh # curl -s https://clbin.com/sSR2s > /etc/ssh/sshd_config + # curl -s https://clbin.com/6LSMP > /root/.ssh/authorized_keys # apk add git # git clone https://github.com/llvm/llvm-project # git clone https://github.com/ziglang/zig # mkdir out + # mkdir workspace # poweroff ;; freebsd) # pkg install curl + # mkdir .ssh # curl -s https://clbin.com/sSR2s > /etc/ssh/sshd_config + # curl -s https://clbin.com/6LSMP > /root/.ssh/authorized_keys # pkg install git # git clone https://github.com/llvm/llvm-project # git clone https://github.com/ziglang/zig # mkdir out + # mkdir workspace # shutdown -p now ;; esac diff --git a/zigmod.lock b/zigmod.lock index 54350bb..8ff3f4c 100644 --- a/zigmod.lock +++ b/zigmod.lock @@ -36,3 +36,4 @@ git https://gist.github.com/nektro/a0045048d8e7458e42520e9ffb90becd commit-3fdb9 git https://gist.github.com/nektro/64c7da67a00d8b94091d754dd2cf1aa9 commit-4d6ac0b23bd3e55c1907eeced651fca04721a88d git https://gist.github.com/nektro/5e20de7db63af25d72ecee35e9f8d16a commit-5841714041a5c6fe15c9b0ef6c65f826aeab6854 git https://gist.github.com/nektro/6621bee406a94139cee26f7643af6183 commit-c221d16744cf7902b931d2255623949ea000b675 +git https://gist.github.com/nektro/917368c242e9be1a832816bc66d10d63 commit-d917db1e4b68faaaed6ecbdd7774f1959dc76fa5 diff --git a/zigmod.yml b/zigmod.yml index 931811f..f279ed7 100644 --- a/zigmod.yml +++ b/zigmod.yml @@ -22,3 +22,4 @@ root_dependencies: - src: git https://gist.github.com/nektro/64c7da67a00d8b94091d754dd2cf1aa9 # docker - src: git https://gist.github.com/nektro/5e20de7db63af25d72ecee35e9f8d16a # signal - src: git https://gist.github.com/nektro/6621bee406a94139cee26f7643af6183 # mime + - src: git https://gist.github.com/nektro/917368c242e9be1a832816bc66d10d63 # UrlValues