Skip to content

Commit 1905dc5

Browse files
committed
Rewrite streaming logic to buffer partial chunks
OpenAI API is properly returning JSON in each chunk, however other APIs aren't. For these cases, we're adding a buffer to collect those and try to decode once a chunk becomes valid json.
1 parent 1478dc0 commit 1905dc5

File tree

1 file changed

+62
-14
lines changed

1 file changed

+62
-14
lines changed

lib/ex_openai/streaming_client.ex

+62-14
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ defmodule ExOpenAI.StreamingClient do
4141
end
4242

4343
def init(stream_to: pid, convert_response_fx: fx) do
44-
{:ok, %{stream_to: pid, convert_response_fx: fx}}
44+
{:ok, %{stream_to: pid, convert_response_fx: fx, buffer: ""}}
4545
end
4646

4747
@doc """
@@ -57,6 +57,43 @@ defmodule ExOpenAI.StreamingClient do
5757
callback_fx.(data)
5858
end
5959

60+
defp parse_lines(lines, state) do
61+
# The last element might be incomplete JSON, which we keep.
62+
# Everything that is valid JSON, we forward immediately.
63+
{remaining_buffer, updated_state} =
64+
Enum.reduce(lines, {"", state}, fn line, {partial_acc, st} ->
65+
# Reconstruct the current attempt: partial data + the current line
66+
attempt = (partial_acc <> line) |> String.trim()
67+
68+
cond do
69+
attempt == "[DONE]" ->
70+
Logger.debug("Received [DONE]")
71+
forward_response(st.stream_to, :finish)
72+
{"", st}
73+
74+
attempt == "" ->
75+
# Possibly just an empty line or leftover
76+
{"", st}
77+
78+
true ->
79+
# Attempt to parse
80+
case Jason.decode(attempt) do
81+
{:ok, decoded} ->
82+
# Once successfully decoded, forward, and reset partial buffer
83+
message = st.convert_response_fx.({:ok, decoded})
84+
forward_response(st.stream_to, {:data, message})
85+
{"", st}
86+
87+
{:error, _} ->
88+
# Not valid JSON yet; treat entire attempt as partial
89+
{attempt, st}
90+
end
91+
end
92+
end)
93+
94+
{remaining_buffer, updated_state}
95+
end
96+
6097
def handle_chunk(
6198
chunk,
6299
%{stream_to: pid_or_fx, convert_response_fx: convert_fx}
@@ -103,20 +140,31 @@ defmodule ExOpenAI.StreamingClient do
103140
{:noreply, state}
104141
end
105142

106-
def handle_info(
107-
%HTTPoison.AsyncChunk{chunk: chunk},
108-
state
109-
) do
110-
chunk
111-
|> String.trim()
112-
|> String.split("data:")
113-
|> Enum.map(&String.trim/1)
114-
|> Enum.filter(&(&1 != ""))
115-
|> Enum.each(fn subchunk ->
116-
handle_chunk(subchunk, state)
117-
end)
143+
# def handle_info(%HTTPoison.AsyncChunk{chunk: "data: " <> chunk_data}, state) do
144+
# Logger.debug("Received AsyncChunk DATA: #{inspect(chunk_data)}")
145+
# end
118146

119-
{:noreply, state}
147+
def handle_info(%HTTPoison.AsyncChunk{chunk: chunk}, state) do
148+
Logger.debug("Received AsyncChunk (partial): #{inspect(chunk)}")
149+
150+
# Combine the existing buffer with the new chunk
151+
new_buffer = state.buffer <> chunk
152+
153+
# Split by "data:" lines, but be mindful of partial JSON
154+
lines =
155+
new_buffer
156+
|> String.split(~r/data: /)
157+
158+
# The first chunk might still hold partial data from the end
159+
# or the last chunk might be partial data at the end.
160+
161+
# We'll need a function that attempts to parse each line. If it fails, we store
162+
# that line back to the buffer for the next chunk.
163+
{remaining_buffer, state_after_parse} = parse_lines(lines, state)
164+
165+
# Update the buffer in the new state
166+
new_state = %{state_after_parse | buffer: remaining_buffer}
167+
{:noreply, new_state}
120168
end
121169

122170
def handle_info(%HTTPoison.Error{reason: reason}, state) do

0 commit comments

Comments
 (0)