Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 97 additions & 20 deletions lib/request/response-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,94 @@ import type { SSEEventData } from "../types.js";
/**
* Parse SSE stream to extract final response
* @param sseText - Complete SSE stream text
* @returns Final response object or null if not found
* @returns Final response object, last response-like, or last event
*/
function parseSseStream(sseText: string): unknown | null {
const lines = sseText.split('\n');
function parseSseStream(sseText: string): {
finalResponse?: unknown;
lastEvent?: unknown;
lastResponseLike?: unknown;
} {
const lines = sseText.split(/\r?\n/);
let pendingData: string[] = [];
let lastEvent: unknown;
let lastResponseLike: unknown;
let finalResponse: unknown;

/**
* Process a parsed event, updating tracking variables
*/
const processEvent = (parsed: SSEEventData) => {
lastEvent = parsed;
const parsedAny = parsed as unknown as {
type?: string;
response?: unknown;
};
if (
parsedAny &&
typeof parsedAny === "object" &&
"response" in parsedAny
) {
if (parsedAny.response !== undefined) {
lastResponseLike = parsedAny.response;
}
if (
parsedAny.type === "response.done" ||
parsedAny.type === "response.completed"
) {
finalResponse = parsedAny.response;
}
}
};

/**
* Try to parse accumulated data. Returns true if successful.
*/
const tryFlush = (): boolean => {
if (pendingData.length === 0) return false;
const data = pendingData.join("\n");
try {
const parsed = JSON.parse(data) as SSEEventData;
pendingData = [];
processEvent(parsed);
return true;
} catch {
return false;
}
};

for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.substring(6)) as SSEEventData;
// Empty line = SSE event delimiter, flush any pending data
if (line === "") {
tryFlush();
pendingData = []; // Clear any unparseable garbage
continue;
}

// Look for response.done event with final data
if (data.type === 'response.done' || data.type === 'response.completed') {
return data.response;
// Accept "data:" with or without a space after colon
if (line.startsWith("data:")) {
const content = line.replace(/^data:\s?/, "");
pendingData.push(content);

// Optimistic parse: try to parse immediately (common case: each line is complete JSON)
if (!tryFlush()) {
// If combined parse failed, try parsing just this line alone
// This handles the case where previous lines were garbage
try {
const parsed = JSON.parse(content) as SSEEventData;
pendingData = []; // Discard accumulated garbage
processEvent(parsed);
} catch {
// Keep accumulating - might be multiline JSON
}
} catch (e) {
// Skip malformed JSON
}
continue;
}
}

return null;
// Final flush for any remaining data
tryFlush();

return { finalResponse, lastEvent, lastResponseLike };
}

/**
Expand Down Expand Up @@ -54,13 +121,17 @@ export async function convertSseToJson(response: Response, headers: Headers): Pr
}

// Parse SSE events to extract the final response
const finalResponse = parseSseStream(fullText);

if (!finalResponse) {
console.error('[openai-codex-plugin] Could not find final response in SSE stream');
logRequest("stream-error", { error: "No response.done event found" });
const parsed = parseSseStream(fullText);
const responsePayload =
parsed.finalResponse ?? parsed.lastResponseLike ?? parsed.lastEvent;

if (!responsePayload) {
console.error("[openai-codex-plugin] Could not find JSON in SSE stream");
logRequest("stream-error", {
error: "No JSON events found in SSE stream",
});

// Return original stream if we can't parse
// Return original stream if we can't parse anything
return new Response(fullText, {
status: response.status,
statusText: response.statusText,
Expand All @@ -70,9 +141,15 @@ export async function convertSseToJson(response: Response, headers: Headers): Pr

// Return as plain JSON (not SSE)
const jsonHeaders = new Headers(headers);
jsonHeaders.set('content-type', 'application/json; charset=utf-8');
jsonHeaders.set("content-type", "application/json; charset=utf-8");

if (!parsed.finalResponse) {
logRequest("stream-warning", {
warning: "No final response event; using last JSON event",
});
}

return new Response(JSON.stringify(finalResponse), {
return new Response(JSON.stringify(responsePayload), {
status: response.status,
statusText: response.statusText,
headers: jsonHeaders,
Expand Down
31 changes: 28 additions & 3 deletions test/response-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,18 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}}
expect(body).toEqual({ id: 'resp_456', output: 'done' });
});

it('should return original text if no final response found', async () => {
it('should return last JSON event if no final response found', async () => {
const sseContent = `data: {"type":"response.started"}
data: {"type":"chunk","delta":"text"}
`;
const response = new Response(sseContent);
const headers = new Headers();

const result = await convertSseToJson(response, headers);
const text = await result.text();
const body = await result.json();

expect(text).toBe(sseContent);
// Falls back to last event when no response.done/response.completed
expect(body).toEqual({ type: 'chunk', delta: 'text' });
});

it('should skip malformed JSON in SSE stream', async () => {
Expand Down Expand Up @@ -110,5 +111,29 @@ data: {"type":"response.done","response":{"id":"resp_789"}}
expect(result.status).toBe(200);
expect(result.statusText).toBe('OK');
});

it('should parse data lines without a space after colon', async () => {
const sseContent = `data:{"type":"response.done","response":{"id":"resp_999","output":"ok"}}`;
const response = new Response(sseContent);
const headers = new Headers();

const result = await convertSseToJson(response, headers);
const body = await result.json();

expect(body).toEqual({ id: 'resp_999', output: 'ok' });
});

it('should join multiline data fields into a single JSON event', async () => {
const sseContent = `data: {"type":"response.done",
data: "response":{"id":"resp_abc","output":"test"}}
`;
const response = new Response(sseContent);
const headers = new Headers();

const result = await convertSseToJson(response, headers);
const body = await result.json();

expect(body).toEqual({ id: 'resp_abc', output: 'test' });
});
});
});