Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 57 additions & 6 deletions crates/octos-bus/src/cli_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,37 @@ use chrono::Utc;
use eyre::Result;
use octos_core::{InboundMessage, OutboundMessage};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::mpsc;
use tokio::sync::{Notify, mpsc};

use crate::channel::Channel;

pub struct CliChannel {
shutdown: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
}

impl CliChannel {
pub fn new(shutdown: Arc<AtomicBool>) -> Self {
Self { shutdown }
Self::with_shutdown_notify(shutdown, Arc::new(Notify::new()))
}

pub fn with_shutdown_notify(shutdown: Arc<AtomicBool>, shutdown_notify: Arc<Notify>) -> Self {
Self {
shutdown,
shutdown_notify,
}
}

pub fn is_exit_command(input: &str) -> bool {
matches!(
input.trim().to_ascii_lowercase().as_str(),
"quit" | "exit" | "/quit" | "/exit" | ":q"
)
}

fn request_shutdown(&self) {
self.shutdown.store(true, Ordering::SeqCst);
self.shutdown_notify.notify_waiters();
}
}

Expand All @@ -36,7 +56,22 @@ impl Channel for CliChannel {
stdout.write_all(b"octos gateway> ").await?;
stdout.flush().await?;

while let Ok(Some(line)) = reader.next_line().await {
loop {
let shutdown_notified = self.shutdown_notify.notified();
tokio::pin!(shutdown_notified);

if self.shutdown.load(Ordering::SeqCst) {
break;
}

let line = tokio::select! {
biased;
_ = &mut shutdown_notified => break,
line = reader.next_line() => line,
};
let Some(line) = line? else {
break;
};
let trimmed = line.trim().to_string();

if trimmed.is_empty() {
Expand All @@ -45,8 +80,8 @@ impl Channel for CliChannel {
continue;
}

if trimmed == "/quit" || trimmed == "/exit" {
self.shutdown.store(true, Ordering::SeqCst);
if Self::is_exit_command(&trimmed) {
self.request_shutdown();
break;
}

Expand Down Expand Up @@ -79,7 +114,23 @@ impl Channel for CliChannel {
}

async fn stop(&self) -> Result<()> {
self.shutdown.store(true, Ordering::SeqCst);
self.request_shutdown();
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn recognizes_gateway_exit_commands() {
for cmd in ["quit", "exit", "/quit", "/exit", ":q", " QUIT "] {
assert!(CliChannel::is_exit_command(cmd), "{cmd} should exit");
}

for cmd in ["", "quit now", "/sessions", "please exit"] {
assert!(!CliChannel::is_exit_command(cmd), "{cmd} should not exit");
}
}
}
7 changes: 6 additions & 1 deletion crates/octos-cli/src/commands/gateway/adapters/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;

use octos_bus::{ChannelManager, CliChannel};
use tokio::sync::Notify;

use crate::config::ChannelEntry;

pub fn register(
channel_mgr: &mut ChannelManager,
_entry: &ChannelEntry,
shutdown: &Arc<AtomicBool>,
shutdown_notify: &Arc<Notify>,
) -> eyre::Result<()> {
channel_mgr.register(Arc::new(CliChannel::new(shutdown.clone())));
channel_mgr.register(Arc::new(CliChannel::with_shutdown_notify(
shutdown.clone(),
shutdown_notify.clone(),
)));
Ok(())
}
5 changes: 3 additions & 2 deletions crates/octos-cli/src/commands/gateway/adapters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;

use octos_bus::{ChannelManager, SessionManager};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, Notify};

use crate::config::ChannelEntry;

Expand Down Expand Up @@ -76,6 +76,7 @@ pub type TaskRelaunchCb =
#[allow(dead_code)]
pub struct ChannelRegistrationCtx<'a> {
pub shutdown: &'a Arc<AtomicBool>,
pub shutdown_notify: &'a Arc<Notify>,
pub media_dir: &'a Path,
pub data_dir: &'a Path,
pub session_mgr: &'a Arc<Mutex<SessionManager>>,
Expand Down Expand Up @@ -107,7 +108,7 @@ pub fn register_all(
) -> eyre::Result<()> {
for entry in entries {
match entry.channel_type.as_str() {
"cli" => cli::register(channel_mgr, entry, ctx.shutdown)?,
"cli" => cli::register(channel_mgr, entry, ctx.shutdown, ctx.shutdown_notify)?,
#[cfg(feature = "telegram")]
"telegram" => telegram::register(channel_mgr, entry, ctx.shutdown, ctx.media_dir)?,
#[cfg(feature = "discord")]
Expand Down
22 changes: 18 additions & 4 deletions crates/octos-cli/src/commands/gateway/gateway_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use octos_core::MAIN_PROFILE_ID;
use super::matrix_integration::*;

const PROFILE_PROMPT_CACHE_CAP: usize = 128;
const CLI_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1);

// `discover_ominix_url` and `push_runtime_plugin_env` live in
// `crate::skills_scope` so the `serve` plugin loader can reuse them.
Expand Down Expand Up @@ -1385,6 +1386,7 @@ impl GatewayRuntime {
gateway_profile_id: profile_id.as_deref(),
api_port_override: cmd.api_port,
wechat_bridge_url: cmd.wechat_bridge_url.as_deref(),
shutdown_notify: &shutdown_notify,
on_session_deleted: Some(Arc::new(move |id: &str| {
let _ = delete_tx.send(id.to_string());
})),
Expand Down Expand Up @@ -1455,7 +1457,7 @@ impl GatewayRuntime {
eprintln!("[gateway] ready");
println!(
"{}",
"Gateway ready. Type a message or /quit to exit.".dimmed()
"Gateway ready. Type a message, quit, exit, /quit, or /exit.".dimmed()
);
println!();

Expand Down Expand Up @@ -2007,12 +2009,14 @@ impl GatewayRuntime {
// Timeout prevents hung actors from blocking the entire sequence.
// CLI shutdown should return control to the terminal promptly.
// Hung actors will be abandoned and then torn down by runtime shutdown.
let shutdown_timeout = Duration::from_secs(1);
if tokio::time::timeout(shutdown_timeout, self.actor_registry.shutdown_all())
if tokio::time::timeout(CLI_SHUTDOWN_TIMEOUT, self.actor_registry.shutdown_all())
.await
.is_err()
{
warn!("actor shutdown timed out after {shutdown_timeout:?}, forcing exit");
warn!(
"actor shutdown timed out after {:?}, forcing exit",
CLI_SHUTDOWN_TIMEOUT
);
}

// Stop background services concurrently
Expand All @@ -2027,3 +2031,13 @@ impl GatewayRuntime {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn cli_shutdown_timeout_stays_prompt() {
assert!(CLI_SHUTDOWN_TIMEOUT <= Duration::from_secs(1));
}
}
14 changes: 11 additions & 3 deletions crates/octos-cli/src/commands/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,20 @@ pub(crate) fn build_profiled_session_key(

impl Executable for GatewayCommand {
fn execute(self) -> Result<()> {
tokio::runtime::Builder::new_multi_thread()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_stack_size(8 * 1024 * 1024) // 8MB stack for deep agent futures
.build()
.wrap_err("failed to create tokio runtime")?
.block_on(self.run_async())
.wrap_err("failed to create tokio runtime")?;
let result = runtime.block_on(self.run_async());

// The CLI channel uses Tokio stdin, whose blocking read can keep the
// runtime teardown attached to the terminal after a clean shutdown.
// `serve` uses the same pattern for long-lived background tasks.
if result.is_ok() {
std::process::exit(0);
}
result
}
}

Expand Down
Loading