Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,21 @@ vi.mock('../openai/parseResponsesStream.js', () => ({
parseErrorResponse: vi.fn((_status: number, body: string) => new Error(body)),
}));

const delayMock = vi.hoisted(() => vi.fn());

vi.mock('../../utils/delay.js', () => ({
delay: delayMock,
createAbortError: vi.fn(() => new Error('Aborted')),
}));

const fetchMock = vi.hoisted(() => vi.fn());

describe('OpenAIResponsesProvider stream retry behavior', () => {
beforeEach(() => {
vi.clearAllMocks();
mockSettingsService.getSettings.mockResolvedValue({});
vi.stubGlobal('fetch', fetchMock);
delayMock.mockResolvedValue(undefined);
});

afterEach(() => {
Expand Down Expand Up @@ -110,4 +118,165 @@ describe('OpenAIResponsesProvider stream retry behavior', () => {
expect(texts).toContain('partial');
expect(texts).toContain('ok');
});

it('waits with exponential backoff between stream retries', async () => {
parseResponsesStreamMock
.mockImplementationOnce(async function* () {
yield;
throw new Error('terminated');
})
.mockImplementationOnce(async function* () {
yield;
throw new Error('terminated');
})
.mockImplementationOnce(async function* () {
yield { speaker: 'ai', blocks: [{ type: 'text', text: 'success' }] };
});

fetchMock.mockResolvedValue({ ok: true, body: {} });

retryWithBackoffMock.mockImplementation(
async (fn: () => Promise<unknown>) => fn(),
);

const provider = new OpenAIResponsesProvider('test-key');

const generator = provider.generateChatCompletion(
createProviderCallOptions({
providerName: provider.name,
contents: [
{
speaker: 'human',
blocks: [{ type: 'text', text: 'hello' }],
},
] as IContent[],
}),
);

const chunks: IContent[] = [];
for await (const chunk of generator) {
chunks.push(chunk);
}

expect(fetchMock).toHaveBeenCalledTimes(3);
expect(delayMock).toHaveBeenCalledTimes(2);

const firstDelay = delayMock.mock.calls[0]?.[0] as number;
const secondDelay = delayMock.mock.calls[1]?.[0] as number;

expect(firstDelay).toBeGreaterThan(0);
expect(secondDelay).toBeGreaterThan(firstDelay);
});

it('uses higher maxStreamingAttempts for Codex mode', async () => {
parseResponsesStreamMock
.mockImplementationOnce(async function* () {
yield;
throw new Error('terminated');
})
.mockImplementationOnce(async function* () {
yield;
throw new Error('terminated');
})
.mockImplementationOnce(async function* () {
yield;
throw new Error('terminated');
})
.mockImplementationOnce(async function* () {
yield;
throw new Error('terminated');
})
.mockImplementationOnce(async function* () {
yield { speaker: 'ai', blocks: [{ type: 'text', text: 'success' }] };
});

fetchMock.mockResolvedValue({ ok: true, body: {} });

retryWithBackoffMock.mockImplementation(
async (fn: () => Promise<unknown>) => fn(),
);

const mockOAuthManager = {
getOAuthToken: vi.fn().mockResolvedValue({
access_token: 'test-token',
account_id: 'test-account-id',
expiry: Date.now() + 3600000,
token_type: 'Bearer',
}),
};

const provider = new OpenAIResponsesProvider(
'test-key',
'https://chatgpt.com/backend-api/codex',
undefined,
mockOAuthManager as never,
);

const generator = provider.generateChatCompletion(
createProviderCallOptions({
providerName: provider.name,
contents: [
{
speaker: 'human',
blocks: [{ type: 'text', text: 'hello' }],
},
] as IContent[],
}),
);

const chunks: IContent[] = [];
for await (const chunk of generator) {
chunks.push(chunk);
}

expect(fetchMock).toHaveBeenCalledTimes(5);
});

it('fails after maxStreamingAttempts for regular mode', async () => {
parseResponsesStreamMock
.mockImplementationOnce(async function* () {
yield;
throw new Error('terminated');
})
.mockImplementationOnce(async function* () {
yield;
throw new Error('terminated');
})
.mockImplementationOnce(async function* () {
yield;
throw new Error('terminated');
})
.mockImplementationOnce(async function* () {
yield;
throw new Error('terminated');
});

fetchMock.mockResolvedValue({ ok: true, body: {} });

retryWithBackoffMock.mockImplementation(
async (fn: () => Promise<unknown>) => fn(),
);

const provider = new OpenAIResponsesProvider('test-key');

const generator = provider.generateChatCompletion(
createProviderCallOptions({
providerName: provider.name,
contents: [
{
speaker: 'human',
blocks: [{ type: 'text', text: 'hello' }],
},
] as IContent[],
}),
);

await expect(async () => {
for await (const _chunk of generator) {
// Should throw before yielding
}
}).rejects.toThrow('terminated');

expect(fetchMock).toHaveBeenCalledTimes(4);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ import {
getErrorStatus,
isNetworkTransientError,
} from '../../utils/retry.js';
import { delay } from '../../utils/delay.js';

// Stream retry constants for exponential backoff
const STREAM_RETRY_INITIAL_DELAY_MS = 5000; // 5 seconds
const STREAM_RETRY_MAX_DELAY_MS = 30000; // 30 seconds

export class OpenAIResponsesProvider extends BaseProvider {
private logger: DebugLogger;
Expand Down Expand Up @@ -704,8 +709,10 @@ export class OpenAIResponsesProvider extends BaseProvider {
// @plan PLAN-20251215-issue868: Retry responses streaming end-to-end
// Retry must encompass both the initial fetch and the subsequent stream
// consumption, because transient network failures can occur mid-stream.
const maxStreamingAttempts = 2;
// @issue #1187: Different maxStreamingAttempts for Codex vs regular mode
const maxStreamingAttempts = isCodex ? 5 : 4;
let streamingAttempt = 0;
let currentDelay = STREAM_RETRY_INITIAL_DELAY_MS;

while (streamingAttempt < maxStreamingAttempts) {
streamingAttempt++;
Expand Down Expand Up @@ -744,13 +751,19 @@ export class OpenAIResponsesProvider extends BaseProvider {
const canRetryStream = this.shouldRetryOnError(error);
this.logger.debug(
() =>
`Responses stream error on attempt ${streamingAttempt}/${maxStreamingAttempts}: ${String(error)}`,
`Stream retry attempt ${streamingAttempt}/${maxStreamingAttempts}: Transient error detected, delay ${currentDelay}ms before retry. Error: ${String(error)}`,
);

if (!canRetryStream || streamingAttempt >= maxStreamingAttempts) {
throw error;
}

// @issue #1187: Add exponential backoff with jitter between stream retries
const jitter = currentDelay * 0.3 * (Math.random() * 2 - 1);
const delayWithJitter = Math.max(0, currentDelay + jitter);
await delay(delayWithJitter);
currentDelay = Math.min(STREAM_RETRY_MAX_DELAY_MS, currentDelay * 2);

// Retry by restarting the request from the beginning.
// NOTE: This can re-yield partial content from a previous attempt.
continue;
Expand Down
Loading