From 7b2f0b7ee2c5c1e1a0042aaadb7599277c1ad073 Mon Sep 17 00:00:00 2001 From: Mo Li <121499347+lijirou12@users.noreply.github.com> Date: Thu, 26 Feb 2026 21:05:07 +0800 Subject: [PATCH] fix: normalize SOCKS proxy scheme for app chat requests --- app/api/v1/admin_api/config.py | 6 +- app/api/v1/chat.py | 133 ++++++++++++++++++----- app/services/grok/services/chat.py | 16 ++- app/services/grok/services/image.py | 23 ++++ app/services/grok/services/image_edit.py | 35 +++--- app/services/grok/services/model.py | 12 ++ app/services/grok/utils/retry.py | 22 +++- app/services/reverse/app_chat.py | 42 ++++++- app/static/admin/js/config.js | 24 ++++ config.defaults.toml | 11 ++ readme.md | 5 +- 11 files changed, 275 insertions(+), 54 deletions(-) diff --git a/app/api/v1/admin_api/config.py b/app/api/v1/admin_api/config.py index f843b76b4..f0a9c2d0b 100644 --- a/app/api/v1/admin_api/config.py +++ b/app/api/v1/admin_api/config.py @@ -4,7 +4,7 @@ from app.core.auth import verify_app_key from app.core.config import config -from app.core.storage import get_storage, LocalStorage, RedisStorage, SQLStorage +from app.core.storage import get_storage as resolve_storage, LocalStorage, RedisStorage, SQLStorage router = APIRouter() @@ -33,11 +33,11 @@ async def update_config(data: dict): @router.get("/storage", dependencies=[Depends(verify_app_key)]) -async def get_storage(): +async def get_storage_mode(): """获取当前存储模式""" storage_type = os.getenv("SERVER_STORAGE_TYPE", "").lower() if not storage_type: - storage = get_storage() + storage = resolve_storage() if isinstance(storage, LocalStorage): storage_type = "local" elif isinstance(storage, RedisStorage): diff --git a/app/api/v1/chat.py b/app/api/v1/chat.py index ace5e701d..b87e1447c 100644 --- a/app/api/v1/chat.py +++ b/app/api/v1/chat.py @@ -2,7 +2,7 @@ Chat Completions API 路由 """ -from typing import Any, Dict, List, Optional, Union +from typing import Any, AsyncGenerator, AsyncIterable, Dict, List, Optional, Union import base64 import binascii import time @@ -11,6 +11,7 @@ from fastapi import APIRouter from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field +import orjson from app.services.grok.services.chat import ChatService from app.services.grok.services.image import ImageGenerationService @@ -78,6 +79,7 @@ class ChatCompletionRequest(BaseModel): "1024x1792", "1024x1024", } +SUPERIMAGE_MODEL_ID = "grok-superimage-1.0" def _validate_media_input(value: str, field_name: str, param: str): @@ -165,6 +167,73 @@ def _image_field(response_format: str) -> str: return "url" return "b64_json" + +def _superimage_server_image_config() -> ImageConfig: + """Load server-side image generation parameters for grok-superimage-1.0.""" + n = int(get_config("superimage.n", 1) or 1) + size = str(get_config("superimage.size", "1024x1024") or "1024x1024") + response_format = str( + get_config("superimage.response_format", get_config("app.image_format") or "url") + or "url" + ) + return ImageConfig(n=n, size=size, response_format=response_format) + + +async def _safe_sse_stream(stream: AsyncIterable[str]) -> AsyncGenerator[str, None]: + """Ensure streaming endpoints return SSE error payloads instead of transport-level 5xx breaks.""" + try: + async for chunk in stream: + yield chunk + except AppException as e: + payload = { + "error": { + "message": e.message, + "type": e.error_type, + "code": e.code, + } + } + yield f"event: error\ndata: {orjson.dumps(payload).decode()}\n\n" + yield "data: [DONE]\n\n" + except Exception as e: + payload = { + "error": { + "message": str(e) or "stream_error", + "type": "server_error", + "code": "stream_error", + } + } + yield f"event: error\ndata: {orjson.dumps(payload).decode()}\n\n" + yield "data: [DONE]\n\n" + + +def _streaming_error_response(exc: Exception) -> StreamingResponse: + if isinstance(exc, AppException): + payload = { + "error": { + "message": exc.message, + "type": exc.error_type, + "code": exc.code, + } + } + else: + payload = { + "error": { + "message": str(exc) or "stream_error", + "type": "server_error", + "code": "stream_error", + } + } + + async def _one_shot_error() -> AsyncGenerator[str, None]: + yield f"event: error\ndata: {orjson.dumps(payload).decode()}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse( + _one_shot_error(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}, + ) + def _validate_image_config(image_conf: ImageConfig, *, stream: bool): n = image_conf.n or 1 if n < 1 or n > 10: @@ -514,7 +583,7 @@ def validate_request(request: ChatCompletionRequest): param="messages", code="empty_prompt", ) - image_conf = request.image_config or ImageConfig() + image_conf = _superimage_server_image_config() if request.model == SUPERIMAGE_MODEL_ID else (request.image_config or ImageConfig()) n = image_conf.n or 1 if not (1 <= n <= 10): raise ValidationException( @@ -669,7 +738,7 @@ async def chat_completions(request: ChatCompletionRequest): if result.stream: return StreamingResponse( - result.data, + _safe_sse_stream(result.data), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}, ) @@ -685,7 +754,7 @@ async def chat_completions(request: ChatCompletionRequest): is_stream = ( request.stream if request.stream is not None else get_config("app.stream") ) - image_conf = request.image_config or ImageConfig() + image_conf = _superimage_server_image_config() if request.model == SUPERIMAGE_MODEL_ID else (request.image_config or ImageConfig()) _validate_image_config(image_conf, stream=bool(is_stream)) response_format = _resolve_image_format(image_conf.response_format) response_field = _image_field(response_format) @@ -732,7 +801,7 @@ async def chat_completions(request: ChatCompletionRequest): if result.stream: return StreamingResponse( - result.data, + _safe_sse_stream(result.data), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}, ) @@ -747,34 +816,44 @@ async def chat_completions(request: ChatCompletionRequest): # 提取视频配置 (默认值在 Pydantic 模型中处理) v_conf = request.video_config or VideoConfig() - result = await VideoService.completions( - model=request.model, - messages=[msg.model_dump() for msg in request.messages], - stream=request.stream, - reasoning_effort=request.reasoning_effort, - aspect_ratio=v_conf.aspect_ratio, - video_length=v_conf.video_length, - resolution=v_conf.resolution_name, - preset=v_conf.preset, - ) + try: + result = await VideoService.completions( + model=request.model, + messages=[msg.model_dump() for msg in request.messages], + stream=request.stream, + reasoning_effort=request.reasoning_effort, + aspect_ratio=v_conf.aspect_ratio, + video_length=v_conf.video_length, + resolution=v_conf.resolution_name, + preset=v_conf.preset, + ) + except Exception as e: + if request.stream is not False: + return _streaming_error_response(e) + raise else: - result = await ChatService.completions( - model=request.model, - messages=[msg.model_dump() for msg in request.messages], - stream=request.stream, - reasoning_effort=request.reasoning_effort, - temperature=request.temperature, - top_p=request.top_p, - tools=request.tools, - tool_choice=request.tool_choice, - parallel_tool_calls=request.parallel_tool_calls, - ) + try: + result = await ChatService.completions( + model=request.model, + messages=[msg.model_dump() for msg in request.messages], + stream=request.stream, + reasoning_effort=request.reasoning_effort, + temperature=request.temperature, + top_p=request.top_p, + tools=request.tools, + tool_choice=request.tool_choice, + parallel_tool_calls=request.parallel_tool_calls, + ) + except Exception as e: + if request.stream is not False: + return _streaming_error_response(e) + raise if isinstance(result, dict): return JSONResponse(content=result) else: return StreamingResponse( - result, + _safe_sse_stream(result), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}, ) diff --git a/app/services/grok/services/chat.py b/app/services/grok/services/chat.py index b8166793c..673cf0708 100644 --- a/app/services/grok/services/chat.py +++ b/app/services/grok/services/chat.py @@ -22,7 +22,7 @@ from app.services.grok.services.model import ModelService from app.services.grok.utils.upload import UploadService from app.services.grok.utils import process as proc_base -from app.services.grok.utils.retry import pick_token, rate_limited +from app.services.grok.utils.retry import pick_token, rate_limited, transient_upstream from app.services.reverse.app_chat import AppChatReverse from app.services.reverse.utils.session import ResettableSession from app.services.grok.utils.stream import wrap_stream_with_usage @@ -481,6 +481,20 @@ async def completions( ) continue + if transient_upstream(e): + has_alternative_token = False + for pool_name in ModelService.pool_candidates_for_model(model): + if token_mgr.get_token(pool_name, exclude=tried_tokens): + has_alternative_token = True + break + if not has_alternative_token: + raise + logger.warning( + f"Transient upstream error for token {token[:10]}..., " + f"trying next token (attempt {attempt + 1}/{max_token_retries}): {e}" + ) + continue + # 非 429 错误,不换 token,直接抛出 raise diff --git a/app/services/grok/services/image.py b/app/services/grok/services/image.py index ebcbc0d54..d7e0771ca 100644 --- a/app/services/grok/services/image.py +++ b/app/services/grok/services/image.py @@ -436,6 +436,7 @@ def _sse(self, event: str, data: dict) -> str: async def process(self, response: AsyncIterable[dict]) -> AsyncGenerator[str, None]: images: Dict[str, Dict] = {} + emitted_chat_chunk = False async for item in response: if item.get("type") == "error": @@ -475,6 +476,9 @@ async def process(self, response: AsyncIterable[dict]) -> AsyncGenerator[str, No continue if item.get("stage") != "final": + # Chat Completions image stream should only expose final results. + if self.chat_format: + continue if image_id not in self._initial_sent: self._initial_sent.add(image_id) stage = item.get("stage") or "preview" @@ -515,6 +519,7 @@ async def process(self, response: AsyncIterable[dict]) -> AsyncGenerator[str, No if not self._id_generated: self._response_id = make_response_id() self._id_generated = True + emitted_chat_chunk = True yield self._sse( "chat.completion.chunk", make_chat_chunk( @@ -593,6 +598,7 @@ async def process(self, response: AsyncIterable[dict]) -> AsyncGenerator[str, No if self.chat_format: # OpenAI ChatCompletion chunk format + emitted_chat_chunk = True yield self._sse( "chat.completion.chunk", make_chat_chunk( @@ -624,6 +630,23 @@ async def process(self, response: AsyncIterable[dict]) -> AsyncGenerator[str, No }, ) + if self.chat_format: + if not self._id_generated: + self._response_id = make_response_id() + self._id_generated = True + if not emitted_chat_chunk: + yield self._sse( + "chat.completion.chunk", + make_chat_chunk( + self._response_id, + self.model, + "", + index=0, + is_final=True, + ), + ) + yield "data: [DONE]\n\n" + class ImageWSCollectProcessor(ImageWSBaseProcessor): """WebSocket image non-stream processor.""" diff --git a/app/services/grok/services/image_edit.py b/app/services/grok/services/image_edit.py index 96e48e4bb..fe0ad3539 100644 --- a/app/services/grok/services/image_edit.py +++ b/app/services/grok/services/image_edit.py @@ -323,6 +323,7 @@ async def process( ) -> AsyncGenerator[str, None]: """Process stream response.""" final_images = [] + emitted_chat_chunk = False idle_timeout = get_config("image.stream_timeout") try: @@ -347,21 +348,7 @@ async def process( out_index = 0 if self.n == 1 else image_index - if self.chat_format: - # OpenAI ChatCompletion chunk format for partial - if not self._id_generated: - self._response_id = make_response_id() - self._id_generated = True - yield self._sse( - "chat.completion.chunk", - make_chat_chunk( - self._response_id, - self.model, - "", - index=out_index, - ), - ) - else: + if not self.chat_format: yield self._sse( "image_generation.partial_image", { @@ -421,6 +408,7 @@ async def process( if self.chat_format: # OpenAI ChatCompletion chunk format + emitted_chat_chunk = True yield self._sse( "chat.completion.chunk", make_chat_chunk( @@ -450,6 +438,23 @@ async def process( }, }, ) + + if self.chat_format: + if not self._id_generated: + self._response_id = make_response_id() + self._id_generated = True + if not emitted_chat_chunk: + yield self._sse( + "chat.completion.chunk", + make_chat_chunk( + self._response_id, + self.model, + "", + index=0, + is_final=True, + ), + ) + yield "data: [DONE]\n\n" except asyncio.CancelledError: logger.debug("Image stream cancelled by client") except StreamIdleTimeoutError as e: diff --git a/app/services/grok/services/model.py b/app/services/grok/services/model.py index 7ff0fe190..a77f8de76 100644 --- a/app/services/grok/services/model.py +++ b/app/services/grok/services/model.py @@ -174,6 +174,18 @@ class ModelService: is_image_edit=False, is_video=False, ), + ModelInfo( + model_id="grok-superimage-1.0", + grok_model="grok-3", + model_mode="MODEL_MODE_FAST", + tier=Tier.BASIC, + cost=Cost.HIGH, + display_name="Grok SuperImage", + description="Imagine waterfall image generation model for chat completions", + is_image=True, + is_image_edit=False, + is_video=False, + ), ModelInfo( model_id="grok-imagine-1.0", grok_model="grok-3", diff --git a/app/services/grok/utils/retry.py b/app/services/grok/utils/retry.py index e0b1edb5b..f65c43ac3 100644 --- a/app/services/grok/utils/retry.py +++ b/app/services/grok/utils/retry.py @@ -42,4 +42,24 @@ def rate_limited(error: Exception) -> bool: return status == 429 or code == "rate_limit_exceeded" -__all__ = ["pick_token", "rate_limited"] +def transient_upstream(error: Exception) -> bool: + """Whether error is likely transient and safe to retry with another token.""" + if not isinstance(error, UpstreamException): + return False + details = error.details or {} + status = details.get("status") + err = str(details.get("error") or error).lower() + transient_status = {408, 500, 502, 503, 504} + if status in transient_status: + return True + timeout_markers = ( + "timed out", + "timeout", + "connection reset", + "temporarily unavailable", + "http2", + ) + return any(marker in err for marker in timeout_markers) + + +__all__ = ["pick_token", "rate_limited", "transient_upstream"] diff --git a/app/services/reverse/app_chat.py b/app/services/reverse/app_chat.py index 80a0c8148..227992a5c 100644 --- a/app/services/reverse/app_chat.py +++ b/app/services/reverse/app_chat.py @@ -4,6 +4,7 @@ import orjson from typing import Any, Dict, List, Optional +from urllib.parse import urlparse from curl_cffi.requests import AsyncSession from app.core.logger import logger @@ -16,6 +17,19 @@ CHAT_API = "https://grok.com/rest/app-chat/conversations/new" +def _normalize_chat_proxy(proxy_url: str) -> str: + """Normalize proxy URL for curl-cffi app-chat requests.""" + if not proxy_url: + return proxy_url + parsed = urlparse(proxy_url) + scheme = parsed.scheme.lower() + if scheme == "socks5": + return proxy_url.replace("socks5://", "socks5h://", 1) + if scheme == "socks4": + return proxy_url.replace("socks4://", "socks4a://", 1) + return proxy_url + + class AppChatReverse: """/rest/app-chat/conversations/new reverse interface.""" @@ -102,7 +116,21 @@ async def request( try: # Get proxies base_proxy = get_config("proxy.base_proxy_url") - proxies = {"http": base_proxy, "https": base_proxy} if base_proxy else None + proxy = None + proxies = None + if base_proxy: + normalized_proxy = _normalize_chat_proxy(base_proxy) + scheme = urlparse(normalized_proxy).scheme.lower() + if scheme.startswith("socks"): + # curl_cffi 对 SOCKS 代理优先使用 proxy 参数,避免被按 HTTP CONNECT 处理 + proxy = normalized_proxy + else: + proxies = {"http": normalized_proxy, "https": normalized_proxy} + logger.info( + f"AppChatReverse proxy enabled: scheme={scheme}, target={normalized_proxy}" + ) + else: + logger.warning("AppChatReverse proxy is empty, request will use direct network") # Build headers headers = build_headers( @@ -123,11 +151,12 @@ async def request( ) # Curl Config - timeout = max( - float(get_config("chat.timeout") or 0), - float(get_config("video.timeout") or 0), - float(get_config("image.timeout") or 0), - ) + timeout = float(get_config("chat.timeout") or 0) + if timeout <= 0: + timeout = max( + float(get_config("video.timeout") or 0), + float(get_config("image.timeout") or 0), + ) browser = get_config("proxy.browser") async def _do_request(): @@ -137,6 +166,7 @@ async def _do_request(): data=orjson.dumps(payload), timeout=timeout, stream=True, + proxy=proxy, proxies=proxies, impersonate=browser, ) diff --git a/app/static/admin/js/config.js b/app/static/admin/js/config.js index 08ac5ec23..66b12ee81 100644 --- a/app/static/admin/js/config.js +++ b/app/static/admin/js/config.js @@ -100,6 +100,14 @@ const LOCALE_MAP = { }, + "superimage": { + "label": "SuperImage 配置", + "n": { title: "生成数量", desc: "仅用于 grok-superimage-1.0 的服务端统一生成数量(1-10)。" }, + "size": { title: "图片尺寸", desc: "仅用于 grok-superimage-1.0 的服务端统一尺寸。" }, + "response_format": { title: "响应格式", desc: "仅用于 grok-superimage-1.0 的服务端统一返回格式。" } + }, + + "asset": { "label": "资产配置", "upload_concurrent": { title: "上传并发", desc: "上传接口的最大并发数。推荐 30。" }, @@ -412,6 +420,22 @@ function buildFieldCard(section, key, val) { { val: 'url', text: 'URL' } ]); } + else if (section === 'superimage' && key === 'size') { + built = buildSelectInput(section, key, val, [ + { val: '1024x1024', text: '1024x1024 (1:1)' }, + { val: '1280x720', text: '1280x720 (16:9)' }, + { val: '720x1280', text: '720x1280 (9:16)' }, + { val: '1792x1024', text: '1792x1024 (3:2)' }, + { val: '1024x1792', text: '1024x1792 (2:3)' } + ]); + } + else if (section === 'superimage' && key === 'response_format') { + built = buildSelectInput(section, key, val, [ + { val: 'url', text: 'URL' }, + { val: 'b64_json', text: 'B64 JSON' }, + { val: 'base64', text: 'Base64' } + ]); + } else if (Array.isArray(val) || typeof val === 'object') { built = buildJsonInput(section, key, val); } diff --git a/config.defaults.toml b/config.defaults.toml index d077b6af3..6f2017946 100644 --- a/config.defaults.toml +++ b/config.defaults.toml @@ -110,6 +110,17 @@ medium_min_bytes = 30000 # 判定为最终图的最小字节数 final_min_bytes = 100000 + +# ==================== SuperImage 配置 ==================== +[superimage] +# 仅对 grok-superimage-1.0 生效,由服务端统一控制,不使用客户端 image_config +n = 1 +# 图片尺寸:1280x720 / 720x1280 / 1792x1024 / 1024x1792 / 1024x1024 +size = "1024x1024" +# 响应格式:url / b64_json / base64 +response_format = "url" + + # ==================== 视频配置 ==================== [video] # Reverse 接口并发上限 diff --git a/readme.md b/readme.md index e2953daef..059c3772f 100644 --- a/readme.md +++ b/readme.md @@ -109,6 +109,7 @@ docker compose up -d | `grok-4.1-thinking` | 4 | Basic/Super | 支持 | 支持 | - | | `grok-4.20-beta` | 1 | Basic/Super | 支持 | 支持 | - | | `grok-imagine-1.0` | - | Basic/Super | - | 支持 | - | +| `grok-superimage-1.0` | - | Basic/Super | - | 支持 | - | | `grok-imagine-1.0-edit` | - | Basic/Super | - | 支持 | - | | `grok-imagine-1.0-video` | - | Basic/Super | - | - | 支持 | @@ -151,7 +152,7 @@ curl http://localhost:8000/v1/chat/completions \ | └─`video_length` | integer | 视频时长 (秒) | `6`, `10`, `15` | | └─`resolution_name` | string | 分辨率 | `480p`, `720p` | | └─`preset` | string | 风格预设 | `fun`, `normal`, `spicy`, `custom` | -| `image_config` | object | **图片模型专用配置对象** | 支持:`grok-imagine-1.0` / `grok-imagine-1.0-edit` | +| `image_config` | object | **图片模型专用配置对象** | 支持:`grok-imagine-1.0` / `grok-superimage-1.0` / `grok-imagine-1.0-edit` | | └─`n` | integer | 生成数量 | `1` ~ `10` | | └─`size` | string | 图片尺寸 | `1280x720`, `720x1280`, `1792x1024`, `1024x1792`, `1024x1024` | | └─`response_format` | string | 响应格式 | `url`, `b64_json`, `base64` | @@ -177,6 +178,8 @@ curl http://localhost:8000/v1/chat/completions \ - `image_url/input_audio/file` 仅支持 URL 或 Data URI(`data:;base64,...`),裸 base64 会报错。 - `reasoning_effort`:`none` 表示不输出思考,其他值都会输出思考内容。 - 工具调用为**提示词模拟 + 客户端执行回填**:模型通过 `{...}` 输出调用请求,服务端解析为 `tool_calls`;不执行工具。 +- `grok-superimage-1.0` 与瀑布流 imagine 生成链路一致,可直接通过 `/v1/chat/completions` 调用;其 `n/size/response_format` 由服务端 `[superimage]` 统一控制。 +- `grok-superimage-1.0` 在 `/v1/chat/completions` 的流式输出仅返回最终成图,不返回中间预览图。 - `grok-imagine-1.0-edit` 必须提供图片,多图默认取**最后 3 张**与最后一个文本。 - `grok-imagine-1.0-video` 支持文生视频与图生视频(通过 `image_url` 传参考图,**仅取第 1 张**)。 - 除上述外的其他参数将自动丢弃并忽略。