diff --git a/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.streamRetry.test.ts b/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.streamRetry.test.ts index 3e9d0ddb4..55c8d0de8 100644 --- a/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.streamRetry.test.ts +++ b/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.streamRetry.test.ts @@ -54,6 +54,13 @@ vi.mock('../openai/parseResponsesStream.js', () => ({ parseErrorResponse: vi.fn((_status: number, body: string) => new Error(body)), })); +const delayMock = vi.hoisted(() => vi.fn()); + +vi.mock('../../utils/delay.js', () => ({ + delay: delayMock, + createAbortError: vi.fn(() => new Error('Aborted')), +})); + const fetchMock = vi.hoisted(() => vi.fn()); describe('OpenAIResponsesProvider stream retry behavior', () => { @@ -61,6 +68,7 @@ describe('OpenAIResponsesProvider stream retry behavior', () => { vi.clearAllMocks(); mockSettingsService.getSettings.mockResolvedValue({}); vi.stubGlobal('fetch', fetchMock); + delayMock.mockResolvedValue(undefined); }); afterEach(() => { @@ -110,4 +118,165 @@ describe('OpenAIResponsesProvider stream retry behavior', () => { expect(texts).toContain('partial'); expect(texts).toContain('ok'); }); + + it('waits with exponential backoff between stream retries', async () => { + parseResponsesStreamMock + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }) + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }) + .mockImplementationOnce(async function* () { + yield { speaker: 'ai', blocks: [{ type: 'text', text: 'success' }] }; + }); + + fetchMock.mockResolvedValue({ ok: true, body: {} }); + + retryWithBackoffMock.mockImplementation( + async (fn: () => Promise) => fn(), + ); + + const provider = new OpenAIResponsesProvider('test-key'); + + const generator = provider.generateChatCompletion( + createProviderCallOptions({ + providerName: provider.name, + contents: [ + { + speaker: 'human', + blocks: [{ type: 'text', text: 'hello' }], + }, + ] as IContent[], + }), + ); + + const chunks: IContent[] = []; + for await (const chunk of generator) { + chunks.push(chunk); + } + + expect(fetchMock).toHaveBeenCalledTimes(3); + expect(delayMock).toHaveBeenCalledTimes(2); + + const firstDelay = delayMock.mock.calls[0]?.[0] as number; + const secondDelay = delayMock.mock.calls[1]?.[0] as number; + + expect(firstDelay).toBeGreaterThan(0); + expect(secondDelay).toBeGreaterThan(firstDelay); + }); + + it('uses higher maxStreamingAttempts for Codex mode', async () => { + parseResponsesStreamMock + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }) + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }) + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }) + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }) + .mockImplementationOnce(async function* () { + yield { speaker: 'ai', blocks: [{ type: 'text', text: 'success' }] }; + }); + + fetchMock.mockResolvedValue({ ok: true, body: {} }); + + retryWithBackoffMock.mockImplementation( + async (fn: () => Promise) => fn(), + ); + + const mockOAuthManager = { + getOAuthToken: vi.fn().mockResolvedValue({ + access_token: 'test-token', + account_id: 'test-account-id', + expiry: Date.now() + 3600000, + token_type: 'Bearer', + }), + }; + + const provider = new OpenAIResponsesProvider( + 'test-key', + 'https://chatgpt.com/backend-api/codex', + undefined, + mockOAuthManager as never, + ); + + const generator = provider.generateChatCompletion( + createProviderCallOptions({ + providerName: provider.name, + contents: [ + { + speaker: 'human', + blocks: [{ type: 'text', text: 'hello' }], + }, + ] as IContent[], + }), + ); + + const chunks: IContent[] = []; + for await (const chunk of generator) { + chunks.push(chunk); + } + + expect(fetchMock).toHaveBeenCalledTimes(5); + }); + + it('fails after maxStreamingAttempts for regular mode', async () => { + parseResponsesStreamMock + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }) + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }) + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }) + .mockImplementationOnce(async function* () { + yield; + throw new Error('terminated'); + }); + + fetchMock.mockResolvedValue({ ok: true, body: {} }); + + retryWithBackoffMock.mockImplementation( + async (fn: () => Promise) => fn(), + ); + + const provider = new OpenAIResponsesProvider('test-key'); + + const generator = provider.generateChatCompletion( + createProviderCallOptions({ + providerName: provider.name, + contents: [ + { + speaker: 'human', + blocks: [{ type: 'text', text: 'hello' }], + }, + ] as IContent[], + }), + ); + + await expect(async () => { + for await (const _chunk of generator) { + // Should throw before yielding + } + }).rejects.toThrow('terminated'); + + expect(fetchMock).toHaveBeenCalledTimes(4); + }); }); diff --git a/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.ts b/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.ts index 1fe49ce44..d20df48ee 100644 --- a/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.ts +++ b/packages/core/src/providers/openai-responses/OpenAIResponsesProvider.ts @@ -64,6 +64,11 @@ import { getErrorStatus, isNetworkTransientError, } from '../../utils/retry.js'; +import { delay } from '../../utils/delay.js'; + +// Stream retry constants for exponential backoff +const STREAM_RETRY_INITIAL_DELAY_MS = 5000; // 5 seconds +const STREAM_RETRY_MAX_DELAY_MS = 30000; // 30 seconds export class OpenAIResponsesProvider extends BaseProvider { private logger: DebugLogger; @@ -704,8 +709,10 @@ export class OpenAIResponsesProvider extends BaseProvider { // @plan PLAN-20251215-issue868: Retry responses streaming end-to-end // Retry must encompass both the initial fetch and the subsequent stream // consumption, because transient network failures can occur mid-stream. - const maxStreamingAttempts = 2; + // @issue #1187: Different maxStreamingAttempts for Codex vs regular mode + const maxStreamingAttempts = isCodex ? 5 : 4; let streamingAttempt = 0; + let currentDelay = STREAM_RETRY_INITIAL_DELAY_MS; while (streamingAttempt < maxStreamingAttempts) { streamingAttempt++; @@ -744,13 +751,19 @@ export class OpenAIResponsesProvider extends BaseProvider { const canRetryStream = this.shouldRetryOnError(error); this.logger.debug( () => - `Responses stream error on attempt ${streamingAttempt}/${maxStreamingAttempts}: ${String(error)}`, + `Stream retry attempt ${streamingAttempt}/${maxStreamingAttempts}: Transient error detected, delay ${currentDelay}ms before retry. Error: ${String(error)}`, ); if (!canRetryStream || streamingAttempt >= maxStreamingAttempts) { throw error; } + // @issue #1187: Add exponential backoff with jitter between stream retries + const jitter = currentDelay * 0.3 * (Math.random() * 2 - 1); + const delayWithJitter = Math.max(0, currentDelay + jitter); + await delay(delayWithJitter); + currentDelay = Math.min(STREAM_RETRY_MAX_DELAY_MS, currentDelay * 2); + // Retry by restarting the request from the beginning. // NOTE: This can re-yield partial content from a previous attempt. continue;