diff --git a/Cargo.lock b/Cargo.lock index 9c79d496b..136d843a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3866,7 +3866,7 @@ dependencies = [ [[package]] name = "openfang-api" -version = "0.3.6" +version = "0.3.7" dependencies = [ "async-trait", "axum", @@ -3902,7 +3902,7 @@ dependencies = [ [[package]] name = "openfang-channels" -version = "0.3.6" +version = "0.3.7" dependencies = [ "async-trait", "axum", @@ -3933,7 +3933,7 @@ dependencies = [ [[package]] name = "openfang-cli" -version = "0.3.6" +version = "0.3.7" dependencies = [ "clap", "clap_complete", @@ -3960,7 +3960,7 @@ dependencies = [ [[package]] name = "openfang-desktop" -version = "0.3.6" +version = "0.3.7" dependencies = [ "axum", "open", @@ -3986,7 +3986,7 @@ dependencies = [ [[package]] name = "openfang-extensions" -version = "0.3.6" +version = "0.3.7" dependencies = [ "aes-gcm", "argon2", @@ -4014,7 +4014,7 @@ dependencies = [ [[package]] name = "openfang-hands" -version = "0.3.6" +version = "0.3.7" dependencies = [ "chrono", "dashmap", @@ -4031,7 +4031,7 @@ dependencies = [ [[package]] name = "openfang-kernel" -version = "0.3.6" +version = "0.3.7" dependencies = [ "async-trait", "chrono", @@ -4067,7 +4067,7 @@ dependencies = [ [[package]] name = "openfang-memory" -version = "0.3.6" +version = "0.3.7" dependencies = [ "async-trait", "chrono", @@ -4086,7 +4086,7 @@ dependencies = [ [[package]] name = "openfang-migrate" -version = "0.3.6" +version = "0.3.7" dependencies = [ "chrono", "dirs 6.0.0", @@ -4105,7 +4105,7 @@ dependencies = [ [[package]] name = "openfang-runtime" -version = "0.3.6" +version = "0.3.7" dependencies = [ "anyhow", "async-trait", @@ -4137,7 +4137,7 @@ dependencies = [ [[package]] name = "openfang-skills" -version = "0.3.6" +version = "0.3.7" dependencies = [ "chrono", "hex", @@ -4160,7 +4160,7 @@ dependencies = [ [[package]] name = "openfang-types" -version = "0.3.6" +version = "0.3.7" dependencies = [ "async-trait", "chrono", @@ -4179,7 +4179,7 @@ dependencies = [ [[package]] name = "openfang-wire" -version = "0.3.6" +version = "0.3.7" dependencies = [ "async-trait", "chrono", @@ -8791,7 +8791,7 @@ checksum = "b9cc00251562a284751c9973bace760d86c0276c471b4be569fe6b068ee97a56" [[package]] name = "xtask" -version = "0.3.6" +version = "0.3.7" [[package]] name = "yoke" diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index 6d30b1915..2cd0485ee 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -17,7 +17,7 @@ use openfang_channels::slack::SlackAdapter; use openfang_channels::teams::TeamsAdapter; use openfang_channels::telegram::TelegramAdapter; use openfang_channels::twitch::TwitchAdapter; -use openfang_channels::types::ChannelAdapter; +use openfang_channels::types::{ChannelAdapter, ChannelStreamEvent}; use openfang_channels::whatsapp::WhatsAppAdapter; use openfang_channels::xmpp::XmppAdapter; use openfang_channels::zulip::ZulipAdapter; @@ -73,6 +73,54 @@ impl ChannelBridgeHandle for KernelBridgeAdapter { Ok(result.response) } + async fn send_message_streaming( + &self, + agent_id: AgentId, + message: &str, + ) -> Result, String> { + let (mut kernel_rx, _handle) = self + .kernel + .send_message_streaming(agent_id, message, None) + .map_err(|e| format!("{e}"))?; + + let (tx, rx) = tokio::sync::mpsc::channel::(64); + + // Spawn a task to convert StreamEvent → ChannelStreamEvent + tokio::spawn(async move { + use openfang_runtime::llm_driver::StreamEvent; + + while let Some(event) = kernel_rx.recv().await { + let channel_event = match event { + StreamEvent::TextDelta { text } => ChannelStreamEvent::TextDelta { text }, + StreamEvent::ToolUseStart { name, .. } => { + ChannelStreamEvent::ToolStart { name } + } + StreamEvent::ToolExecutionResult { + name, + result_preview, + is_error, + } => ChannelStreamEvent::ToolResult { + name, + preview: result_preview, + is_error, + }, + StreamEvent::ContentComplete { .. } => ChannelStreamEvent::Complete, + // Skip events the channel layer doesn't care about + StreamEvent::ToolInputDelta { .. } + | StreamEvent::ToolUseEnd { .. } + | StreamEvent::ThinkingDelta { .. } + | StreamEvent::PhaseChange { .. } => continue, + }; + + if tx.send(channel_event).await.is_err() { + break; // receiver dropped + } + } + }); + + Ok(rx) + } + async fn find_agent_by_name(&self, name: &str) -> Result, String> { Ok(self.kernel.registry.find_by_name(name).map(|e| e.id)) } @@ -1026,10 +1074,12 @@ pub async fn start_channel_bridge_with_config( if let Some(ref tg_config) = config.telegram { if let Some(token) = read_token(&tg_config.bot_token_env, "Telegram") { let poll_interval = Duration::from_secs(tg_config.poll_interval_secs); - let adapter = Arc::new(TelegramAdapter::new( + let adapter = Arc::new(TelegramAdapter::with_streaming( token, tg_config.allowed_users.clone(), poll_interval, + tg_config.stream_mode, + tg_config.stream_config.clone(), )); adapters.push((adapter, tg_config.default_agent.clone())); } diff --git a/crates/openfang-channels/src/bridge.rs b/crates/openfang-channels/src/bridge.rs index 85dae6a59..f851dfa94 100644 --- a/crates/openfang-channels/src/bridge.rs +++ b/crates/openfang-channels/src/bridge.rs @@ -5,7 +5,9 @@ use crate::formatter; use crate::router::AgentRouter; -use crate::types::{ChannelAdapter, ChannelContent, ChannelMessage, ChannelUser}; +use crate::types::{ + ChannelAdapter, ChannelContent, ChannelMessage, ChannelStreamEvent, ChannelUser, +}; use async_trait::async_trait; use dashmap::DashMap; use futures::StreamExt; @@ -128,6 +130,18 @@ pub trait ChannelBridgeHandle: Send + Sync { None } + /// Send a message to an agent with streaming responses. + /// + /// Returns a receiver for `ChannelStreamEvent`s. Default implementation + /// returns an error (streaming not supported). + async fn send_message_streaming( + &self, + _agent_id: AgentId, + _message: &str, + ) -> Result, String> { + Err("Streaming not supported".to_string()) + } + // ── Automation: workflows, triggers, schedules, approvals ── /// List all registered workflows as formatted text. @@ -622,7 +636,28 @@ async fn dispatch_message( // Send typing indicator (best-effort) let _ = adapter.send_typing(&message.sender).await; - // Send to agent and relay response + // Try streaming path if adapter supports it + if adapter.supports_streaming() { + match dispatch_streaming( + &text, + agent_id, + handle, + adapter, + &message.sender, + message.is_group, + ct_str, + ) + .await + { + Ok(()) => return, + Err(e) => { + debug!("Streaming dispatch failed, falling back to non-streaming: {e}"); + // Fall through to non-streaming path + } + } + } + + // Send to agent and relay response (non-streaming path) match handle.send_message(agent_id, &text).await { Ok(response) => { send_response(adapter, &message.sender, response, thread_id, output_format).await; @@ -654,6 +689,73 @@ async fn dispatch_message( } } +/// Dispatch a message using streaming output. +/// +/// Creates a `StreamSink` from the adapter, then forwards `ChannelStreamEvent`s +/// from the kernel bridge handle into the sink for progressive display. +async fn dispatch_streaming( + text: &str, + agent_id: AgentId, + handle: &Arc, + adapter: &dyn ChannelAdapter, + user: &ChannelUser, + is_group: bool, + ct_str: &str, +) -> Result<(), Box> { + // Get the stream receiver from the bridge handle + let mut rx = handle + .send_message_streaming(agent_id, text) + .await + .map_err(|e| -> Box { e.into() })?; + + // Create a streaming sink from the adapter + let mut sink = adapter.begin_stream(user, is_group).await?; + + // Process stream events + while let Some(event) = rx.recv().await { + match event { + ChannelStreamEvent::TextDelta { text } => { + let push_err = match sink.push_text(&text).await { + Ok(()) => None, + Err(e) => Some(e.to_string()), + }; + if let Some(err) = push_err { + warn!("Stream sink push_text failed: {err}"); + sink.abort().await.ok(); + return Err(err.into()); + } + } + ChannelStreamEvent::ToolStart { name } => { + sink.push_tool_start(&name).await.ok(); + } + ChannelStreamEvent::ToolResult { + name, + preview, + is_error, + } => { + sink.push_tool_result(&name, &preview, is_error).await.ok(); + } + ChannelStreamEvent::Complete => { + break; + } + ChannelStreamEvent::Error { message } => { + warn!("Stream error from kernel: {message}"); + sink.abort().await.ok(); + return Err(message.into()); + } + } + } + + // Finalize the stream + sink.finalize().await?; + + handle + .record_delivery(agent_id, ct_str, &user.platform_id, true, None) + .await; + + Ok(()) +} + /// Handle a bot command (returns the response text). async fn handle_command( name: &str, diff --git a/crates/openfang-channels/src/formatter.rs b/crates/openfang-channels/src/formatter.rs index 0ffface72..b78530665 100644 --- a/crates/openfang-channels/src/formatter.rs +++ b/crates/openfang-channels/src/formatter.rs @@ -197,6 +197,48 @@ fn markdown_to_plain(text: &str) -> String { result } +/// Lightweight HTML entity escaping for streaming intermediate states. +/// +/// Only escapes `&`, `<`, `>` — no Markdown conversion. Used during streaming +/// so partial text doesn't break Telegram's HTML parser. +pub fn escape_html_entities(text: &str) -> String { + text.replace('&', "&") + .replace('<', "<") + .replace('>', ">") +} + +/// Find the best split point near `max_chars` for message splitting. +/// +/// Prefers splitting at paragraph breaks (`\n\n`), then line breaks (`\n`), +/// then sentence endings (`. `), then word boundaries (` `). +/// Returns the byte offset to split at, or `max_chars` if no good point found. +pub fn find_split_point(text: &str, max_chars: usize) -> usize { + if text.len() <= max_chars { + return text.len(); + } + + let search_range = &text[..max_chars]; + + // Prefer paragraph break + if let Some(pos) = search_range.rfind("\n\n") { + return pos; + } + // Then line break + if let Some(pos) = search_range.rfind('\n') { + return pos; + } + // Then sentence end + if let Some(pos) = search_range.rfind(". ") { + return pos + 1; // include the period + } + // Then word boundary + if let Some(pos) = search_range.rfind(' ') { + return pos; + } + + max_chars +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/openfang-channels/src/telegram.rs b/crates/openfang-channels/src/telegram.rs index 3b07b08f0..83aebcce2 100644 --- a/crates/openfang-channels/src/telegram.rs +++ b/crates/openfang-channels/src/telegram.rs @@ -3,9 +3,12 @@ //! Uses long-polling via `getUpdates` with exponential backoff on failures. //! No external Telegram crate — just `reqwest` for full control over error handling. +use crate::formatter; use crate::types::{ split_message, ChannelAdapter, ChannelContent, ChannelMessage, ChannelType, ChannelUser, + StreamSink, }; +use openfang_types::config::{TelegramStreamConfig, TelegramStreamMode}; use async_trait::async_trait; use futures::Stream; use std::collections::HashMap; @@ -32,6 +35,10 @@ pub struct TelegramAdapter { poll_interval: Duration, shutdown_tx: Arc>, shutdown_rx: watch::Receiver, + /// Streaming mode configuration. + stream_mode: TelegramStreamMode, + /// Streaming behavior configuration. + stream_config: TelegramStreamConfig, } impl TelegramAdapter { @@ -48,6 +55,29 @@ impl TelegramAdapter { poll_interval, shutdown_tx: Arc::new(shutdown_tx), shutdown_rx, + stream_mode: TelegramStreamMode::default(), + stream_config: TelegramStreamConfig::default(), + } + } + + /// Create a new Telegram adapter with streaming configuration. + pub fn with_streaming( + token: String, + allowed_users: Vec, + poll_interval: Duration, + stream_mode: TelegramStreamMode, + stream_config: TelegramStreamConfig, + ) -> Self { + let (shutdown_tx, shutdown_rx) = watch::channel(false); + Self { + token: Zeroizing::new(token), + client: reqwest::Client::new(), + allowed_users, + poll_interval, + shutdown_tx: Arc::new(shutdown_tx), + shutdown_rx, + stream_mode, + stream_config, } } @@ -74,6 +104,16 @@ impl TelegramAdapter { chat_id: i64, text: &str, ) -> Result<(), Box> { + self.api_send_message_with_id(chat_id, text).await?; + Ok(()) + } + + /// Call `sendMessage` on the Telegram API, returning the message_id of the last chunk. + async fn api_send_message_with_id( + &self, + chat_id: i64, + text: &str, + ) -> Result> { let url = format!( "https://api.telegram.org/bot{}/sendMessage", self.token.as_str() @@ -86,6 +126,7 @@ impl TelegramAdapter { // Telegram has a 4096 character limit per message — split if needed let chunks = split_message(&sanitized, 4096); + let mut last_message_id: i64 = 0; for chunk in chunks { let body = serde_json::json!({ "chat_id": chat_id, @@ -98,9 +139,12 @@ impl TelegramAdapter { if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); warn!("Telegram sendMessage failed ({status}): {body_text}"); + } else { + let resp_body: serde_json::Value = resp.json().await.unwrap_or_default(); + last_message_id = resp_body["result"]["message_id"].as_i64().unwrap_or(0); } } - Ok(()) + Ok(last_message_id) } /// Call `sendPhoto` on the Telegram API. @@ -441,6 +485,455 @@ impl ChannelAdapter for TelegramAdapter { let _ = self.shutdown_tx.send(true); Ok(()) } + + fn supports_streaming(&self) -> bool { + self.stream_mode != TelegramStreamMode::Off + } + + async fn begin_stream( + &self, + user: &ChannelUser, + is_group: bool, + ) -> Result, Box> { + let chat_id: i64 = user + .platform_id + .parse() + .map_err(|_| format!("Invalid Telegram chat_id: {}", user.platform_id))?; + + // Resolve effective mode: Auto → Edit (Draft requires Bot API 9.5 which isn't widely available yet) + let effective_mode = match self.stream_mode { + TelegramStreamMode::Off => { + return Err("Streaming is disabled".into()); + } + TelegramStreamMode::Edit => StreamMode::Edit, + TelegramStreamMode::Draft => { + if is_group { + // Draft doesn't work in groups — fall back to Edit + StreamMode::Edit + } else { + StreamMode::Draft + } + } + TelegramStreamMode::Auto => { + // Auto: always use Edit for now (Draft support is experimental) + StreamMode::Edit + } + }; + + Ok(Box::new(TelegramStreamSink::new( + self.token.as_str().to_string(), + self.client.clone(), + chat_id, + effective_mode, + self.stream_config.clone(), + ))) + } +} + +/// Internal mode enum for the active streaming strategy. +#[derive(Debug, Clone, Copy, PartialEq)] +enum StreamMode { + Edit, + Draft, +} + +/// Streaming output sink for Telegram — buffers text and flushes via edits. +struct TelegramStreamSink { + token: String, + client: reqwest::Client, + chat_id: i64, + mode: StreamMode, + config: TelegramStreamConfig, + + /// Accumulated full text buffer. + buffer: String, + /// Message ID of the message being edited (Edit mode). + message_id: Option, + /// Whether the first message has been sent. + first_sent: bool, + /// Last flush instant for rate limiting. + last_flush: tokio::time::Instant, + /// Characters in buffer at last flush. + chars_at_last_flush: usize, + /// Number of consecutive 429 errors for adaptive backoff. + consecutive_rate_limits: u32, + /// Current extra delay from rate limiting. + rate_limit_delay: Duration, + /// Messages already sent (for multi-message splitting). + sent_messages: Vec, + /// Whether finalize has been called. + finalized: bool, +} + +impl TelegramStreamSink { + fn new( + token: String, + client: reqwest::Client, + chat_id: i64, + mode: StreamMode, + config: TelegramStreamConfig, + ) -> Self { + Self { + token, + client, + chat_id, + mode, + config, + buffer: String::new(), + message_id: None, + first_sent: false, + last_flush: tokio::time::Instant::now(), + chars_at_last_flush: 0, + consecutive_rate_limits: 0, + rate_limit_delay: Duration::ZERO, + sent_messages: Vec::new(), + finalized: false, + } + } + + /// Whether enough time and chars have accumulated for a flush. + fn should_flush(&self) -> bool { + let elapsed = self.last_flush.elapsed(); + let interval = Duration::from_millis(self.config.flush_interval_ms) + self.rate_limit_delay; + let chars_added = self.buffer.len().saturating_sub(self.chars_at_last_flush); + + elapsed >= interval && chars_added >= self.config.min_chars_per_flush + } + + /// Flush the current buffer to Telegram. + async fn flush(&mut self) -> Result<(), Box> { + if self.buffer.is_empty() { + return Ok(()); + } + + // Check if we need to split the message + if self.buffer.len() > self.config.max_message_chars && self.message_id.is_some() { + return self.split_and_continue().await; + } + + // Prepare display text: escape HTML entities + cursor indicator + let display_text = + formatter::escape_html_entities(&self.buffer) + &self.config.cursor_indicator; + + match self.mode { + StreamMode::Edit => { + if !self.first_sent { + // First flush: send a new message + match self.send_message_raw(&display_text, None).await { + Ok(msg_id) => { + self.message_id = Some(msg_id); + self.first_sent = true; + } + Err(e) => { + warn!("Telegram stream: sendMessage failed: {e}"); + return Err(e); + } + } + } else if let Some(msg_id) = self.message_id { + // Subsequent flushes: edit the existing message + let edit_err = match self.edit_message_raw(msg_id, &display_text, None).await { + Ok(()) => None, + Err(e) => Some(e.to_string()), + }; + if let Some(err_str) = edit_err { + if err_str.contains("429") { + self.handle_rate_limit(&err_str).await; + } else { + warn!("Telegram stream: editMessageText failed: {err_str}"); + } + } + } + } + StreamMode::Draft => { + // Draft mode: use sendMessageDraft (experimental) + // Falls back to Edit if it fails + let draft_failed = match self.send_draft(&display_text).await { + Ok(()) => false, + Err(e) => { + warn!("Telegram stream: Draft failed, falling back to Edit: {e}"); + true + } + }; + if draft_failed { + self.mode = StreamMode::Edit; + // Inline the Edit path instead of recursive flush() + match self.send_message_raw(&display_text, None).await { + Ok(msg_id) => { + self.message_id = Some(msg_id); + self.first_sent = true; + } + Err(e) => { + warn!("Telegram stream: fallback sendMessage failed: {e}"); + return Err(e); + } + } + } + } + } + + self.last_flush = tokio::time::Instant::now(); + self.chars_at_last_flush = self.buffer.len(); + self.consecutive_rate_limits = 0; + + Ok(()) + } + + /// Split a long message: finalize the current message and start a new one. + async fn split_and_continue(&mut self) -> Result<(), Box> { + let split_at = + formatter::find_split_point(&self.buffer, self.config.max_message_chars); + let completed_text = self.buffer[..split_at].to_string(); + let remaining = self.buffer[split_at..].trim_start().to_string(); + + // Finalize the current message with the completed chunk + if let Some(msg_id) = self.message_id { + let formatted = formatter::escape_html_entities(&completed_text); + self.edit_message_raw(msg_id, &formatted, None).await.ok(); + self.sent_messages.push(msg_id); + } + + // Start a new message with the remaining text + cursor + self.buffer = remaining; + let display = + formatter::escape_html_entities(&self.buffer) + &self.config.cursor_indicator; + match self.send_message_raw(&display, None).await { + Ok(msg_id) => { + self.message_id = Some(msg_id); + self.chars_at_last_flush = self.buffer.len(); + } + Err(e) => { + warn!("Telegram stream: failed to start continuation message: {e}"); + } + } + + Ok(()) + } + + /// Handle a 429 rate limit response with adaptive backoff. + async fn handle_rate_limit(&mut self, err_text: &str) { + self.consecutive_rate_limits += 1; + + // Parse retry_after from error if available + let base_delay = if let Some(pos) = err_text.find("retry_after") { + err_text[pos..] + .split(|c: char| c.is_ascii_digit()) + .nth(0) + .and_then(|_| { + err_text[pos..] + .chars() + .filter(|c| c.is_ascii_digit()) + .collect::() + .parse::() + .ok() + }) + .map(Duration::from_secs) + .unwrap_or(Duration::from_secs(1)) + } else { + Duration::from_secs(1) + }; + + // Adaptive: double interval after 3 consecutive rate limits + if self.consecutive_rate_limits >= 3 { + self.rate_limit_delay = (self.rate_limit_delay + base_delay) * 2; + } else { + self.rate_limit_delay = base_delay; + } + + let delay = self.rate_limit_delay.min(Duration::from_secs(10)); + debug!( + "Telegram stream: rate limited, sleeping {delay:?} (consecutive: {})", + self.consecutive_rate_limits + ); + tokio::time::sleep(delay).await; + } + + /// Raw sendMessage call that returns the message_id. + async fn send_message_raw( + &self, + text: &str, + parse_mode: Option<&str>, + ) -> Result> { + let url = format!( + "https://api.telegram.org/bot{}/sendMessage", + self.token + ); + + let mut body = serde_json::json!({ + "chat_id": self.chat_id, + "text": text, + }); + if let Some(pm) = parse_mode { + body["parse_mode"] = serde_json::Value::String(pm.to_string()); + } + + let resp = self.client.post(&url).json(&body).send().await?; + let status = resp.status(); + let resp_body: serde_json::Value = resp.json().await.unwrap_or_default(); + + if !status.is_success() { + let desc = resp_body["description"].as_str().unwrap_or("unknown error"); + return Err(format!("sendMessage failed ({status}): {desc}").into()); + } + + let msg_id = resp_body["result"]["message_id"] + .as_i64() + .unwrap_or(0); + Ok(msg_id) + } + + /// Raw editMessageText call. + async fn edit_message_raw( + &self, + message_id: i64, + text: &str, + parse_mode: Option<&str>, + ) -> Result<(), Box> { + let url = format!( + "https://api.telegram.org/bot{}/editMessageText", + self.token + ); + + let mut body = serde_json::json!({ + "chat_id": self.chat_id, + "message_id": message_id, + "text": text, + }); + if let Some(pm) = parse_mode { + body["parse_mode"] = serde_json::Value::String(pm.to_string()); + } + + let resp = self.client.post(&url).json(&body).send().await?; + let status = resp.status(); + if !status.is_success() { + let body_text = resp.text().await.unwrap_or_default(); + if status.as_u16() == 400 && body_text.contains("message is not modified") { + return Ok(()); + } + return Err(format!("{status}: {body_text}").into()); + } + Ok(()) + } + + /// Send a draft message (Bot API 9.5 experimental). + async fn send_draft(&self, text: &str) -> Result<(), Box> { + let url = format!( + "https://api.telegram.org/bot{}/sendMessageDraft", + self.token + ); + + let body = serde_json::json!({ + "chat_id": self.chat_id, + "text": text, + }); + + let resp = self.client.post(&url).json(&body).send().await?; + if !resp.status().is_success() { + let body_text = resp.text().await.unwrap_or_default(); + return Err(format!("sendMessageDraft failed: {body_text}").into()); + } + Ok(()) + } +} + +#[async_trait] +impl StreamSink for TelegramStreamSink { + async fn push_text(&mut self, text: &str) -> Result<(), Box> { + self.buffer.push_str(text); + + if self.should_flush() { + self.flush().await?; + } + + Ok(()) + } + + async fn push_tool_start(&mut self, tool_name: &str) -> Result<(), Box> { + // Append a tool indicator to the buffer + if !self.buffer.is_empty() && !self.buffer.ends_with('\n') { + self.buffer.push('\n'); + } + self.buffer + .push_str(&format!("\n[Running: {tool_name}...]\n")); + + // Force flush to show tool activity + self.flush().await?; + Ok(()) + } + + async fn push_tool_result( + &mut self, + _tool_name: &str, + _preview: &str, + _is_error: bool, + ) -> Result<(), Box> { + // Tool results are typically not shown inline during streaming — + // the LLM response after tool use will contain the relevant info. + Ok(()) + } + + async fn finalize(&mut self) -> Result<(), Box> { + if self.finalized { + return Ok(()); + } + self.finalized = true; + + if self.buffer.is_empty() { + return Ok(()); + } + + match self.mode { + StreamMode::Edit => { + if let Some(msg_id) = self.message_id { + // Final edit with full Markdown→HTML formatting (no cursor) + let formatted = + formatter::format_for_channel(&self.buffer, openfang_types::config::OutputFormat::TelegramHtml); + let sanitized = sanitize_telegram_html(&formatted); + + // If the final text is too long, we need to split + if sanitized.len() > 4096 { + let chunks = split_message(&sanitized, 4096); + // Edit existing message with first chunk + if let Some(first) = chunks.first() { + self.edit_message_raw(msg_id, first, Some("HTML")) + .await + .ok(); + } + // Send remaining chunks as new messages + for chunk in &chunks[1..] { + self.send_message_raw(chunk, Some("HTML")).await.ok(); + } + } else { + self.edit_message_raw(msg_id, &sanitized, Some("HTML")) + .await + .ok(); + } + } else { + // Never sent a first message — send the complete text + let formatted = + formatter::format_for_channel(&self.buffer, openfang_types::config::OutputFormat::TelegramHtml); + let sanitized = sanitize_telegram_html(&formatted); + self.send_message_raw(&sanitized, Some("HTML")).await.ok(); + } + } + StreamMode::Draft => { + // Draft messages disappear — send a final real message + let formatted = + formatter::format_for_channel(&self.buffer, openfang_types::config::OutputFormat::TelegramHtml); + let sanitized = sanitize_telegram_html(&formatted); + self.send_message_raw(&sanitized, Some("HTML")).await.ok(); + } + } + + Ok(()) + } + + async fn abort(&mut self) -> Result<(), Box> { + if self.finalized { + return Ok(()); + } + // Best-effort: finalize whatever we have + self.finalize().await + } } /// Parse a Telegram update JSON into a `ChannelMessage`, or `None` if filtered/unparseable. diff --git a/crates/openfang-channels/src/types.rs b/crates/openfang-channels/src/types.rs index bfd3fe1b2..e74ca3041 100644 --- a/crates/openfang-channels/src/types.rs +++ b/crates/openfang-channels/src/types.rs @@ -261,6 +261,71 @@ pub trait ChannelAdapter: Send + Sync { ) -> Result<(), Box> { self.send(user, content).await } + + /// Whether this adapter supports streaming output. + fn supports_streaming(&self) -> bool { + false + } + + /// Begin a streaming session, returning a `StreamSink` for progressive output. + /// + /// `is_group` indicates if the target is a group chat (affects mode selection). + /// Default implementation returns an error. + async fn begin_stream( + &self, + _user: &ChannelUser, + _is_group: bool, + ) -> Result, Box> { + Err("Streaming not supported by this adapter".into()) + } +} + +/// Stream events relevant to the channel layer. +/// +/// This is a channel-local enum mirroring the subset of `openfang_runtime::StreamEvent` +/// that channels care about, avoiding a dependency on `openfang-runtime`. +#[derive(Debug, Clone)] +pub enum ChannelStreamEvent { + /// Incremental text from the LLM. + TextDelta { text: String }, + /// A tool execution has started. + ToolStart { name: String }, + /// A tool execution completed. + ToolResult { + name: String, + preview: String, + is_error: bool, + }, + /// The response is complete. + Complete, + /// An error occurred during streaming. + Error { message: String }, +} + +/// Trait for a streaming output sink that progressively delivers content to a channel. +/// +/// Implementations buffer text and flush to the platform at appropriate intervals. +#[async_trait] +pub trait StreamSink: Send + Sync { + /// Push a text delta into the buffer. + async fn push_text(&mut self, text: &str) -> Result<(), Box>; + + /// Push a tool start notification. + async fn push_tool_start(&mut self, tool_name: &str) -> Result<(), Box>; + + /// Push a tool result notification. + async fn push_tool_result( + &mut self, + tool_name: &str, + preview: &str, + is_error: bool, + ) -> Result<(), Box>; + + /// Finalize the stream — send the complete formatted message. + async fn finalize(&mut self) -> Result<(), Box>; + + /// Abort the stream — best-effort send of whatever we have. + async fn abort(&mut self) -> Result<(), Box>; } /// Split a message into chunks of at most `max_len` characters, diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index a5455d79a..344256908 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -1523,6 +1523,46 @@ pub struct ChannelsConfig { pub linkedin: Option, } +/// Telegram streaming mode. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TelegramStreamMode { + /// Disable streaming — send complete response at once. + Off, + /// Edit mode: sendMessage + editMessageText (works everywhere). + Edit, + /// Draft mode: sendMessageDraft (Bot API 9.5, DM only). + Draft, + /// Auto-select best mode (Draft for DM, Edit for groups). + #[default] + Auto, +} + +/// Configuration for Telegram streaming behavior. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct TelegramStreamConfig { + /// Minimum interval between edits in milliseconds. + pub flush_interval_ms: u64, + /// Minimum new characters before flushing an edit. + pub min_chars_per_flush: usize, + /// Max characters per message before splitting (Telegram limit is 4096). + pub max_message_chars: usize, + /// Cursor indicator appended during streaming. + pub cursor_indicator: String, +} + +impl Default for TelegramStreamConfig { + fn default() -> Self { + Self { + flush_interval_ms: 500, + min_chars_per_flush: 40, + max_message_chars: 3900, + cursor_indicator: "\u{258C}".to_string(), // ▌ + } + } +} + /// Telegram channel adapter configuration. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] @@ -1538,6 +1578,12 @@ pub struct TelegramConfig { /// Per-channel behavior overrides. #[serde(default)] pub overrides: ChannelOverrides, + /// Streaming mode for progressive message display. + #[serde(default)] + pub stream_mode: TelegramStreamMode, + /// Streaming configuration (timing, thresholds). + #[serde(default)] + pub stream_config: TelegramStreamConfig, } impl Default for TelegramConfig { @@ -1548,6 +1594,8 @@ impl Default for TelegramConfig { default_agent: None, poll_interval_secs: 1, overrides: ChannelOverrides::default(), + stream_mode: TelegramStreamMode::default(), + stream_config: TelegramStreamConfig::default(), } } }