diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index b72180670..32d8be8cb 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -43,6 +43,7 @@ use openfang_channels::webex::WebexAdapter; // Wave 5 use async_trait::async_trait; use openfang_channels::dingtalk::DingTalkAdapter; +use openfang_channels::dingtalk_stream::DingTalkStreamAdapter; use openfang_channels::discourse::DiscourseAdapter; use openfang_channels::gitter::GitterAdapter; use openfang_channels::gotify::GotifyAdapter; @@ -727,6 +728,7 @@ impl ChannelBridgeHandle for KernelBridgeAdapter { // Wave 5 "mumble" => channels.mumble.as_ref().map(|c| c.overrides.clone()), "dingtalk" => channels.dingtalk.as_ref().map(|c| c.overrides.clone()), + "dingtalk_stream" => channels.dingtalk_stream.as_ref().map(|c| c.overrides.clone()), "discourse" => channels.discourse.as_ref().map(|c| c.overrides.clone()), "gitter" => channels.gitter.as_ref().map(|c| c.overrides.clone()), "ntfy" => channels.ntfy.as_ref().map(|c| c.overrides.clone()), @@ -1003,6 +1005,7 @@ pub async fn start_channel_bridge_with_config( // Wave 5 || config.mumble.is_some() || config.dingtalk.is_some() + || config.dingtalk_stream.is_some() || config.discourse.is_some() || config.gitter.is_some() || config.ntfy.is_some() @@ -1436,7 +1439,7 @@ pub async fn start_channel_bridge_with_config( } } - // DingTalk + // DingTalk (webhook mode) if let Some(ref dt_config) = config.dingtalk { if let Some(token) = read_token(&dt_config.access_token_env, "DingTalk") { let secret = read_token(&dt_config.secret_env, "DingTalk (secret)").unwrap_or_default(); @@ -1445,6 +1448,18 @@ pub async fn start_channel_bridge_with_config( } } + // DingTalk (stream mode) + if let Some(ref ds_config) = config.dingtalk_stream { + if let Some(app_key) = read_token(&ds_config.app_key_env, "DingTalk Stream (app_key)") { + if let Some(app_secret) = read_token(&ds_config.app_secret_env, "DingTalk Stream (app_secret)") { + let robot_code = read_token(&ds_config.robot_code_env, "DingTalk Stream (robot_code)") + .unwrap_or_else(|| app_key.clone()); + let adapter = Arc::new(DingTalkStreamAdapter::new(app_key, app_secret, robot_code)); + adapters.push((adapter, ds_config.default_agent.clone())); + } + } + } + // Discourse if let Some(ref dc_config) = config.discourse { if let Some(api_key) = read_token(&dc_config.api_key_env, "Discourse") { diff --git a/crates/openfang-api/src/routes.rs b/crates/openfang-api/src/routes.rs index 89353f39d..bea31ce8f 100644 --- a/crates/openfang-api/src/routes.rs +++ b/crates/openfang-api/src/routes.rs @@ -1515,6 +1515,21 @@ const CHANNEL_REGISTRY: &[ChannelMeta] = &[ setup_steps: &["Create a robot in your DingTalk group", "Copy the token and signing secret", "Paste them below"], config_template: "[channels.dingtalk]\naccess_token_env = \"DINGTALK_ACCESS_TOKEN\"\nsecret_env = \"DINGTALK_SECRET\"", }, + ChannelMeta { + name: "dingtalk_stream", display_name: "DingTalk Stream", icon: "DS", + description: "DingTalk Stream Mode (WebSocket long-connection)", + category: "enterprise", difficulty: "Easy", setup_time: "~5 min", + quick_setup: "Create an Enterprise Internal App with Stream Mode enabled", + setup_type: "form", + fields: &[ + ChannelField { key: "app_key_env", label: "App Key", field_type: FieldType::Secret, env_var: Some("DINGTALK_APP_KEY"), required: true, placeholder: "ding...", advanced: false }, + ChannelField { key: "app_secret_env", label: "App Secret", field_type: FieldType::Secret, env_var: Some("DINGTALK_APP_SECRET"), required: true, placeholder: "uAn4...", advanced: false }, + ChannelField { key: "robot_code_env", label: "Robot Code", field_type: FieldType::Text, env_var: Some("DINGTALK_ROBOT_CODE"), required: false, placeholder: "ding... (same as App Key)", advanced: true }, + ChannelField { key: "default_agent", label: "Default Agent", field_type: FieldType::Text, env_var: None, required: false, placeholder: "assistant", advanced: true }, + ], + setup_steps: &["Create an Enterprise Internal App in DingTalk Open Platform", "Enable Stream Mode in the app settings", "Add robot capability and configure permissions", "Copy App Key and App Secret below"], + config_template: "[channels.dingtalk_stream]\napp_key_env = \"DINGTALK_APP_KEY\"\napp_secret_env = \"DINGTALK_APP_SECRET\"", + }, ChannelMeta { name: "pumble", display_name: "Pumble", icon: "PB", description: "Pumble bot adapter", @@ -1804,6 +1819,7 @@ fn is_channel_configured(config: &openfang_types::config::ChannelsConfig, name: "webex" => config.webex.is_some(), "feishu" => config.feishu.is_some(), "dingtalk" => config.dingtalk.is_some(), + "dingtalk_stream" => config.dingtalk_stream.is_some(), "pumble" => config.pumble.is_some(), "flock" => config.flock.is_some(), "twist" => config.twist.is_some(), @@ -1926,6 +1942,7 @@ fn channel_config_values( "twist" => config.twist.as_ref().and_then(|c| serde_json::to_value(c).ok()), "mumble" => config.mumble.as_ref().and_then(|c| serde_json::to_value(c).ok()), "dingtalk" => config.dingtalk.as_ref().and_then(|c| serde_json::to_value(c).ok()), + "dingtalk_stream" => config.dingtalk_stream.as_ref().and_then(|c| serde_json::to_value(c).ok()), "discourse" => config.discourse.as_ref().and_then(|c| serde_json::to_value(c).ok()), "gitter" => config.gitter.as_ref().and_then(|c| serde_json::to_value(c).ok()), "ntfy" => config.ntfy.as_ref().and_then(|c| serde_json::to_value(c).ok()), diff --git a/crates/openfang-channels/src/dingtalk_stream.rs b/crates/openfang-channels/src/dingtalk_stream.rs new file mode 100644 index 000000000..c8abf114e --- /dev/null +++ b/crates/openfang-channels/src/dingtalk_stream.rs @@ -0,0 +1,607 @@ +//! DingTalk Stream channel adapter. +//! +//! Uses DingTalk Stream Mode (WebSocket long-connection) instead of the +//! legacy webhook approach. The webhook adapter in `dingtalk.rs` is preserved +//! for backwards compatibility. +//! +//! Protocol: +//! 1. POST /v1.0/oauth2/accessToken → get access token +//! 2. POST /v1.0/gateway/connections/open → get WebSocket URL +//! 3. Connect via WebSocket, handle ping/pong and EVENT messages +//! 4. Outbound: POST /v1.0/robot/oToMessages/batchSend + +use crate::types::{ + split_message, ChannelAdapter, ChannelContent, ChannelMessage, ChannelType, ChannelUser, +}; +use async_trait::async_trait; +use chrono::Utc; +use futures::{SinkExt, Stream, StreamExt}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::sync::{mpsc, watch}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tracing::{error, info, warn}; + +const API_BASE: &str = "https://api.dingtalk.com"; +const MAX_MESSAGE_LEN: usize = 20000; + +// ─── Adapter ───────────────────────────────────────────────────────────────── + +pub struct DingTalkStreamAdapter { + app_key: String, + app_secret: String, + robot_code: String, + client: reqwest::Client, + token_cache: Arc>, + shutdown_tx: Arc>, + shutdown_rx: watch::Receiver, +} + +impl DingTalkStreamAdapter { + pub fn new(app_key: String, app_secret: String, robot_code: String) -> Self { + let (shutdown_tx, shutdown_rx) = watch::channel(false); + Self { + app_key, + app_secret, + robot_code, + client: reqwest::Client::new(), + token_cache: Arc::new(Mutex::new(TokenCache::default())), + shutdown_tx: Arc::new(shutdown_tx), + shutdown_rx, + } + } + + async fn get_token(&self) -> Result> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + { + let c = self.token_cache.lock().unwrap(); + if !c.token.is_empty() && c.expire_at > now + 300 { + return Ok(c.token.clone()); + } + } + + let resp: serde_json::Value = self + .client + .post(format!("{API_BASE}/v1.0/oauth2/accessToken")) + .json(&serde_json::json!({ + "appKey": self.app_key, + "appSecret": self.app_secret, + })) + .send() + .await? + .error_for_status()? + .json() + .await?; + + let token = resp["accessToken"] + .as_str() + .ok_or("missing accessToken")? + .to_string(); + let expire_in = resp["expireIn"].as_u64().unwrap_or(7200); + + { + let mut c = self.token_cache.lock().unwrap(); + c.token = token.clone(); + c.expire_at = now + expire_in; + } + Ok(token) + } + + async fn send_to_ids( + &self, + user_ids: &[&str], + content: ChannelContent, + ) -> Result<(), Box> { + let token = self.get_token().await.map_err(|e| -> Box { e })?; + + let (msg_key, msg_param) = match &content { + ChannelContent::Text(t) => ( + "sampleText", + serde_json::json!({ "content": t }).to_string(), + ), + _ => ( + "sampleText", + serde_json::json!({ "content": "(unsupported content type)" }).to_string(), + ), + }; + + let text = match &content { + ChannelContent::Text(t) => t.as_str(), + _ => "(unsupported)", + }; + let chunks = split_message(text, MAX_MESSAGE_LEN); + + for chunk in &chunks { + let param = serde_json::json!({ "content": chunk }).to_string(); + let body = serde_json::json!({ + "robotCode": self.robot_code, + "userIds": user_ids, + "msgKey": msg_key, + "msgParam": param, + }); + + let resp = self + .client + .post(format!("{API_BASE}/v1.0/robot/oToMessages/batchSend")) + .header("x-acs-dingtalk-access-token", &token) + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let status = resp.status(); + let err_body = resp.text().await.unwrap_or_default(); + return Err(format!("DingTalk batchSend error {status}: {err_body}").into()); + } + + if chunks.len() > 1 { + tokio::time::sleep(Duration::from_millis(200)).await; + } + } + Ok(()) + } +} + +#[async_trait] +impl ChannelAdapter for DingTalkStreamAdapter { + fn name(&self) -> &str { + "dingtalk_stream" + } + + fn channel_type(&self) -> ChannelType { + ChannelType::Custom("dingtalk_stream".to_string()) + } + + async fn start( + &self, + ) -> Result + Send>>, Box> + { + let (tx, rx) = mpsc::channel::(256); + let app_key = self.app_key.clone(); + let app_secret = self.app_secret.clone(); + let client = self.client.clone(); + let token_cache = Arc::clone(&self.token_cache); + let mut shutdown_rx = self.shutdown_rx.clone(); + + info!("DingTalk Stream adapter starting WebSocket connection"); + + tokio::spawn(async move { + let mut attempt: u32 = 0; + + loop { + if *shutdown_rx.borrow() { + info!("DingTalk Stream: shutdown requested"); + break; + } + + // 1. Get access token + let token = match get_access_token(&client, &app_key, &app_secret, &token_cache) + .await + { + Ok(t) => t, + Err(e) => { + warn!("DingTalk Stream: token fetch failed: {e}"); + attempt += 1; + tokio::time::sleep(backoff(attempt)).await; + continue; + } + }; + + // 2. Get WebSocket endpoint + let ws_url = + match get_ws_endpoint(&client, &app_key, &app_secret, &token).await { + Ok(u) => u, + Err(e) => { + warn!("DingTalk Stream: endpoint fetch failed: {e}"); + attempt += 1; + tokio::time::sleep(backoff(attempt)).await; + continue; + } + }; + + info!( + "DingTalk Stream: connecting to {}...", + &ws_url[..ws_url.len().min(60)] + ); + + // 3. Connect + let ws_stream = match connect_async(&ws_url).await { + Ok((ws, _)) => ws, + Err(e) => { + warn!("DingTalk Stream: WS connect failed: {e}"); + attempt += 1; + tokio::time::sleep(backoff(attempt)).await; + continue; + } + }; + + info!("DingTalk Stream: connected"); + attempt = 0; + let (mut sink, mut source) = ws_stream.split(); + + // 4. Message loop + loop { + tokio::select! { + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + info!("DingTalk Stream: graceful shutdown"); + return; + } + } + msg = source.next() => { + match msg { + None => { warn!("DingTalk Stream: connection closed"); break; } + Some(Err(e)) => { warn!("DingTalk Stream: WS error: {e}"); break; } + Some(Ok(Message::Text(text))) => { + handle_frame(&text, &mut sink, &tx).await; + } + Some(Ok(Message::Ping(d))) => { let _ = sink.send(Message::Pong(d)).await; } + Some(Ok(Message::Close(_))) => { info!("DingTalk Stream: close frame"); break; } + _ => {} + } + } + } + } + + // Reconnect + attempt += 1; + let delay = backoff(attempt); + info!("DingTalk Stream: reconnecting in {delay:?}"); + tokio::time::sleep(delay).await; + } + }); + + Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))) + } + + async fn send( + &self, + user: &ChannelUser, + content: ChannelContent, + ) -> Result<(), Box> { + let uid = &user.platform_id; + if uid.is_empty() { + return Err("DingTalk Stream: no platform_id to reply to".into()); + } + self.send_to_ids(&[uid.as_str()], content).await + } + + async fn send_typing(&self, _user: &ChannelUser) -> Result<(), Box> { + Ok(()) + } + + async fn stop(&self) -> Result<(), Box> { + let _ = self.shutdown_tx.send(true); + Ok(()) + } +} + +// ─── Token helpers ─────────────────────────────────────────────────────────── + +struct TokenCache { + token: String, + expire_at: u64, +} + +impl Default for TokenCache { + fn default() -> Self { + Self { + token: String::new(), + expire_at: 0, + } + } +} + +async fn get_access_token( + http: &reqwest::Client, + app_key: &str, + app_secret: &str, + cache: &Arc>, +) -> Result> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + { + let c = cache.lock().unwrap(); + if !c.token.is_empty() && c.expire_at > now + 300 { + return Ok(c.token.clone()); + } + } + + let resp: serde_json::Value = http + .post(format!("{API_BASE}/v1.0/oauth2/accessToken")) + .json(&serde_json::json!({ "appKey": app_key, "appSecret": app_secret })) + .send() + .await? + .error_for_status()? + .json() + .await?; + + let token = resp["accessToken"] + .as_str() + .ok_or("missing accessToken")? + .to_string(); + let expire_in = resp["expireIn"].as_u64().unwrap_or(7200); + { + let mut c = cache.lock().unwrap(); + c.token = token.clone(); + c.expire_at = now + expire_in; + } + Ok(token) +} + +// ─── Gateway / WebSocket helpers ───────────────────────────────────────────── + +#[derive(Serialize)] +struct OpenConnectionRequest<'a> { + #[serde(rename = "clientId")] + client_id: &'a str, + #[serde(rename = "clientSecret")] + client_secret: &'a str, + subscriptions: Vec, + ua: &'a str, + #[serde(rename = "localIp")] + local_ip: &'a str, +} + +#[derive(Serialize)] +struct SubItem { + #[serde(rename = "type")] + sub_type: String, + topic: String, +} + +#[derive(Deserialize)] +struct OpenConnectionResponse { + endpoint: String, + ticket: String, +} + +async fn get_ws_endpoint( + http: &reqwest::Client, + app_key: &str, + app_secret: &str, + token: &str, +) -> Result> { + let body = OpenConnectionRequest { + client_id: app_key, + client_secret: app_secret, + subscriptions: vec![SubItem { + sub_type: "CALLBACK".to_string(), + topic: "/v1.0/im/bot/messages/get".to_string(), + }], + ua: "openfang/0.3", + local_ip: "", + }; + let resp: OpenConnectionResponse = http + .post(format!("{API_BASE}/v1.0/gateway/connections/open")) + .header("x-acs-dingtalk-access-token", token) + .json(&body) + .send() + .await? + .error_for_status()? + .json() + .await?; + let sep = if resp.endpoint.contains('?') { "&" } else { "?" }; + Ok(format!("{}{}ticket={}", resp.endpoint, sep, resp.ticket)) +} + +// ─── Frame handling ────────────────────────────────────────────────────────── + +#[derive(Deserialize)] +struct ProtoFrame { + #[serde(rename = "type")] + msg_type: String, + headers: ProtoHeaders, + #[serde(default)] + data: serde_json::Value, +} + +#[derive(Deserialize)] +struct ProtoHeaders { + #[serde(rename = "messageId", default)] + message_id: String, + #[serde(default)] + topic: String, +} + +#[derive(Serialize)] +struct AckReply { + code: u32, + headers: AckHeaders, + message: String, + data: String, +} + +#[derive(Serialize)] +struct AckHeaders { + #[serde(rename = "contentType")] + content_type: String, + #[serde(rename = "messageId")] + message_id: String, + topic: String, +} + +fn make_ack(message_id: &str, topic: &str) -> String { + serde_json::to_string(&AckReply { + code: 200, + headers: AckHeaders { + content_type: "application/json".to_string(), + message_id: message_id.to_string(), + topic: topic.to_string(), + }, + message: "OK".to_string(), + data: String::new(), + }) + .unwrap_or_default() +} + +#[derive(Deserialize)] +struct CallbackPayload { + #[serde(rename = "msgtype", default)] + msg_type: String, + #[serde(default)] + text: Option, + #[serde(rename = "senderStaffId", default)] + sender_staff_id: String, + #[serde(rename = "senderId", default)] + sender_id: String, + #[serde(rename = "senderNick", default)] + sender_nick: String, + #[serde(rename = "conversationId", default)] + conversation_id: String, + #[serde(rename = "conversationType", default)] + conversation_type: String, + #[serde(rename = "messageId", default)] + message_id: String, +} + +#[derive(Deserialize)] +struct TextContent { + content: String, +} + +async fn handle_frame(text: &str, sink: &mut S, tx: &mpsc::Sender) +where + S: SinkExt + Unpin, + >::Error: std::fmt::Display, +{ + let frame: ProtoFrame = match serde_json::from_str(text) { + Ok(f) => f, + Err(e) => { + warn!("DingTalk Stream: bad frame: {e}"); + return; + } + }; + + let mid = &frame.headers.message_id; + let topic = &frame.headers.topic; + + match frame.msg_type.as_str() { + "SYSTEM" if topic == "ping" => { + let _ = sink.send(Message::Text(make_ack(mid, "pong"))).await; + } + "CALLBACK" | "EVENT" => { + let data_str = frame.data.to_string(); + // Try direct parse, then try unwrapping double-encoded string + let cb: Option = serde_json::from_str(&data_str) + .ok() + .or_else(|| { + serde_json::from_str::(&data_str) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + }); + + if let Some(cb) = cb { + if cb.msg_type == "text" { + if let Some(ref tc) = cb.text { + let trimmed = tc.content.trim().to_string(); + if !trimmed.is_empty() { + let content = if trimmed.starts_with('/') { + let parts: Vec<&str> = trimmed.splitn(2, ' ').collect(); + let cmd = parts[0].trim_start_matches('/'); + let args: Vec = parts + .get(1) + .map(|a| { + a.split_whitespace().map(String::from).collect() + }) + .unwrap_or_default(); + ChannelContent::Command { + name: cmd.to_string(), + args, + } + } else { + ChannelContent::Text(trimmed) + }; + + let mut meta = HashMap::new(); + meta.insert( + "conversation_id".to_string(), + serde_json::Value::String(cb.conversation_id), + ); + + let uid = if cb.sender_staff_id.is_empty() { + cb.sender_id + } else { + cb.sender_staff_id + }; + + let msg = ChannelMessage { + channel: ChannelType::Custom("dingtalk_stream".to_string()), + platform_message_id: cb.message_id, + sender: ChannelUser { + platform_id: uid, + display_name: cb.sender_nick, + openfang_user: None, + }, + content, + target_agent: None, + timestamp: Utc::now(), + is_group: cb.conversation_type == "2", + thread_id: None, + metadata: meta, + }; + + if tx.send(msg).await.is_err() { + error!("DingTalk Stream: channel receiver dropped"); + } + } + } + } + } + + let _ = sink.send(Message::Text(make_ack(mid, topic))).await; + } + _ => { + let _ = sink.send(Message::Text(make_ack(mid, topic))).await; + } + } +} + +fn backoff(attempt: u32) -> Duration { + let ms = (1000u64 * 2u64.saturating_pow(attempt.min(6))).min(60_000); + Duration::from_millis(ms) +} + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn adapter_creation() { + let a = DingTalkStreamAdapter::new("k".into(), "s".into(), "r".into()); + assert_eq!(a.name(), "dingtalk_stream"); + assert_eq!( + a.channel_type(), + ChannelType::Custom("dingtalk_stream".to_string()) + ); + } + + #[test] + fn backoff_doubles() { + assert_eq!(backoff(0), Duration::from_millis(1000)); + assert_eq!(backoff(1), Duration::from_millis(2000)); + assert_eq!(backoff(2), Duration::from_millis(4000)); + } + + #[test] + fn backoff_capped() { + assert_eq!(backoff(10), Duration::from_millis(60_000)); + assert_eq!(backoff(20), Duration::from_millis(60_000)); + } + + #[test] + fn make_ack_valid_json() { + let ack = make_ack("msg1", "topic1"); + let v: serde_json::Value = serde_json::from_str(&ack).unwrap(); + assert_eq!(v["code"], 200); + assert_eq!(v["headers"]["messageId"], "msg1"); + } +} diff --git a/crates/openfang-channels/src/lib.rs b/crates/openfang-channels/src/lib.rs index 978c202e7..97db72e1a 100644 --- a/crates/openfang-channels/src/lib.rs +++ b/crates/openfang-channels/src/lib.rs @@ -43,6 +43,7 @@ pub mod twist; pub mod webex; // Wave 5 — Niche & differentiating channels pub mod dingtalk; +pub mod dingtalk_stream; pub mod discourse; pub mod gitter; pub mod gotify; diff --git a/crates/openfang-cli/src/tui/screens/channels.rs b/crates/openfang-cli/src/tui/screens/channels.rs index 8bcf0508a..aa51b432e 100644 --- a/crates/openfang-cli/src/tui/screens/channels.rs +++ b/crates/openfang-cli/src/tui/screens/channels.rs @@ -200,7 +200,14 @@ const CHANNEL_DEFS: &[ChannelDef] = &[ display_name: "DingTalk", category: "Enterprise", env_vars: &["DINGTALK_ACCESS_TOKEN", "DINGTALK_SECRET"], - description: "DingTalk Robot API adapter", + description: "DingTalk Robot API adapter (webhook mode)", + }, + ChannelDef { + name: "dingtalk_stream", + display_name: "DingTalk Stream", + category: "Enterprise", + env_vars: &["DINGTALK_APP_KEY", "DINGTALK_APP_SECRET", "DINGTALK_ROBOT_CODE"], + description: "DingTalk Stream Mode (WebSocket long-connection)", }, ChannelDef { name: "pumble", diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index 617c09bfc..0bd5b061f 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -1507,8 +1507,10 @@ pub struct ChannelsConfig { // Wave 5 — Niche & differentiating channels /// Mumble text chat configuration (None = disabled). pub mumble: Option, - /// DingTalk robot configuration (None = disabled). + /// DingTalk robot configuration — webhook mode (None = disabled). pub dingtalk: Option, + /// DingTalk Stream mode — long-lived WebSocket (None = disabled). + pub dingtalk_stream: Option, /// Discourse forum configuration (None = disabled). pub discourse: Option, /// Gitter streaming configuration (None = disabled). @@ -2566,6 +2568,39 @@ impl Default for DingTalkConfig { } } +/// DingTalk Stream channel adapter configuration. +/// +/// Uses the DingTalk Stream Mode (WebSocket long-connection) instead of +/// the legacy webhook approach. Requires an Enterprise Internal App with +/// Stream Mode enabled in the DingTalk Open Platform console. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct DingTalkStreamConfig { + /// Env var holding the App Key (client_id). + pub app_key_env: String, + /// Env var holding the App Secret (client_secret). + pub app_secret_env: String, + /// Robot code for outbound batchSend (often same as app_key). + pub robot_code_env: String, + /// Default agent name to route messages to. + pub default_agent: Option, + /// Per-channel behavior overrides. + #[serde(default)] + pub overrides: ChannelOverrides, +} + +impl Default for DingTalkStreamConfig { + fn default() -> Self { + Self { + app_key_env: "DINGTALK_APP_KEY".to_string(), + app_secret_env: "DINGTALK_APP_SECRET".to_string(), + robot_code_env: "DINGTALK_ROBOT_CODE".to_string(), + default_agent: None, + overrides: ChannelOverrides::default(), + } + } +} + /// Discourse forum channel adapter configuration. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] @@ -3081,6 +3116,26 @@ impl KernelConfig { )); } } + if let Some(ref ds) = self.channels.dingtalk_stream { + if std::env::var(&ds.app_key_env) + .unwrap_or_default() + .is_empty() + { + warnings.push(format!( + "DingTalk Stream configured but {} is not set", + ds.app_key_env + )); + } + if std::env::var(&ds.app_secret_env) + .unwrap_or_default() + .is_empty() + { + warnings.push(format!( + "DingTalk Stream configured but {} is not set", + ds.app_secret_env + )); + } + } if let Some(ref dc) = self.channels.discourse { if std::env::var(&dc.api_key_env) .unwrap_or_default()