Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/cli/args.zig
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub const Command = union(enum) {
rate_limit: UserQuery,
stake: StakeArgs,
vault: VaultArgs,
watch: WatchArgs,
ledger: LedgerArgs,
approve_builder: ApproveBuilderArgs,
subaccount: SubAccountArgs,
Expand Down Expand Up @@ -82,6 +83,7 @@ pub const HelpTopic = enum {
rate_limit,
stake,
vault,
watch,
ledger,
approve_builder,
subaccount,
Expand Down Expand Up @@ -279,6 +281,14 @@ pub const VaultArgs = struct {

pub const VaultAction = enum { info, deposit, withdraw };

pub const WatchArgs = struct {
coin: []const u8,
above: ?[]const u8 = null,
below: ?[]const u8 = null,
cmd: ?[]const u8 = null,
repeat: bool = false,
};

pub const LedgerArgs = struct {
address: ?[]const u8 = null,
start_time: ?[]const u8 = null,
Expand Down Expand Up @@ -484,6 +494,8 @@ pub fn parse(allocator: std.mem.Allocator) ParseError!ParseResult {
.{ .stake = parseStake(rest) }
else if (std.mem.eql(u8, cmd_str, "vault"))
.{ .vault = parseVault(rest) }
else if (std.mem.eql(u8, cmd_str, "watch"))
.{ .watch = parseWatch(rest) orelse return error.MissingArgument }
else if (std.mem.eql(u8, cmd_str, "ledger"))
.{ .ledger = parseLedger(rest) }
else if (std.mem.eql(u8, cmd_str, "approve-builder"))
Expand Down Expand Up @@ -703,6 +715,7 @@ fn canonicalHelpTopic(name: []const u8) ?HelpTopic {
if (std.mem.eql(u8, name, "rate-limit") or std.mem.eql(u8, name, "ratelimit")) return .rate_limit;
if (std.mem.eql(u8, name, "stake") or std.mem.eql(u8, name, "staking")) return .stake;
if (std.mem.eql(u8, name, "vault")) return .vault;
if (std.mem.eql(u8, name, "watch")) return .watch;
if (std.mem.eql(u8, name, "ledger")) return .ledger;
if (std.mem.eql(u8, name, "approve-builder")) return .approve_builder;
if (std.mem.eql(u8, name, "subaccount")) return .subaccount;
Expand Down Expand Up @@ -974,6 +987,32 @@ fn parseVault(args: []const []const u8) VaultArgs {
return result;
}

// watch BTC --above 100000 --cmd "echo triggered" --repeat
fn parseWatch(args: []const []const u8) ?WatchArgs {
if (args.len < 1) return null;
var result = WatchArgs{ .coin = args[0] };
var i: usize = 1;
while (i < args.len) : (i += 1) {
const a = args[i];
if (std.mem.eql(u8, a, "--above") and i + 1 < args.len) {
i += 1;
result.above = args[i];
} else if (std.mem.eql(u8, a, "--below") and i + 1 < args.len) {
i += 1;
result.below = args[i];
} else if (std.mem.eql(u8, a, "--cmd") and i + 1 < args.len) {
i += 1;
result.cmd = args[i];
} else if (std.mem.eql(u8, a, "--repeat")) {
result.repeat = true;
}
}
// Must have exactly one condition — not both, not neither
if (result.above == null and result.below == null) return null;
if (result.above != null and result.below != null) return null;
return result;
}

fn parseLedger(args: []const []const u8) LedgerArgs {
var result = LedgerArgs{};
var i: usize = 0;
Expand Down
283 changes: 283 additions & 0 deletions src/cli/commands.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2754,6 +2754,258 @@ pub fn stream(allocator: std.mem.Allocator, w: *Writer, config: Config, a: args_
}
}

pub fn watch(allocator: std.mem.Allocator, w: *Writer, config: Config, a: args_mod.WatchArgs) CmdError!void {
const is_json = w.format == .json or !w.is_tty;
const stderr = std.fs.File.stderr();
const stdout = std.fs.File.stdout();

// Parse threshold
const threshold_src = a.above orelse a.below orelse
return fail(w, "either --above or --below is required");
const threshold = std.fmt.parseFloat(f64, threshold_src) catch
return failFmt(w, "invalid threshold: {s}", .{threshold_src});
const is_above = a.above != null;
const threshold_str = threshold_src;

// Resolve coin name — spot pairs (e.g. HYPE/USDC) need @index for allMids
var coin_buf: [16]u8 = undefined;
var spot_idx_buf: [16]u8 = undefined;
const display_coin = upperCoin(a.coin, &coin_buf);
const is_spot = std.mem.indexOf(u8, a.coin, "/") != null;
const lookup_coin = if (is_spot)
resolveSpotCoin(allocator, config, display_coin, &spot_idx_buf)
else
display_coin;

if (!is_json) {
var msg_buf: [256]u8 = undefined;
const msg = std.fmt.bufPrint(&msg_buf, "Watching {s} {s} {s}...\r\n", .{
display_coin,
if (is_above) @as([]const u8, "--above") else @as([]const u8, "--below"),
threshold_str,
}) catch "Watching...\r\n";
stderr.writeAll(msg) catch {};
}

// SIGINT handling — reuse the stream pattern
stream_shutdown.store(false, .release);
const S = struct {
fn handler(_: c_int) callconv(.c) void {
stream_shutdown.store(true, .release);
const fd = stream_socket_fd.load(.acquire);
if (fd != -1) {
_ = std.c.shutdown(fd, 2); // SHUT_RDWR = 2
}
}
};
const act = posix.Sigaction{
.handler = .{ .handler = S.handler },
.mask = std.mem.zeroes(posix.sigset_t),
.flags = 0,
};
posix.sigaction(posix.SIG.INT, &act, null);
posix.sigaction(posix.SIG.TERM, &act, null);

// Reconnect loop
const max_retries: u8 = 10;
var retries: u8 = 0;
while (!stream_shutdown.load(.acquire)) {
var conn = WsConnection.connect(std.heap.page_allocator, config.chain) catch |e| {
if (stream_shutdown.load(.acquire)) break;
retries += 1;
if (retries > max_retries) {
return failFmt(w, "WebSocket connection failed after {d} retries: {s}", .{ max_retries, @errorName(e) });
}
if (!is_json) {
var err_buf: [256]u8 = undefined;
const err_msg = std.fmt.bufPrint(&err_buf, "Connection failed ({s}), retrying ({d}/{d})...\r\n", .{ @errorName(e), retries, max_retries }) catch "Connection failed, retrying...\r\n";
stderr.writeAll(err_msg) catch {};
}
std.Thread.sleep(2 * std.time.ns_per_s);
continue;
};

stream_socket_fd.store(conn.socket_fd, .release);

conn.subscribe(.{ .allMids = .{ .dex = null } }) catch {
conn.close();
retries += 1;
if (retries > max_retries) {
return fail(w, "Failed to subscribe after retries");
}
std.Thread.sleep(2 * std.time.ns_per_s);
continue;
};

if (!is_json) {
if (retries > 0) {
stderr.writeAll("Reconnected \xe2\x9c\x93\r\n") catch {};
} else {
stderr.writeAll("Connected \xe2\x9c\x93 (Ctrl+C to quit)\r\n") catch {};
}
}
retries = 0;

const result = watchReadLoop(conn, a, is_json, is_above, threshold, threshold_str, display_coin, lookup_coin, stdout, stderr);
conn.close();

switch (result) {
.exit => return,
.invalid_coin => return failFmt(w, "unknown coin: {s} — not found in allMids", .{display_coin}),
.reconnect => {},
}

// Connection lost — reconnect unless shutting down
if (stream_shutdown.load(.acquire)) break;
if (!is_json) {
stderr.writeAll("Disconnected, reconnecting...\r\n") catch {};
}
std.Thread.sleep(1 * std.time.ns_per_s);
}

if (!is_json) {
stderr.writeAll("\r\n") catch {};
}
}

/// Inner read loop for watch. Returns true when caller should exit.
/// Uses edge detection: only triggers when condition transitions from
/// not-met to met, preventing repeated firing while price stays on the
/// same side of the threshold.
const WatchResult = enum { exit, reconnect, invalid_coin };

fn watchReadLoop(
conn: *WsConnection,
a: args_mod.WatchArgs,
is_json: bool,
is_above: bool,
threshold: f64,
threshold_str: []const u8,
display_coin: []const u8,
lookup_coin: []const u8,
stdout: std.fs.File,
stderr: std.fs.File,
) WatchResult {
// Edge detection state: tracks whether the condition was met on the
// previous tick. We only fire when transitioning from false → true.
var was_met: bool = false;
var coin_validated: bool = false;

while (!stream_shutdown.load(.acquire)) {
const event = conn.next() catch |e| {
if (!is_json and !stream_shutdown.load(.acquire)) {
if (e == error.EndOfStream) {
stderr.writeAll("Connection closed by server\r\n") catch {};
} else {
var err_buf: [256]u8 = undefined;
const err_msg = std.fmt.bufPrint(&err_buf, "Connection error: {s}\r\n", .{@errorName(e)}) catch "Connection error\r\n";
stderr.writeAll(err_msg) catch {};
}
}
return .reconnect;
};

switch (event) {
.timeout => continue,
.closed => return .reconnect,
.message => |msg| {
if (msg.channel != .allMids) continue;

// Validate coin exists on first allMids message
if (!coin_validated) {
coin_validated = true;
if (extractMidPrice(msg.raw_json, lookup_coin) == null) {
return .invalid_coin;
}
}

const price_str = extractMidPrice(msg.raw_json, lookup_coin) orelse continue;
const mid_price = std.fmt.parseFloat(f64, price_str) catch continue;

const is_met = if (is_above) mid_price >= threshold else mid_price <= threshold;

if (!is_met) {
// Price moved back — re-arm for next crossing
was_met = false;
continue;
}

if (was_met) continue; // still on the triggered side, no edge
was_met = true;

// --- Edge triggered: condition just became true ---
const now: u64 = @intCast(std.time.milliTimestamp());

if (is_json) {
var json_buf: [512]u8 = undefined;
const json_out = std.fmt.bufPrint(&json_buf,
\\{{"coin":"{s}","price":"{s}","condition":"{s}","threshold":"{s}","timestamp":{d}}}
, .{
display_coin,
price_str,
if (is_above) @as([]const u8, "above") else @as([]const u8, "below"),
threshold_str,
now,
}) catch continue;
stdout.writeAll(json_out) catch return .exit;
stdout.writeAll("\n") catch return .exit;
} else {
var alert_buf: [256]u8 = undefined;
const alert = std.fmt.bufPrint(&alert_buf, "\xe2\x9a\xa1 {s} hit {s} (was watching: {s} {s})\r\n", .{
display_coin,
price_str,
if (is_above) @as([]const u8, "--above") else @as([]const u8, "--below"),
threshold_str,
}) catch "\xe2\x9a\xa1 Alert triggered!\r\n";
stdout.writeAll(alert) catch {};
}

// Execute shell command if provided
if (a.cmd) |cmd_str| {
var child = std.process.Child.init(
&.{ "/bin/sh", "-c", cmd_str },
std.heap.page_allocator,
);
// In JSON/pipe mode, redirect child output to stderr
// to keep stdout clean for machine-readable JSON.
if (is_json) {
child.stdout_behavior = .Ignore;
child.stderr_behavior = .Inherit;
} else {
child.stdout_behavior = .Inherit;
child.stderr_behavior = .Inherit;
}
child.stdin_behavior = .Close;
child.spawn() catch |e| {
if (!is_json) {
var err_buf: [256]u8 = undefined;
const err_msg = std.fmt.bufPrint(&err_buf, "Failed to execute command: {s}\r\n", .{@errorName(e)}) catch "Failed to execute command\r\n";
stderr.writeAll(err_msg) catch {};
}
if (!a.repeat) return .exit;
continue;
};
_ = child.wait() catch {};
}

if (!a.repeat) return .exit;
},
}
}
return .exit; // SIGINT
}

/// Extract a coin's mid price from an allMids WS message.
/// Looks for `"COIN":"price"` in the `"mids":{...}` object.
fn extractMidPrice(json: []const u8, coin: []const u8) ?[]const u8 {
var key_buf: [32]u8 = undefined;
const key = std.fmt.bufPrint(&key_buf, "\"{s}\":\"", .{coin}) catch return null;
const idx = std.mem.indexOf(u8, json, key) orelse return null;
const val_start = idx + key.len;
const val_end = std.mem.indexOfPos(u8, json, val_start, "\"") orelse return null;
return json[val_start..val_end];
}

fn streamPretty(stdout: std.fs.File, kind: args_mod.StreamKind, text: []const u8) void {
const channel = ws_types.parseChannel(text);
const data_slice = ws_types.extractData(text);
Expand Down Expand Up @@ -4493,3 +4745,34 @@ pub fn subaccountCmd(allocator: std.mem.Allocator, w: *Writer, config: Config, a
},
}
}

test "extractMidPrice: finds coin price in allMids message" {
const msg =
\\{"channel":"allMids","data":{"mids":{"BTC":"100234.5","ETH":"3456.7","SOL":"178.9"}}}
;
try std.testing.expectEqualStrings("100234.5", extractMidPrice(msg, "BTC").?);
try std.testing.expectEqualStrings("3456.7", extractMidPrice(msg, "ETH").?);
try std.testing.expectEqualStrings("178.9", extractMidPrice(msg, "SOL").?);
try std.testing.expectEqual(@as(?[]const u8, null), extractMidPrice(msg, "DOGE"));
}

test "extractMidPrice: returns null for missing coin" {
const msg =
\\{"channel":"allMids","data":{"mids":{"BTC":"50000"}}}
;
try std.testing.expectEqual(@as(?[]const u8, null), extractMidPrice(msg, "ETH"));
}

test "extractMidPrice: returns null for empty or malformed input" {
try std.testing.expectEqual(@as(?[]const u8, null), extractMidPrice("", "BTC"));
try std.testing.expectEqual(@as(?[]const u8, null), extractMidPrice("{}", "BTC"));
}

test "extractMidPrice: handles spot index format" {
const msg =
\\{"channel":"allMids","data":{"mids":{"@1":"25.123","@2":"1.05","BTC":"100000"}}}
;
try std.testing.expectEqualStrings("25.123", extractMidPrice(msg, "@1").?);
try std.testing.expectEqualStrings("1.05", extractMidPrice(msg, "@2").?);
try std.testing.expectEqualStrings("100000", extractMidPrice(msg, "BTC").?);
}
Loading
Loading