diff --git a/packages/core/src/providers/anthropic/AnthropicProvider.streamRetry.test.ts b/packages/core/src/providers/anthropic/AnthropicProvider.streamRetry.test.ts new file mode 100644 index 000000000..b37faacdc --- /dev/null +++ b/packages/core/src/providers/anthropic/AnthropicProvider.streamRetry.test.ts @@ -0,0 +1,474 @@ +/** + * @license + * Copyright 2025 Vybestack LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { AnthropicProvider } from './AnthropicProvider.js'; +import { type IContent } from '../../services/history/IContent.js'; +import { createProviderCallOptions } from '../../test-utils/providerCallOptions.js'; + +const retryWithBackoffMock = vi.hoisted(() => vi.fn()); + +vi.mock('../../utils/retry.js', () => ({ + retryWithBackoff: retryWithBackoffMock, + getErrorStatus: vi.fn((error: unknown) => { + if (error && typeof error === 'object' && 'status' in error) { + return (error as { status: number }).status; + } + return undefined; + }), + isNetworkTransientError: vi.fn((error: unknown) => + Boolean( + error && + typeof error === 'object' && + 'message' in error && + String((error as { message?: unknown }).message).includes('terminated'), + ), + ), +})); + +const mockSettingsService = vi.hoisted(() => ({ + set: vi.fn(), + get: vi.fn(), + setProviderSetting: vi.fn(), + getProviderSettings: vi.fn().mockReturnValue({}), + getSettings: vi.fn(), + updateSettings: vi.fn(), + getAllGlobalSettings: vi.fn().mockReturnValue({}), +})); + +vi.mock('../../settings/settingsServiceInstance.js', () => ({ + getSettingsService: () => mockSettingsService, +})); + +vi.mock('../../core/prompts.js', () => ({ + getCoreSystemPromptAsync: vi.fn().mockResolvedValue('system prompt'), +})); + +const delayMock = vi.hoisted(() => vi.fn()); + +vi.mock('../../utils/delay.js', () => ({ + delay: delayMock, + createAbortError: vi.fn(() => new Error('Aborted')), +})); + +const mockAnthropicClient = vi.hoisted(() => ({ + messages: { + create: vi.fn(), + }, +})); + +// Mock the @anthropic-ai/sdk module +vi.mock('@anthropic-ai/sdk', () => ({ + default: vi.fn(() => mockAnthropicClient), +})); + +describe('AnthropicProvider stream retry behavior', () => { + beforeEach(() => { + vi.clearAllMocks(); + mockSettingsService.getSettings.mockResolvedValue({}); + delayMock.mockResolvedValue(undefined); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('retries when streaming terminates after a successful API call', async () => { + // First stream: yields partial content then terminates + const firstStream = (async function* () { + yield { + type: 'message_start', + message: { usage: { input_tokens: 10, output_tokens: 0 } }, + }; + yield { + type: 'content_block_start', + index: 0, + content_block: { type: 'text', text: '' }, + }; + yield { + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'partial' }, + }; + throw new Error('terminated'); + })(); + + // Second stream: completes successfully + const secondStream = (async function* () { + yield { + type: 'message_start', + message: { usage: { input_tokens: 10, output_tokens: 0 } }, + }; + yield { + type: 'content_block_start', + index: 0, + content_block: { type: 'text', text: '' }, + }; + yield { + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'success' }, + }; + yield { type: 'content_block_stop', index: 0 }; + yield { + type: 'message_delta', + delta: { stop_reason: 'end_turn' }, + usage: { output_tokens: 5 }, + }; + })(); + + let callCount = 0; + mockAnthropicClient.messages.create.mockImplementation(() => { + const stream = callCount === 0 ? firstStream : secondStream; + callCount++; + return { + withResponse: () => + Promise.resolve({ + data: stream, + response: { headers: new Headers() }, + }), + }; + }); + + retryWithBackoffMock.mockImplementation( + async (fn: () => Promise) => fn(), + ); + + const provider = new AnthropicProvider('test-key'); + + const generator = provider.generateChatCompletion( + createProviderCallOptions({ + providerName: provider.name, + contents: [ + { + speaker: 'human', + blocks: [{ type: 'text', text: 'hello' }], + }, + ] as IContent[], + }), + ); + + const chunks: IContent[] = []; + for await (const chunk of generator) { + chunks.push(chunk); + } + + expect(mockAnthropicClient.messages.create).toHaveBeenCalledTimes(2); + const texts = chunks + .flatMap((c) => c.blocks) + .filter((b) => b.type === 'text') + .map((b) => ('text' in b ? b.text : '')) + .join(''); + expect(texts).toContain('partial'); + expect(texts).toContain('success'); + }); + + it('waits with exponential backoff between stream retries', async () => { + const createErrorStream = () => + (async function* () { + yield { + type: 'message_start', + message: { usage: { input_tokens: 10, output_tokens: 0 } }, + }; + throw new Error('terminated'); + })(); + + const successStream = (async function* () { + yield { + type: 'message_start', + message: { usage: { input_tokens: 10, output_tokens: 0 } }, + }; + yield { + type: 'content_block_start', + index: 0, + content_block: { type: 'text', text: '' }, + }; + yield { + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'success' }, + }; + yield { type: 'content_block_stop', index: 0 }; + yield { + type: 'message_delta', + delta: { stop_reason: 'end_turn' }, + usage: { output_tokens: 5 }, + }; + })(); + + let callCount = 0; + mockAnthropicClient.messages.create.mockImplementation(() => { + const stream = callCount < 2 ? createErrorStream() : successStream; + callCount++; + return { + withResponse: () => + Promise.resolve({ + data: stream, + response: { headers: new Headers() }, + }), + }; + }); + + retryWithBackoffMock.mockImplementation( + async (fn: () => Promise) => fn(), + ); + + const provider = new AnthropicProvider('test-key'); + + const generator = provider.generateChatCompletion( + createProviderCallOptions({ + providerName: provider.name, + contents: [ + { + speaker: 'human', + blocks: [{ type: 'text', text: 'hello' }], + }, + ] as IContent[], + ephemerals: { + retrywait: 3000, // Override default delay + }, + }), + ); + + const chunks: IContent[] = []; + for await (const chunk of generator) { + chunks.push(chunk); + } + + expect(mockAnthropicClient.messages.create).toHaveBeenCalledTimes(3); + expect(delayMock).toHaveBeenCalledTimes(2); + + const firstDelay = delayMock.mock.calls[0]?.[0] as number; + const secondDelay = delayMock.mock.calls[1]?.[0] as number; + + expect(firstDelay).toBeGreaterThan(0); + expect(secondDelay).toBeGreaterThan(firstDelay); + // Verify initial delay respects ephemeral setting (~3000ms with jitter) + expect(firstDelay).toBeGreaterThan(2100); + expect(firstDelay).toBeLessThan(3900); + }); + + it('fails after max attempts with error thrown', async () => { + const createErrorStream = () => + (async function* () { + yield { + type: 'message_start', + message: { usage: { input_tokens: 10, output_tokens: 0 } }, + }; + throw new Error('terminated'); + })(); + + mockAnthropicClient.messages.create.mockImplementation(() => ({ + withResponse: () => + Promise.resolve({ + data: createErrorStream(), + response: { headers: new Headers() }, + }), + })); + + retryWithBackoffMock.mockImplementation( + async (fn: () => Promise) => fn(), + ); + + const provider = new AnthropicProvider('test-key'); + + const generator = provider.generateChatCompletion( + createProviderCallOptions({ + providerName: provider.name, + contents: [ + { + speaker: 'human', + blocks: [{ type: 'text', text: 'hello' }], + }, + ] as IContent[], + }), + ); + + await expect(async () => { + for await (const _chunk of generator) { + // Should throw before yielding all chunks + } + }).rejects.toThrow('terminated'); + + // Default maxAttempts from getRetryConfig is 6 + expect(mockAnthropicClient.messages.create).toHaveBeenCalledTimes(6); + }); + + it('throws non-retryable errors immediately', async () => { + const createErrorStream = () => + (async function* () { + yield { + type: 'message_start', + message: { usage: { input_tokens: 10, output_tokens: 0 } }, + }; + throw new Error('bad request'); + })(); + + mockAnthropicClient.messages.create.mockImplementation(() => ({ + withResponse: () => + Promise.resolve({ + data: createErrorStream(), + response: { headers: new Headers() }, + }), + })); + + retryWithBackoffMock.mockImplementation( + async (fn: () => Promise) => fn(), + ); + + const provider = new AnthropicProvider('test-key'); + + const generator = provider.generateChatCompletion( + createProviderCallOptions({ + providerName: provider.name, + contents: [ + { + speaker: 'human', + blocks: [{ type: 'text', text: 'hello' }], + }, + ] as IContent[], + }), + ); + + await expect(async () => { + for await (const _chunk of generator) { + // Should throw immediately + } + }).rejects.toThrow('bad request'); + + // Should not retry non-retryable errors + expect(mockAnthropicClient.messages.create).toHaveBeenCalledTimes(1); + }); + + it('respects retries ephemeral setting', async () => { + const createErrorStream = () => + (async function* () { + yield { + type: 'message_start', + message: { usage: { input_tokens: 10, output_tokens: 0 } }, + }; + throw new Error('terminated'); + })(); + + mockAnthropicClient.messages.create.mockImplementation(() => ({ + withResponse: () => + Promise.resolve({ + data: createErrorStream(), + response: { headers: new Headers() }, + }), + })); + + retryWithBackoffMock.mockImplementation( + async (fn: () => Promise) => fn(), + ); + + const provider = new AnthropicProvider('test-key'); + + const generator = provider.generateChatCompletion( + createProviderCallOptions({ + providerName: provider.name, + contents: [ + { + speaker: 'human', + blocks: [{ type: 'text', text: 'hello' }], + }, + ] as IContent[], + ephemerals: { + retries: 2, // Override default + }, + }), + ); + + await expect(async () => { + for await (const _chunk of generator) { + // Should throw after 2 attempts + } + }).rejects.toThrow('terminated'); + + expect(mockAnthropicClient.messages.create).toHaveBeenCalledTimes(2); + }); + + it('respects retrywait ephemeral setting', async () => { + const createErrorStream = () => + (async function* () { + yield { + type: 'message_start', + message: { usage: { input_tokens: 10, output_tokens: 0 } }, + }; + throw new Error('terminated'); + })(); + + const successStream = (async function* () { + yield { + type: 'message_start', + message: { usage: { input_tokens: 10, output_tokens: 0 } }, + }; + yield { + type: 'content_block_start', + index: 0, + content_block: { type: 'text', text: '' }, + }; + yield { + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'success' }, + }; + yield { type: 'content_block_stop', index: 0 }; + yield { + type: 'message_delta', + delta: { stop_reason: 'end_turn' }, + usage: { output_tokens: 5 }, + }; + })(); + + let callCount = 0; + mockAnthropicClient.messages.create.mockImplementation(() => { + const stream = callCount === 0 ? createErrorStream() : successStream; + callCount++; + return { + withResponse: () => + Promise.resolve({ + data: stream, + response: { headers: new Headers() }, + }), + }; + }); + + retryWithBackoffMock.mockImplementation( + async (fn: () => Promise) => fn(), + ); + + const provider = new AnthropicProvider('test-key'); + + const generator = provider.generateChatCompletion( + createProviderCallOptions({ + providerName: provider.name, + contents: [ + { + speaker: 'human', + blocks: [{ type: 'text', text: 'hello' }], + }, + ] as IContent[], + ephemerals: { + retrywait: 1500, // Override default 4000ms + }, + }), + ); + + const chunks: IContent[] = []; + for await (const chunk of generator) { + chunks.push(chunk); + } + + expect(mockAnthropicClient.messages.create).toHaveBeenCalledTimes(2); + expect(delayMock).toHaveBeenCalledTimes(1); + + const delay = delayMock.mock.calls[0]?.[0] as number; + // Should be ~1500ms with jitter (between 1050-1950) + expect(delay).toBeGreaterThan(1050); + expect(delay).toBeLessThan(1950); + }); +}); diff --git a/packages/core/src/providers/anthropic/AnthropicProvider.ts b/packages/core/src/providers/anthropic/AnthropicProvider.ts index e536cea0f..fa8891aed 100644 --- a/packages/core/src/providers/anthropic/AnthropicProvider.ts +++ b/packages/core/src/providers/anthropic/AnthropicProvider.ts @@ -52,6 +52,7 @@ import { getErrorStatus, isNetworkTransientError, } from '../../utils/retry.js'; +import { delay } from '../../utils/delay.js'; import { getSettingsService } from '../../settings/settingsServiceInstance.js'; import { shouldDumpSDKContext, @@ -1940,259 +1941,314 @@ ${block.code} } if (streamingEnabled) { - // Handle streaming response - response is already a Stream when streaming is enabled - const stream = - response as unknown as AsyncIterable; - let currentToolCall: - | { id: string; name: string; input: string } - | undefined; - let currentThinkingBlock: - | { thinking: string; signature?: string } - | undefined; + // Handle streaming response with retry loop for transient network errors + // Similar to OpenAIResponsesProvider, we wrap the entire stream consumption + // in a retry loop to handle mid-stream disconnections (issue #1228) + // Use retry config from ephemeral settings + let streamingAttempt = 0; + let currentDelay = initialDelayMs; + const streamRetryMaxDelayMs = 30000; + const streamingLogger = this.getStreamingLogger(); + + while (streamingAttempt < maxAttempts) { + streamingAttempt++; + + // If this is a retry, make a fresh API call to get a new stream + if (streamingAttempt > 1) { + streamingLogger.debug( + () => + `Stream retry attempt ${streamingAttempt}/${maxAttempts}: Making fresh API call`, + ); + const retryResult = await retryWithBackoff(apiCallWithResponse, { + maxAttempts, + initialDelayMs, + shouldRetryOnError: this.shouldRetryAnthropicResponse.bind(this), + trackThrottleWaitTime: this.throttleTracker, + onPersistent429: onPersistent429Callback, + }); + response = retryResult.data; + } + + const stream = + response as unknown as AsyncIterable; + let currentToolCall: + | { id: string; name: string; input: string } + | undefined; + let currentThinkingBlock: + | { thinking: string; signature?: string } + | undefined; - this.getStreamingLogger().debug(() => 'Processing streaming response'); + streamingLogger.debug(() => 'Processing streaming response'); - try { - for await (const chunk of stream) { - if (chunk.type === 'message_start') { - // Extract cache metrics from message_start event - const usage = ( - chunk as unknown as { - message?: { - usage?: { - input_tokens?: number; - output_tokens?: number; - cache_read_input_tokens?: number; - cache_creation_input_tokens?: number; + try { + for await (const chunk of stream) { + if (chunk.type === 'message_start') { + // Extract cache metrics from message_start event + const usage = ( + chunk as unknown as { + message?: { + usage?: { + input_tokens?: number; + output_tokens?: number; + cache_read_input_tokens?: number; + cache_creation_input_tokens?: number; + }; }; + } + ).message?.usage; + if (usage) { + const cacheRead = usage.cache_read_input_tokens ?? 0; + const cacheCreation = usage.cache_creation_input_tokens ?? 0; + + cacheLogger.debug( + () => + `[AnthropicProvider streaming] Emitting usage metadata: cacheRead=${cacheRead}, cacheCreation=${cacheCreation}, raw values: cache_read_input_tokens=${usage.cache_read_input_tokens}, cache_creation_input_tokens=${usage.cache_creation_input_tokens}`, + ); + + if (cacheRead > 0 || cacheCreation > 0) { + cacheLogger.debug(() => { + const hitRate = + cacheRead + (usage.input_tokens ?? 0) > 0 + ? (cacheRead / + (cacheRead + (usage.input_tokens ?? 0))) * + 100 + : 0; + return `Cache metrics: read=${cacheRead}, creation=${cacheCreation}, hit_rate=${hitRate.toFixed(1)}%`; + }); + } + + yield { + speaker: 'ai', + blocks: [], + metadata: { + usage: { + promptTokens: usage.input_tokens ?? 0, + completionTokens: usage.output_tokens ?? 0, + totalTokens: + (usage.input_tokens ?? 0) + (usage.output_tokens ?? 0), + cache_read_input_tokens: cacheRead, + cache_creation_input_tokens: cacheCreation, + }, + }, + } as IContent; + } + } else if (chunk.type === 'content_block_start') { + if (chunk.content_block.type === 'tool_use') { + const toolBlock = chunk.content_block as ToolUseBlock; + this.getStreamingLogger().debug( + () => `Starting tool use: ${toolBlock.name}`, + ); + currentToolCall = { + id: toolBlock.id, + name: this.unprefixToolName(toolBlock.name, isOAuth), + input: '', + }; + } else if (chunk.content_block.type === 'thinking') { + this.getStreamingLogger().debug( + () => 'Starting thinking block', + ); + currentThinkingBlock = { + thinking: '', + signature: chunk.content_block.signature, + }; + } + } else if (chunk.type === 'content_block_delta') { + if (chunk.delta.type === 'text_delta') { + const textDelta = chunk.delta as TextDelta; + this.getStreamingLogger().debug( + () => `Received text delta: ${textDelta.text.length} chars`, + ); + // Emit text immediately as IContent + yield { + speaker: 'ai', + blocks: [{ type: 'text', text: textDelta.text }], + } as IContent; + } else if ( + chunk.delta.type === 'input_json_delta' && + currentToolCall + ) { + const jsonDelta = chunk.delta as InputJSONDelta; + currentToolCall.input += jsonDelta.partial_json; + + // Check for double-escaping patterns + logDoubleEscapingInChunk( + jsonDelta.partial_json, + currentToolCall.name, + 'anthropic', + ); + } else if ( + chunk.delta.type === 'thinking_delta' && + currentThinkingBlock + ) { + const thinkingDelta = chunk.delta as { + type: 'thinking_delta'; + thinking: string; }; + currentThinkingBlock.thinking += thinkingDelta.thinking; + this.getStreamingLogger().debug( + () => + `Thinking delta chunk (${thinkingDelta.thinking.length} chars): ${thinkingDelta.thinking}`, + ); + } else if ( + chunk.delta.type === 'signature_delta' && + currentThinkingBlock + ) { + // Signature delta is sent just before content_block_stop for thinking blocks + // This contains the cryptographic signature for the thinking block + const signatureDelta = chunk.delta as { + type: 'signature_delta'; + signature: string; + }; + this.getStreamingLogger().debug( + () => + `Received signature_delta: ${signatureDelta.signature.substring(0, 50)}...`, + ); + currentThinkingBlock.signature = signatureDelta.signature; } - ).message?.usage; - if (usage) { + } else if (chunk.type === 'content_block_stop') { + if (currentToolCall) { + const activeToolCall = currentToolCall; + this.getStreamingLogger().debug( + () => `Completed tool use: ${activeToolCall.name}`, + ); + // Process tool parameters with double-escape handling + let processedParameters = processToolParameters( + activeToolCall.input, + activeToolCall.name, + 'anthropic', + ); + + // Apply schema-aware type coercion to fix LLM type errors (issue #1146) + // Look up the tool schema from the tools passed to this request + const toolSchema = this.findToolSchema( + tools, + activeToolCall.name, + isOAuth, + ); + if ( + toolSchema && + processedParameters && + typeof processedParameters === 'object' + ) { + processedParameters = coerceParametersToSchema( + processedParameters, + toolSchema, + ); + } + + yield { + speaker: 'ai', + blocks: [ + { + type: 'tool_call', + id: this.normalizeToHistoryToolId(activeToolCall.id), + name: activeToolCall.name, + parameters: processedParameters, + }, + ], + } as IContent; + currentToolCall = undefined; + } else if (currentThinkingBlock) { + const activeThinkingBlock = currentThinkingBlock; + this.getStreamingLogger().debug( + () => + `Completed thinking block: ${activeThinkingBlock.thinking.length} chars`, + ); + this.getStreamingLogger().debug( + () => + `Thinking block content: ${activeThinkingBlock.thinking}`, + ); + + // Extract signature from content_block if present + const contentBlock = ( + chunk as unknown as { + content_block?: { + type: string; + thinking?: string; + signature?: string; + }; + } + ).content_block; + if (contentBlock?.signature) { + activeThinkingBlock.signature = contentBlock.signature; + } + + yield { + speaker: 'ai', + blocks: [ + { + type: 'thinking', + thought: activeThinkingBlock.thinking, + sourceField: 'thinking', + signature: activeThinkingBlock.signature, + } as ThinkingBlock, + ], + } as IContent; + currentThinkingBlock = undefined; + } + } else if (chunk.type === 'message_delta' && chunk.usage) { + // Emit usage metadata including cache fields + const usage = chunk.usage as { + input_tokens: number; + output_tokens: number; + cache_read_input_tokens?: number; + cache_creation_input_tokens?: number; + }; + const cacheRead = usage.cache_read_input_tokens ?? 0; const cacheCreation = usage.cache_creation_input_tokens ?? 0; - cacheLogger.debug( + this.getStreamingLogger().debug( () => - `[AnthropicProvider streaming] Emitting usage metadata: cacheRead=${cacheRead}, cacheCreation=${cacheCreation}, raw values: cache_read_input_tokens=${usage.cache_read_input_tokens}, cache_creation_input_tokens=${usage.cache_creation_input_tokens}`, + `Received usage metadata from message_delta: promptTokens=${usage.input_tokens || 0}, completionTokens=${usage.output_tokens || 0}, cacheRead=${cacheRead}, cacheCreation=${cacheCreation}`, ); - if (cacheRead > 0 || cacheCreation > 0) { - cacheLogger.debug(() => { - const hitRate = - cacheRead + (usage.input_tokens ?? 0) > 0 - ? (cacheRead / (cacheRead + (usage.input_tokens ?? 0))) * - 100 - : 0; - return `Cache metrics: read=${cacheRead}, creation=${cacheCreation}, hit_rate=${hitRate.toFixed(1)}%`; - }); - } - yield { speaker: 'ai', blocks: [], metadata: { usage: { - promptTokens: usage.input_tokens ?? 0, - completionTokens: usage.output_tokens ?? 0, + promptTokens: usage.input_tokens || 0, + completionTokens: usage.output_tokens || 0, totalTokens: - (usage.input_tokens ?? 0) + (usage.output_tokens ?? 0), + (usage.input_tokens || 0) + (usage.output_tokens || 0), cache_read_input_tokens: cacheRead, cache_creation_input_tokens: cacheCreation, }, }, } as IContent; } - } else if (chunk.type === 'content_block_start') { - if (chunk.content_block.type === 'tool_use') { - const toolBlock = chunk.content_block as ToolUseBlock; - this.getStreamingLogger().debug( - () => `Starting tool use: ${toolBlock.name}`, - ); - currentToolCall = { - id: toolBlock.id, - name: this.unprefixToolName(toolBlock.name, isOAuth), - input: '', - }; - } else if (chunk.content_block.type === 'thinking') { - this.getStreamingLogger().debug(() => 'Starting thinking block'); - currentThinkingBlock = { - thinking: '', - signature: chunk.content_block.signature, - }; - } - } else if (chunk.type === 'content_block_delta') { - if (chunk.delta.type === 'text_delta') { - const textDelta = chunk.delta as TextDelta; - this.getStreamingLogger().debug( - () => `Received text delta: ${textDelta.text.length} chars`, - ); - // Emit text immediately as IContent - yield { - speaker: 'ai', - blocks: [{ type: 'text', text: textDelta.text }], - } as IContent; - } else if ( - chunk.delta.type === 'input_json_delta' && - currentToolCall - ) { - const jsonDelta = chunk.delta as InputJSONDelta; - currentToolCall.input += jsonDelta.partial_json; - - // Check for double-escaping patterns - logDoubleEscapingInChunk( - jsonDelta.partial_json, - currentToolCall.name, - 'anthropic', - ); - } else if ( - chunk.delta.type === 'thinking_delta' && - currentThinkingBlock - ) { - const thinkingDelta = chunk.delta as { - type: 'thinking_delta'; - thinking: string; - }; - currentThinkingBlock.thinking += thinkingDelta.thinking; - this.getStreamingLogger().debug( - () => - `Thinking delta chunk (${thinkingDelta.thinking.length} chars): ${thinkingDelta.thinking}`, - ); - } else if ( - chunk.delta.type === 'signature_delta' && - currentThinkingBlock - ) { - // Signature delta is sent just before content_block_stop for thinking blocks - // This contains the cryptographic signature for the thinking block - const signatureDelta = chunk.delta as { - type: 'signature_delta'; - signature: string; - }; - this.getStreamingLogger().debug( - () => - `Received signature_delta: ${signatureDelta.signature.substring(0, 50)}...`, - ); - currentThinkingBlock.signature = signatureDelta.signature; - } - } else if (chunk.type === 'content_block_stop') { - if (currentToolCall) { - const activeToolCall = currentToolCall; - this.getStreamingLogger().debug( - () => `Completed tool use: ${activeToolCall.name}`, - ); - // Process tool parameters with double-escape handling - let processedParameters = processToolParameters( - activeToolCall.input, - activeToolCall.name, - 'anthropic', - ); - - // Apply schema-aware type coercion to fix LLM type errors (issue #1146) - // Look up the tool schema from the tools passed to this request - const toolSchema = this.findToolSchema( - tools, - activeToolCall.name, - isOAuth, - ); - if ( - toolSchema && - processedParameters && - typeof processedParameters === 'object' - ) { - processedParameters = coerceParametersToSchema( - processedParameters, - toolSchema, - ); - } - - yield { - speaker: 'ai', - blocks: [ - { - type: 'tool_call', - id: this.normalizeToHistoryToolId(activeToolCall.id), - name: activeToolCall.name, - parameters: processedParameters, - }, - ], - } as IContent; - currentToolCall = undefined; - } else if (currentThinkingBlock) { - const activeThinkingBlock = currentThinkingBlock; - this.getStreamingLogger().debug( - () => - `Completed thinking block: ${activeThinkingBlock.thinking.length} chars`, - ); - this.getStreamingLogger().debug( - () => `Thinking block content: ${activeThinkingBlock.thinking}`, - ); - - // Extract signature from content_block if present - const contentBlock = ( - chunk as unknown as { - content_block?: { - type: string; - thinking?: string; - signature?: string; - }; - } - ).content_block; - if (contentBlock?.signature) { - activeThinkingBlock.signature = contentBlock.signature; - } - - yield { - speaker: 'ai', - blocks: [ - { - type: 'thinking', - thought: activeThinkingBlock.thinking, - sourceField: 'thinking', - signature: activeThinkingBlock.signature, - } as ThinkingBlock, - ], - } as IContent; - currentThinkingBlock = undefined; - } - } else if (chunk.type === 'message_delta' && chunk.usage) { - // Emit usage metadata including cache fields - const usage = chunk.usage as { - input_tokens: number; - output_tokens: number; - cache_read_input_tokens?: number; - cache_creation_input_tokens?: number; - }; - - const cacheRead = usage.cache_read_input_tokens ?? 0; - const cacheCreation = usage.cache_creation_input_tokens ?? 0; + } + // Stream completed successfully, return + return; + } catch (error) { + // Check if error is retryable and attempts remain + const canRetryStream = isNetworkTransientError(error); + streamingLogger.debug( + () => + `Stream attempt ${streamingAttempt}/${maxAttempts} error: ${error}`, + ); - this.getStreamingLogger().debug( + if (!canRetryStream || streamingAttempt >= maxAttempts) { + streamingLogger.debug( () => - `Received usage metadata from message_delta: promptTokens=${usage.input_tokens || 0}, completionTokens=${usage.output_tokens || 0}, cacheRead=${cacheRead}, cacheCreation=${cacheCreation}`, + `Stream error not retryable or max attempts reached, throwing: ${error}`, ); - - yield { - speaker: 'ai', - blocks: [], - metadata: { - usage: { - promptTokens: usage.input_tokens || 0, - completionTokens: usage.output_tokens || 0, - totalTokens: - (usage.input_tokens || 0) + (usage.output_tokens || 0), - cache_read_input_tokens: cacheRead, - cache_creation_input_tokens: cacheCreation, - }, - }, - } as IContent; + throw error; } + + // Wait with exponential backoff + jitter before retrying + const jitter = currentDelay * 0.3 * (Math.random() * 2 - 1); + const delayWithJitter = Math.max(0, currentDelay + jitter); + streamingLogger.debug( + () => + `Stream retry attempt ${streamingAttempt}/${maxAttempts}: Transient error detected, waiting ${Math.round(delayWithJitter)}ms before retry`, + ); + await delay(delayWithJitter); + currentDelay = Math.min(streamRetryMaxDelayMs, currentDelay * 2); + + // Loop continues to retry } - } catch (error) { - // Streaming errors should be propagated for retry logic - this.getStreamingLogger().debug( - () => `Streaming iteration error: ${error}`, - ); - throw error; } } else { // Handle non-streaming response diff --git a/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.streamRetry.test.ts b/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.streamRetry.test.ts index 55c8d0de8..8debb5ce3 100644 --- a/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.streamRetry.test.ts +++ b/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.streamRetry.test.ts @@ -150,6 +150,9 @@ describe('OpenAIResponsesProvider stream retry behavior', () => { blocks: [{ type: 'text', text: 'hello' }], }, ] as IContent[], + ephemerals: { + retrywait: 2000, // Override default delay + }, }), ); @@ -166,9 +169,12 @@ describe('OpenAIResponsesProvider stream retry behavior', () => { expect(firstDelay).toBeGreaterThan(0); expect(secondDelay).toBeGreaterThan(firstDelay); + // Verify initial delay respects ephemeral setting (~2000ms with jitter) + expect(firstDelay).toBeGreaterThan(1400); + expect(firstDelay).toBeLessThan(2600); }); - it('uses higher maxStreamingAttempts for Codex mode', async () => { + it('uses higher maxStreamingAttempts for Codex mode when ephemerals not set', async () => { parseResponsesStreamMock .mockImplementationOnce(async function* () { yield; @@ -229,6 +235,7 @@ describe('OpenAIResponsesProvider stream retry behavior', () => { chunks.push(chunk); } + // Codex defaults to 5 attempts when no ephemeral retries setting expect(fetchMock).toHaveBeenCalledTimes(5); }); @@ -277,6 +284,97 @@ describe('OpenAIResponsesProvider stream retry behavior', () => { } }).rejects.toThrow('terminated'); + // Regular mode defaults to 4 attempts expect(fetchMock).toHaveBeenCalledTimes(4); }); + + it('respects retries ephemeral setting', async () => { + parseResponsesStreamMock + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }) + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }); + + fetchMock.mockResolvedValue({ ok: true, body: {} }); + + retryWithBackoffMock.mockImplementation( + async (fn: () => Promise) => fn(), + ); + + const provider = new OpenAIResponsesProvider('test-key'); + + const generator = provider.generateChatCompletion( + createProviderCallOptions({ + providerName: provider.name, + contents: [ + { + speaker: 'human', + blocks: [{ type: 'text', text: 'hello' }], + }, + ] as IContent[], + ephemerals: { + retries: 2, // Override default + }, + }), + ); + + await expect(async () => { + for await (const _chunk of generator) { + // Should throw after 2 attempts + } + }).rejects.toThrow('terminated'); + + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it('respects retrywait ephemeral setting', async () => { + parseResponsesStreamMock + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }) + .mockImplementationOnce(async function* () { + yield { speaker: 'ai', blocks: [{ type: 'text', text: 'success' }] }; + }); + + fetchMock.mockResolvedValue({ ok: true, body: {} }); + + retryWithBackoffMock.mockImplementation( + async (fn: () => Promise) => fn(), + ); + + const provider = new OpenAIResponsesProvider('test-key'); + + const generator = provider.generateChatCompletion( + createProviderCallOptions({ + providerName: provider.name, + contents: [ + { + speaker: 'human', + blocks: [{ type: 'text', text: 'hello' }], + }, + ] as IContent[], + ephemerals: { + retrywait: 1000, // Override default 5000ms + }, + }), + ); + + const chunks: IContent[] = []; + for await (const chunk of generator) { + chunks.push(chunk); + } + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(delayMock).toHaveBeenCalledTimes(1); + + const delay = delayMock.mock.calls[0]?.[0] as number; + // Should be ~1000ms with jitter (between 700-1300) + expect(delay).toBeGreaterThan(700); + expect(delay).toBeLessThan(1300); + }); }); diff --git a/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.ts b/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.ts index d20df48ee..d004fbcbe 100644 --- a/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.ts +++ b/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.ts @@ -66,10 +66,6 @@ import { } from '../../utils/retry.js'; import { delay } from '../../utils/delay.js'; -// Stream retry constants for exponential backoff -const STREAM_RETRY_INITIAL_DELAY_MS = 5000; // 5 seconds -const STREAM_RETRY_MAX_DELAY_MS = 30000; // 30 seconds - export class OpenAIResponsesProvider extends BaseProvider { private logger: DebugLogger; private _isCodexMode: boolean; @@ -710,9 +706,18 @@ export class OpenAIResponsesProvider extends BaseProvider { // Retry must encompass both the initial fetch and the subsequent stream // consumption, because transient network failures can occur mid-stream. // @issue #1187: Different maxStreamingAttempts for Codex vs regular mode - const maxStreamingAttempts = isCodex ? 5 : 4; + // Read retry configuration from ephemeral settings + const ephemeralRetries = options.invocation?.ephemerals?.['retries'] as + | number + | undefined; + const ephemeralRetryWait = options.invocation?.ephemerals?.['retrywait'] as + | number + | undefined; + const maxStreamingAttempts = ephemeralRetries ?? (isCodex ? 5 : 4); + const streamRetryInitialDelayMs = ephemeralRetryWait ?? 5000; + const streamRetryMaxDelayMs = 30000; let streamingAttempt = 0; - let currentDelay = STREAM_RETRY_INITIAL_DELAY_MS; + let currentDelay = streamRetryInitialDelayMs; while (streamingAttempt < maxStreamingAttempts) { streamingAttempt++; @@ -726,6 +731,8 @@ export class OpenAIResponsesProvider extends BaseProvider { }), { shouldRetryOnError: this.shouldRetryOnError.bind(this), + maxAttempts: maxStreamingAttempts, + initialDelayMs: streamRetryInitialDelayMs, }, ); @@ -762,7 +769,7 @@ export class OpenAIResponsesProvider extends BaseProvider { const jitter = currentDelay * 0.3 * (Math.random() * 2 - 1); const delayWithJitter = Math.max(0, currentDelay + jitter); await delay(delayWithJitter); - currentDelay = Math.min(STREAM_RETRY_MAX_DELAY_MS, currentDelay * 2); + currentDelay = Math.min(streamRetryMaxDelayMs, currentDelay * 2); // Retry by restarting the request from the beginning. // NOTE: This can re-yield partial content from a previous attempt. diff --git a/packages/core/src/test-utils/providerCallOptions.ts b/packages/core/src/test-utils/providerCallOptions.ts index 8414f9918..91561c304 100644 --- a/packages/core/src/test-utils/providerCallOptions.ts +++ b/packages/core/src/test-utils/providerCallOptions.ts @@ -42,6 +42,7 @@ export interface ProviderCallOptionsInit { runtimeId?: string; runtimeMetadata?: Record; invocation?: RuntimeInvocationContext; + ephemerals?: Record; } function applySettingsOverrides( @@ -69,9 +70,11 @@ function applySettingsOverrides( function buildEphemeralsSnapshot( providerName: string, settings: SettingsService, + overrides?: Record, ): Record { const snapshot: Record = { ...settings.getAllGlobalSettings(), + ...(overrides ?? {}), }; snapshot[providerName] = { @@ -161,7 +164,11 @@ function ensureInvocation( return init.invocation; } - const ephemeralsSnapshot = buildEphemeralsSnapshot(providerName, settings); + const ephemeralsSnapshot = buildEphemeralsSnapshot( + providerName, + settings, + init.ephemerals, + ); const userMemorySnapshot = typeof init.userMemory === 'string' ? init.userMemory : undefined;