From 5eb5fed49b7585b6a0bcdb5670751fcbfef622ee Mon Sep 17 00:00:00 2001 From: "luodong.seu" Date: Mon, 2 Mar 2026 11:32:35 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat(feishu):=20=E6=B7=BB=E5=8A=A0WebSocket?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=E6=94=AF=E6=8C=81=E5=B9=B6=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E9=80=82=E9=85=8D=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加Feishu通道的WebSocket模式支持,允许通过长连接接收事件而无需公网IP 重构适配器代码,分离Webhook和WebSocket实现,优化错误处理和日志记录 --- crates/openfang-channels/src/feishu.rs | 572 +++++++++++++++++-------- 1 file changed, 394 insertions(+), 178 deletions(-) diff --git a/crates/openfang-channels/src/feishu.rs b/crates/openfang-channels/src/feishu.rs index 7f4290477..5d4263d3e 100644 --- a/crates/openfang-channels/src/feishu.rs +++ b/crates/openfang-channels/src/feishu.rs @@ -1,8 +1,11 @@ //! Feishu/Lark Open Platform channel adapter. //! -//! Uses the Feishu Open API for sending messages and a webhook HTTP server for -//! receiving inbound events. Authentication is performed via a tenant access token -//! obtained from `https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal`. +//! Uses the Feishu Open API for sending messages. Supports two modes for receiving inbound events: +//! 1. Webhook mode: HTTP server for receiving event callbacks +//! 2. WebSocket mode: WebSocket long connection for receiving events (no public IP required) +//! +//! Authentication is performed via a tenant access token obtained from +//! `https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal`. //! The token is cached and refreshed automatically (2-hour expiry). use crate::types::{ @@ -10,13 +13,14 @@ use crate::types::{ }; use async_trait::async_trait; use chrono::Utc; -use futures::Stream; +use futures::{SinkExt, Stream, StreamExt}; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{mpsc, watch, RwLock}; -use tracing::{info, warn}; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +use tracing::{debug, error, info, warn}; use zeroize::Zeroizing; /// Feishu tenant access token endpoint. @@ -35,21 +39,31 @@ const MAX_MESSAGE_LEN: usize = 4096; /// Token refresh buffer — refresh 5 minutes before actual expiry. const TOKEN_REFRESH_BUFFER_SECS: u64 = 300; +/// Feishu connection mode. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FeishuConnectionMode { + /// Webhook mode: HTTP server receives event callbacks. + Webhook, + /// WebSocket mode: Long connection receives events (no public IP required). + WebSocket, +} + /// Feishu/Lark Open Platform adapter. /// -/// Inbound messages arrive via a webhook HTTP server that receives event -/// callbacks from the Feishu platform. Outbound messages are sent via the -/// Feishu IM API with a tenant access token for authentication. +/// Inbound messages arrive via either a webhook HTTP server or WebSocket long connection. +/// Outbound messages are sent via the Feishu IM API with a tenant access token for authentication. pub struct FeishuAdapter { /// Feishu app ID. app_id: String, /// SECURITY: Feishu app secret, zeroized on drop. app_secret: Zeroizing, - /// Port on which the inbound webhook HTTP server listens. + /// Connection mode (Webhook or WebSocket). + connection_mode: FeishuConnectionMode, + /// Port on which the inbound webhook HTTP server listens (Webhook mode only). webhook_port: u16, - /// Optional verification token for webhook event validation. + /// Optional verification token for webhook event validation (Webhook mode only). verification_token: Option, - /// Optional encrypt key for webhook event decryption. + /// Optional encrypt key for webhook event decryption (Webhook mode only). encrypt_key: Option, /// HTTP client for API calls. client: reqwest::Client, @@ -61,7 +75,7 @@ pub struct FeishuAdapter { } impl FeishuAdapter { - /// Create a new Feishu adapter. + /// Create a new Feishu adapter in Webhook mode. /// /// # Arguments /// * `app_id` - Feishu application ID. @@ -72,6 +86,7 @@ impl FeishuAdapter { Self { app_id, app_secret: Zeroizing::new(app_secret), + connection_mode: FeishuConnectionMode::Webhook, webhook_port, verification_token: None, encrypt_key: None, @@ -82,7 +97,7 @@ impl FeishuAdapter { } } - /// Create a new Feishu adapter with webhook verification. + /// Create a new Feishu adapter in Webhook mode with verification. pub fn with_verification( app_id: String, app_secret: String, @@ -96,9 +111,31 @@ impl FeishuAdapter { adapter } + /// Create a new Feishu adapter in WebSocket mode. + /// + /// WebSocket mode does not require a public IP or webhook configuration. + /// + /// # Arguments + /// * `app_id` - Feishu application ID. + /// * `app_secret` - Feishu application secret. + pub fn new_websocket(app_id: String, app_secret: String) -> Self { + let (shutdown_tx, shutdown_rx) = watch::channel(false); + Self { + app_id, + app_secret: Zeroizing::new(app_secret), + connection_mode: FeishuConnectionMode::WebSocket, + webhook_port: 0, + verification_token: None, + encrypt_key: None, + client: reqwest::Client::new(), + shutdown_tx: Arc::new(shutdown_tx), + shutdown_rx, + cached_token: Arc::new(RwLock::new(None)), + } + } + /// Obtain a valid tenant access token, refreshing if expired or missing. async fn get_token(&self) -> Result> { - // Check cache first { let guard = self.cached_token.read().await; if let Some((ref token, expiry)) = *guard { @@ -108,7 +145,6 @@ impl FeishuAdapter { } } - // Fetch a new tenant access token let body = serde_json::json!({ "app_id": self.app_id, "app_secret": self.app_secret.as_str(), @@ -140,7 +176,6 @@ impl FeishuAdapter { .to_string(); let expire = resp_body["expire"].as_u64().unwrap_or(7200); - // Cache with safety buffer let expiry = Instant::now() + Duration::from_secs(expire.saturating_sub(TOKEN_REFRESH_BUFFER_SECS)); *self.cached_token.write().await = Some((tenant_access_token.clone(), expiry)); @@ -227,32 +262,283 @@ impl FeishuAdapter { Ok(()) } - /// Reply to a message in a thread. - #[allow(dead_code)] - async fn api_reply_message( - &self, - message_id: &str, - text: &str, - ) -> Result<(), Box> { - let token = self.get_token().await?; - let url = format!( - "https://open.feishu.cn/open-apis/im/v1/messages/{}/reply", - message_id - ); + /// Start webhook server (Webhook mode). + async fn start_webhook(&self, tx: mpsc::Sender) -> Result<(), Box> { + let port = self.webhook_port; + let verification_token = self.verification_token.clone(); + let mut shutdown_rx = self.shutdown_rx.clone(); + + tokio::spawn(async move { + let verification_token = Arc::new(verification_token); + let tx = Arc::new(tx); + + let app = axum::Router::new().route( + "/feishu/webhook", + axum::routing::post({ + let vt = Arc::clone(&verification_token); + let tx = Arc::clone(&tx); + move |body: axum::extract::Json| { + let vt = Arc::clone(&vt); + let tx = Arc::clone(&tx); + async move { + if let Some(challenge) = body.0.get("challenge") { + if let Some(ref expected_token) = *vt { + let token = body.0["token"].as_str().unwrap_or(""); + if token != expected_token { + warn!("Feishu: invalid verification token"); + return ( + axum::http::StatusCode::FORBIDDEN, + axum::Json(serde_json::json!({})), + ); + } + } + return ( + axum::http::StatusCode::OK, + axum::Json(serde_json::json!({ + "challenge": challenge, + })), + ); + } + + if let Some(schema) = body.0["schema"].as_str() { + if schema == "2.0" { + if let Some(msg) = parse_feishu_event(&body.0) { + let _ = tx.send(msg).await; + } + } + } else { + let event_type = body.0["event"]["type"].as_str().unwrap_or(""); + if event_type == "message" { + let event = &body.0["event"]; + let text = event["text"].as_str().unwrap_or(""); + if !text.is_empty() { + let open_id = + event["open_id"].as_str().unwrap_or("").to_string(); + let chat_id = event["open_chat_id"] + .as_str() + .unwrap_or("") + .to_string(); + let msg_id = event["open_message_id"] + .as_str() + .unwrap_or("") + .to_string(); + let is_group = + event["chat_type"].as_str().unwrap_or("") == "group"; + + let content = if text.starts_with('/') { + let parts: Vec<&str> = text.splitn(2, ' ').collect(); + let cmd = parts[0].trim_start_matches('/'); + let args: Vec = parts + .get(1) + .map(|a| { + a.split_whitespace().map(String::from).collect() + }) + .unwrap_or_default(); + ChannelContent::Command { + name: cmd.to_string(), + args, + } + } else { + ChannelContent::Text(text.to_string()) + }; + + let channel_msg = ChannelMessage { + channel: ChannelType::Custom("feishu".to_string()), + platform_message_id: msg_id, + sender: ChannelUser { + platform_id: chat_id, + display_name: open_id, + openfang_user: None, + }, + content, + target_agent: None, + timestamp: Utc::now(), + is_group, + thread_id: None, + metadata: HashMap::new(), + }; + + let _ = tx.send(channel_msg).await; + } + } + } + + ( + axum::http::StatusCode::OK, + axum::Json(serde_json::json!({})), + ) + } + } + }), + ); + + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); + info!("Feishu webhook server listening on {addr}"); + + let listener = match tokio::net::TcpListener::bind(addr).await { + Ok(l) => l, + Err(e) => { + warn!("Feishu webhook bind failed: {e}"); + return; + } + }; + + let server = axum::serve(listener, app); + + tokio::select! { + result = server => { + if let Err(e) = result { + warn!("Feishu webhook server error: {e}"); + } + } + _ = shutdown_rx.changed() => { + info!("Feishu adapter shutting down"); + } + } + }); - let content = serde_json::json!({ - "text": text, + Ok(()) + } + + /// Start WebSocket connection loop (WebSocket mode). + async fn start_websocket_loop(&self, tx: mpsc::Sender) -> Result<(), Box> { + let self_arc = Arc::new(self.clone_adapter()); + + tokio::spawn(async move { + info!("Starting Feishu WebSocket mode"); + let mut backoff = Duration::from_secs(1); + let max_backoff = Duration::from_secs(60); + + loop { + match Self::run_websocket_inner(self_arc.clone(), tx.clone()).await { + Ok(_) => { + info!("Feishu WebSocket connection closed, reconnecting..."); + } + Err(e) => { + error!("Feishu WebSocket error: {e}, reconnecting in {backoff:?}"); + } + } + + tokio::time::sleep(backoff).await; + backoff = std::cmp::min(backoff * 2, max_backoff); + } }); + Ok(()) + } + + /// Clone adapter for use in async tasks. + fn clone_adapter(&self) -> FeishuAdapterClone { + FeishuAdapterClone { + app_id: self.app_id.clone(), + app_secret: self.app_secret.clone(), + client: self.client.clone(), + cached_token: self.cached_token.clone(), + shutdown_rx: self.shutdown_rx.clone(), + } + } + + /// Run WebSocket connection loop (inner implementation). + async fn run_websocket_inner( + adapter: Arc, + tx: mpsc::Sender, + ) -> Result<(), Box> { + let ws_url = adapter.get_websocket_endpoint().await?; + info!("Connecting to Feishu WebSocket endpoint: {ws_url}"); + + let (ws_stream, _) = connect_async(ws_url).await?; + info!("Feishu WebSocket connected successfully"); + + let (mut write, mut read) = ws_stream.split(); + let mut shutdown_rx = adapter.shutdown_rx.clone(); + + loop { + tokio::select! { + msg = read.next() => { + match msg { + Some(Ok(Message::Text(text))) => { + debug!("Received Feishu WebSocket message: {text}"); + if let Ok(event) = serde_json::from_str::(&text) { + if let Some(msg) = parse_feishu_event(&event) { + let _ = tx.send(msg).await; + } + } + } + Some(Ok(Message::Binary(data))) => { + debug!("Received Feishu WebSocket binary message: {} bytes", data.len()); + if let Ok(text) = String::from_utf8(data) { + if let Ok(event) = serde_json::from_str::(&text) { + if let Some(msg) = parse_feishu_event(&event) { + let _ = tx.send(msg).await; + } + } + } + } + Some(Ok(Message::Close(_))) => { + info!("Feishu WebSocket connection closed by server"); + break; + } + Some(Ok(Message::Ping(_))) => { + debug!("Received Feishu WebSocket ping, sending pong"); + let _ = write.send(Message::Pong(Vec::new())).await; + } + Some(Ok(Message::Pong(_))) => { + debug!("Received Feishu WebSocket pong"); + } + Some(Ok(_)) => { + debug!("Received unhandled Feishu WebSocket message type"); + } + Some(Err(e)) => { + error!("Feishu WebSocket error: {e}"); + break; + } + None => { + info!("Feishu WebSocket stream ended"); + break; + } + } + } + _ = shutdown_rx.changed() => { + info!("Feishu WebSocket shutting down"); + let _ = write.close().await; + break; + } + } + } + + Ok(()) + } +} + +/// Cloneable Feishu adapter parts for use in async tasks. +struct FeishuAdapterClone { + app_id: String, + app_secret: Zeroizing, + client: reqwest::Client, + cached_token: Arc>>, + shutdown_rx: watch::Receiver, +} + +impl FeishuAdapterClone { + /// Get a valid tenant access token, refreshing if expired or missing. + async fn get_token(&self) -> Result> { + { + let guard = self.cached_token.read().await; + if let Some((ref token, expiry)) = *guard { + if Instant::now() < expiry { + return Ok(token.clone()); + } + } + } + let body = serde_json::json!({ - "msg_type": "text", - "content": content.to_string(), + "app_id": self.app_id, + "app_secret": self.app_secret.as_str(), }); let resp = self .client - .post(&url) - .bearer_auth(&token) + .post(FEISHU_TOKEN_URL) .json(&body) .send() .await?; @@ -260,10 +546,60 @@ impl FeishuAdapter { if !resp.status().is_success() { let status = resp.status(); let resp_body = resp.text().await.unwrap_or_default(); - return Err(format!("Feishu reply message error {status}: {resp_body}").into()); + return Err(format!("Feishu token request failed {status}: {resp_body}").into()); } - Ok(()) + let resp_body: serde_json::Value = resp.json().await?; + let code = resp_body["code"].as_i64().unwrap_or(-1); + if code != 0 { + let msg = resp_body["msg"].as_str().unwrap_or("unknown error"); + return Err(format!("Feishu token error: {msg}").into()); + } + + let tenant_access_token = resp_body["tenant_access_token"] + .as_str() + .ok_or("Missing tenant_access_token")? + .to_string(); + let expire = resp_body["expire"].as_u64().unwrap_or(7200); + + let expiry = + Instant::now() + Duration::from_secs(expire.saturating_sub(TOKEN_REFRESH_BUFFER_SECS)); + *self.cached_token.write().await = Some((tenant_access_token.clone(), expiry)); + + Ok(tenant_access_token) + } + + /// Get WebSocket endpoint from Feishu API. + async fn get_websocket_endpoint(&self) -> Result> { + let token = self.get_token().await?; + let url = "https://open.feishu.cn/open-apis/ws/v1/endpoint"; + + let resp = self + .client + .get(url) + .bearer_auth(&token) + .send() + .await?; + + if !resp.status().is_success() { + let status = resp.status(); + let resp_body = resp.text().await.unwrap_or_default(); + return Err(format!("Feishu WebSocket endpoint request failed {status}: {resp_body}").into()); + } + + let resp_body: serde_json::Value = resp.json().await?; + let code = resp_body["code"].as_i64().unwrap_or(-1); + if code != 0 { + let msg = resp_body["msg"].as_str().unwrap_or("unknown error"); + return Err(format!("Feishu WebSocket endpoint error: {msg}").into()); + } + + let ws_url = resp_body["data"]["url"] + .as_str() + .ok_or("Missing WebSocket URL in response")? + .to_string(); + + Ok(ws_url) } } @@ -271,7 +607,6 @@ impl FeishuAdapter { /// /// Handles `im.message.receive_v1` events with text message type. fn parse_feishu_event(event: &serde_json::Value) -> Option { - // Feishu v2 event schema let header = event.get("header")?; let event_type = header["event_type"].as_str().unwrap_or(""); @@ -288,7 +623,6 @@ fn parse_feishu_event(event: &serde_json::Value) -> Option { return None; } - // Parse the content JSON string let content_str = message["content"].as_str().unwrap_or("{}"); let content_json: serde_json::Value = serde_json::from_str(content_str).unwrap_or_default(); let text = content_json["text"].as_str().unwrap_or(""); @@ -309,7 +643,6 @@ fn parse_feishu_event(event: &serde_json::Value) -> Option { .to_string(); let sender_type = sender["sender_type"].as_str().unwrap_or("user"); - // Skip bot messages if sender_type == "bot" { return None; } @@ -383,149 +716,19 @@ impl ChannelAdapter for FeishuAdapter { &self, ) -> Result + Send>>, Box> { - // Validate credentials let bot_name = self.validate().await?; info!("Feishu adapter authenticated as {bot_name}"); let (tx, rx) = mpsc::channel::(256); - let port = self.webhook_port; - let verification_token = self.verification_token.clone(); - let mut shutdown_rx = self.shutdown_rx.clone(); - - tokio::spawn(async move { - let verification_token = Arc::new(verification_token); - let tx = Arc::new(tx); - - let app = axum::Router::new().route( - "/feishu/webhook", - axum::routing::post({ - let vt = Arc::clone(&verification_token); - let tx = Arc::clone(&tx); - move |body: axum::extract::Json| { - let vt = Arc::clone(&vt); - let tx = Arc::clone(&tx); - async move { - // Handle URL verification challenge - if let Some(challenge) = body.0.get("challenge") { - // Verify token if configured - if let Some(ref expected_token) = *vt { - let token = body.0["token"].as_str().unwrap_or(""); - if token != expected_token { - warn!("Feishu: invalid verification token"); - return ( - axum::http::StatusCode::FORBIDDEN, - axum::Json(serde_json::json!({})), - ); - } - } - return ( - axum::http::StatusCode::OK, - axum::Json(serde_json::json!({ - "challenge": challenge, - })), - ); - } - - // Handle event callback - if let Some(schema) = body.0["schema"].as_str() { - if schema == "2.0" { - // V2 event format - if let Some(msg) = parse_feishu_event(&body.0) { - let _ = tx.send(msg).await; - } - } - } else { - // V1 event format (legacy) - let event_type = body.0["event"]["type"].as_str().unwrap_or(""); - if event_type == "message" { - // Legacy format handling - let event = &body.0["event"]; - let text = event["text"].as_str().unwrap_or(""); - if !text.is_empty() { - let open_id = - event["open_id"].as_str().unwrap_or("").to_string(); - let chat_id = event["open_chat_id"] - .as_str() - .unwrap_or("") - .to_string(); - let msg_id = event["open_message_id"] - .as_str() - .unwrap_or("") - .to_string(); - let is_group = - event["chat_type"].as_str().unwrap_or("") == "group"; - - let content = if text.starts_with('/') { - let parts: Vec<&str> = text.splitn(2, ' ').collect(); - let cmd = parts[0].trim_start_matches('/'); - let args: Vec = parts - .get(1) - .map(|a| { - a.split_whitespace().map(String::from).collect() - }) - .unwrap_or_default(); - ChannelContent::Command { - name: cmd.to_string(), - args, - } - } else { - ChannelContent::Text(text.to_string()) - }; - - let channel_msg = ChannelMessage { - channel: ChannelType::Custom("feishu".to_string()), - platform_message_id: msg_id, - sender: ChannelUser { - platform_id: chat_id, - display_name: open_id, - openfang_user: None, - }, - content, - target_agent: None, - timestamp: Utc::now(), - is_group, - thread_id: None, - metadata: HashMap::new(), - }; - - let _ = tx.send(channel_msg).await; - } - } - } - - ( - axum::http::StatusCode::OK, - axum::Json(serde_json::json!({})), - ) - } - } - }), - ); - let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); - info!("Feishu webhook server listening on {addr}"); - - let listener = match tokio::net::TcpListener::bind(addr).await { - Ok(l) => l, - Err(e) => { - warn!("Feishu webhook bind failed: {e}"); - return; - } - }; - - let server = axum::serve(listener, app); - - tokio::select! { - result = server => { - if let Err(e) = result { - warn!("Feishu webhook server error: {e}"); - } - } - _ = shutdown_rx.changed() => { - info!("Feishu adapter shutting down"); - } + match self.connection_mode { + FeishuConnectionMode::Webhook => { + self.start_webhook(tx).await?; } - }); + FeishuConnectionMode::WebSocket => { + self.start_websocket_loop(tx).await?; + } + } Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))) } @@ -537,7 +740,6 @@ impl ChannelAdapter for FeishuAdapter { ) -> Result<(), Box> { match content { ChannelContent::Text(text) => { - // Use chat_id as receive_id with chat_id type self.api_send_message(&user.platform_id, "chat_id", &text) .await?; } @@ -550,7 +752,6 @@ impl ChannelAdapter for FeishuAdapter { } async fn send_typing(&self, _user: &ChannelUser) -> Result<(), Box> { - // Feishu does not support typing indicators via REST API Ok(()) } @@ -574,6 +775,21 @@ mod tests { ChannelType::Custom("feishu".to_string()) ); assert_eq!(adapter.webhook_port, 9000); + assert_eq!(adapter.connection_mode, FeishuConnectionMode::Webhook); + } + + #[test] + fn test_feishu_websocket_adapter_creation() { + let adapter = FeishuAdapter::new_websocket( + "cli_abc123".to_string(), + "app-secret-456".to_string(), + ); + assert_eq!(adapter.name(), "feishu"); + assert_eq!( + adapter.channel_type(), + ChannelType::Custom("feishu".to_string()) + ); + assert_eq!(adapter.connection_mode, FeishuConnectionMode::WebSocket); } #[test] From 95b63628980bf9cd8cdc2113b4accf5cd961f62d Mon Sep 17 00:00:00 2001 From: xinuxZ Date: Wed, 4 Mar 2026 18:05:41 +0800 Subject: [PATCH 2/3] feat(feishu): wire websocket mode across config and bridge --- Cargo.lock | 52 ++- Cargo.toml | 1 + crates/openfang-api/src/channel_bridge.rs | 52 ++- crates/openfang-api/src/routes.rs | 384 ++++++++++++---- crates/openfang-channels/Cargo.toml | 1 + crates/openfang-channels/src/feishu.rs | 531 +++++++++++++++++----- crates/openfang-types/src/config.rs | 39 ++ 7 files changed, 818 insertions(+), 242 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c609179a..5357f152d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3866,7 +3866,7 @@ dependencies = [ [[package]] name = "openfang-api" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "axum", @@ -3902,7 +3902,7 @@ dependencies = [ [[package]] name = "openfang-channels" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "axum", @@ -3917,6 +3917,7 @@ dependencies = [ "mailparse", "native-tls", "openfang-types", + "prost", "reqwest 0.12.28", "serde", "serde_json", @@ -3933,7 +3934,7 @@ dependencies = [ [[package]] name = "openfang-cli" -version = "0.3.11" +version = "0.3.12" dependencies = [ "clap", "clap_complete", @@ -3960,7 +3961,7 @@ dependencies = [ [[package]] name = "openfang-desktop" -version = "0.3.11" +version = "0.3.12" dependencies = [ "axum", "open", @@ -3986,7 +3987,7 @@ dependencies = [ [[package]] name = "openfang-extensions" -version = "0.3.11" +version = "0.3.12" dependencies = [ "aes-gcm", "argon2", @@ -4014,7 +4015,7 @@ dependencies = [ [[package]] name = "openfang-hands" -version = "0.3.11" +version = "0.3.12" dependencies = [ "chrono", "dashmap", @@ -4031,7 +4032,7 @@ dependencies = [ [[package]] name = "openfang-kernel" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "chrono", @@ -4067,7 +4068,7 @@ dependencies = [ [[package]] name = "openfang-memory" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "chrono", @@ -4086,7 +4087,7 @@ dependencies = [ [[package]] name = "openfang-migrate" -version = "0.3.11" +version = "0.3.12" dependencies = [ "chrono", "dirs 6.0.0", @@ -4105,7 +4106,7 @@ dependencies = [ [[package]] name = "openfang-runtime" -version = "0.3.11" +version = "0.3.12" dependencies = [ "anyhow", "async-trait", @@ -4137,7 +4138,7 @@ dependencies = [ [[package]] name = "openfang-skills" -version = "0.3.11" +version = "0.3.12" dependencies = [ "chrono", "hex", @@ -4160,7 +4161,7 @@ dependencies = [ [[package]] name = "openfang-types" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "chrono", @@ -4179,7 +4180,7 @@ dependencies = [ [[package]] name = "openfang-wire" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "chrono", @@ -4782,6 +4783,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "psm" version = "0.1.30" @@ -8791,7 +8815,7 @@ checksum = "b9cc00251562a284751c9973bace760d86c0276c471b4be569fe6b068ee97a56" [[package]] name = "xtask" -version = "0.3.11" +version = "0.3.12" [[package]] name = "yoke" diff --git a/Cargo.toml b/Cargo.toml index 5d8b0a6c0..028f86ef3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ bytes = "1" # Futures futures = "0.3" +prost = "0.13" # WebSocket client (for Discord/Slack gateway) tokio-tungstenite = { version = "0.24", default-features = false, features = ["connect", "rustls-tls-native-roots"] } diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index 6d30b1915..71a6935a0 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -30,6 +30,7 @@ use openfang_channels::messenger::MessengerAdapter; use openfang_channels::reddit::RedditAdapter; use openfang_channels::revolt::RevoltAdapter; use openfang_channels::viber::ViberAdapter; +use openfang_types::config::FeishuMode; // Wave 4 use openfang_channels::flock::FlockAdapter; use openfang_channels::guilded::GuildedAdapter; @@ -1064,7 +1065,9 @@ pub async fn start_channel_bridge_with_config( // WhatsApp — supports Cloud API mode (access token) or Web/QR mode (gateway URL) if let Some(ref wa_config) = config.whatsapp { let cloud_token = read_token(&wa_config.access_token_env, "WhatsApp"); - let gateway_url = std::env::var(&wa_config.gateway_url_env).ok().filter(|u| !u.is_empty()); + let gateway_url = std::env::var(&wa_config.gateway_url_env) + .ok() + .filter(|u| !u.is_empty()); if cloud_token.is_some() || gateway_url.is_some() { let token = cloud_token.unwrap_or_default(); @@ -1312,11 +1315,17 @@ pub async fn start_channel_bridge_with_config( // Feishu/Lark if let Some(ref fs_config) = config.feishu { if let Some(secret) = read_token(&fs_config.app_secret_env, "Feishu") { - let adapter = Arc::new(FeishuAdapter::new( - fs_config.app_id.clone(), - secret, - fs_config.webhook_port, - )); + let adapter = match fs_config.mode { + FeishuMode::Webhook => Arc::new(FeishuAdapter::new( + fs_config.app_id.clone(), + secret, + fs_config.webhook_port, + )), + FeishuMode::Websocket => Arc::new(FeishuAdapter::new_websocket( + fs_config.app_id.clone(), + secret, + )), + }; adapters.push((adapter, fs_config.default_agent.clone())); } } @@ -1692,4 +1701,35 @@ mod tests { assert!(config.channels.webhook.is_none()); assert!(config.channels.linkedin.is_none()); } + + #[test] + fn test_feishu_bridge_mode_defaults_to_webhook() { + let config: openfang_types::config::KernelConfig = toml::from_str( + r#" + [channels.feishu] + app_id = "cli_test" + app_secret_env = "FEISHU_APP_SECRET" + "#, + ) + .unwrap(); + + let feishu = config.channels.feishu.expect("feishu config should exist"); + assert_eq!(feishu.mode, openfang_types::config::FeishuMode::Webhook); + } + + #[test] + fn test_feishu_bridge_mode_supports_websocket() { + let config: openfang_types::config::KernelConfig = toml::from_str( + r#" + [channels.feishu] + app_id = "cli_test" + app_secret_env = "FEISHU_APP_SECRET" + mode = "websocket" + "#, + ) + .unwrap(); + + let feishu = config.channels.feishu.expect("feishu config should exist"); + assert_eq!(feishu.mode, openfang_types::config::FeishuMode::Websocket); + } } diff --git a/crates/openfang-api/src/routes.rs b/crates/openfang-api/src/routes.rs index d2f66392c..88f55ab03 100644 --- a/crates/openfang-api/src/routes.rs +++ b/crates/openfang-api/src/routes.rs @@ -126,15 +126,13 @@ pub async fn list_agents(State(state): State>) -> impl IntoRespons .into_iter() .map(|e| { // Resolve "default" provider/model to actual kernel defaults - let provider = if e.manifest.model.provider.is_empty() - || e.manifest.model.provider == "default" - { - dm.provider.as_str() - } else { - e.manifest.model.provider.as_str() - }; - let model = if e.manifest.model.model.is_empty() - || e.manifest.model.model == "default" + let provider = + if e.manifest.model.provider.is_empty() || e.manifest.model.provider == "default" { + dm.provider.as_str() + } else { + e.manifest.model.provider.as_str() + }; + let model = if e.manifest.model.model.is_empty() || e.manifest.model.model == "default" { dm.model.as_str() } else { @@ -1474,11 +1472,12 @@ const CHANNEL_REGISTRY: &[ChannelMeta] = &[ fields: &[ ChannelField { key: "app_id", label: "App ID", field_type: FieldType::Text, env_var: None, required: true, placeholder: "cli_abc123", advanced: false }, ChannelField { key: "app_secret_env", label: "App Secret", field_type: FieldType::Secret, env_var: Some("FEISHU_APP_SECRET"), required: true, placeholder: "abc123...", advanced: false }, + ChannelField { key: "mode", label: "Receive Mode", field_type: FieldType::Text, env_var: None, required: false, placeholder: "webhook|websocket", advanced: true }, ChannelField { key: "webhook_port", label: "Webhook Port", field_type: FieldType::Number, env_var: None, required: false, placeholder: "8453", advanced: true }, ChannelField { key: "default_agent", label: "Default Agent", field_type: FieldType::Text, env_var: None, required: false, placeholder: "assistant", advanced: true }, ], setup_steps: &["Create an app at open.feishu.cn", "Copy App ID and Secret", "Paste them below"], - config_template: "[channels.feishu]\napp_id = \"\"\napp_secret_env = \"FEISHU_APP_SECRET\"", + config_template: "[channels.feishu]\napp_id = \"\"\napp_secret_env = \"FEISHU_APP_SECRET\"\nmode = \"webhook\"", }, ChannelMeta { name: "dingtalk", display_name: "DingTalk", icon: "DT", @@ -1850,9 +1849,7 @@ fn build_field_json( val.clone() }; field["value"] = display_val; - if !val.is_null() - && val.as_str().map(|s| !s.is_empty()).unwrap_or(true) - { + if !val.is_null() && val.as_str().map(|s| !s.is_empty()).unwrap_or(true) { field["has_value"] = serde_json::Value::Bool(true); } } @@ -1866,52 +1863,189 @@ fn find_channel_meta(name: &str) -> Option<&'static ChannelMeta> { CHANNEL_REGISTRY.iter().find(|c| c.name == name) } +#[cfg(test)] +mod channel_meta_tests { + use super::*; + + #[test] + fn feishu_channel_meta_includes_mode_field() { + let meta = find_channel_meta("feishu").expect("feishu channel meta should exist"); + assert!(meta.fields.iter().any(|f| f.key == "mode")); + } + + #[test] + fn feishu_channel_meta_template_includes_webhook_mode_default() { + let meta = find_channel_meta("feishu").expect("feishu channel meta should exist"); + assert!(meta.config_template.contains("mode = \"webhook\"")); + } +} + /// Serialize a channel's config to a JSON Value for pre-populating dashboard forms. fn channel_config_values( config: &openfang_types::config::ChannelsConfig, name: &str, ) -> Option { match name { - "telegram" => config.telegram.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "discord" => config.discord.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "slack" => config.slack.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "whatsapp" => config.whatsapp.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "signal" => config.signal.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "matrix" => config.matrix.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "email" => config.email.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "teams" => config.teams.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "mattermost" => config.mattermost.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "irc" => config.irc.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "google_chat" => config.google_chat.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "twitch" => config.twitch.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "rocketchat" => config.rocketchat.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "zulip" => config.zulip.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "xmpp" => config.xmpp.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "line" => config.line.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "viber" => config.viber.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "messenger" => config.messenger.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "reddit" => config.reddit.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "mastodon" => config.mastodon.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "bluesky" => config.bluesky.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "feishu" => config.feishu.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "revolt" => config.revolt.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "nextcloud" => config.nextcloud.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "guilded" => config.guilded.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "keybase" => config.keybase.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "threema" => config.threema.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "nostr" => config.nostr.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "webex" => config.webex.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "pumble" => config.pumble.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "flock" => config.flock.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "twist" => config.twist.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "mumble" => config.mumble.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "dingtalk" => config.dingtalk.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "discourse" => config.discourse.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "gitter" => config.gitter.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "ntfy" => config.ntfy.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "gotify" => config.gotify.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "webhook" => config.webhook.as_ref().and_then(|c| serde_json::to_value(c).ok()), - "linkedin" => config.linkedin.as_ref().and_then(|c| serde_json::to_value(c).ok()), + "telegram" => config + .telegram + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "discord" => config + .discord + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "slack" => config + .slack + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "whatsapp" => config + .whatsapp + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "signal" => config + .signal + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "matrix" => config + .matrix + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "email" => config + .email + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "teams" => config + .teams + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "mattermost" => config + .mattermost + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "irc" => config + .irc + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "google_chat" => config + .google_chat + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "twitch" => config + .twitch + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "rocketchat" => config + .rocketchat + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "zulip" => config + .zulip + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "xmpp" => config + .xmpp + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "line" => config + .line + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "viber" => config + .viber + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "messenger" => config + .messenger + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "reddit" => config + .reddit + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "mastodon" => config + .mastodon + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "bluesky" => config + .bluesky + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "feishu" => config + .feishu + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "revolt" => config + .revolt + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "nextcloud" => config + .nextcloud + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "guilded" => config + .guilded + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "keybase" => config + .keybase + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "threema" => config + .threema + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "nostr" => config + .nostr + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "webex" => config + .webex + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "pumble" => config + .pumble + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "flock" => config + .flock + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "twist" => config + .twist + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "mumble" => config + .mumble + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "dingtalk" => config + .dingtalk + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "discourse" => config + .discourse + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "gitter" => config + .gitter + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "ntfy" => config + .ntfy + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "gotify" => config + .gotify + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "webhook" => config + .webhook + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), + "linkedin" => config + .linkedin + .as_ref() + .and_then(|c| serde_json::to_value(c).ok()), _ => None, } } @@ -2033,7 +2167,10 @@ pub async fn configure_channel( ); } else { // Config field — collect for TOML write with type info - config_fields.insert(field_def.key.to_string(), (value.to_string(), field_def.field_type)); + config_fields.insert( + field_def.key.to_string(), + (value.to_string(), field_def.field_type), + ); } } @@ -2933,7 +3070,9 @@ pub async fn clawhub_search( "items": items, "next_cursor": null, }); - state.clawhub_cache.insert(cache_key, (Instant::now(), resp.clone())); + state + .clawhub_cache + .insert(cache_key, (Instant::now(), resp.clone())); (StatusCode::OK, Json(resp)) } Err(e) => { @@ -2947,9 +3086,7 @@ pub async fn clawhub_search( }; ( status, - Json( - serde_json::json!({"items": [], "next_cursor": null, "error": msg}), - ), + Json(serde_json::json!({"items": [], "next_cursor": null, "error": msg})), ) } } @@ -3002,7 +3139,9 @@ pub async fn clawhub_browse( "items": items, "next_cursor": results.next_cursor, }); - state.clawhub_cache.insert(cache_key, (Instant::now(), resp.clone())); + state + .clawhub_cache + .insert(cache_key, (Instant::now(), resp.clone())); (StatusCode::OK, Json(resp)) } Err(e) => { @@ -3015,9 +3154,7 @@ pub async fn clawhub_browse( }; ( status, - Json( - serde_json::json!({"items": [], "next_cursor": null, "error": msg}), - ), + Json(serde_json::json!({"items": [], "next_cursor": null, "error": msg})), ) } } @@ -3184,7 +3321,10 @@ pub async fn clawhub_install( StatusCode::FORBIDDEN } else if msg.contains("429") || msg.contains("rate limit") { StatusCode::TOO_MANY_REQUESTS - } else if msg.contains("Network error") || msg.contains("returned 4") || msg.contains("returned 5") { + } else if msg.contains("Network error") + || msg.contains("returned 4") + || msg.contains("returned 5") + { StatusCode::BAD_GATEWAY } else { StatusCode::INTERNAL_SERVER_ERROR @@ -3683,7 +3823,12 @@ pub async fn activate_hand( // If the hand agent has a non-reactive schedule (autonomous hands), // start its background loop so it begins running immediately. if let Some(agent_id) = instance.agent_id { - let entry = state.kernel.registry.list().into_iter().find(|e| e.id == agent_id); + let entry = state + .kernel + .registry + .list() + .into_iter() + .find(|e| e.id == agent_id); if let Some(entry) = entry { if !matches!( entry.manifest.schedule, @@ -3839,7 +3984,9 @@ pub async fn update_hand_settings( }, None => ( StatusCode::NOT_FOUND, - Json(serde_json::json!({"error": format!("No active instance for hand: {hand_id}. Activate the hand first.")})), + Json( + serde_json::json!({"error": format!("No active instance for hand: {hand_id}. Activate the hand first.")}), + ), ), } } @@ -3966,7 +4113,10 @@ pub async fn hand_instance_browser( content = data["content"].as_str().unwrap_or("").to_string(); // Truncate content to avoid huge payloads (UTF-8 safe) if content.len() > 2000 { - content = format!("{}... (truncated)", openfang_types::truncate_str(&content, 2000)); + content = format!( + "{}... (truncated)", + openfang_types::truncate_str(&content, 2000) + ); } } } @@ -4622,7 +4772,9 @@ pub async fn update_agent_budget( if hourly.is_none() && daily.is_none() && monthly.is_none() { return ( StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": "Provide at least one of: max_cost_per_hour_usd, max_cost_per_day_usd, max_cost_per_month_usd"})), + Json( + serde_json::json!({"error": "Provide at least one of: max_cost_per_hour_usd, max_cost_per_day_usd, max_cost_per_month_usd"}), + ), ); } @@ -6125,10 +6277,7 @@ pub async fn set_agent_tools( .kernel .set_agent_tool_filters(agent_id, allowlist, blocklist) { - Ok(()) => ( - StatusCode::OK, - Json(serde_json::json!({"status": "ok"})), - ), + Ok(()) => (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("{e}")})), @@ -6600,8 +6749,7 @@ pub async fn set_provider_url( } // Probe reachability at the new URL - let probe = - openfang_runtime::provider_health::probe_provider(&name, &base_url).await; + let probe = openfang_runtime::provider_health::probe_provider(&name, &base_url).await; // Merge discovered models into catalog if !probe.discovered_models.is_empty() { @@ -7483,7 +7631,11 @@ pub async fn run_schedule( ); let kernel_handle: Arc = state.kernel.clone() as Arc; - match state.kernel.send_message_with_handle(target_agent, &run_message, Some(kernel_handle)).await { + match state + .kernel + .send_message_with_handle(target_agent, &run_message, Some(kernel_handle)) + .await + { Ok(result) => ( StatusCode::OK, Json(serde_json::json!({ @@ -7630,7 +7782,9 @@ pub async fn patch_agent_config( if name.len() > MAX_NAME_LEN { return ( StatusCode::PAYLOAD_TOO_LARGE, - Json(serde_json::json!({"error": format!("Name exceeds max length ({MAX_NAME_LEN} chars)")})), + Json( + serde_json::json!({"error": format!("Name exceeds max length ({MAX_NAME_LEN} chars)")}), + ), ); } } @@ -7638,7 +7792,9 @@ pub async fn patch_agent_config( if desc.len() > MAX_DESC_LEN { return ( StatusCode::PAYLOAD_TOO_LARGE, - Json(serde_json::json!({"error": format!("Description exceeds max length ({MAX_DESC_LEN} chars)")})), + Json( + serde_json::json!({"error": format!("Description exceeds max length ({MAX_DESC_LEN} chars)")}), + ), ); } } @@ -7646,7 +7802,9 @@ pub async fn patch_agent_config( if prompt.len() > MAX_PROMPT_LEN { return ( StatusCode::PAYLOAD_TOO_LARGE, - Json(serde_json::json!({"error": format!("System prompt exceeds max length ({MAX_PROMPT_LEN} chars)")})), + Json( + serde_json::json!({"error": format!("System prompt exceeds max length ({MAX_PROMPT_LEN} chars)")}), + ), ); } } @@ -8623,12 +8781,18 @@ pub async fn config_reload(State(state): State>) -> impl IntoRespo // --------------------------------------------------------------------------- /// GET /api/config/schema — Return a simplified JSON description of the config structure. -pub async fn config_schema( - State(state): State>, -) -> impl IntoResponse { +pub async fn config_schema(State(state): State>) -> impl IntoResponse { // Build provider/model options from model catalog for dropdowns - let catalog = state.kernel.model_catalog.read().unwrap_or_else(|e| e.into_inner()); - let provider_options: Vec = catalog.list_providers().iter().map(|p| p.id.clone()).collect(); + let catalog = state + .kernel + .model_catalog + .read() + .unwrap_or_else(|e| e.into_inner()); + let provider_options: Vec = catalog + .list_providers() + .iter() + .map(|p| p.id.clone()) + .collect(); let model_options: Vec = catalog .list_models() .iter() @@ -9481,8 +9645,7 @@ pub async fn copilot_oauth_start() -> impl IntoResponse { CopilotFlowState { device_code: resp.device_code, interval: resp.interval, - expires_at: Instant::now() - + std::time::Duration::from_secs(resp.expires_in), + expires_at: Instant::now() + std::time::Duration::from_secs(resp.expires_in), }, ); @@ -9546,7 +9709,9 @@ pub async fn copilot_oauth_poll( if let Err(e) = write_secret_env(&secrets_path, "GITHUB_TOKEN", &access_token) { return ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"status": "error", "error": format!("Failed to save token: {e}")})), + Json( + serde_json::json!({"status": "error", "error": format!("Failed to save token: {e}")}), + ), ); } @@ -9750,15 +9915,30 @@ fn audit_to_comms_event( // Format detail: "tokens_in=X, tokens_out=Y" → readable summary let detail = if entry.detail.starts_with("tokens_in=") { let parts: Vec<&str> = entry.detail.split(", ").collect(); - let in_tok = parts.first().and_then(|p| p.strip_prefix("tokens_in=")).unwrap_or("?"); - let out_tok = parts.get(1).and_then(|p| p.strip_prefix("tokens_out=")).unwrap_or("?"); + let in_tok = parts + .first() + .and_then(|p| p.strip_prefix("tokens_in=")) + .unwrap_or("?"); + let out_tok = parts + .get(1) + .and_then(|p| p.strip_prefix("tokens_out=")) + .unwrap_or("?"); if entry.outcome == "ok" { format!("{} in / {} out tokens", in_tok, out_tok) } else { - format!("{} in / {} out — {}", in_tok, out_tok, openfang_types::truncate_str(&entry.outcome, 80)) + format!( + "{} in / {} out — {}", + in_tok, + out_tok, + openfang_types::truncate_str(&entry.outcome, 80) + ) } } else if entry.outcome != "ok" { - format!("{} — {}", openfang_types::truncate_str(&entry.detail, 80), openfang_types::truncate_str(&entry.outcome, 80)) + format!( + "{} — {}", + openfang_types::truncate_str(&entry.detail, 80), + openfang_types::truncate_str(&entry.outcome, 80) + ) } else { openfang_types::truncate_str(&entry.detail, 200).to_string() }; @@ -9766,12 +9946,18 @@ fn audit_to_comms_event( } "AgentSpawn" => ( CommsEventKind::AgentSpawned, - format!("Agent spawned: {}", openfang_types::truncate_str(&entry.detail, 100)), + format!( + "Agent spawned: {}", + openfang_types::truncate_str(&entry.detail, 100) + ), "", ), "AgentKill" => ( CommsEventKind::AgentTerminated, - format!("Agent killed: {}", openfang_types::truncate_str(&entry.detail, 100)), + format!( + "Agent killed: {}", + openfang_types::truncate_str(&entry.detail, 100) + ), "", ), _ => return None, @@ -9783,8 +9969,16 @@ fn audit_to_comms_event( kind, source_id: entry.agent_id.clone(), source_name: resolve_name(&entry.agent_id), - target_id: if target_label.is_empty() { String::new() } else { target_label.to_string() }, - target_name: if target_label.is_empty() { String::new() } else { target_label.to_string() }, + target_id: if target_label.is_empty() { + String::new() + } else { + target_label.to_string() + }, + target_name: if target_label.is_empty() { + String::new() + } else { + target_label.to_string() + }, detail, }) } @@ -9835,9 +10029,7 @@ pub async fn comms_events( /// GET /api/comms/events/stream — SSE stream of inter-agent communication events. /// /// Polls the audit log every 500ms for new inter-agent events. -pub async fn comms_events_stream( - State(state): State>, -) -> axum::response::Response { +pub async fn comms_events_stream(State(state): State>) -> axum::response::Response { use axum::response::sse::{Event, KeepAlive, Sse}; let (tx, rx) = tokio::sync::mpsc::channel::< diff --git a/crates/openfang-channels/Cargo.toml b/crates/openfang-channels/Cargo.toml index 58f0a5df5..1997d8c7c 100644 --- a/crates/openfang-channels/Cargo.toml +++ b/crates/openfang-channels/Cargo.toml @@ -14,6 +14,7 @@ chrono = { workspace = true } dashmap = { workspace = true } async-trait = { workspace = true } futures = { workspace = true } +prost = { workspace = true } reqwest = { workspace = true } tokio-stream = { workspace = true } tracing = { workspace = true } diff --git a/crates/openfang-channels/src/feishu.rs b/crates/openfang-channels/src/feishu.rs index 5d4263d3e..c5d0a7959 100644 --- a/crates/openfang-channels/src/feishu.rs +++ b/crates/openfang-channels/src/feishu.rs @@ -14,6 +14,7 @@ use crate::types::{ use async_trait::async_trait; use chrono::Utc; use futures::{SinkExt, Stream, StreamExt}; +use prost::Message as ProstMessage; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -21,6 +22,7 @@ use std::time::{Duration, Instant}; use tokio::sync::{mpsc, watch, RwLock}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; use tracing::{debug, error, info, warn}; +use url::Url; use zeroize::Zeroizing; /// Feishu tenant access token endpoint. @@ -33,12 +35,57 @@ const FEISHU_SEND_URL: &str = "https://open.feishu.cn/open-apis/im/v1/messages"; /// Feishu bot info endpoint. const FEISHU_BOT_INFO_URL: &str = "https://open.feishu.cn/open-apis/bot/v3/info"; +/// Feishu websocket endpoint discovery API. +const FEISHU_WS_ENDPOINT_URL: &str = "https://open.feishu.cn/callback/ws/endpoint"; + /// Maximum Feishu message text length (characters). const MAX_MESSAGE_LEN: usize = 4096; /// Token refresh buffer — refresh 5 minutes before actual expiry. const TOKEN_REFRESH_BUFFER_SECS: u64 = 300; +const INITIAL_BACKOFF: Duration = Duration::from_secs(1); +const MAX_BACKOFF: Duration = Duration::from_secs(60); +const DEFAULT_WS_PING_INTERVAL_SECS: u64 = 30; + +/// Feishu websocket frame header. +#[derive(Clone, PartialEq, ::prost::Message)] +struct FeishuWsHeader { + #[prost(string, tag = "1")] + key: String, + #[prost(string, tag = "2")] + value: String, +} + +/// Feishu websocket frame (pbbp2.proto compatible). +#[derive(Clone, PartialEq, ::prost::Message)] +struct FeishuWsFrame { + #[prost(uint64, tag = "1")] + seq_id: u64, + #[prost(uint64, tag = "2")] + log_id: u64, + #[prost(int32, tag = "3")] + service: i32, + #[prost(int32, tag = "4")] + method: i32, + #[prost(message, repeated, tag = "5")] + headers: Vec, + #[prost(string, optional, tag = "6")] + payload_encoding: Option, + #[prost(string, optional, tag = "7")] + payload_type: Option, + #[prost(bytes, optional, tag = "8")] + payload: Option>, + #[prost(string, optional, tag = "9")] + log_id_new: Option, +} + +#[derive(Debug, Clone)] +struct FeishuWsEndpoint { + url: String, + ping_interval_secs: u64, +} + /// Feishu connection mode. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FeishuConnectionMode { @@ -263,7 +310,10 @@ impl FeishuAdapter { } /// Start webhook server (Webhook mode). - async fn start_webhook(&self, tx: mpsc::Sender) -> Result<(), Box> { + async fn start_webhook( + &self, + tx: mpsc::Sender, + ) -> Result<(), Box> { let port = self.webhook_port; let verification_token = self.verification_token.clone(); let mut shutdown_rx = self.shutdown_rx.clone(); @@ -401,111 +451,192 @@ impl FeishuAdapter { } /// Start WebSocket connection loop (WebSocket mode). - async fn start_websocket_loop(&self, tx: mpsc::Sender) -> Result<(), Box> { - let self_arc = Arc::new(self.clone_adapter()); - + async fn start_websocket_loop( + &self, + tx: mpsc::Sender, + ) -> Result<(), Box> { + let adapter = Arc::new(self.clone_adapter()); + tokio::spawn(async move { info!("Starting Feishu WebSocket mode"); - let mut backoff = Duration::from_secs(1); - let max_backoff = Duration::from_secs(60); - + let mut shutdown_rx = adapter.shutdown_rx.clone(); + let mut backoff = INITIAL_BACKOFF; + loop { - match Self::run_websocket_inner(self_arc.clone(), tx.clone()).await { - Ok(_) => { - info!("Feishu WebSocket connection closed, reconnecting..."); - } - Err(e) => { - error!("Feishu WebSocket error: {e}, reconnecting in {backoff:?}"); + if *shutdown_rx.borrow() { + break; + } + + if let Err(e) = Self::run_websocket_inner(adapter.clone(), tx.clone()).await { + error!("Feishu WebSocket error: {e}"); + } else { + info!("Feishu WebSocket connection closed"); + } + + if *shutdown_rx.borrow() { + break; + } + + warn!("Feishu WebSocket reconnecting in {backoff:?}"); + tokio::select! { + _ = tokio::time::sleep(backoff) => {} + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + break; + } } } - - tokio::time::sleep(backoff).await; - backoff = std::cmp::min(backoff * 2, max_backoff); + + backoff = (backoff * 2).min(MAX_BACKOFF); } + + info!("Feishu WebSocket loop stopped"); }); Ok(()) } - /// Clone adapter for use in async tasks. fn clone_adapter(&self) -> FeishuAdapterClone { FeishuAdapterClone { app_id: self.app_id.clone(), app_secret: self.app_secret.clone(), client: self.client.clone(), - cached_token: self.cached_token.clone(), shutdown_rx: self.shutdown_rx.clone(), } } - /// Run WebSocket connection loop (inner implementation). async fn run_websocket_inner( adapter: Arc, tx: mpsc::Sender, ) -> Result<(), Box> { - let ws_url = adapter.get_websocket_endpoint().await?; - info!("Connecting to Feishu WebSocket endpoint: {ws_url}"); + let endpoint = adapter.get_websocket_endpoint().await?; + let ws_url = endpoint.url; + let service_id = parse_service_id(&ws_url); - let (ws_stream, _) = connect_async(ws_url).await?; + info!("Connecting to Feishu WebSocket endpoint: {ws_url}"); + let (ws_stream, _) = connect_async(&ws_url).await?; info!("Feishu WebSocket connected successfully"); let (mut write, mut read) = ws_stream.split(); let mut shutdown_rx = adapter.shutdown_rx.clone(); + let mut ping_interval = + tokio::time::interval(Duration::from_secs(endpoint.ping_interval_secs)); + // consume first immediate tick + ping_interval.tick().await; + + let mut frame_parts: HashMap>> = HashMap::new(); loop { tokio::select! { msg = read.next() => { match msg { - Some(Ok(Message::Text(text))) => { - debug!("Received Feishu WebSocket message: {text}"); - if let Ok(event) = serde_json::from_str::(&text) { - if let Some(msg) = parse_feishu_event(&event) { - let _ = tx.send(msg).await; - } - } - } Some(Ok(Message::Binary(data))) => { - debug!("Received Feishu WebSocket binary message: {} bytes", data.len()); - if let Ok(text) = String::from_utf8(data) { - if let Ok(event) = serde_json::from_str::(&text) { - if let Some(msg) = parse_feishu_event(&event) { - let _ = tx.send(msg).await; + let frame = match FeishuWsFrame::decode(data.as_slice()) { + Ok(f) => f, + Err(e) => { + warn!("Feishu WS decode frame failed: {e}"); + continue; + } + }; + + match frame.method { + 0 => { + if let Some(new_interval) = parse_pong_interval(&frame) { + if new_interval > 0 { + debug!("Feishu WS update ping interval to {}s", new_interval); + ping_interval = tokio::time::interval(Duration::from_secs(new_interval)); + ping_interval.tick().await; + } } } + 1 => { + Self::handle_data_frame(frame, &mut write, &tx, &mut frame_parts).await?; + } + method => { + debug!("Feishu WS unhandled frame method: {method}"); + } } } - Some(Ok(Message::Close(_))) => { - info!("Feishu WebSocket connection closed by server"); + Some(Ok(Message::Text(text))) => { + // Feishu WS should be binary protobuf frames; keep this for diagnostics. + debug!("Feishu WS unexpected text message: {text}"); + } + Some(Ok(Message::Close(frame))) => { + info!("Feishu WebSocket closed by server: {frame:?}"); break; } - Some(Ok(Message::Ping(_))) => { - debug!("Received Feishu WebSocket ping, sending pong"); - let _ = write.send(Message::Pong(Vec::new())).await; + Some(Ok(Message::Ping(payload))) => { + let _ = write.send(Message::Pong(payload)).await; } Some(Ok(Message::Pong(_))) => { - debug!("Received Feishu WebSocket pong"); - } - Some(Ok(_)) => { - debug!("Received unhandled Feishu WebSocket message type"); - } - Some(Err(e)) => { - error!("Feishu WebSocket error: {e}"); - break; - } - None => { - info!("Feishu WebSocket stream ended"); - break; + debug!("Feishu WebSocket pong"); } + Some(Ok(_)) => {} + Some(Err(e)) => return Err(format!("Feishu WebSocket stream error: {e}").into()), + None => break, } } + _ = ping_interval.tick() => { + let ping_frame = build_ping_frame(service_id); + write.send(Message::Binary(ping_frame.encode_to_vec())).await?; + } _ = shutdown_rx.changed() => { - info!("Feishu WebSocket shutting down"); - let _ = write.close().await; - break; + if *shutdown_rx.borrow() { + info!("Feishu WebSocket shutting down"); + let _ = write.close().await; + break; + } + } + } + } + + Ok(()) + } + + async fn handle_data_frame( + mut frame: FeishuWsFrame, + write: &mut S, + tx: &mpsc::Sender, + frame_parts: &mut HashMap>>, + ) -> Result<(), Box> + where + S: SinkExt + Unpin, + >::Error: std::error::Error + Send + Sync + 'static, + { + let frame_type = ws_header(&frame.headers, "type").unwrap_or_default(); + if frame_type != "event" { + return Ok(()); + } + + let payload = match frame.payload.take() { + Some(p) => p, + None => return Ok(()), + }; + + let payload = match combine_payload(&frame.headers, payload, frame_parts) { + Some(p) => p, + None => return Ok(()), + }; + + let mut code = 200; + match serde_json::from_slice::(&payload) { + Ok(event) => { + if let Some(msg) = parse_feishu_event(&event) { + if tx.send(msg).await.is_err() { + return Ok(()); + } } } + Err(e) => { + warn!("Feishu WS event payload parse failed: {e}"); + code = 500; + } } + let ack_frame = build_ack_frame(&frame, code); + write + .send(Message::Binary(ack_frame.encode_to_vec())) + .await?; Ok(()) } } @@ -515,91 +646,175 @@ struct FeishuAdapterClone { app_id: String, app_secret: Zeroizing, client: reqwest::Client, - cached_token: Arc>>, shutdown_rx: watch::Receiver, } impl FeishuAdapterClone { - /// Get a valid tenant access token, refreshing if expired or missing. - async fn get_token(&self) -> Result> { - { - let guard = self.cached_token.read().await; - if let Some((ref token, expiry)) = *guard { - if Instant::now() < expiry { - return Ok(token.clone()); - } - } - } - - let body = serde_json::json!({ - "app_id": self.app_id, - "app_secret": self.app_secret.as_str(), - }); - + /// Get WebSocket endpoint from Feishu API. + async fn get_websocket_endpoint(&self) -> Result> { let resp = self .client - .post(FEISHU_TOKEN_URL) - .json(&body) + .post(FEISHU_WS_ENDPOINT_URL) + .json(&serde_json::json!({ + "AppID": self.app_id, + "AppSecret": self.app_secret.as_str(), + })) .send() .await?; if !resp.status().is_success() { let status = resp.status(); let resp_body = resp.text().await.unwrap_or_default(); - return Err(format!("Feishu token request failed {status}: {resp_body}").into()); + return Err( + format!("Feishu WebSocket endpoint request failed {status}: {resp_body}").into(), + ); } let resp_body: serde_json::Value = resp.json().await?; - let code = resp_body["code"].as_i64().unwrap_or(-1); - if code != 0 { - let msg = resp_body["msg"].as_str().unwrap_or("unknown error"); - return Err(format!("Feishu token error: {msg}").into()); - } + parse_ws_endpoint_response(&resp_body) + } +} - let tenant_access_token = resp_body["tenant_access_token"] - .as_str() - .ok_or("Missing tenant_access_token")? - .to_string(); - let expire = resp_body["expire"].as_u64().unwrap_or(7200); +fn parse_ws_endpoint_response( + resp_body: &serde_json::Value, +) -> Result> { + let code = resp_body["code"].as_i64().unwrap_or(-1); + if code != 0 { + let msg = resp_body["msg"].as_str().unwrap_or("unknown error"); + return Err(format!("Feishu WebSocket endpoint error: {msg}").into()); + } - let expiry = - Instant::now() + Duration::from_secs(expire.saturating_sub(TOKEN_REFRESH_BUFFER_SECS)); - *self.cached_token.write().await = Some((tenant_access_token.clone(), expiry)); + let data = &resp_body["data"]; + let ws_url = data + .get("url") + .or_else(|| data.get("URL")) + .and_then(|v| v.as_str()) + .ok_or("Missing WebSocket URL in response")? + .to_string(); - Ok(tenant_access_token) + let ping_interval = data + .get("client_config") + .or_else(|| data.get("ClientConfig")) + .and_then(|cfg| cfg.get("ping_interval").or_else(|| cfg.get("PingInterval"))) + .and_then(|v| v.as_u64()) + .filter(|v| *v > 0) + .unwrap_or(DEFAULT_WS_PING_INTERVAL_SECS); + + Ok(FeishuWsEndpoint { + url: ws_url, + ping_interval_secs: ping_interval, + }) +} + +fn parse_service_id(ws_url: &str) -> i32 { + Url::parse(ws_url) + .ok() + .and_then(|u| { + u.query_pairs() + .find(|(k, _)| k == "service_id") + .and_then(|(_, v)| v.parse::().ok()) + }) + .unwrap_or(0) +} + +fn ws_header(headers: &[FeishuWsHeader], key: &str) -> Option { + headers + .iter() + .find(|h| h.key == key) + .map(|h| h.value.clone()) +} + +fn parse_pong_interval(frame: &FeishuWsFrame) -> Option { + let frame_type = ws_header(&frame.headers, "type")?; + if frame_type != "pong" { + return None; } - /// Get WebSocket endpoint from Feishu API. - async fn get_websocket_endpoint(&self) -> Result> { - let token = self.get_token().await?; - let url = "https://open.feishu.cn/open-apis/ws/v1/endpoint"; - - let resp = self - .client - .get(url) - .bearer_auth(&token) - .send() - .await?; + let payload = frame.payload.as_ref()?; + let value: serde_json::Value = serde_json::from_slice(payload).ok()?; + value + .get("ping_interval") + .or_else(|| value.get("PingInterval")) + .and_then(|v| v.as_u64()) +} - if !resp.status().is_success() { - let status = resp.status(); - let resp_body = resp.text().await.unwrap_or_default(); - return Err(format!("Feishu WebSocket endpoint request failed {status}: {resp_body}").into()); - } +fn combine_payload( + headers: &[FeishuWsHeader], + payload: Vec, + frame_parts: &mut HashMap>>, +) -> Option> { + let sum = ws_header(headers, "sum") + .and_then(|v| v.parse::().ok()) + .unwrap_or(1); + if sum <= 1 { + return Some(payload); + } - let resp_body: serde_json::Value = resp.json().await?; - let code = resp_body["code"].as_i64().unwrap_or(-1); - if code != 0 { - let msg = resp_body["msg"].as_str().unwrap_or("unknown error"); - return Err(format!("Feishu WebSocket endpoint error: {msg}").into()); - } + let seq = ws_header(headers, "seq") + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + let msg_id = ws_header(headers, "message_id")?; - let ws_url = resp_body["data"]["url"] - .as_str() - .ok_or("Missing WebSocket URL in response")? - .to_string(); + if seq >= sum { + return None; + } - Ok(ws_url) + let entry = frame_parts + .entry(msg_id.clone()) + .or_insert_with(|| vec![Vec::new(); sum]); + + if entry.len() != sum { + *entry = vec![Vec::new(); sum]; + } + + entry[seq] = payload; + + if entry.iter().any(|part| part.is_empty()) { + return None; + } + + let mut combined = Vec::new(); + for part in entry.iter() { + combined.extend_from_slice(part); + } + frame_parts.remove(&msg_id); + Some(combined) +} + +fn build_ping_frame(service_id: i32) -> FeishuWsFrame { + FeishuWsFrame { + seq_id: 0, + log_id: 0, + service: service_id, + method: 0, + headers: vec![FeishuWsHeader { + key: "type".to_string(), + value: "ping".to_string(), + }], + payload_encoding: None, + payload_type: None, + payload: None, + log_id_new: None, + } +} + +fn build_ack_frame(request: &FeishuWsFrame, code: u16) -> FeishuWsFrame { + let payload = serde_json::json!({ + "code": code, + "headers": {}, + "data": [] + }); + + FeishuWsFrame { + seq_id: request.seq_id, + log_id: request.log_id, + service: request.service, + method: request.method, + headers: request.headers.clone(), + payload_encoding: None, + payload_type: None, + payload: Some(serde_json::to_vec(&payload).unwrap_or_default()), + log_id_new: request.log_id_new.clone(), } } @@ -722,12 +937,8 @@ impl ChannelAdapter for FeishuAdapter { let (tx, rx) = mpsc::channel::(256); match self.connection_mode { - FeishuConnectionMode::Webhook => { - self.start_webhook(tx).await?; - } - FeishuConnectionMode::WebSocket => { - self.start_websocket_loop(tx).await?; - } + FeishuConnectionMode::Webhook => self.start_webhook(tx).await?, + FeishuConnectionMode::WebSocket => self.start_websocket_loop(tx).await?, } Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))) @@ -765,6 +976,13 @@ impl ChannelAdapter for FeishuAdapter { mod tests { use super::*; + fn header(key: &str, value: &str) -> FeishuWsHeader { + FeishuWsHeader { + key: key.to_string(), + value: value.to_string(), + } + } + #[test] fn test_feishu_adapter_creation() { let adapter = @@ -780,10 +998,8 @@ mod tests { #[test] fn test_feishu_websocket_adapter_creation() { - let adapter = FeishuAdapter::new_websocket( - "cli_abc123".to_string(), - "app-secret-456".to_string(), - ); + let adapter = + FeishuAdapter::new_websocket("cli_abc123".to_string(), "app-secret-456".to_string()); assert_eq!(adapter.name(), "feishu"); assert_eq!( adapter.channel_type(), @@ -811,6 +1027,69 @@ mod tests { assert_eq!(adapter.app_id, "cli_test"); } + #[test] + fn test_parse_ws_endpoint_response_lowercase() { + let body = serde_json::json!({ + "code": 0, + "msg": "ok", + "data": { + "url": "wss://example/ws?service_id=123", + "client_config": { + "ping_interval": 42 + } + } + }); + + let endpoint = parse_ws_endpoint_response(&body).unwrap(); + assert_eq!(endpoint.url, "wss://example/ws?service_id=123"); + assert_eq!(endpoint.ping_interval_secs, 42); + } + + #[test] + fn test_parse_ws_endpoint_response_uppercase() { + let body = serde_json::json!({ + "code": 0, + "msg": "ok", + "data": { + "URL": "wss://example/ws?service_id=321", + "ClientConfig": { + "PingInterval": 24 + } + } + }); + + let endpoint = parse_ws_endpoint_response(&body).unwrap(); + assert_eq!(endpoint.url, "wss://example/ws?service_id=321"); + assert_eq!(endpoint.ping_interval_secs, 24); + } + + #[test] + fn test_combine_payload_multi_package() { + let mut frame_parts = HashMap::new(); + + let headers_1 = vec![ + header("message_id", "msg-1"), + header("sum", "2"), + header("seq", "0"), + ]; + let headers_2 = vec![ + header("message_id", "msg-1"), + header("sum", "2"), + header("seq", "1"), + ]; + + let r1 = combine_payload(&headers_1, b"Hello ".to_vec(), &mut frame_parts); + assert!(r1.is_none()); + let r2 = combine_payload(&headers_2, b"World".to_vec(), &mut frame_parts).unwrap(); + assert_eq!(r2, b"Hello World".to_vec()); + } + + #[test] + fn test_parse_service_id() { + assert_eq!(parse_service_id("wss://foo/bar?service_id=123"), 123); + assert_eq!(parse_service_id("wss://foo/bar"), 0); + } + #[test] fn test_parse_feishu_event_v2_text() { let event = serde_json::json!({ diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index a5455d79a..69a129cfa 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -2193,6 +2193,17 @@ impl Default for BlueskyConfig { } } +/// Feishu inbound event receive mode. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum FeishuMode { + /// Receive events via HTTP webhook callback. + #[default] + Webhook, + /// Receive events via WebSocket long connection. + Websocket, +} + /// Feishu/Lark Open Platform channel adapter configuration. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] @@ -2201,6 +2212,8 @@ pub struct FeishuConfig { pub app_id: String, /// Env var name holding the app secret. pub app_secret_env: String, + /// Inbound receive mode (`webhook` or `websocket`). + pub mode: FeishuMode, /// Port for the incoming webhook. pub webhook_port: u16, /// Default agent name to route messages to. @@ -2215,6 +2228,7 @@ impl Default for FeishuConfig { Self { app_id: String::new(), app_secret_env: "FEISHU_APP_SECRET".to_string(), + mode: FeishuMode::Webhook, webhook_port: 8453, default_agent: None, overrides: ChannelOverrides::default(), @@ -3583,4 +3597,29 @@ mod tests { assert_eq!(config.web.fetch.max_response_bytes, fetch_bytes); assert_eq!(config.web.fetch.timeout_secs, fetch_timeout); } + + #[test] + fn test_feishu_mode_defaults_to_webhook() { + let toml_str = r#" + [channels.feishu] + app_id = "cli_test" + app_secret_env = "FEISHU_APP_SECRET" + "#; + let config: KernelConfig = toml::from_str(toml_str).unwrap(); + let feishu = config.channels.feishu.unwrap(); + assert_eq!(feishu.mode, FeishuMode::Webhook); + } + + #[test] + fn test_feishu_mode_parses_websocket() { + let toml_str = r#" + [channels.feishu] + app_id = "cli_test" + app_secret_env = "FEISHU_APP_SECRET" + mode = "websocket" + "#; + let config: KernelConfig = toml::from_str(toml_str).unwrap(); + let feishu = config.channels.feishu.unwrap(); + assert_eq!(feishu.mode, FeishuMode::Websocket); + } } From f9b87ae921e26ff4fcc74441a068868fd94ce7bc Mon Sep 17 00:00:00 2001 From: xinuxZ Date: Wed, 4 Mar 2026 18:20:14 +0800 Subject: [PATCH 3/3] feat(feishu): default receive mode to websocket --- crates/openfang-api/src/channel_bridge.rs | 4 +- crates/openfang-api/src/routes.rs | 6 +-- crates/openfang-types/src/config.rs | 8 ++-- docs/channel-adapters.md | 57 ++++++++++++++++++++++- 4 files changed, 65 insertions(+), 10 deletions(-) diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index 71a6935a0..8e0aec066 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -1703,7 +1703,7 @@ mod tests { } #[test] - fn test_feishu_bridge_mode_defaults_to_webhook() { + fn test_feishu_bridge_mode_defaults_to_websocket() { let config: openfang_types::config::KernelConfig = toml::from_str( r#" [channels.feishu] @@ -1714,7 +1714,7 @@ mod tests { .unwrap(); let feishu = config.channels.feishu.expect("feishu config should exist"); - assert_eq!(feishu.mode, openfang_types::config::FeishuMode::Webhook); + assert_eq!(feishu.mode, openfang_types::config::FeishuMode::Websocket); } #[test] diff --git a/crates/openfang-api/src/routes.rs b/crates/openfang-api/src/routes.rs index 88f55ab03..d13922611 100644 --- a/crates/openfang-api/src/routes.rs +++ b/crates/openfang-api/src/routes.rs @@ -1477,7 +1477,7 @@ const CHANNEL_REGISTRY: &[ChannelMeta] = &[ ChannelField { key: "default_agent", label: "Default Agent", field_type: FieldType::Text, env_var: None, required: false, placeholder: "assistant", advanced: true }, ], setup_steps: &["Create an app at open.feishu.cn", "Copy App ID and Secret", "Paste them below"], - config_template: "[channels.feishu]\napp_id = \"\"\napp_secret_env = \"FEISHU_APP_SECRET\"\nmode = \"webhook\"", + config_template: "[channels.feishu]\napp_id = \"\"\napp_secret_env = \"FEISHU_APP_SECRET\"\nmode = \"websocket\"", }, ChannelMeta { name: "dingtalk", display_name: "DingTalk", icon: "DT", @@ -1874,9 +1874,9 @@ mod channel_meta_tests { } #[test] - fn feishu_channel_meta_template_includes_webhook_mode_default() { + fn feishu_channel_meta_template_includes_websocket_mode_default() { let meta = find_channel_meta("feishu").expect("feishu channel meta should exist"); - assert!(meta.config_template.contains("mode = \"webhook\"")); + assert!(meta.config_template.contains("mode = \"websocket\"")); } } diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index 69a129cfa..df3969c2b 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -2198,9 +2198,9 @@ impl Default for BlueskyConfig { #[serde(rename_all = "lowercase")] pub enum FeishuMode { /// Receive events via HTTP webhook callback. - #[default] Webhook, /// Receive events via WebSocket long connection. + #[default] Websocket, } @@ -2228,7 +2228,7 @@ impl Default for FeishuConfig { Self { app_id: String::new(), app_secret_env: "FEISHU_APP_SECRET".to_string(), - mode: FeishuMode::Webhook, + mode: FeishuMode::Websocket, webhook_port: 8453, default_agent: None, overrides: ChannelOverrides::default(), @@ -3599,7 +3599,7 @@ mod tests { } #[test] - fn test_feishu_mode_defaults_to_webhook() { + fn test_feishu_mode_defaults_to_websocket() { let toml_str = r#" [channels.feishu] app_id = "cli_test" @@ -3607,7 +3607,7 @@ mod tests { "#; let config: KernelConfig = toml::from_str(toml_str).unwrap(); let feishu = config.channels.feishu.unwrap(); - assert_eq!(feishu.mode, FeishuMode::Webhook); + assert_eq!(feishu.mode, FeishuMode::Websocket); } #[test] diff --git a/docs/channel-adapters.md b/docs/channel-adapters.md index 850625ab4..cd44652ea 100644 --- a/docs/channel-adapters.md +++ b/docs/channel-adapters.md @@ -14,6 +14,7 @@ All adapters share a common foundation: graceful shutdown via `watch::channel`, - [Discord](#discord) - [Slack](#slack) - [WhatsApp](#whatsapp) +- [Feishu / Lark](#feishu--lark) - [Signal](#signal) - [Matrix](#matrix) - [Email](#email) @@ -45,7 +46,7 @@ All adapters share a common foundation: graceful shutdown via `watch::channel`, | Mattermost | WebSocket + REST v4 | `MATTERMOST_TOKEN`, `MATTERMOST_URL` | `Mattermost` | | Google Chat | Service account webhook | `GOOGLE_CHAT_SA_KEY`, `GOOGLE_CHAT_SPACE` | `Custom("google_chat")` | | Webex | Bot SDK WebSocket | `WEBEX_BOT_TOKEN` | `Custom("webex")` | -| Feishu / Lark | Open Platform webhook | `FEISHU_APP_ID`, `FEISHU_APP_SECRET` | `Custom("feishu")` | +| Feishu / Lark | Open Platform Webhook / WebSocket | `FEISHU_APP_ID`, `FEISHU_APP_SECRET` | `Custom("feishu")` | | Rocket.Chat | REST polling | `ROCKETCHAT_TOKEN`, `ROCKETCHAT_URL` | `Custom("rocketchat")` | | Zulip | Event queue long-polling | `ZULIP_EMAIL`, `ZULIP_API_KEY`, `ZULIP_URL` | `Custom("zulip")` | | XMPP | XMPP protocol (stub) | `XMPP_JID`, `XMPP_PASSWORD`, `XMPP_SERVER` | `Custom("xmpp")` | @@ -433,6 +434,60 @@ The WhatsApp adapter runs an HTTP server (on the configured `webhook_port`) that --- +## Feishu / Lark + +### Prerequisites + +- A Feishu/Lark app created in [open.feishu.cn](https://open.feishu.cn/) +- App ID and App Secret + +### Setup + +1. Create a custom app in Feishu Open Platform. +2. Enable the IM message event subscription for your app. +3. Set environment variable: + +```bash +export FEISHU_APP_SECRET=cli_xxx_secret +``` + +4. Add to config (default: `websocket` mode): + +```toml +[channels.feishu] +app_id = "cli_xxx" +app_secret_env = "FEISHU_APP_SECRET" +mode = "websocket" +default_agent = "assistant" +``` + +5. Restart the daemon. + +### Webhook Compatibility Mode + +If you need the legacy callback flow, switch to `webhook` and expose a public callback URL: + +```toml +[channels.feishu] +app_id = "cli_xxx" +app_secret_env = "FEISHU_APP_SECRET" +mode = "webhook" +webhook_port = 8453 +default_agent = "assistant" +``` + +Then configure Feishu event callback to: + +`https://:8453/feishu/webhook` + +### How It Works + +- **websocket mode**: OpenFang obtains endpoint from Feishu and receives events via long connection (no public inbound webhook needed). +- **webhook mode**: OpenFang starts an HTTP callback server and receives Feishu push events. +- **send path (both modes)**: outbound messages still go through Feishu OpenAPI HTTP `im/v1/messages`. + +--- + ## Signal ### Prerequisites