Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ nonstream-keepalive-interval: 0
# - name: "openrouter" # The name of the provider; it will be used in the user agent and other places.
# prefix: "test" # optional: require calls like "test/kimi-k2" to target this provider's credentials
# base-url: "https://openrouter.ai/api/v1" # The base URL of the provider.
# # force-upstream-stream: true # optional: always call upstream with stream=true and aggregate SSE for downstream non-stream requests
# headers:
# X-Custom-Header: "custom-value"
# api-key-entries:
Expand Down
4 changes: 4 additions & 0 deletions internal/api/handlers/management/config_lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ func (h *Handler) PatchOpenAICompat(c *gin.Context) {
Name *string `json:"name"`
Prefix *string `json:"prefix"`
BaseURL *string `json:"base-url"`
ForceUpstreamStream *bool `json:"force-upstream-stream"`
APIKeyEntries *[]config.OpenAICompatibilityAPIKey `json:"api-key-entries"`
Models *[]config.OpenAICompatibilityModel `json:"models"`
Headers *map[string]string `json:"headers"`
Expand Down Expand Up @@ -445,6 +446,9 @@ func (h *Handler) PatchOpenAICompat(c *gin.Context) {
}
entry.BaseURL = trimmed
}
if body.Value.ForceUpstreamStream != nil {
entry.ForceUpstreamStream = *body.Value.ForceUpstreamStream
}
if body.Value.APIKeyEntries != nil {
entry.APIKeyEntries = append([]config.OpenAICompatibilityAPIKey(nil), (*body.Value.APIKeyEntries)...)
}
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,9 @@ type OpenAICompatibility struct {
// BaseURL is the base URL for the external OpenAI-compatible API endpoint.
BaseURL string `yaml:"base-url" json:"base-url"`

// ForceUpstreamStream forces upstream stream=true for non-stream downstream requests.
ForceUpstreamStream bool `yaml:"force-upstream-stream,omitempty" json:"force-upstream-stream,omitempty"`

// APIKeyEntries defines API keys with optional per-key proxy configuration.
APIKeyEntries []OpenAICompatibilityAPIKey `yaml:"api-key-entries,omitempty" json:"api-key-entries,omitempty"`

Expand Down
32 changes: 28 additions & 4 deletions internal/runtime/executor/openai_compat_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,20 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
originalPayloadSource = opts.OriginalRequest
}
originalPayload := originalPayloadSource
originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, opts.Stream)
translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, opts.Stream)
forceUpstreamStream := e.shouldForceUpstreamStream(auth) && opts.Alt != "responses/compact"
upstreamStream := forceUpstreamStream
originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, upstreamStream)
translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, upstreamStream)
requestedModel := payloadRequestedModel(opts, req.Model)
translated = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", translated, originalTranslated, requestedModel)
if opts.Alt == "responses/compact" {
if updated, errDelete := sjson.DeleteBytes(translated, "stream"); errDelete == nil {
translated = updated
}
} else if forceUpstreamStream {
if updated, errSet := sjson.SetBytes(translated, "stream", true); errSet == nil {
translated = updated
}
}
Comment on lines +97 to 110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for setting the upstream stream parameter seems to have some redundancy. The upstreamStream variable is passed to sdktranslator.TranslateRequest, which should handle setting the stream property in the request payload. The ExecuteStream function already relies on this behavior. The subsequent else if forceUpstreamStream block, which also sets stream: true, appears to be redundant and can be removed for clarity. Additionally, the upstreamStream variable is an alias for forceUpstreamStream in this context and can be removed to simplify the code.

 	originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, forceUpstreamStream)
 	translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, forceUpstreamStream)
 	requestedModel := payloadRequestedModel(opts, req.Model)
 	translated = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", translated, originalTranslated, requestedModel)
 	if opts.Alt == "responses/compact" {
 		if updated, errDelete := sjson.DeleteBytes(translated, "stream"); errDelete == nil {
 			translated = updated
 		}
 	}


translated, err = thinking.ApplyThinking(translated, req.Model, from.String(), to.String(), e.Identifier())
Expand All @@ -118,6 +124,10 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
httpReq.Header.Set("Authorization", "Bearer "+apiKey)
}
httpReq.Header.Set("User-Agent", "cli-proxy-openai-compat")
if forceUpstreamStream {
httpReq.Header.Set("Accept", "text/event-stream")
httpReq.Header.Set("Cache-Control", "no-cache")
}
var attrs map[string]string
if auth != nil {
attrs = auth.Attributes
Expand Down Expand Up @@ -166,12 +176,21 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, body)
reporter.publish(ctx, parseOpenAIUsage(body))
bodyForTranslation := body
usageDetail := parseOpenAIUsage(body)
if forceUpstreamStream {
bodyForTranslation, usageDetail, err = aggregateOpenAIChatCompletionSSE(body)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return resp, err
}
}
reporter.publish(ctx, usageDetail)
// Ensure we at least record the request even if upstream doesn't return usage
reporter.ensurePublished(ctx)
// Translate response back to source format when needed
var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, body, &param)
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, bodyForTranslation, &param)
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
return resp, nil
}
Expand Down Expand Up @@ -374,6 +393,11 @@ func (e *OpenAICompatExecutor) resolveCompatConfig(auth *cliproxyauth.Auth) *con
return nil
}

func (e *OpenAICompatExecutor) shouldForceUpstreamStream(auth *cliproxyauth.Auth) bool {
compat := e.resolveCompatConfig(auth)
return compat != nil && compat.ForceUpstreamStream
}

func (e *OpenAICompatExecutor) overrideModel(payload []byte, model string) []byte {
if len(payload) == 0 || model == "" {
return payload
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package executor

import (
"context"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
"github.com/tidwall/gjson"
)

func TestOpenAICompatExecutor_ForceUpstreamStreamAggregatesReasoningAndContent(t *testing.T) {
var gotAccept string
var gotBody []byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotAccept = r.Header.Get("Accept")
body, _ := io.ReadAll(r.Body)
gotBody = body
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte("data: {\"id\":\"chatcmpl-1\",\"object\":\"chat.completion.chunk\",\"created\":1710000000,\"model\":\"glm-5\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning_content\":\"r1\"}}]}\n\n"))
_, _ = w.Write([]byte("data: {\"id\":\"chatcmpl-1\",\"object\":\"chat.completion.chunk\",\"created\":1710000000,\"model\":\"glm-5\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi \"}}]}\n\n"))
_, _ = w.Write([]byte("data: {\"id\":\"chatcmpl-1\",\"object\":\"chat.completion.chunk\",\"created\":1710000000,\"model\":\"glm-5\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"there\"}}],\"usage\":{\"prompt_tokens\":1,\"completion_tokens\":2,\"total_tokens\":3}}\n\n"))
_, _ = w.Write([]byte("data: [DONE]\n\n"))
}))
defer server.Close()

cfg := &config.Config{OpenAICompatibility: []config.OpenAICompatibility{{
Name: "iamai",
BaseURL: server.URL + "/v1",
ForceUpstreamStream: true,
Models: []config.OpenAICompatibilityModel{{
Name: "glm-5",
Alias: "glm-5",
}},
}}}

executor := NewOpenAICompatExecutor("openai-compatibility", cfg)
auth := &cliproxyauth.Auth{Attributes: map[string]string{
"base_url": server.URL + "/v1",
"api_key": "test",
"compat_name": "iamai",
}}
payload := []byte(`{"model":"glm-5","messages":[{"role":"user","content":"hi"}]}`)

resp, err := executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
Model: "glm-5",
Payload: payload,
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai"),
Stream: false,
})
if err != nil {
t.Fatalf("Execute error: %v", err)
}
if gotAccept != "text/event-stream" {
t.Fatalf("expected Accept text/event-stream, got %q", gotAccept)
}
if !gjson.GetBytes(gotBody, "stream").Bool() {
t.Fatalf("expected upstream payload to include stream=true")
}
if !gjson.ValidBytes(resp.Payload) {
t.Fatalf("expected valid JSON response, got: %s", string(resp.Payload))
}
if gjson.GetBytes(resp.Payload, "choices.0.message.content").String() != "hi there" {
t.Fatalf("content mismatch: %s", gjson.GetBytes(resp.Payload, "choices.0.message.content").String())
}
if gjson.GetBytes(resp.Payload, "choices.0.message.reasoning_content").String() != "r1" {
t.Fatalf("reasoning mismatch: %s", gjson.GetBytes(resp.Payload, "choices.0.message.reasoning_content").String())
}
if gjson.GetBytes(resp.Payload, "choices.0.finish_reason").String() != "stop" {
t.Fatalf("expected finish_reason stop, got %s", gjson.GetBytes(resp.Payload, "choices.0.finish_reason").String())
}
if gjson.GetBytes(resp.Payload, "usage.prompt_tokens").Int() != 1 {
t.Fatalf("expected usage prompt_tokens")
}
}

func TestOpenAICompatExecutor_ForceUpstreamStream_ToolCallsOnlyFinishReasonIsToolCalls(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte("data: {\"id\":\"chatcmpl-2\",\"object\":\"chat.completion.chunk\",\"created\":1710000001,\"model\":\"glm-5\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_1\",\"type\":\"function\",\"function\":{\"name\":\"read\",\"arguments\":\"{\\\"path\\\": \\\"\"}}]}}]}\n\n"))
_, _ = w.Write([]byte("data: {\"id\":\"chatcmpl-2\",\"object\":\"chat.completion.chunk\",\"created\":1710000001,\"model\":\"glm-5\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"/tmp/test\\\"}\"}}]}}]}\n\n"))
_, _ = w.Write([]byte("data: [DONE]\n\n"))
}))
defer server.Close()

cfg := &config.Config{OpenAICompatibility: []config.OpenAICompatibility{{
Name: "iamai",
BaseURL: server.URL + "/v1",
ForceUpstreamStream: true,
Models: []config.OpenAICompatibilityModel{{
Name: "glm-5",
Alias: "glm-5",
}},
}}}

executor := NewOpenAICompatExecutor("openai-compatibility", cfg)
auth := &cliproxyauth.Auth{Attributes: map[string]string{
"base_url": server.URL + "/v1",
"api_key": "test",
"compat_name": "iamai",
}}
payload := []byte(`{"model":"glm-5","messages":[{"role":"user","content":"hi"}]}`)

resp, err := executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
Model: "glm-5",
Payload: payload,
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai"),
Stream: false,
})
if err != nil {
t.Fatalf("Execute error: %v", err)
}
if !gjson.ValidBytes(resp.Payload) {
t.Fatalf("expected valid JSON response")
}
calls := gjson.GetBytes(resp.Payload, "choices.0.message.tool_calls")
if !calls.Exists() || len(calls.Array()) != 1 {
t.Fatalf("expected one tool_call, got: %s", calls.String())
}
if gjson.GetBytes(resp.Payload, "choices.0.message.tool_calls.0.function.name").String() != "read" {
t.Fatalf("tool_call name mismatch")
}
if gjson.GetBytes(resp.Payload, "choices.0.message.tool_calls.0.function.arguments").String() != `{"path": "/tmp/test"}` {
t.Fatalf("tool_call arguments mismatch: %s", gjson.GetBytes(resp.Payload, "choices.0.message.tool_calls.0.function.arguments").String())
}
if gjson.GetBytes(resp.Payload, "choices.0.finish_reason").String() != "tool_calls" {
t.Fatalf("expected finish_reason tool_calls, got: %s", gjson.GetBytes(resp.Payload, "choices.0.finish_reason").String())
}
}

func TestOpenAICompatExecutor_DefaultBehaviorUnchanged(t *testing.T) {
var gotBody []byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
gotBody = body
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"id":"chatcmpl-3","object":"chat.completion","choices":[{"index":0,"message":{"role":"assistant","content":"ok"},"finish_reason":"stop"}]}`))
}))
defer server.Close()

cfg := &config.Config{OpenAICompatibility: []config.OpenAICompatibility{{
Name: "iamai",
BaseURL: server.URL + "/v1",
Models: []config.OpenAICompatibilityModel{{
Name: "glm-5",
Alias: "glm-5",
}},
}}}

executor := NewOpenAICompatExecutor("openai-compatibility", cfg)
auth := &cliproxyauth.Auth{Attributes: map[string]string{
"base_url": server.URL + "/v1",
"api_key": "test",
"compat_name": "iamai",
}}
payload := []byte(`{"model":"glm-5","messages":[{"role":"user","content":"hi"}]}`)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

resp, err := executor.Execute(ctx, auth, cliproxyexecutor.Request{
Model: "glm-5",
Payload: payload,
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai"),
Stream: false,
})
if err != nil {
t.Fatalf("Execute error: %v", err)
}
if gjson.GetBytes(gotBody, "stream").Exists() {
t.Fatalf("did not expect stream=true in payload")
}
if !gjson.ValidBytes(resp.Payload) {
t.Fatalf("expected valid JSON response")
}
}
Loading
Loading