Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 52 additions & 2 deletions crates/openfang-api/src/channel_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use openfang_channels::slack::SlackAdapter;
use openfang_channels::teams::TeamsAdapter;
use openfang_channels::telegram::TelegramAdapter;
use openfang_channels::twitch::TwitchAdapter;
use openfang_channels::types::ChannelAdapter;
use openfang_channels::types::{ChannelAdapter, ChannelStreamEvent};
use openfang_channels::whatsapp::WhatsAppAdapter;
use openfang_channels::xmpp::XmppAdapter;
use openfang_channels::zulip::ZulipAdapter;
Expand Down Expand Up @@ -73,6 +73,54 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
Ok(result.response)
}

async fn send_message_streaming(
&self,
agent_id: AgentId,
message: &str,
) -> Result<tokio::sync::mpsc::Receiver<ChannelStreamEvent>, String> {
let (mut kernel_rx, _handle) = self
.kernel
.send_message_streaming(agent_id, message, None)
.map_err(|e| format!("{e}"))?;

let (tx, rx) = tokio::sync::mpsc::channel::<ChannelStreamEvent>(64);

// Spawn a task to convert StreamEvent → ChannelStreamEvent
tokio::spawn(async move {
use openfang_runtime::llm_driver::StreamEvent;

while let Some(event) = kernel_rx.recv().await {
let channel_event = match event {
StreamEvent::TextDelta { text } => ChannelStreamEvent::TextDelta { text },
StreamEvent::ToolUseStart { name, .. } => {
ChannelStreamEvent::ToolStart { name }
}
StreamEvent::ToolExecutionResult {
name,
result_preview,
is_error,
} => ChannelStreamEvent::ToolResult {
name,
preview: result_preview,
is_error,
},
StreamEvent::ContentComplete { .. } => ChannelStreamEvent::Complete,
// Skip events the channel layer doesn't care about
StreamEvent::ToolInputDelta { .. }
| StreamEvent::ToolUseEnd { .. }
| StreamEvent::ThinkingDelta { .. }
| StreamEvent::PhaseChange { .. } => continue,
};

if tx.send(channel_event).await.is_err() {
break; // receiver dropped
}
}
});

Ok(rx)
}

async fn find_agent_by_name(&self, name: &str) -> Result<Option<AgentId>, String> {
Ok(self.kernel.registry.find_by_name(name).map(|e| e.id))
}
Expand Down Expand Up @@ -1026,10 +1074,12 @@ pub async fn start_channel_bridge_with_config(
if let Some(ref tg_config) = config.telegram {
if let Some(token) = read_token(&tg_config.bot_token_env, "Telegram") {
let poll_interval = Duration::from_secs(tg_config.poll_interval_secs);
let adapter = Arc::new(TelegramAdapter::new(
let adapter = Arc::new(TelegramAdapter::with_streaming(
token,
tg_config.allowed_users.clone(),
poll_interval,
tg_config.stream_mode,
tg_config.stream_config.clone(),
));
adapters.push((adapter, tg_config.default_agent.clone()));
}
Expand Down
106 changes: 104 additions & 2 deletions crates/openfang-channels/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

use crate::formatter;
use crate::router::AgentRouter;
use crate::types::{ChannelAdapter, ChannelContent, ChannelMessage, ChannelUser};
use crate::types::{
ChannelAdapter, ChannelContent, ChannelMessage, ChannelStreamEvent, ChannelUser,
};
use async_trait::async_trait;
use dashmap::DashMap;
use futures::StreamExt;
Expand Down Expand Up @@ -128,6 +130,18 @@ pub trait ChannelBridgeHandle: Send + Sync {
None
}

/// Send a message to an agent with streaming responses.
///
/// Returns a receiver for `ChannelStreamEvent`s. Default implementation
/// returns an error (streaming not supported).
async fn send_message_streaming(
&self,
_agent_id: AgentId,
_message: &str,
) -> Result<tokio::sync::mpsc::Receiver<ChannelStreamEvent>, String> {
Err("Streaming not supported".to_string())
}

// ── Automation: workflows, triggers, schedules, approvals ──

/// List all registered workflows as formatted text.
Expand Down Expand Up @@ -622,7 +636,28 @@ async fn dispatch_message(
// Send typing indicator (best-effort)
let _ = adapter.send_typing(&message.sender).await;

// Send to agent and relay response
// Try streaming path if adapter supports it
if adapter.supports_streaming() {
match dispatch_streaming(
&text,
agent_id,
handle,
adapter,
&message.sender,
message.is_group,
ct_str,
)
.await
{
Ok(()) => return,
Err(e) => {
debug!("Streaming dispatch failed, falling back to non-streaming: {e}");
// Fall through to non-streaming path
}
}
}

// Send to agent and relay response (non-streaming path)
match handle.send_message(agent_id, &text).await {
Ok(response) => {
send_response(adapter, &message.sender, response, thread_id, output_format).await;
Expand Down Expand Up @@ -654,6 +689,73 @@ async fn dispatch_message(
}
}

/// Dispatch a message using streaming output.
///
/// Creates a `StreamSink` from the adapter, then forwards `ChannelStreamEvent`s
/// from the kernel bridge handle into the sink for progressive display.
async fn dispatch_streaming(
text: &str,
agent_id: AgentId,
handle: &Arc<dyn ChannelBridgeHandle>,
adapter: &dyn ChannelAdapter,
user: &ChannelUser,
is_group: bool,
ct_str: &str,
) -> Result<(), Box<dyn std::error::Error>> {
// Get the stream receiver from the bridge handle
let mut rx = handle
.send_message_streaming(agent_id, text)
.await
.map_err(|e| -> Box<dyn std::error::Error> { e.into() })?;

// Create a streaming sink from the adapter
let mut sink = adapter.begin_stream(user, is_group).await?;

// Process stream events
while let Some(event) = rx.recv().await {
match event {
ChannelStreamEvent::TextDelta { text } => {
let push_err = match sink.push_text(&text).await {
Ok(()) => None,
Err(e) => Some(e.to_string()),
};
if let Some(err) = push_err {
warn!("Stream sink push_text failed: {err}");
sink.abort().await.ok();
return Err(err.into());
}
}
ChannelStreamEvent::ToolStart { name } => {
sink.push_tool_start(&name).await.ok();
}
ChannelStreamEvent::ToolResult {
name,
preview,
is_error,
} => {
sink.push_tool_result(&name, &preview, is_error).await.ok();
}
ChannelStreamEvent::Complete => {
break;
}
ChannelStreamEvent::Error { message } => {
warn!("Stream error from kernel: {message}");
sink.abort().await.ok();
return Err(message.into());
}
}
}

// Finalize the stream
sink.finalize().await?;

handle
.record_delivery(agent_id, ct_str, &user.platform_id, true, None)
.await;

Ok(())
}

/// Handle a bot command (returns the response text).
async fn handle_command(
name: &str,
Expand Down
42 changes: 42 additions & 0 deletions crates/openfang-channels/src/formatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,48 @@ fn markdown_to_plain(text: &str) -> String {
result
}

/// Lightweight HTML entity escaping for streaming intermediate states.
///
/// Only escapes `&`, `<`, `>` — no Markdown conversion. Used during streaming
/// so partial text doesn't break Telegram's HTML parser.
pub fn escape_html_entities(text: &str) -> String {
text.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
}

/// Find the best split point near `max_chars` for message splitting.
///
/// Prefers splitting at paragraph breaks (`\n\n`), then line breaks (`\n`),
/// then sentence endings (`. `), then word boundaries (` `).
/// Returns the byte offset to split at, or `max_chars` if no good point found.
pub fn find_split_point(text: &str, max_chars: usize) -> usize {
if text.len() <= max_chars {
return text.len();
}

let search_range = &text[..max_chars];

// Prefer paragraph break
if let Some(pos) = search_range.rfind("\n\n") {
return pos;
}
// Then line break
if let Some(pos) = search_range.rfind('\n') {
return pos;
}
// Then sentence end
if let Some(pos) = search_range.rfind(". ") {
return pos + 1; // include the period
}
// Then word boundary
if let Some(pos) = search_range.rfind(' ') {
return pos;
}

max_chars
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading