diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index e52c35e..8c647e1 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -4,27 +4,94 @@ import type { SSEEventData } from "../types.js"; /** * Parse SSE stream to extract final response * @param sseText - Complete SSE stream text - * @returns Final response object or null if not found + * @returns Final response object, last response-like, or last event */ -function parseSseStream(sseText: string): unknown | null { - const lines = sseText.split('\n'); +function parseSseStream(sseText: string): { + finalResponse?: unknown; + lastEvent?: unknown; + lastResponseLike?: unknown; +} { + const lines = sseText.split(/\r?\n/); + let pendingData: string[] = []; + let lastEvent: unknown; + let lastResponseLike: unknown; + let finalResponse: unknown; + + /** + * Process a parsed event, updating tracking variables + */ + const processEvent = (parsed: SSEEventData) => { + lastEvent = parsed; + const parsedAny = parsed as unknown as { + type?: string; + response?: unknown; + }; + if ( + parsedAny && + typeof parsedAny === "object" && + "response" in parsedAny + ) { + if (parsedAny.response !== undefined) { + lastResponseLike = parsedAny.response; + } + if ( + parsedAny.type === "response.done" || + parsedAny.type === "response.completed" + ) { + finalResponse = parsedAny.response; + } + } + }; + + /** + * Try to parse accumulated data. Returns true if successful. + */ + const tryFlush = (): boolean => { + if (pendingData.length === 0) return false; + const data = pendingData.join("\n"); + try { + const parsed = JSON.parse(data) as SSEEventData; + pendingData = []; + processEvent(parsed); + return true; + } catch { + return false; + } + }; for (const line of lines) { - if (line.startsWith('data: ')) { - try { - const data = JSON.parse(line.substring(6)) as SSEEventData; + // Empty line = SSE event delimiter, flush any pending data + if (line === "") { + tryFlush(); + pendingData = []; // Clear any unparseable garbage + continue; + } - // Look for response.done event with final data - if (data.type === 'response.done' || data.type === 'response.completed') { - return data.response; + // Accept "data:" with or without a space after colon + if (line.startsWith("data:")) { + const content = line.replace(/^data:\s?/, ""); + pendingData.push(content); + + // Optimistic parse: try to parse immediately (common case: each line is complete JSON) + if (!tryFlush()) { + // If combined parse failed, try parsing just this line alone + // This handles the case where previous lines were garbage + try { + const parsed = JSON.parse(content) as SSEEventData; + pendingData = []; // Discard accumulated garbage + processEvent(parsed); + } catch { + // Keep accumulating - might be multiline JSON } - } catch (e) { - // Skip malformed JSON } + continue; } } - return null; + // Final flush for any remaining data + tryFlush(); + + return { finalResponse, lastEvent, lastResponseLike }; } /** @@ -54,13 +121,17 @@ export async function convertSseToJson(response: Response, headers: Headers): Pr } // Parse SSE events to extract the final response - const finalResponse = parseSseStream(fullText); - - if (!finalResponse) { - console.error('[openai-codex-plugin] Could not find final response in SSE stream'); - logRequest("stream-error", { error: "No response.done event found" }); + const parsed = parseSseStream(fullText); + const responsePayload = + parsed.finalResponse ?? parsed.lastResponseLike ?? parsed.lastEvent; + + if (!responsePayload) { + console.error("[openai-codex-plugin] Could not find JSON in SSE stream"); + logRequest("stream-error", { + error: "No JSON events found in SSE stream", + }); - // Return original stream if we can't parse + // Return original stream if we can't parse anything return new Response(fullText, { status: response.status, statusText: response.statusText, @@ -70,9 +141,15 @@ export async function convertSseToJson(response: Response, headers: Headers): Pr // Return as plain JSON (not SSE) const jsonHeaders = new Headers(headers); - jsonHeaders.set('content-type', 'application/json; charset=utf-8'); + jsonHeaders.set("content-type", "application/json; charset=utf-8"); + + if (!parsed.finalResponse) { + logRequest("stream-warning", { + warning: "No final response event; using last JSON event", + }); + } - return new Response(JSON.stringify(finalResponse), { + return new Response(JSON.stringify(responsePayload), { status: response.status, statusText: response.statusText, headers: jsonHeaders, diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index df117e8..014e7b5 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -61,7 +61,7 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} expect(body).toEqual({ id: 'resp_456', output: 'done' }); }); - it('should return original text if no final response found', async () => { + it('should return last JSON event if no final response found', async () => { const sseContent = `data: {"type":"response.started"} data: {"type":"chunk","delta":"text"} `; @@ -69,9 +69,10 @@ data: {"type":"chunk","delta":"text"} const headers = new Headers(); const result = await convertSseToJson(response, headers); - const text = await result.text(); + const body = await result.json(); - expect(text).toBe(sseContent); + // Falls back to last event when no response.done/response.completed + expect(body).toEqual({ type: 'chunk', delta: 'text' }); }); it('should skip malformed JSON in SSE stream', async () => { @@ -110,5 +111,29 @@ data: {"type":"response.done","response":{"id":"resp_789"}} expect(result.status).toBe(200); expect(result.statusText).toBe('OK'); }); + + it('should parse data lines without a space after colon', async () => { + const sseContent = `data:{"type":"response.done","response":{"id":"resp_999","output":"ok"}}`; + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json(); + + expect(body).toEqual({ id: 'resp_999', output: 'ok' }); + }); + + it('should join multiline data fields into a single JSON event', async () => { + const sseContent = `data: {"type":"response.done", +data: "response":{"id":"resp_abc","output":"test"}} +`; + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json(); + + expect(body).toEqual({ id: 'resp_abc', output: 'test' }); + }); }); });