Skip to content

Commit

Permalink
Merge pull request #123 from depot/health-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobwgillespie authored Oct 11, 2024
2 parents 02ad4e8 + 0dbfc57 commit 70dae85
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 50 deletions.
9 changes: 9 additions & 0 deletions proto/depot/cloud/v3/machine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package depot.cloud.v3;
service MachineService {
rpc RegisterMachine(RegisterMachineRequest) returns (stream RegisterMachineResponse);
rpc PingMachineHealth(PingMachineHealthRequest) returns (PingMachineHealthResponse);
rpc ReportMachineHealth(stream ReportMachineHealthRequest) returns (ReportMachineHealthResponse);
rpc Usage(UsageRequest) returns (UsageResponse);
}

Expand Down Expand Up @@ -134,6 +135,14 @@ message PingMachineHealthResponse {
bool should_terminate = 1;
}

message ReportMachineHealthRequest {
repeated DiskSpace disks = 1;
}

message ReportMachineHealthResponse {
bool should_terminate = 1;
}

message Cert {
string cert = 1;
string key = 2;
Expand Down
13 changes: 12 additions & 1 deletion src/gen/ts/depot/cloud/v3/machine_connect.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// @generated by protoc-gen-connect-es v0.12.0 with parameter "target=ts,import_extension=none"
// @generated by protoc-gen-connect-es v1.4.0 with parameter "target=ts,import_extension=none"
// @generated from file depot/cloud/v3/machine.proto (package depot.cloud.v3, syntax proto3)
/* eslint-disable */
// @ts-nocheck
Expand All @@ -9,6 +9,8 @@ import {
PingMachineHealthResponse,
RegisterMachineRequest,
RegisterMachineResponse,
ReportMachineHealthRequest,
ReportMachineHealthResponse,
UsageRequest,
UsageResponse,
} from './machine_pb'
Expand Down Expand Up @@ -37,6 +39,15 @@ export const MachineService = {
O: PingMachineHealthResponse,
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v3.MachineService.ReportMachineHealth
*/
reportMachineHealth: {
name: 'ReportMachineHealth',
I: ReportMachineHealthRequest,
O: ReportMachineHealthResponse,
kind: MethodKind.ClientStreaming,
},
/**
* @generated from rpc depot.cloud.v3.MachineService.Usage
*/
Expand Down
82 changes: 81 additions & 1 deletion src/gen/ts/depot/cloud/v3/machine_pb.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// @generated by protoc-gen-es v1.3.0 with parameter "target=ts,import_extension=none"
// @generated by protoc-gen-es v1.10.0 with parameter "target=ts,import_extension=none"
// @generated from file depot/cloud/v3/machine.proto (package depot.cloud.v3, syntax proto3)
/* eslint-disable */
// @ts-nocheck
Expand Down Expand Up @@ -900,6 +900,86 @@ export class PingMachineHealthResponse extends Message<PingMachineHealthResponse
}
}

/**
* @generated from message depot.cloud.v3.ReportMachineHealthRequest
*/
export class ReportMachineHealthRequest extends Message<ReportMachineHealthRequest> {
/**
* @generated from field: repeated depot.cloud.v3.DiskSpace disks = 1;
*/
disks: DiskSpace[] = []

constructor(data?: PartialMessage<ReportMachineHealthRequest>) {
super()
proto3.util.initPartial(data, this)
}

static readonly runtime: typeof proto3 = proto3
static readonly typeName = 'depot.cloud.v3.ReportMachineHealthRequest'
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{no: 1, name: 'disks', kind: 'message', T: DiskSpace, repeated: true},
])

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ReportMachineHealthRequest {
return new ReportMachineHealthRequest().fromBinary(bytes, options)
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ReportMachineHealthRequest {
return new ReportMachineHealthRequest().fromJson(jsonValue, options)
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ReportMachineHealthRequest {
return new ReportMachineHealthRequest().fromJsonString(jsonString, options)
}

static equals(
a: ReportMachineHealthRequest | PlainMessage<ReportMachineHealthRequest> | undefined,
b: ReportMachineHealthRequest | PlainMessage<ReportMachineHealthRequest> | undefined,
): boolean {
return proto3.util.equals(ReportMachineHealthRequest, a, b)
}
}

/**
* @generated from message depot.cloud.v3.ReportMachineHealthResponse
*/
export class ReportMachineHealthResponse extends Message<ReportMachineHealthResponse> {
/**
* @generated from field: bool should_terminate = 1;
*/
shouldTerminate = false

constructor(data?: PartialMessage<ReportMachineHealthResponse>) {
super()
proto3.util.initPartial(data, this)
}

static readonly runtime: typeof proto3 = proto3
static readonly typeName = 'depot.cloud.v3.ReportMachineHealthResponse'
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{no: 1, name: 'should_terminate', kind: 'scalar', T: 8 /* ScalarType.BOOL */},
])

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ReportMachineHealthResponse {
return new ReportMachineHealthResponse().fromBinary(bytes, options)
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ReportMachineHealthResponse {
return new ReportMachineHealthResponse().fromJson(jsonValue, options)
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ReportMachineHealthResponse {
return new ReportMachineHealthResponse().fromJsonString(jsonString, options)
}

static equals(
a: ReportMachineHealthResponse | PlainMessage<ReportMachineHealthResponse> | undefined,
b: ReportMachineHealthResponse | PlainMessage<ReportMachineHealthResponse> | undefined,
): boolean {
return proto3.util.equals(ReportMachineHealthResponse, a, b)
}
}

/**
* @generated from message depot.cloud.v3.Cert
*/
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/buildkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ keepBytes = ${cacheSizeBytes}
try {
await Promise.all([
buildkit,
reportHealth({machineId, signal, headers, path: rootDir}),
reportHealth({signal, headers, path: rootDir}),
reportUsage({machineId, signal, headers}),
])
} catch (error) {
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export async function startEngine(message: RegisterMachineResponse, task: Regist
try {
await Promise.all([
engine,
reportEngineHealth({machineId, signal, headers, path: '/var/lib/engine'}),
reportEngineHealth({signal, headers, path: '/var/lib/engine'}),
// reportUsage({machineId, signal, headers}),
])
} catch (error) {
Expand Down
45 changes: 22 additions & 23 deletions src/tasks/engineHealth.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,40 @@
import {PlainMessage} from '@bufbuild/protobuf'
import {execa} from 'execa'
import {DiskSpace} from '../gen/ts/depot/cloud/v3/machine_pb'
import {sleep} from '../utils/common'
import {stats} from '../utils/disk'
import {client} from '../utils/grpc'

export interface ReportHealthParams {
machineId: string
signal: AbortSignal
headers: HeadersInit
path: string
}

export async function reportEngineHealth({machineId, signal, headers, path}: ReportHealthParams) {
while (true) {
if (signal.aborted) return

export async function reportEngineHealth({signal, headers, path}: ReportHealthParams) {
while (!signal.aborted) {
await waitForWorkers(signal)

try {
while (true) {
if (signal.aborted) return

const disk_stats = await stats(path)
const disk_space = disk_stats
? [
{
path,
freeMb: disk_stats.freeMb,
totalMb: disk_stats.totalMb,
freeInodes: disk_stats.freeInodes,
totalInodes: disk_stats.totalInodes,
},
]
: undefined

await client.pingMachineHealth({machineId, disks: disk_space}, {headers, signal})
await sleep(1000)
async function* stream() {
while (!signal.aborted) {
const diskStats = await stats(path)
if (diskStats) {
const diskSpace: PlainMessage<DiskSpace> = {
path,
freeMb: diskStats.freeMb,
totalMb: diskStats.totalMb,
freeInodes: diskStats.freeInodes,
totalInodes: diskStats.totalInodes,
}
yield {disks: [diskSpace]}
}
await sleep(1000)
}
}

const res = await client.reportMachineHealth(stream(), {headers, signal})
if (res.shouldTerminate) return
} catch (error) {
console.log('Error reporting health:', error)
}
Expand Down
45 changes: 22 additions & 23 deletions src/tasks/health.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,40 @@
import {PlainMessage} from '@bufbuild/protobuf'
import {execa} from 'execa'
import {DiskSpace} from '../gen/ts/depot/cloud/v3/machine_pb'
import {sleep} from '../utils/common'
import {stats} from '../utils/disk'
import {client} from '../utils/grpc'

export interface ReportHealthParams {
machineId: string
signal: AbortSignal
headers: HeadersInit
path: string
}

export async function reportHealth({machineId, signal, headers, path}: ReportHealthParams) {
while (true) {
if (signal.aborted) return

export async function reportHealth({signal, headers, path}: ReportHealthParams) {
while (!signal.aborted) {
await waitForBuildKitWorkers(signal)

try {
while (true) {
if (signal.aborted) return

const disk_stats = await stats(path)
const disk_space = disk_stats
? [
{
path,
freeMb: disk_stats.freeMb,
totalMb: disk_stats.totalMb,
freeInodes: disk_stats.freeInodes,
totalInodes: disk_stats.totalInodes,
},
]
: undefined

await client.pingMachineHealth({machineId, disks: disk_space}, {headers, signal})
await sleep(1000)
async function* stream() {
while (!signal.aborted) {
const diskStats = await stats(path)
if (diskStats) {
const diskSpace: PlainMessage<DiskSpace> = {
path,
freeMb: diskStats.freeMb,
totalMb: diskStats.totalMb,
freeInodes: diskStats.freeInodes,
totalInodes: diskStats.totalInodes,
}
yield {disks: [diskSpace]}
}
await sleep(1000)
}
}

const res = await client.reportMachineHealth(stream(), {headers, signal})
if (res.shouldTerminate) return
} catch (error) {
console.log('Error reporting health:', error)
}
Expand Down

0 comments on commit 70dae85

Please sign in to comment.