diff --git a/src-tauri/src/proxy/providers/streaming.rs b/src-tauri/src/proxy/providers/streaming.rs index 65a10260e..f6c70d919 100644 --- a/src-tauri/src/proxy/providers/streaming.rs +++ b/src-tauri/src/proxy/providers/streaming.rs @@ -2,7 +2,7 @@ //! //! 实现 OpenAI SSE → Anthropic SSE 格式转换 -use crate::proxy::sse::strip_sse_field; +use crate::proxy::sse::{strip_sse_field, take_sse_block}; use bytes::Bytes; use futures::stream::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; @@ -110,10 +110,7 @@ pub fn create_anthropic_sse_stream( let text = String::from_utf8_lossy(&bytes); buffer.push_str(&text); - while let Some(pos) = buffer.find("\n\n") { - let line = buffer[..pos].to_string(); - buffer = buffer[pos + 2..].to_string(); - + while let Some(line) = take_sse_block(&mut buffer) { if line.trim().is_empty() { continue; } diff --git a/src-tauri/src/proxy/providers/streaming_responses.rs b/src-tauri/src/proxy/providers/streaming_responses.rs index ea9274ff8..02ec7a661 100644 --- a/src-tauri/src/proxy/providers/streaming_responses.rs +++ b/src-tauri/src/proxy/providers/streaming_responses.rs @@ -9,7 +9,7 @@ //! 与 Chat Completions 的 delta chunk 模型完全不同,需要独立的状态机处理。 use super::transform_responses::{build_anthropic_usage_from_responses, map_responses_stop_reason}; -use crate::proxy::sse::strip_sse_field; +use crate::proxy::sse::{strip_sse_field, take_sse_block}; use bytes::Bytes; use futures::stream::{Stream, StreamExt}; use serde_json::{json, Value}; @@ -121,11 +121,8 @@ pub fn create_anthropic_sse_stream_from_responses(line: &'a str, field: &str) -> Option<&'a str> .or_else(|| line.strip_prefix(&format!("{field}:"))) } +pub(crate) fn take_sse_block(buffer: &mut String) -> Option { + let lf = buffer.find("\n\n"); + let crlf = buffer.find("\r\n\r\n"); + let (pos, delim_len) = match (lf, crlf) { + (Some(lf_pos), Some(crlf_pos)) => { + if lf_pos < crlf_pos { + (lf_pos, 2) + } else { + (crlf_pos, 4) + } + } + (Some(lf_pos), None) => (lf_pos, 2), + (None, Some(crlf_pos)) => (crlf_pos, 4), + (None, None) => return None, + }; + + let mut block = buffer[..pos].to_string(); + *buffer = buffer[pos + delim_len..].to_string(); + + if block.contains('\r') { + block = block.replace("\r\n", "\n"); + block = block.replace('\r', "\n"); + } + + Some(block) +} + #[cfg(test)] mod tests { - use super::strip_sse_field; + use super::{strip_sse_field, take_sse_block}; #[test] fn strip_sse_field_accepts_optional_space() { @@ -28,4 +55,20 @@ mod tests { ); assert_eq!(strip_sse_field("id:1", "data"), None); } + + #[test] + fn take_sse_block_handles_crlf_delimiter() { + let mut buffer = "data: {\"ok\":true}\r\n\r\nrest".to_string(); + let block = take_sse_block(&mut buffer).expect("block"); + assert_eq!(block, "data: {\"ok\":true}"); + assert_eq!(buffer, "rest"); + } + + #[test] + fn take_sse_block_handles_lf_delimiter() { + let mut buffer = "event: ping\n\ndata:1".to_string(); + let block = take_sse_block(&mut buffer).expect("block"); + assert_eq!(block, "event: ping"); + assert_eq!(buffer, "data:1"); + } }