Skip to content

Commit

Permalink
Merge pull request #96 from restlessronin/issue-88/sse-timeout
Browse files Browse the repository at this point in the history
Implement timeout for SSE streams, add raising of exceptions for cancellation and timeouts
  • Loading branch information
restlessronin authored Jun 30, 2024
2 parents 6da0f78 + 49a51b3 commit 45b43a2
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
livebook:
image: ghcr.io/livebook-dev/livebook:0.12.1
image: ghcr.io/livebook-dev/livebook:0.13.2
environment:
# secrets - values are set in '.env'. It's not in version control, but look at 'env'
LIVEBOOK_PASSWORD: ${LIVEBOOK_PASSWORD}
Expand Down
10 changes: 8 additions & 2 deletions lib/openai_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule OpenaiEx do
beta: nil,
base_url: "https://api.openai.com/v1",
receive_timeout: 15_000,
stream_timeout: :infinity,
finch_name: OpenaiEx.Finch,
_ep_path_mapping: &OpenaiEx._identity/1,
_http_headers: nil
Expand Down Expand Up @@ -109,8 +110,13 @@ defmodule OpenaiEx do
openai |> Map.put(:base_url, base_url)
end

def with_receive_timeout(openai = %OpenaiEx{}, receive_timeout) do
openai |> Map.put(:receive_timeout, receive_timeout)
def with_receive_timeout(openai = %OpenaiEx{}, timeout) do
openai |> Map.put(:receive_timeout, timeout)
end

def with_stream_timeout(openai = %OpenaiEx{}, timeout)
when is_integer(timeout) and timeout > 0 do
openai |> Map.put(:stream_timeout, timeout)
end

def with_finch_name(openai = %OpenaiEx{}, finch_name) do
Expand Down
13 changes: 13 additions & 0 deletions lib/openai_ex/exception.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule OpenaiEx.Exception do
defexception [:type, :message]

@impl true
def exception({type, message}) do
%__MODULE__{type: type, message: to_string(message)}
end

@impl true
def message(%__MODULE__{type: type, message: message}) do
"OpenaiEx Exception (#{type}): #{message}"
end
end
72 changes: 45 additions & 27 deletions lib/openai_ex/http_sse.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ defmodule OpenaiEx.HttpSse do
def post(openai = %OpenaiEx{}, url, json: json) do
me = self()
ref = make_ref()
task = Task.async(fn -> post_sse(openai, url, json, me, ref) end)
task = Task.async(fn -> finch_stream(openai, url, json, me, ref) end)
status = receive(do: ({:chunk, {:status, status}, ^ref} -> status))
headers = receive(do: ({:chunk, {:headers, headers}, ^ref} -> headers))

if status in 200..299 do
body_stream = Stream.resource(fn -> {"", ref} end, &next_sse/1, fn _ -> :ok end)
stream_receiver = create_stream_receiver(ref, openai.stream_timeout, task)
body_stream = Stream.resource(&init_stream/0, stream_receiver, &end_stream/1)
%{status: status, headers: headers, body_stream: body_stream, task_pid: task.pid}
else
error = extract_error(ref, "")
Expand All @@ -28,15 +29,20 @@ defmodule OpenaiEx.HttpSse do
send(task_pid, :cancel_request)
end

defp post_sse(openai = %OpenaiEx{}, url, json, me, ref) do
request = OpenaiEx.Http.build_post(openai, url, json: json)
on_chunk = create_chunk_handler(me, ref)
defp finch_stream(openai = %OpenaiEx{}, url, json, me, ref) do
request = Http.build_post(openai, url, json: json)
send_me_chunk = create_chunk_sender(me, ref)
options = Http.request_options(openai)
request |> Finch.stream(Map.get(openai, :finch_name), nil, on_chunk, options)
send(me, {:done, ref})

try do
request |> Finch.stream(Map.get(openai, :finch_name), nil, send_me_chunk, options)
send(me, {:done, ref})
catch
:throw, :cancel_request -> {:exception, :cancel_request}
end
end

defp create_chunk_handler(me, ref) do
defp create_chunk_sender(me, ref) do
fn chunk, _acc ->
receive do
:cancel_request ->
Expand All @@ -48,28 +54,40 @@ defmodule OpenaiEx.HttpSse do
end
end

defp next_sse({acc, ref}) do
receive do
{:chunk, {:data, evt_data}, ^ref} ->
{events, next_acc} = extract_events(evt_data, acc)
{[events], {next_acc, ref}}

# some 3rd party providers seem to be ending the stream with eof,
# rather than 2 line terminators. Hopefully those will be fixed and this
# can be removed in the future
{:done, ^ref} when acc == "data: [DONE]" ->
{:halt, {acc, ref}}

{:done, ^ref} ->
if acc != "", do: Logger.error("Residue!: #{acc}")
{:halt, {acc, ref}}

{:canceled, ^ref} ->
Logger.info("Request canceled by user")
{:halt, {acc, ref}}
defp init_stream, do: ""

defp create_stream_receiver(ref, timeout, task) do
fn acc when is_binary(acc) ->
receive do
{:chunk, {:data, evt_data}, ^ref} ->
{events, next_acc} = extract_events(evt_data, acc)
{[events], next_acc}

# some 3rd party providers seem to be ending the stream with eof,
# rather than 2 line terminators. Hopefully those will be fixed and this
# can be removed in the future
{:done, ^ref} when acc == "data: [DONE]" ->
Logger.warning("\"data: [DONE]\" should be followed by 2 line terminators")
{:halt, acc}

{:done, ^ref} ->
{:halt, acc}

{:canceled, ^ref} ->
Logger.info("Request canceled by user")
{:halt, {:exception, :canceled}}
after
timeout ->
Logger.warning("Stream timeout after #{timeout}ms")
Task.shutdown(task)
{:halt, {:exception, :timeout}}
end
end
end

defp end_stream({:exception, reason}), do: raise(OpenaiEx.Exception, {:sse_exception, reason})
defp end_stream(_), do: :ok

@double_eol ~r/(\r?\n|\r){2}/
@double_eol_eos ~r/(\r?\n|\r){2}$/

Expand Down
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule OpenaiEx.MixProject do
use Mix.Project

@version "0.6.5"
@version "0.7.0"
@description "Community maintained OpenAI API Elixir client for Livebook"
@source_url "https://github.com/restlessronin/openai_ex"

Expand Down Expand Up @@ -35,7 +35,7 @@ defmodule OpenaiEx.MixProject do
{:jason, "~> 1.4"},
{:multipart, "~> 0.4"},
{:ex_doc, ">= 0.0.0", only: :docs},
{:credo, "~> 1.6", only: :dev, runtime: false}
{:credo, "~> 1.7", only: :dev, runtime: false}
]
end

Expand Down
8 changes: 4 additions & 4 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
%{
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"},
"credo": {:hex, :credo, "1.7.6", "b8f14011a5443f2839b04def0b252300842ce7388f3af177157c86da18dfbeea", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "146f347fb9f8cbc5f7e39e3f22f70acbef51d441baa6d10169dd604bfbc55296"},
"credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"ex_doc": {:hex, :ex_doc, "0.33.0", "690562b153153c7e4d455dc21dab86e445f66ceba718defe64b0ef6f0bd83ba0", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "3f69adc28274cb51be37d09b03e4565232862a4b10288a3894587b0131412124"},
"ex_doc": {:hex, :ex_doc, "0.34.1", "9751a0419bc15bc7580c73fde506b17b07f6402a1e5243be9e0f05a68c723368", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d441f1a86a235f59088978eff870de2e815e290e44a8bd976fe5d64470a4c9d2"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"},
"hpax": {:hex, :hpax, "0.2.0", "5a58219adcb75977b2edce5eb22051de9362f08236220c9e859a47111c194ff5", [:mix], [], "hexpm", "bea06558cdae85bed075e6c036993d43cd54d447f76d8190a8db0dc5893fa2f1"},
Expand All @@ -12,9 +12,9 @@
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"mint": {:hex, :mint, "1.6.0", "88a4f91cd690508a04ff1c3e28952f322528934be541844d54e0ceb765f01d5e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "3c5ae85d90a5aca0a49c0d8b67360bbe407f3b54f1030a111047ff988e8fefaa"},
"mint": {:hex, :mint, "1.6.1", "065e8a5bc9bbd46a41099dfea3e0656436c5cbcb6e741c80bd2bad5cd872446f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4fc518dcc191d02f433393a72a7ba3f6f94b101d094cb6bf532ea54c89423780"},
"multipart": {:hex, :multipart, "0.4.0", "634880a2148d4555d050963373d0e3bbb44a55b2badd87fa8623166172e9cda0", [:mix], [{:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}], "hexpm", "3c5604bc2fb17b3137e5d2abdf5dacc2647e60c5cc6634b102cf1aef75a06f0a"},
"nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
Expand Down
4 changes: 2 additions & 2 deletions notebooks/cleanup.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

```elixir
Mix.install([
{:openai_ex, "~> 0.6.5"},
{:kino, "~> 0.12.3"}
{:openai_ex, "~> 0.7.0"},
{:kino, "~> 0.13.1"}
])
```

Expand Down
4 changes: 2 additions & 2 deletions notebooks/completions.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

```elixir
Mix.install([
{:openai_ex, "~> 0.6.5"},
{:kino, "~> 0.12.3"}
{:openai_ex, "~> 0.7.0"},
{:kino, "~> 0.13.1"}
])

alias OpenaiEx
Expand Down
2 changes: 1 addition & 1 deletion notebooks/dlai_orderbot.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

```elixir
Mix.install([
{:openai_ex, "~> 0.6.5"},
{:openai_ex, "~> 0.7.0"},
{:kino, "~> 0.11.0"}
])

Expand Down
2 changes: 1 addition & 1 deletion notebooks/images.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
```elixir
Mix.install([
{:openai_ex, path: Path.join(__DIR__, "..")},
{:kino, "~> 0.12.3"}
{:kino, "~> 0.13.1"}
])

alias OpenaiEx
Expand Down
53 changes: 36 additions & 17 deletions notebooks/streaming_orderbot.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

```elixir
Mix.install([
{:openai_ex, "~> 0.6.5"},
{:kino, "~> 0.12.3"}
{:openai_ex, "~> 0.7.0"},
{:kino, "~> 0.13.1"}
])

alias OpenaiEx
Expand All @@ -27,7 +27,6 @@ openai = System.fetch_env!("LB_OPENAI_API_KEY") |> OpenaiEx.new()
```elixir
defmodule OpenaiEx.Notebooks.StreamingOrderbot do
alias OpenaiEx
alias OpenaiEx.Chat.Completions
require Logger

def set_task_pid(task_pid) do
Expand All @@ -54,19 +53,37 @@ defmodule OpenaiEx.Notebooks.StreamingOrderbot do
def stream_to_completions(%{body_stream: body_stream}) do
body_stream
|> Stream.flat_map(& &1)
|> Stream.map(fn %{data: d} -> d |> Map.get("choices") |> Enum.at(0) |> Map.get("delta") end)
|> Stream.map(fn %{data: d} -> d |> Map.get("choices") |> List.first() |> Map.get("delta") end)
|> Stream.filter(fn map -> map |> Map.has_key?("content") end)
|> Stream.map(fn map -> map |> Map.get("content") end)
end

def stream_completion_to_frame(stream, frame) do
stream
|> stream_to_completions()
|> Enum.reduce("", fn token, text ->
next = text <> token
Kino.Frame.render(frame, Kino.Text.new(next))
next
end)
try do
result =
stream
|> stream_to_completions()
|> Enum.reduce("", fn token, text ->
next = text <> token
Kino.Frame.render(frame, Kino.Text.new(next))
next
end)

{:ok, result}
rescue
e in OpenaiEx.Exception ->
case e do
%{type: :sse_exception, message: "canceled"} ->
message = "Request was canceled."
Kino.Frame.render(frame, Kino.Text.new(message))
{:canceled, message}

_ ->
message = "An error occurred: #{e.message}"
Kino.Frame.render(frame, Kino.Text.new(message))
{:error, message}
end
end
end

def create_orderbot(openai = %OpenaiEx{}, context) do
Expand All @@ -81,20 +98,24 @@ defmodule OpenaiEx.Notebooks.StreamingOrderbot do
|> Kino.render()

stream_o = openai |> get_stream(context)
bot_says = stream_o |> stream_completion_to_frame(last_frame)
{status, bot_says} = stream_o |> stream_completion_to_frame(last_frame)

Kino.listen(
form,
context ++ [ChatMessage.assistant(bot_says)],
context ++ if(status == :ok, do: [ChatMessage.assistant(bot_says)], else: []),
fn %{data: %{prompt: you_say}}, history ->
Kino.Frame.render(last_frame, Kino.Text.new(""))
Kino.Frame.append(chat_frame, Kino.Text.new(List.last(history).content))
Kino.Frame.append(chat_frame, Kino.Markdown.new("**You** #{you_say}"))

stream = openai |> get_stream(history ++ [ChatMessage.user(you_say)])
set_task_pid(stream.task_pid)
bot_says = stream |> stream_completion_to_frame(last_frame)
{:cont, history ++ [ChatMessage.user(you_say), ChatMessage.assistant(bot_says)]}
{status, bot_says} = stream |> stream_completion_to_frame(last_frame)

case status do
:ok -> {:cont, history ++ [ChatMessage.user(you_say), ChatMessage.assistant(bot_says)]}
_ -> {:cont, history}
end
end
)

Expand Down Expand Up @@ -150,5 +171,3 @@ context = [
```elixir
openai |> StreamingOrderbot.create_orderbot(context)
```

<!-- livebook:{"offset":4437,"stamp":{"token":"XCP.Iy1xAMVioeT-9KvIvaXPp4xWkNvzlDsHezN22TKU4QXLwrIWP_72QVzVCOfaN9WH62y_SuTNi7_Dy3CUN253EXroFQBEgESzHXBGKvtylahgkk1j49u-jLU","version":2}} -->
Loading

0 comments on commit 45b43a2

Please sign in to comment.