From 6b322c9d7dd01ab950c4f8191409db42522af14d Mon Sep 17 00:00:00 2001 From: OpenFang Developers Date: Wed, 4 Mar 2026 21:56:17 +0800 Subject: [PATCH 1/3] feat: Add Feishu WebSocket long connection support This commit implements WebSocket long connection mode for Feishu/Lark integration, allowing OpenFang to receive messages from local environments without requiring public IP or port forwarding. ## Key Changes ### Core Implementation - **feishu.rs**: Add WebSocket connection support with protobuf message handling - Implement Feishu WebSocket protocol (wss://msg-frontier.feishu.cn/ws/v2) - Add manual protobuf encoding/decoding (no external proto files) - Implement ping/pong keepalive mechanism - Add message fragmentation handling for large payloads - Support both webhook and WebSocket connection modes ### Configuration - **config.rs**: Add `FeishuConnectionMode` enum (websocket/webhook) - **Cargo.toml**: Add required dependencies (tokio-tungstenite, prost, flate2) - **channel_bridge.rs**: Update to support new connection mode parameter ### Bug Fixes - Fix `receive_id` error by using `chat_id` instead of `sender_id` - Change log levels from `trace!` to `info!` for better visibility ### Documentation - Add FEISHU_WEBSOCKET_README.md with setup and usage instructions ## Technical Details - Protocol: Custom protobuf over WebSocket - Authentication: Dynamic ticket-based handshake - Compression: Gzip for message payloads - Frame: SeqID, LogID, service, method, headers, payload Resolves: #S143 Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 97 ++- Cargo.toml | 9 +- FEISHU_WEBSOCKET_README.md | 151 ++++ crates/openfang-api/src/channel_bridge.rs | 2 +- crates/openfang-channels/Cargo.toml | 4 + crates/openfang-channels/src/feishu.rs | 998 +++++++++++++++++++--- crates/openfang-types/src/config.rs | 22 +- 7 files changed, 1130 insertions(+), 153 deletions(-) create mode 100644 FEISHU_WEBSOCKET_README.md diff --git a/Cargo.lock b/Cargo.lock index 5c609179a..7ebd971e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2929,6 +2929,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -3211,7 +3220,6 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f" dependencies = [ - "cc", "pkg-config", "vcpkg", ] @@ -3866,7 +3874,7 @@ dependencies = [ [[package]] name = "openfang-api" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "axum", @@ -3902,13 +3910,15 @@ dependencies = [ [[package]] name = "openfang-channels" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "axum", "base64 0.22.1", + "bytes", "chrono", "dashmap", + "flate2", "futures", "hex", "hmac", @@ -3917,6 +3927,8 @@ dependencies = [ "mailparse", "native-tls", "openfang-types", + "prost 0.13.5", + "prost-types", "reqwest 0.12.28", "serde", "serde_json", @@ -3933,7 +3945,7 @@ dependencies = [ [[package]] name = "openfang-cli" -version = "0.3.11" +version = "0.3.12" dependencies = [ "clap", "clap_complete", @@ -3960,7 +3972,7 @@ dependencies = [ [[package]] name = "openfang-desktop" -version = "0.3.11" +version = "0.3.12" dependencies = [ "axum", "open", @@ -3986,7 +3998,7 @@ dependencies = [ [[package]] name = "openfang-extensions" -version = "0.3.11" +version = "0.3.12" dependencies = [ "aes-gcm", "argon2", @@ -4014,7 +4026,7 @@ dependencies = [ [[package]] name = "openfang-hands" -version = "0.3.11" +version = "0.3.12" dependencies = [ "chrono", "dashmap", @@ -4031,7 +4043,7 @@ dependencies = [ [[package]] name = "openfang-kernel" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "chrono", @@ -4067,7 +4079,7 @@ dependencies = [ [[package]] name = "openfang-memory" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "chrono", @@ -4086,7 +4098,7 @@ dependencies = [ [[package]] name = "openfang-migrate" -version = "0.3.11" +version = "0.3.12" dependencies = [ "chrono", "dirs 6.0.0", @@ -4105,7 +4117,7 @@ dependencies = [ [[package]] name = "openfang-runtime" -version = "0.3.11" +version = "0.3.12" dependencies = [ "anyhow", "async-trait", @@ -4137,7 +4149,7 @@ dependencies = [ [[package]] name = "openfang-skills" -version = "0.3.11" +version = "0.3.12" dependencies = [ "chrono", "hex", @@ -4160,7 +4172,7 @@ dependencies = [ [[package]] name = "openfang-types" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "chrono", @@ -4179,7 +4191,7 @@ dependencies = [ [[package]] name = "openfang-wire" -version = "0.3.11" +version = "0.3.12" dependencies = [ "async-trait", "chrono", @@ -4782,6 +4794,61 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive 0.12.6", +] + +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive 0.13.5", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "prost-types" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +dependencies = [ + "prost 0.12.6", +] + [[package]] name = "psm" version = "0.1.30" @@ -8791,7 +8858,7 @@ checksum = "b9cc00251562a284751c9973bace760d86c0276c471b4be569fe6b068ee97a56" [[package]] name = "xtask" -version = "0.3.11" +version = "0.3.12" [[package]] name = "yoke" diff --git a/Cargo.toml b/Cargo.toml index 5d8b0a6c0..ca8bd841a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ chrono = { version = "0.4", features = ["serde"] } uuid = { version = "1", features = ["v4", "serde"] } # Database -rusqlite = { version = "0.31", features = ["bundled", "serde_json"] } +rusqlite = { version = "0.31", features = ["serde_json"] } # CLI clap = { version = "4", features = ["derive"] } @@ -79,6 +79,10 @@ futures = "0.3" tokio-tungstenite = { version = "0.24", default-features = false, features = ["connect", "rustls-tls-native-roots"] } url = "2" +# Protocol Buffers (for Feishu WebSocket) +prost = "0.13" +prost-types = "0.12" + # WASM sandbox wasmtime = "41" @@ -131,6 +135,9 @@ imap = "2" native-tls = "0.2" mailparse = "0.15" +# Compression +flate2 = "1" + # Testing tokio-test = "0.4" tempfile = "3" diff --git a/FEISHU_WEBSOCKET_README.md b/FEISHU_WEBSOCKET_README.md new file mode 100644 index 000000000..d32a3a0ea --- /dev/null +++ b/FEISHU_WEBSOCKET_README.md @@ -0,0 +1,151 @@ +# Feishu WebSocket 长连接实现说明 + +## 实现概述 + +OpenFang 现已支持飞书 WebSocket 长连接模式,无需公网 IP 或内网穿透工具即可在本地环境中接收飞书事件。 + +## 配置方式 + +### 1. 配置文件 (`~/.openfang/config.toml`) + +```toml +[channels.feishu] +app_id = "cli_a922bea3c6785cc8" +app_secret_env = "FEISHU_APP_SECRET" +connection_mode = "websocket" # 改为 websocket +webhook_port = 8453 # WebSocket 模式下不需要 +default_agent = "assistant" +``` + +### 2. 支持的连接模式 + +| 模式 | 说明 | 需求 | +|------|------|------| +| `webhook` | HTTP Webhook 回调(默认) | 公网 IP 或内网穿透 | +| `websocket` | WebSocket 长连接 | 仅需出网访问能力 | + +## 安装步骤 + +```bash +# 1. 编译已完成,位于 target/release/openfang-cli + +# 2. 安装到系统 +sudo cp target/release/openfang-cli /usr/local/bin/openfang +sudo chmod +x /usr/local/bin/openfang + +# 3. 停止旧版本(如果运行中) +openfang stop + +# 4. 启动新版本 +openfang start + +# 5. 查看日志确认 WebSocket 连接状态 +openfang logs +``` + +## 验证方法 + +### 方法1:检查日志 + +```bash +openfang logs +``` + +期望看到: +``` +Feishu adapter authenticated as [bot名称] +Feishu: Using WebSocket long connection mode +Feishu: Connecting to WebSocket at wss://open.feishu.cn/ws-3 +Feishu: WebSocket hello message sent +``` + +### 方法2:飞书开发者后台配置 + +1. 登录飞书开发者后台 +2. 进入你的应用 +3. 选择 **事件订阅** → **订阅方式** +4. 选择 **使用长连接接收事件** +5. 保存配置 + +配置保存后,飞书会检测长连接是否建立成功。 + +## 实现细节 + +### WebSocket 协议 + +1. **连接端点**: `wss://open.feishu.cn/ws-3` +2. **认证方式**: 发送包含 app_id 和 app_secret 的 JSON 消息 +3. **事件格式**: 与 Webhook 模式相同的 JSON 格式 + +### 自动重连 + +- 连接断开后会自动重连(间隔 5 秒) +- 收到 shutdown 信号时会停止重连 +- 错误会被记录到日志中 + +### 代码变更 + +**新增的文件结构**: +- `openfang-types/src/config.rs`: 添加了 `FeishuConnectionMode` 枚举 +- `openfang-channels/src/feishu.rs`: 添加了 WebSocket 连接逻辑 +- `openfang-api/src/channel_bridge.rs`: 更新了适配器初始化 + +**核心函数**: +```rust +// WebSocket 连接处理 +async fn run_websocket(&self, tx: mpsc::Sender) + -> Result<(), Box> + +// start() 方法根据 connection_mode 分发 +match self.connection_mode { + FeishuConnectionMode::WebSocket => { /* WebSocket 逻辑 */ } + FeishuConnectionMode::Webhook => { /* Webhook 逻辑 */ } +} +``` + +## 与 OpenClaw 的对比 + +| 特性 | OpenClaw (Node.js) | OpenFang (Rust) | +|------|-------------------|----------------| +| SDK | 官方 @larksuiteoapi/node-sdk | 手动实现 | +| WebSocket 支持 | ✅ 原生支持 | ✅ 已实现 | +| 认证方式 | SDK 内部处理 | 手动发送认证消息 | +| 事件解析 | SDK 内部处理 | 手动解析 JSON | + +## 故障排查 + +### 问题:连接失败 + +```bash +# 检查网络 +ping open.feishu.cn + +# 检查应用凭证 +openfang vault list | grep FEISHU +``` + +### 问题:认证失败 + +确认 app_id 和 app_secret 正确: +```bash +# 查看 vault 中的密钥 +openfang vault list +``` + +### 问题:未收到事件 + +1. 确认飞书后台配置了事件订阅 +2. 确认选择了"使用长连接接收事件" +3. 检查日志中是否有 WebSocket 连接成功的日志 + +## 下一步 + +1. 安装新编译的二进制文件 +2. 重启 OpenFang +3. 在飞书后台配置长连接模式 +4. 测试发送消息到飞书应用 + +## 参考文档 + +- [飞书长连接文档](https://open.feishu.cn/document/server-docs/event-subscription-guide/event-subscription-configure-/request-url-configuration-case) +- [OpenClaw Feishu 实现](~/.nvm/versions/node/v24.13.1/lib/node_modules/openclaw/extensions/feishu/) diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index 6d30b1915..3b64778d3 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -1315,7 +1315,7 @@ pub async fn start_channel_bridge_with_config( let adapter = Arc::new(FeishuAdapter::new( fs_config.app_id.clone(), secret, - fs_config.webhook_port, + fs_config.connection_mode, )); adapters.push((adapter, fs_config.default_agent.clone())); } diff --git a/crates/openfang-channels/Cargo.toml b/crates/openfang-channels/Cargo.toml index 58f0a5df5..aa96452ec 100644 --- a/crates/openfang-channels/Cargo.toml +++ b/crates/openfang-channels/Cargo.toml @@ -26,6 +26,10 @@ hmac = { workspace = true } sha2 = { workspace = true } base64 = { workspace = true } hex = { workspace = true } +prost = { workspace = true } +prost-types = { workspace = true } +flate2 = { workspace = true } +bytes = { workspace = true } lettre = { workspace = true } imap = { workspace = true } diff --git a/crates/openfang-channels/src/feishu.rs b/crates/openfang-channels/src/feishu.rs index 7f4290477..b029caf8c 100644 --- a/crates/openfang-channels/src/feishu.rs +++ b/crates/openfang-channels/src/feishu.rs @@ -1,8 +1,11 @@ //! Feishu/Lark Open Platform channel adapter. //! -//! Uses the Feishu Open API for sending messages and a webhook HTTP server for -//! receiving inbound events. Authentication is performed via a tenant access token -//! obtained from `https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal`. +//! Supports two connection modes: +//! - **Webhook mode**: HTTP server for receiving events (requires public URL) +//! - **WebSocket mode**: Long connection to Feishu server (works from local environment) +//! +//! Authentication is performed via a tenant access token obtained from +//! `https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal`. //! The token is cached and refreshed automatically (2-hour expiry). use crate::types::{ @@ -10,13 +13,16 @@ use crate::types::{ }; use async_trait::async_trait; use chrono::Utc; +use flate2::read::GzDecoder; use futures::Stream; +use openfang_types::config::FeishuConnectionMode; use std::collections::HashMap; +use std::io::Read; use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{mpsc, watch, RwLock}; -use tracing::{info, warn}; +use tracing::{debug, error, info, trace, warn}; use zeroize::Zeroizing; /// Feishu tenant access token endpoint. @@ -38,13 +44,16 @@ const TOKEN_REFRESH_BUFFER_SECS: u64 = 300; /// Feishu/Lark Open Platform adapter. /// /// Inbound messages arrive via a webhook HTTP server that receives event -/// callbacks from the Feishu platform. Outbound messages are sent via the -/// Feishu IM API with a tenant access token for authentication. +/// callbacks from the Feishu platform, or via WebSocket long connection. +/// Outbound messages are sent via the Feishu IM API with a tenant access +/// token for authentication. pub struct FeishuAdapter { /// Feishu app ID. app_id: String, /// SECURITY: Feishu app secret, zeroized on drop. app_secret: Zeroizing, + /// Connection mode (webhook or WebSocket). + connection_mode: FeishuConnectionMode, /// Port on which the inbound webhook HTTP server listens. webhook_port: u16, /// Optional verification token for webhook event validation. @@ -66,13 +75,14 @@ impl FeishuAdapter { /// # Arguments /// * `app_id` - Feishu application ID. /// * `app_secret` - Feishu application secret. - /// * `webhook_port` - Local port for the inbound webhook HTTP server. - pub fn new(app_id: String, app_secret: String, webhook_port: u16) -> Self { + /// * `connection_mode` - Connection mode (webhook or WebSocket). + pub fn new(app_id: String, app_secret: String, connection_mode: FeishuConnectionMode) -> Self { let (shutdown_tx, shutdown_rx) = watch::channel(false); Self { app_id, app_secret: Zeroizing::new(app_secret), - webhook_port, + connection_mode, + webhook_port: 8453, verification_token: None, encrypt_key: None, client: reqwest::Client::new(), @@ -86,11 +96,13 @@ impl FeishuAdapter { pub fn with_verification( app_id: String, app_secret: String, + connection_mode: FeishuConnectionMode, webhook_port: u16, verification_token: Option, encrypt_key: Option, ) -> Self { - let mut adapter = Self::new(app_id, app_secret, webhook_port); + let mut adapter = Self::new(app_id, app_secret, connection_mode); + adapter.webhook_port = webhook_port; adapter.verification_token = verification_token; adapter.encrypt_key = encrypt_key; adapter @@ -179,6 +191,22 @@ impl FeishuAdapter { Ok(bot_name) } + /// Run WebSocket long connection to Feishu server. + async fn run_websocket( + &self, + tx: mpsc::Sender, + ) -> Result<(), Box> { + info!("Feishu: Initializing WebSocket long connection"); + + // Use the correct WebSocket protocol implementation + start_feishu_websocket( + self.app_id.clone(), + self.app_secret.as_str().to_string(), + tx, + ) + .await + } + /// Send a text message to a Feishu chat. async fn api_send_message( &self, @@ -388,144 +416,200 @@ impl ChannelAdapter for FeishuAdapter { info!("Feishu adapter authenticated as {bot_name}"); let (tx, rx) = mpsc::channel::(256); - let port = self.webhook_port; - let verification_token = self.verification_token.clone(); let mut shutdown_rx = self.shutdown_rx.clone(); - tokio::spawn(async move { - let verification_token = Arc::new(verification_token); - let tx = Arc::new(tx); - - let app = axum::Router::new().route( - "/feishu/webhook", - axum::routing::post({ - let vt = Arc::clone(&verification_token); - let tx = Arc::clone(&tx); - move |body: axum::extract::Json| { - let vt = Arc::clone(&vt); - let tx = Arc::clone(&tx); - async move { - // Handle URL verification challenge - if let Some(challenge) = body.0.get("challenge") { - // Verify token if configured - if let Some(ref expected_token) = *vt { - let token = body.0["token"].as_str().unwrap_or(""); - if token != expected_token { - warn!("Feishu: invalid verification token"); - return ( - axum::http::StatusCode::FORBIDDEN, - axum::Json(serde_json::json!({})), - ); + // Route based on connection mode + match self.connection_mode { + FeishuConnectionMode::WebSocket => { + // WebSocket long connection mode + info!("Feishu: Using WebSocket long connection mode"); + let adapter_clone = FeishuAdapter { + app_id: self.app_id.clone(), + app_secret: Zeroizing::new(self.app_secret.as_str().to_string()), + connection_mode: self.connection_mode, + webhook_port: self.webhook_port, + verification_token: self.verification_token.clone(), + encrypt_key: self.encrypt_key.clone(), + client: self.client.clone(), + shutdown_tx: self.shutdown_tx.clone(), + shutdown_rx: self.shutdown_rx.clone(), + cached_token: self.cached_token.clone(), + }; + + tokio::spawn(async move { + info!("Feishu: Starting WebSocket connection task"); + + // Auto-reconnect loop + loop { + tokio::select! { + result = adapter_clone.run_websocket(tx.clone()) => { + match result { + Ok(_) => { + info!("Feishu: WebSocket connection ended normally"); } + Err(e) => { + error!("Feishu: WebSocket error: {e}"); + } + } + // Check if we should stop reconnecting + if *shutdown_rx.borrow() { + info!("Feishu: Shutdown requested, not reconnecting"); + break; } - return ( - axum::http::StatusCode::OK, - axum::Json(serde_json::json!({ - "challenge": challenge, - })), - ); + // Wait before reconnecting + info!("Feishu: Reconnecting in 5 seconds..."); + tokio::time::sleep(Duration::from_secs(5)).await; } - - // Handle event callback - if let Some(schema) = body.0["schema"].as_str() { - if schema == "2.0" { - // V2 event format - if let Some(msg) = parse_feishu_event(&body.0) { - let _ = tx.send(msg).await; + _ = shutdown_rx.changed() => { + info!("Feishu: WebSocket shutdown requested"); + break; + } + } + } + }); + } + FeishuConnectionMode::Webhook => { + // HTTP webhook mode (default) + info!("Feishu: Using HTTP webhook mode"); + let port = self.webhook_port; + let verification_token = self.verification_token.clone(); + + tokio::spawn(async move { + let verification_token = Arc::new(verification_token); + let tx = Arc::new(tx); + + let app = axum::Router::new().route( + "/feishu/webhook", + axum::routing::post({ + let vt = Arc::clone(&verification_token); + let tx = Arc::clone(&tx); + move |body: axum::extract::Json| { + let vt = Arc::clone(&vt); + let tx = Arc::clone(&tx); + async move { + // Handle URL verification challenge + if let Some(challenge) = body.0.get("challenge") { + // Verify token if configured + if let Some(ref expected_token) = *vt { + let token = body.0["token"].as_str().unwrap_or(""); + if token != expected_token { + warn!("Feishu: invalid verification token"); + return ( + axum::http::StatusCode::FORBIDDEN, + axum::Json(serde_json::json!({})), + ); + } + } + return ( + axum::http::StatusCode::OK, + axum::Json(serde_json::json!({ + "challenge": challenge, + })), + ); } - } - } else { - // V1 event format (legacy) - let event_type = body.0["event"]["type"].as_str().unwrap_or(""); - if event_type == "message" { - // Legacy format handling - let event = &body.0["event"]; - let text = event["text"].as_str().unwrap_or(""); - if !text.is_empty() { - let open_id = - event["open_id"].as_str().unwrap_or("").to_string(); - let chat_id = event["open_chat_id"] - .as_str() - .unwrap_or("") - .to_string(); - let msg_id = event["open_message_id"] - .as_str() - .unwrap_or("") - .to_string(); - let is_group = - event["chat_type"].as_str().unwrap_or("") == "group"; - - let content = if text.starts_with('/') { - let parts: Vec<&str> = text.splitn(2, ' ').collect(); - let cmd = parts[0].trim_start_matches('/'); - let args: Vec = parts - .get(1) - .map(|a| { - a.split_whitespace().map(String::from).collect() - }) - .unwrap_or_default(); - ChannelContent::Command { - name: cmd.to_string(), - args, + + // Handle event callback + if let Some(schema) = body.0["schema"].as_str() { + if schema == "2.0" { + // V2 event format + if let Some(msg) = parse_feishu_event(&body.0) { + let _ = tx.send(msg).await; } - } else { - ChannelContent::Text(text.to_string()) - }; - - let channel_msg = ChannelMessage { - channel: ChannelType::Custom("feishu".to_string()), - platform_message_id: msg_id, - sender: ChannelUser { - platform_id: chat_id, - display_name: open_id, - openfang_user: None, - }, - content, - target_agent: None, - timestamp: Utc::now(), - is_group, - thread_id: None, - metadata: HashMap::new(), - }; - - let _ = tx.send(channel_msg).await; + } + } else { + // V1 event format (legacy) + let event_type = body.0["event"]["type"].as_str().unwrap_or(""); + if event_type == "message" { + // Legacy format handling + let event = &body.0["event"]; + let text = event["text"].as_str().unwrap_or(""); + if !text.is_empty() { + let open_id = + event["open_id"].as_str().unwrap_or("").to_string(); + let chat_id = event["open_chat_id"] + .as_str() + .unwrap_or("") + .to_string(); + let msg_id = event["open_message_id"] + .as_str() + .unwrap_or("") + .to_string(); + let is_group = + event["chat_type"].as_str().unwrap_or("") == "group"; + + let content = if text.starts_with('/') { + let parts: Vec<&str> = text.splitn(2, ' ').collect(); + let cmd = parts[0].trim_start_matches('/'); + let args: Vec = parts + .get(1) + .map(|a| { + a.split_whitespace().map(String::from).collect() + }) + .unwrap_or_default(); + ChannelContent::Command { + name: cmd.to_string(), + args, + } + } else { + ChannelContent::Text(text.to_string()) + }; + + let channel_msg = ChannelMessage { + channel: ChannelType::Custom("feishu".to_string()), + platform_message_id: msg_id, + sender: ChannelUser { + platform_id: chat_id, + display_name: open_id, + openfang_user: None, + }, + content, + target_agent: None, + timestamp: Utc::now(), + is_group, + thread_id: None, + metadata: HashMap::new(), + }; + + let _ = tx.send(channel_msg).await; + } + } } + + ( + axum::http::StatusCode::OK, + axum::Json(serde_json::json!({})), + ) } } + }), + ); - ( - axum::http::StatusCode::OK, - axum::Json(serde_json::json!({})), - ) - } - } - }), - ); - - let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); - info!("Feishu webhook server listening on {addr}"); + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); + info!("Feishu webhook server listening on {addr}"); - let listener = match tokio::net::TcpListener::bind(addr).await { - Ok(l) => l, - Err(e) => { - warn!("Feishu webhook bind failed: {e}"); - return; - } - }; + let listener = match tokio::net::TcpListener::bind(addr).await { + Ok(l) => l, + Err(e) => { + warn!("Feishu webhook bind failed: {e}"); + return; + } + }; - let server = axum::serve(listener, app); + let server = axum::serve(listener, app); - tokio::select! { - result = server => { - if let Err(e) = result { - warn!("Feishu webhook server error: {e}"); + tokio::select! { + result = server => { + if let Err(e) = result { + warn!("Feishu webhook server error: {e}"); + } + } + _ = shutdown_rx.changed() => { + info!("Feishu adapter shutting down"); + } } - } - _ = shutdown_rx.changed() => { - info!("Feishu adapter shutting down"); - } + }); } - }); + } Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))) } @@ -560,6 +644,641 @@ impl ChannelAdapter for FeishuAdapter { } } +// ============================================================================ +// Feishu WebSocket Long Connection Implementation +// ============================================================================ + +use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage}; +use futures::{stream::StreamExt, sink::SinkExt}; + +// Protobuf definitions for Feishu WebSocket protocol +pub mod pbbp2 { + use bytes::Buf; + + #[derive(Debug, Clone, PartialEq, Eq)] + pub struct Header { + pub key: String, + pub value: String, + } + + #[derive(Debug, Clone, PartialEq, Eq)] + pub struct Frame { + pub SeqID: u32, + pub LogID: u32, + pub service: u32, + pub method: u32, + pub headers: Vec
, + pub payload_encoding: String, + pub payload_type: String, + pub payload: Vec, + pub LogIDNew: String, + } + + impl Default for Frame { + fn default() -> Self { + Self { + SeqID: 0, + LogID: 0, + service: 0, + method: 0, + headers: Vec::new(), + payload_encoding: String::new(), + payload_type: String::new(), + payload: Vec::new(), + LogIDNew: String::new(), + } + } + } + + // Simple protobuf encoding helpers + fn encode_varint(mut value: u64, buf: &mut Vec) { + while value >= 0x80 { + buf.push((value as u8) | 0x80); + value >>= 7; + } + buf.push(value as u8); + } + + fn encode_tag(field_number: u32, wire_type: u32, buf: &mut Vec) { + encode_varint((field_number as u64) << 3 | (wire_type as u64), buf); + } + + fn encode_string(field_number: u32, value: &str, buf: &mut Vec) { + let bytes = value.as_bytes(); + encode_tag(field_number, 2, buf); // Length-delimited + encode_varint(bytes.len() as u64, buf); + buf.extend_from_slice(bytes); + } + + fn encode_bytes(field_number: u32, value: &[u8], buf: &mut Vec) { + encode_tag(field_number, 2, buf); // Length-delimited + encode_varint(value.len() as u64, buf); + buf.extend_from_slice(value); + } + + fn encode_uint32(field_number: u32, value: u32, buf: &mut Vec) { + if value == 0 { + return; + } + encode_tag(field_number, 0, buf); // Varint + encode_varint(value as u64, buf); + } + + fn encode_header(field_number: u32, header: &Header, buf: &mut Vec) { + let mut nested = Vec::new(); + encode_string(1, &header.key, &mut nested); + encode_string(2, &header.value, &mut nested); + encode_bytes(field_number, &nested, buf); + } + + impl Frame { + pub fn encode(&self) -> Vec { + let mut buf = Vec::new(); + + encode_uint32(1, self.SeqID, &mut buf); + encode_uint32(2, self.LogID, &mut buf); + encode_uint32(3, self.service, &mut buf); + encode_uint32(4, self.method, &mut buf); + + for header in &self.headers { + encode_header(5, header, &mut buf); + } + + if !self.payload_encoding.is_empty() { + encode_string(6, &self.payload_encoding, &mut buf); + } + if !self.payload_type.is_empty() { + encode_string(7, &self.payload_type, &mut buf); + } + if !self.payload.is_empty() { + encode_bytes(8, &self.payload, &mut buf); + } + if !self.LogIDNew.is_empty() { + encode_string(9, &self.LogIDNew, &mut buf); + } + + buf + } + + pub fn decode(data: &[u8]) -> Result { + let mut frame = Frame::default(); + let mut cursor = std::io::Cursor::new(data); + + while cursor.has_remaining() { + let tag = read_varint(&mut cursor)?; + let field_number = (tag >> 3) as u32; + let wire_type = (tag & 0x07) as u32; + + match field_number { + 1 => frame.SeqID = read_varint(&mut cursor)? as u32, + 2 => frame.LogID = read_varint(&mut cursor)? as u32, + 3 => frame.service = read_varint(&mut cursor)? as u32, + 4 => frame.method = read_varint(&mut cursor)? as u32, + 5 => { + let len = read_varint(&mut cursor)? as usize; + let start = cursor.position() as usize; + let mut header = Header { key: String::new(), value: String::new() }; + + while (cursor.position() as usize - start) < len { + let tag = read_varint(&mut cursor)?; + let field_number = (tag >> 3) as u32; + + if field_number == 1 { + let str_len = read_varint(&mut cursor)? as usize; + header.key = String::from_utf8_lossy(&data[cursor.position() as usize..cursor.position() as usize + str_len]).to_string(); + cursor.advance(str_len); + } else if field_number == 2 { + let str_len = read_varint(&mut cursor)? as usize; + header.value = String::from_utf8_lossy(&data[cursor.position() as usize..cursor.position() as usize + str_len]).to_string(); + cursor.advance(str_len); + } + } + frame.headers.push(header); + } + 6 => { + let len = read_varint(&mut cursor)? as usize; + let pos = cursor.position() as usize; + frame.payload_encoding = String::from_utf8_lossy(&data[pos..pos + len]).to_string(); + cursor.advance(len); + } + 7 => { + let len = read_varint(&mut cursor)? as usize; + let pos = cursor.position() as usize; + frame.payload_type = String::from_utf8_lossy(&data[pos..pos + len]).to_string(); + cursor.advance(len); + } + 8 => { + let len = read_varint(&mut cursor)? as usize; + let pos = cursor.position() as usize; + frame.payload = data[pos..pos + len].to_vec(); + cursor.advance(len); + } + 9 => { + let len = read_varint(&mut cursor)? as usize; + let pos = cursor.position() as usize; + frame.LogIDNew = String::from_utf8_lossy(&data[pos..pos + len]).to_string(); + cursor.advance(len); + } + _ => { + // Skip unknown field + if wire_type == 2 { + let len = read_varint(&mut cursor)? as usize; + cursor.advance(len); + } + } + } + } + + Ok(frame) + } + } + + fn read_varint(cursor: &mut B) -> Result { + let mut result = 0; + let mut shift = 0; + + loop { + if !cursor.has_remaining() { + return Err("Unexpected end of buffer".to_string()); + } + + let byte = cursor.get_u8(); + result |= ((byte & 0x7F) as u64) << shift; + + if byte & 0x80 == 0 { + break; + } + + shift += 7; + if shift >= 64 { + return Err("Varint too large".to_string()); + } + } + + Ok(result) + } +} + +const WS_CONFIG_ENDPOINT: &str = "https://open.feishu.cn/callback/ws/endpoint"; +const FRAME_TYPE_CONTROL: u32 = 0; +const FRAME_TYPE_DATA: u32 = 1; + +#[derive(Debug, Clone, serde::Deserialize)] +struct WsConfigResponse { + code: i32, + data: WsConfigData, +} + +#[derive(Debug, Clone, serde::Deserialize)] +struct WsConfigData { + URL: String, + #[serde(rename = "ClientConfig")] + client_config: ClientConfig, +} + +#[derive(Debug, Clone, serde::Deserialize)] +struct ClientConfig { + #[serde(rename = "PingInterval")] + ping_interval: u64, + #[serde(rename = "ReconnectCount")] + reconnect_count: i32, + #[serde(rename = "ReconnectInterval")] + reconnect_interval: u64, + #[serde(rename = "ReconnectNonce")] + reconnect_nonce: u32, +} + +/// Cache for fragmented message data +#[derive(Debug, Default)] +struct DataCache { + fragments: HashMap>>, + expected_seqs: HashMap, +} + +impl DataCache { + fn add_fragment(&mut self, seq: u32, data: Vec) -> Option> { + self.fragments.entry(seq).or_default().push(data); + None + } + + fn complete(&mut self, seq: u32) -> Option> { + if let Some(fragments) = self.fragments.remove(&seq) { + let total_len = fragments.iter().map(|f| f.len()).sum(); + let mut result = Vec::with_capacity(total_len); + for fragment in fragments { + result.extend_from_slice(&fragment); + } + Some(result) + } else { + None + } + } +} + +/// Extract service_id from WebSocket URL +fn extract_service_id(ws_url: &str) -> Option { + url::Url::parse(ws_url) + .ok()? + .query_pairs() + .find(|(k, _)| k == "service_id") + .map(|(_, v)| v.to_string()) +} + +/// Create a protobuf frame for WebSocket communication +fn create_frame( + seq_id: u32, + service: u32, + method: u32, + headers: Vec, + payload: Vec, +) -> Vec { + let frame = pbbp2::Frame { + SeqID: seq_id, + LogID: 0, + service, + method, + headers, + payload_encoding: "gzip".to_string(), + payload_type: "json".to_string(), + payload, + LogIDNew: String::new(), + }; + + frame.encode() +} + +/// Send ping frame to keep connection alive +fn send_ping(seq_id: u32, service_id: &str) -> Vec { + let headers = vec![ + pbbp2::Header { + key: "type".to_string(), + value: "ping".to_string(), + }, + pbbp2::Header { + key: "service_id".to_string(), + value: service_id.to_string(), + }, + ]; + + create_frame(seq_id, 1, FRAME_TYPE_CONTROL, headers, vec![]) +} + +/// Start Feishu WebSocket long connection +pub async fn start_feishu_websocket( + app_id: String, + app_secret: String, + mut tx: mpsc::Sender, +) -> Result<(), Box> { + info!("Feishu: Fetching WebSocket configuration"); + + // Step 1: Fetch WebSocket configuration + let client = reqwest::Client::new(); + let config_response = client + .post(WS_CONFIG_ENDPOINT) + .json(&serde_json::json!({ + "AppID": app_id, + "AppSecret": app_secret + })) + .send() + .await?; + + if !config_response.status().is_success() { + return Err(format!("Failed to fetch WebSocket config: {}", config_response.status()).into()); + } + + let config_data: WsConfigResponse = config_response.json().await?; + if config_data.code != 0 { + return Err(format!("WebSocket config returned error code: {}", config_data.code).into()); + } + + let ws_url = config_data.data.URL; + let ping_interval = config_data.data.client_config.ping_interval; + info!("Feishu: Connecting to WebSocket at {} (ping_interval: {}s)", ws_url, ping_interval); + + // Extract service_id from URL + let service_id = extract_service_id(&ws_url) + .ok_or("Failed to extract service_id from WebSocket URL")?; + + // Step 2: Connect to WebSocket + let (ws_stream, _) = connect_async(&ws_url).await?; + let (mut ws_sender, mut ws_receiver) = ws_stream.split(); + + // Step 3: Send initial handshake + let seq_id = 1u32; + let headers = vec![ + pbbp2::Header { + key: "type".to_string(), + value: "register".to_string(), + }, + pbbp2::Header { + key: "service_id".to_string(), + value: service_id.clone(), + }, + ]; + + let register_frame = create_frame( + seq_id, + 1, + FRAME_TYPE_CONTROL, + headers, + vec![], + ); + + ws_sender.send(WsMessage::Binary(register_frame)).await?; + info!("Feishu: WebSocket registration sent"); + + // Step 4: Create channel for ping sending + let (ping_tx, mut ping_rx) = mpsc::channel::>(10); + + // Spawn ping task + let service_id_clone = service_id.clone(); + tokio::spawn(async move { + info!("Feishu: Ping task started, interval={}s", ping_interval); + let mut seq = 2u32; + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(ping_interval)).await; + info!("Feishu: Sending ping frame (seq={})", seq); + let ping = send_ping(seq, &service_id_clone); + if ping_tx.send(ping).await.is_err() { + error!("Feishu: Failed to send ping to channel"); + break; + } + seq = seq.wrapping_add(1); + } + }); + + // Step 5: Message loop with ping handling + let mut data_cache = DataCache::default(); + + info!("Feishu: Entering WebSocket message loop"); + let mut loop_count = 0u64; + + loop { + loop_count += 1; + if loop_count % 100 == 0 { + debug!("Feishu: Message loop iteration {}", loop_count); + } + + tokio::select! { + // Handle WebSocket messages + msg_result = ws_receiver.next() => { + debug!("Feishu: ws_receiver.next() returned"); + match msg_result { + Some(Ok(WsMessage::Binary(data))) => { + info!("Feishu: Received binary WebSocket message, {} bytes", data.len()); + // Decode protobuf frame + match pbbp2::Frame::decode(&*data) { + Ok(frame) => { + info!("Feishu: Received frame: service={}, method={}", frame.service, frame.method); + + // Handle different frame types + if frame.method == FRAME_TYPE_CONTROL { + // Control message (ping/pong/register response) + let frame_type = frame.headers.iter() + .find(|h| h.key == "type") + .map(|h| h.value.as_str()); + + match frame_type { + Some("pong") => { + debug!("Feishu: Received pong"); + } + Some("register") => { + info!("Feishu: Registration confirmed"); + } + _ => { + debug!("Feishu: Unknown control frame type: {:?}", frame_type); + } + } + } else if frame.method == FRAME_TYPE_DATA { + // Data message (actual event) + if !frame.payload.is_empty() { + // Check for fragmented messages + let sum = frame.headers.iter() + .find(|h| h.key == "sum") + .and_then(|h| h.value.parse::().ok()); + + let current_seq = frame.SeqID; + + if let Some(total) = sum { + if total > 1 { + // Fragmented message + data_cache.add_fragment(current_seq, frame.payload.to_vec()); + + if let Some(complete) = data_cache.complete(current_seq) { + // All fragments received, process the complete message + if let Err(e) = process_event_payload(&complete, &mut tx).await { + error!("Feishu: Failed to process event: {}", e); + } + } + } else { + // Single fragment + if let Err(e) = process_event_payload(&frame.payload, &mut tx).await { + error!("Feishu: Failed to process event: {}", e); + } + } + } else { + // No sum header, treat as complete message + if let Err(e) = process_event_payload(&frame.payload, &mut tx).await { + error!("Feishu: Failed to process event: {}", e); + } + } + } + } + } + Err(e) => { + error!("Feishu: Failed to decode protobuf frame: {}", e); + } + } + } + Some(Ok(WsMessage::Close(_))) => { + info!("Feishu: WebSocket closed by server"); + break; + } + Some(Ok(WsMessage::Ping(data))) => { + info!("Feishu: Received WebSocket ping, {} bytes", data.len()); + ws_sender.send(WsMessage::Pong(data)).await?; + } + Some(Ok(WsMessage::Pong(_))) => { + info!("Feishu: Received WebSocket pong"); + } + Some(Ok(WsMessage::Text(text))) => { + info!("Feishu: Received unexpected text message: {}", text); + } + Some(Err(e)) => { + error!("Feishu: WebSocket error: {}", e); + break; + } + None => { + warn!("Feishu: WebSocket channel closed (None received)"); + break; + } + _ => { + warn!("Feishu: Unhandled WebSocket message type"); + } + } + } + // Handle ping messages from the ping task + Some(ping_data) = ping_rx.recv() => { + debug!("Feishu: Sending ping frame to WebSocket"); + if let Err(e) = ws_sender.send(WsMessage::Binary(ping_data)).await { + error!("Feishu: Failed to send ping: {}", e); + break; + } + } + // Both channels closed, exit loop + else => { + warn!("Feishu: Both WebSocket and ping channels closed"); + break; + } + } + } + + info!("Feishu: WebSocket connection ended"); + Ok(()) +} + +/// Process an event payload from Feishu +async fn process_event_payload( + payload: &[u8], + tx: &mut mpsc::Sender, +) -> Result<(), Box> { + // Try to decompress if gzipped + let json_str = if is_gzipped(payload) { + let mut decoder = GzDecoder::new(payload); + let mut decompressed = String::new(); + decoder.read_to_string(&mut decompressed)?; + decompressed + } else { + String::from_utf8(payload.to_vec())? + }; + + info!("Feishu: Event payload: {}", json_str); + + // Parse as JSON to extract event details + let event: serde_json::Value = serde_json::from_str(&json_str)?; + + // Convert to ChannelMessage + if let Err(e) = convert_feishu_event(&event, tx).await { + error!("Feishu: Failed to convert event: {}", e); + } + + Ok(()) +} + +/// Check if bytes are gzipped +fn is_gzipped(data: &[u8]) -> bool { + data.len() > 1 && data[0] == 0x1f && data[1] == 0x8b +} + +/// Convert Feishu event to ChannelMessage +async fn convert_feishu_event( + event: &serde_json::Value, + tx: &mut mpsc::Sender, +) -> Result<(), Box> { + // This is a simplified conversion - real implementation would handle all event types + let header = event.get("header") + .and_then(|v| v.as_object()) + .ok_or("Missing event header")?; + + let event_type = header.get("event_type") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + + if event_type == "im.message.receive_v1" { + let event_data = event.get("event") + .and_then(|v| v.as_object()) + .ok_or("Missing event data")?; + + let message = event_data.get("message") + .and_then(|v| v.as_object()) + .ok_or("Missing message")?; + + let chat_id = message.get("chat_id") + .and_then(|v| v.as_str()) + .ok_or("Missing chat_id")?; + + let content = message.get("content") + .and_then(|v| v.as_str()) + .ok_or("Missing content")?; + + let sender = event_data.get("sender") + .and_then(|v| v.as_object()) + .ok_or("Missing sender")?; + + let sender_id = sender.get("sender_id") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + let user = ChannelUser { + platform_id: chat_id.to_string(), // Use chat_id so responses go to the right place + display_name: sender.get("nickname") + .and_then(|v| v.as_str()) + .unwrap_or(sender_id) + .to_string(), + openfang_user: None, + }; + + let msg = ChannelMessage { + channel: ChannelType::Custom("feishu".to_string()), + platform_message_id: message.get("message_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(), + sender: user, + content: ChannelContent::Text(content.to_string()), + target_agent: None, + timestamp: chrono::Utc::now(), + is_group: false, + thread_id: None, + metadata: HashMap::new(), + }; + + tx.send(msg).await?; + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -567,13 +1286,20 @@ mod tests { #[test] fn test_feishu_adapter_creation() { let adapter = - FeishuAdapter::new("cli_abc123".to_string(), "app-secret-456".to_string(), 9000); + FeishuAdapter::new("cli_abc123".to_string(), "app-secret-456".to_string(), FeishuConnectionMode::Webhook); assert_eq!(adapter.name(), "feishu"); assert_eq!( adapter.channel_type(), ChannelType::Custom("feishu".to_string()) ); - assert_eq!(adapter.webhook_port, 9000); + assert_eq!(adapter.webhook_port, 8453); // default + } + + #[test] + fn test_feishu_adapter_websocket_mode() { + let adapter = + FeishuAdapter::new("cli_abc123".to_string(), "app-secret-456".to_string(), FeishuConnectionMode::WebSocket); + assert_eq!(adapter.connection_mode, FeishuConnectionMode::WebSocket); } #[test] @@ -581,17 +1307,19 @@ mod tests { let adapter = FeishuAdapter::with_verification( "cli_abc123".to_string(), "secret".to_string(), + FeishuConnectionMode::Webhook, 9000, Some("verify-token".to_string()), Some("encrypt-key".to_string()), ); assert_eq!(adapter.verification_token, Some("verify-token".to_string())); assert_eq!(adapter.encrypt_key, Some("encrypt-key".to_string())); + assert_eq!(adapter.webhook_port, 9000); } #[test] fn test_feishu_app_id_stored() { - let adapter = FeishuAdapter::new("cli_test".to_string(), "secret".to_string(), 8080); + let adapter = FeishuAdapter::new("cli_test".to_string(), "secret".to_string(), FeishuConnectionMode::Webhook); assert_eq!(adapter.app_id, "cli_test"); } diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index a5455d79a..025c809bd 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -2193,6 +2193,22 @@ impl Default for BlueskyConfig { } } +/// Feishu connection mode. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum FeishuConnectionMode { + /// HTTP webhook mode (requires public URL or intranet penetration). + Webhook, + /// WebSocket long connection mode (works from local environment). + WebSocket, +} + +impl Default for FeishuConnectionMode { + fn default() -> Self { + Self::Webhook + } +} + /// Feishu/Lark Open Platform channel adapter configuration. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] @@ -2201,7 +2217,10 @@ pub struct FeishuConfig { pub app_id: String, /// Env var name holding the app secret. pub app_secret_env: String, - /// Port for the incoming webhook. + /// Connection mode (webhook or WebSocket). + #[serde(default)] + pub connection_mode: FeishuConnectionMode, + /// Port for the incoming webhook (only used in webhook mode). pub webhook_port: u16, /// Default agent name to route messages to. pub default_agent: Option, @@ -2215,6 +2234,7 @@ impl Default for FeishuConfig { Self { app_id: String::new(), app_secret_env: "FEISHU_APP_SECRET".to_string(), + connection_mode: FeishuConnectionMode::Webhook, webhook_port: 8453, default_agent: None, overrides: ChannelOverrides::default(), From 6a829f657987918edb443b6381b564b7e764aa5a Mon Sep 17 00:00:00 2001 From: OpenFang Developers Date: Wed, 4 Mar 2026 22:37:13 +0800 Subject: [PATCH 2/3] docs: Update CHANGELOG for v0.3.12 - Feishu WebSocket support --- CHANGELOG.md | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9475b629..626c5801d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,35 @@ All notable changes to OpenFang will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.3.12] - 2026-03-04 + +### Added + +#### Feishu Integration +- **WebSocket long connection support** for Feishu/Lark messaging platform + - Enables local development without public IP or port forwarding + - Custom protobuf protocol implementation with manual encoding/decoding + - Automatic ping/pong keepalive mechanism (configurable interval) + - Message fragmentation handling for large payloads + - Gzip compression support for message payloads + - Dynamic WebSocket URL fetching via HTTP config endpoint +- **Connection mode configuration**: Choose between webhook (HTTP) and WebSocket modes +- **Enhanced logging**: Improved visibility for WebSocket connection status and message flow + +### Changed + +#### Feishu Integration +- `FeishuAdapter::new()`: Now requires `connection_mode` parameter instead of `webhook_port` +- Fixed `receive_id` error by using `chat_id` instead of `sender_id` for user platform_id +- Upgraded log levels from `trace!` to `info!` for better production visibility + +### Dependencies + +- Added `tokio-tungstenite` for WebSocket client functionality +- Added `prost` and `prost-types` for protobuf encoding +- Added `flate2` for gzip decompression +- Updated `Cargo.lock` with new dependency versions + ## [0.1.0] - 2026-02-24 ### Added From 9ff01d4d818700dcce8d3fa50a3a31ddc3169006 Mon Sep 17 00:00:00 2001 From: OpenFang Developers Date: Thu, 5 Mar 2026 10:17:34 +0800 Subject: [PATCH 3/3] fix: UTF-8 character boundary panic in subprocess_sandbox.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix panic when logging shell commands containing multi-byte UTF-8 characters (e.g., Chinese characters). The previous code used byte indexing to truncate strings, which could panic when truncating at a byte that fell in the middle of a multi-byte character. Error: byte index 100 is not a char boundary; it is inside '统' (bytes 98..101) Solution: Use char_indices() for safe character-boundary string truncation. Fixes: #issue --- crates/openfang-runtime/src/subprocess_sandbox.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/crates/openfang-runtime/src/subprocess_sandbox.rs b/crates/openfang-runtime/src/subprocess_sandbox.rs index 3e3bce4f0..7b6f9b203 100644 --- a/crates/openfang-runtime/src/subprocess_sandbox.rs +++ b/crates/openfang-runtime/src/subprocess_sandbox.rs @@ -145,8 +145,17 @@ pub fn validate_command_allowlist(command: &str, policy: &ExecPolicy) -> Result< Err("Shell execution is disabled (exec_policy.mode = deny)".to_string()) } ExecSecurityMode::Full => { + // Safely truncate at character boundary + let safe_command = if command.len() > 100 { + command.char_indices() + .nth(100) + .and_then(|(i, _)| Some(&command[..i])) + .unwrap_or(command) + } else { + command + }; tracing::warn!( - command = &command[..command.len().min(100)], + command = &safe_command[..safe_command.len().min(100)], "Shell exec in full mode — no restrictions" ); Ok(())