Skip to content

Commit

Permalink
handle incomplete chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
restlessronin committed Aug 20, 2023
1 parent 31667eb commit f3c3a32
Showing 1 changed file with 35 additions and 16 deletions.
51 changes: 35 additions & 16 deletions lib/openai_ex/http_sse.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule OpenaiEx.HttpSse do
@moduledoc false
require Logger

# based on
# https://gist.github.com/zachallaun/88aed2a0cef0aed6d68dcc7c12531649
Expand All @@ -21,31 +22,49 @@ defmodule OpenaiEx.HttpSse do
_status = receive(do: ({:chunk, {:status, status}, ^ref} -> status))
_headers = receive(do: ({:chunk, {:headers, headers}, ^ref} -> headers))

Stream.resource(fn -> {ref, task} end, &next_sse/1, fn {_ref, task} ->
Stream.resource(fn -> {"", ref, task} end, &next_sse/1, fn {_data, _ref, task} ->
Task.shutdown(task)
end)
end

@doc false
defp next_sse({ref, task}) do
defp next_sse({acc, ref, task}) do
receive do
{:chunk, {:data, value}, ^ref} -> {[value |> to_sse_data()], {ref, task}}
{:done, ^ref} -> {:halt, {ref, task}}
{:chunk, {:data, evt_data}, ^ref} ->
{tokens, next_acc} = tokenize_data(evt_data, acc)
{[tokens], {next_acc, ref, task}}

{:done, ^ref} ->
{:halt, {acc, ref, task}}
end
end

@doc false
defp to_sse_data(value) do
String.split(value, "\n\n", trim: true)
|> Enum.map(fn line ->
[field | rest] = String.split(line, ": ", parts: 2)
value = Enum.join(rest, "") |> String.replace_prefix(" ", "")

case field do
"data" -> %{data: value}
end
end)
|> Enum.filter(fn %{data: data} -> data != "[DONE]" end)
|> Enum.map(fn %{data: data} -> %{data: Jason.decode!(data)} end)
defp tokenize_data(evt_data, acc) do
cond do
String.contains?(evt_data, "\n\n") ->
evt_chunks = String.split(acc <> evt_data, "\n\n")
{rem, token_chunks} = List.pop_at(evt_chunks, -1)

tokens =
token_chunks
|> Enum.map(fn chunk -> extract_token(chunk) end)
|> Enum.filter(fn %{data: data} -> data != "[DONE]" end)
|> Enum.map(fn %{data: data} -> %{data: Jason.decode!(data)} end)

{tokens, rem}

true ->
{[], acc <> evt_data}
end
end

@doc false
defp extract_token(line) do
[field | rest] = String.split(line, ": ", parts: 2)

case field do
"data" -> %{data: Enum.join(rest, "") |> String.replace_prefix(" ", "")}
end
end
end

0 comments on commit f3c3a32

Please sign in to comment.