diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index 15e3620..d0c4e14 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -1,7 +1,7 @@ version: '3' services: livebook: - image: ghcr.io/livebook-dev/livebook:0.12.1 + image: ghcr.io/livebook-dev/livebook:0.13.2 environment: # secrets - values are set in '.env'. It's not in version control, but look at 'env' LIVEBOOK_PASSWORD: ${LIVEBOOK_PASSWORD} diff --git a/lib/openai_ex.ex b/lib/openai_ex.ex index a6eddcc..77d0830 100644 --- a/lib/openai_ex.ex +++ b/lib/openai_ex.ex @@ -12,6 +12,7 @@ defmodule OpenaiEx do beta: nil, base_url: "https://api.openai.com/v1", receive_timeout: 15_000, + stream_timeout: :infinity, finch_name: OpenaiEx.Finch, _ep_path_mapping: &OpenaiEx._identity/1, _http_headers: nil @@ -109,8 +110,13 @@ defmodule OpenaiEx do openai |> Map.put(:base_url, base_url) end - def with_receive_timeout(openai = %OpenaiEx{}, receive_timeout) do - openai |> Map.put(:receive_timeout, receive_timeout) + def with_receive_timeout(openai = %OpenaiEx{}, timeout) do + openai |> Map.put(:receive_timeout, timeout) + end + + def with_stream_timeout(openai = %OpenaiEx{}, timeout) + when is_integer(timeout) and timeout > 0 do + openai |> Map.put(:stream_timeout, timeout) end def with_finch_name(openai = %OpenaiEx{}, finch_name) do diff --git a/lib/openai_ex/exception.ex b/lib/openai_ex/exception.ex new file mode 100644 index 0000000..9c09275 --- /dev/null +++ b/lib/openai_ex/exception.ex @@ -0,0 +1,13 @@ +defmodule OpenaiEx.Exception do + defexception [:type, :message] + + @impl true + def exception({type, message}) do + %__MODULE__{type: type, message: to_string(message)} + end + + @impl true + def message(%__MODULE__{type: type, message: message}) do + "OpenaiEx Exception (#{type}): #{message}" + end +end diff --git a/lib/openai_ex/http_sse.ex b/lib/openai_ex/http_sse.ex index 5ba0064..4e5283c 100644 --- a/lib/openai_ex/http_sse.ex +++ b/lib/openai_ex/http_sse.ex @@ -11,12 +11,13 @@ defmodule OpenaiEx.HttpSse do def post(openai = %OpenaiEx{}, url, json: json) do me = self() ref = make_ref() - task = Task.async(fn -> post_sse(openai, url, json, me, ref) end) + task = Task.async(fn -> finch_stream(openai, url, json, me, ref) end) status = receive(do: ({:chunk, {:status, status}, ^ref} -> status)) headers = receive(do: ({:chunk, {:headers, headers}, ^ref} -> headers)) if status in 200..299 do - body_stream = Stream.resource(fn -> {"", ref} end, &next_sse/1, fn _ -> :ok end) + stream_receiver = create_stream_receiver(ref, openai.stream_timeout, task) + body_stream = Stream.resource(&init_stream/0, stream_receiver, &end_stream/1) %{status: status, headers: headers, body_stream: body_stream, task_pid: task.pid} else error = extract_error(ref, "") @@ -28,15 +29,20 @@ defmodule OpenaiEx.HttpSse do send(task_pid, :cancel_request) end - defp post_sse(openai = %OpenaiEx{}, url, json, me, ref) do - request = OpenaiEx.Http.build_post(openai, url, json: json) - on_chunk = create_chunk_handler(me, ref) + defp finch_stream(openai = %OpenaiEx{}, url, json, me, ref) do + request = Http.build_post(openai, url, json: json) + send_me_chunk = create_chunk_sender(me, ref) options = Http.request_options(openai) - request |> Finch.stream(Map.get(openai, :finch_name), nil, on_chunk, options) - send(me, {:done, ref}) + + try do + request |> Finch.stream(Map.get(openai, :finch_name), nil, send_me_chunk, options) + send(me, {:done, ref}) + catch + :throw, :cancel_request -> {:exception, :cancel_request} + end end - defp create_chunk_handler(me, ref) do + defp create_chunk_sender(me, ref) do fn chunk, _acc -> receive do :cancel_request -> @@ -48,28 +54,40 @@ defmodule OpenaiEx.HttpSse do end end - defp next_sse({acc, ref}) do - receive do - {:chunk, {:data, evt_data}, ^ref} -> - {events, next_acc} = extract_events(evt_data, acc) - {[events], {next_acc, ref}} - - # some 3rd party providers seem to be ending the stream with eof, - # rather than 2 line terminators. Hopefully those will be fixed and this - # can be removed in the future - {:done, ^ref} when acc == "data: [DONE]" -> - {:halt, {acc, ref}} - - {:done, ^ref} -> - if acc != "", do: Logger.error("Residue!: #{acc}") - {:halt, {acc, ref}} - - {:canceled, ^ref} -> - Logger.info("Request canceled by user") - {:halt, {acc, ref}} + defp init_stream, do: "" + + defp create_stream_receiver(ref, timeout, task) do + fn acc when is_binary(acc) -> + receive do + {:chunk, {:data, evt_data}, ^ref} -> + {events, next_acc} = extract_events(evt_data, acc) + {[events], next_acc} + + # some 3rd party providers seem to be ending the stream with eof, + # rather than 2 line terminators. Hopefully those will be fixed and this + # can be removed in the future + {:done, ^ref} when acc == "data: [DONE]" -> + Logger.warning("\"data: [DONE]\" should be followed by 2 line terminators") + {:halt, acc} + + {:done, ^ref} -> + {:halt, acc} + + {:canceled, ^ref} -> + Logger.info("Request canceled by user") + {:halt, {:exception, :canceled}} + after + timeout -> + Logger.warning("Stream timeout after #{timeout}ms") + Task.shutdown(task) + {:halt, {:exception, :timeout}} + end end end + defp end_stream({:exception, reason}), do: raise(OpenaiEx.Exception, {:sse_exception, reason}) + defp end_stream(_), do: :ok + @double_eol ~r/(\r?\n|\r){2}/ @double_eol_eos ~r/(\r?\n|\r){2}$/ diff --git a/mix.exs b/mix.exs index 008af8f..53d887e 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule OpenaiEx.MixProject do use Mix.Project - @version "0.6.5" + @version "0.7.0" @description "Community maintained OpenAI API Elixir client for Livebook" @source_url "https://github.com/restlessronin/openai_ex" @@ -35,7 +35,7 @@ defmodule OpenaiEx.MixProject do {:jason, "~> 1.4"}, {:multipart, "~> 0.4"}, {:ex_doc, ">= 0.0.0", only: :docs}, - {:credo, "~> 1.6", only: :dev, runtime: false} + {:credo, "~> 1.7", only: :dev, runtime: false} ] end diff --git a/mix.lock b/mix.lock index 8e29d9e..6d0286c 100644 --- a/mix.lock +++ b/mix.lock @@ -1,9 +1,9 @@ %{ "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"}, - "credo": {:hex, :credo, "1.7.6", "b8f14011a5443f2839b04def0b252300842ce7388f3af177157c86da18dfbeea", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "146f347fb9f8cbc5f7e39e3f22f70acbef51d441baa6d10169dd604bfbc55296"}, + "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, - "ex_doc": {:hex, :ex_doc, "0.33.0", "690562b153153c7e4d455dc21dab86e445f66ceba718defe64b0ef6f0bd83ba0", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "3f69adc28274cb51be37d09b03e4565232862a4b10288a3894587b0131412124"}, + "ex_doc": {:hex, :ex_doc, "0.34.1", "9751a0419bc15bc7580c73fde506b17b07f6402a1e5243be9e0f05a68c723368", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d441f1a86a235f59088978eff870de2e815e290e44a8bd976fe5d64470a4c9d2"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, "hpax": {:hex, :hpax, "0.2.0", "5a58219adcb75977b2edce5eb22051de9362f08236220c9e859a47111c194ff5", [:mix], [], "hexpm", "bea06558cdae85bed075e6c036993d43cd54d447f76d8190a8db0dc5893fa2f1"}, @@ -12,9 +12,9 @@ "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, - "mint": {:hex, :mint, "1.6.0", "88a4f91cd690508a04ff1c3e28952f322528934be541844d54e0ceb765f01d5e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "3c5ae85d90a5aca0a49c0d8b67360bbe407f3b54f1030a111047ff988e8fefaa"}, + "mint": {:hex, :mint, "1.6.1", "065e8a5bc9bbd46a41099dfea3e0656436c5cbcb6e741c80bd2bad5cd872446f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4fc518dcc191d02f433393a72a7ba3f6f94b101d094cb6bf532ea54c89423780"}, "multipart": {:hex, :multipart, "0.4.0", "634880a2148d4555d050963373d0e3bbb44a55b2badd87fa8623166172e9cda0", [:mix], [{:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}], "hexpm", "3c5604bc2fb17b3137e5d2abdf5dacc2647e60c5cc6634b102cf1aef75a06f0a"}, - "nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, diff --git a/notebooks/cleanup.livemd b/notebooks/cleanup.livemd index 7d25de4..5c1043e 100644 --- a/notebooks/cleanup.livemd +++ b/notebooks/cleanup.livemd @@ -2,8 +2,8 @@ ```elixir Mix.install([ - {:openai_ex, "~> 0.6.5"}, - {:kino, "~> 0.12.3"} + {:openai_ex, "~> 0.7.0"}, + {:kino, "~> 0.13.1"} ]) ``` diff --git a/notebooks/completions.livemd b/notebooks/completions.livemd index 7a996c3..14bd1f9 100644 --- a/notebooks/completions.livemd +++ b/notebooks/completions.livemd @@ -4,8 +4,8 @@ ```elixir Mix.install([ - {:openai_ex, "~> 0.6.5"}, - {:kino, "~> 0.12.3"} + {:openai_ex, "~> 0.7.0"}, + {:kino, "~> 0.13.1"} ]) alias OpenaiEx diff --git a/notebooks/dlai_orderbot.livemd b/notebooks/dlai_orderbot.livemd index 61ee53f..36b802e 100644 --- a/notebooks/dlai_orderbot.livemd +++ b/notebooks/dlai_orderbot.livemd @@ -2,7 +2,7 @@ ```elixir Mix.install([ - {:openai_ex, "~> 0.6.5"}, + {:openai_ex, "~> 0.7.0"}, {:kino, "~> 0.11.0"} ]) diff --git a/notebooks/images.livemd b/notebooks/images.livemd index 2eafdfb..0e574c3 100644 --- a/notebooks/images.livemd +++ b/notebooks/images.livemd @@ -5,7 +5,7 @@ ```elixir Mix.install([ {:openai_ex, path: Path.join(__DIR__, "..")}, - {:kino, "~> 0.12.3"} + {:kino, "~> 0.13.1"} ]) alias OpenaiEx diff --git a/notebooks/streaming_orderbot.livemd b/notebooks/streaming_orderbot.livemd index 0eb333f..9b7fa0b 100644 --- a/notebooks/streaming_orderbot.livemd +++ b/notebooks/streaming_orderbot.livemd @@ -2,8 +2,8 @@ ```elixir Mix.install([ - {:openai_ex, "~> 0.6.5"}, - {:kino, "~> 0.12.3"} + {:openai_ex, "~> 0.7.0"}, + {:kino, "~> 0.13.1"} ]) alias OpenaiEx @@ -27,7 +27,6 @@ openai = System.fetch_env!("LB_OPENAI_API_KEY") |> OpenaiEx.new() ```elixir defmodule OpenaiEx.Notebooks.StreamingOrderbot do alias OpenaiEx - alias OpenaiEx.Chat.Completions require Logger def set_task_pid(task_pid) do @@ -54,19 +53,37 @@ defmodule OpenaiEx.Notebooks.StreamingOrderbot do def stream_to_completions(%{body_stream: body_stream}) do body_stream |> Stream.flat_map(& &1) - |> Stream.map(fn %{data: d} -> d |> Map.get("choices") |> Enum.at(0) |> Map.get("delta") end) + |> Stream.map(fn %{data: d} -> d |> Map.get("choices") |> List.first() |> Map.get("delta") end) |> Stream.filter(fn map -> map |> Map.has_key?("content") end) |> Stream.map(fn map -> map |> Map.get("content") end) end def stream_completion_to_frame(stream, frame) do - stream - |> stream_to_completions() - |> Enum.reduce("", fn token, text -> - next = text <> token - Kino.Frame.render(frame, Kino.Text.new(next)) - next - end) + try do + result = + stream + |> stream_to_completions() + |> Enum.reduce("", fn token, text -> + next = text <> token + Kino.Frame.render(frame, Kino.Text.new(next)) + next + end) + + {:ok, result} + rescue + e in OpenaiEx.Exception -> + case e do + %{type: :sse_exception, message: "canceled"} -> + message = "Request was canceled." + Kino.Frame.render(frame, Kino.Text.new(message)) + {:canceled, message} + + _ -> + message = "An error occurred: #{e.message}" + Kino.Frame.render(frame, Kino.Text.new(message)) + {:error, message} + end + end end def create_orderbot(openai = %OpenaiEx{}, context) do @@ -81,11 +98,11 @@ defmodule OpenaiEx.Notebooks.StreamingOrderbot do |> Kino.render() stream_o = openai |> get_stream(context) - bot_says = stream_o |> stream_completion_to_frame(last_frame) + {status, bot_says} = stream_o |> stream_completion_to_frame(last_frame) Kino.listen( form, - context ++ [ChatMessage.assistant(bot_says)], + context ++ if(status == :ok, do: [ChatMessage.assistant(bot_says)], else: []), fn %{data: %{prompt: you_say}}, history -> Kino.Frame.render(last_frame, Kino.Text.new("")) Kino.Frame.append(chat_frame, Kino.Text.new(List.last(history).content)) @@ -93,8 +110,12 @@ defmodule OpenaiEx.Notebooks.StreamingOrderbot do stream = openai |> get_stream(history ++ [ChatMessage.user(you_say)]) set_task_pid(stream.task_pid) - bot_says = stream |> stream_completion_to_frame(last_frame) - {:cont, history ++ [ChatMessage.user(you_say), ChatMessage.assistant(bot_says)]} + {status, bot_says} = stream |> stream_completion_to_frame(last_frame) + + case status do + :ok -> {:cont, history ++ [ChatMessage.user(you_say), ChatMessage.assistant(bot_says)]} + _ -> {:cont, history} + end end ) @@ -150,5 +171,3 @@ context = [ ```elixir openai |> StreamingOrderbot.create_orderbot(context) ``` - - diff --git a/notebooks/userguide.livemd b/notebooks/userguide.livemd index 6ada4d9..4ed6749 100644 --- a/notebooks/userguide.livemd +++ b/notebooks/userguide.livemd @@ -2,9 +2,9 @@ ```elixir Mix.install([ - {:openai_ex, "~> 0.6.5"}, + {:openai_ex, "~> 0.7.0"}, # {:openai_ex, path: Path.join(__DIR__, "..")}, - {:kino, "~> 0.12.3"} + {:kino, "~> 0.13.1"} ]) ``` @@ -43,7 +43,7 @@ Add the following code to the first connection cell: ```elixir Mix.install( [ - {:openai_ex, "~> 0.6.5"} + {:openai_ex, "~> 0.7.0"} ] ) ``` @@ -57,7 +57,7 @@ Add the following to your mix.exs file: ```elixir def deps do [ - {:openai_ex, "~> 0.6.5"} + {:openai_ex, "~> 0.7.0"} ] end ``` @@ -222,6 +222,8 @@ IO.puts(inspect(chat_stream.task_pid)) chat_stream.body_stream |> Stream.flat_map(& &1) |> Enum.each(fn x -> IO.puts(inspect(x)) end) ``` +### Canceling a streaming request + The `chat_stream.task_pid` can be used in conjunction with `OpenaiEx.HttpSse.cancel_request/1` to cancel an ongoing request. You need to check the return `chat_stream.status` field. In case the status is *not* 2XX, the `body_stream` and `task_pid` fields are not available. Instead, an `error` field will be returned. @@ -235,6 +237,82 @@ err_resp = openai |> Chat.Completions.create(bad_req, stream: true) For a detailed example of the use of the streaming `ChatCompletion` API, **including how to cancel an ongoing request**, check out [Streaming Orderbot](https://hexdocs.pm/openai_ex/streaming_orderbot.html), the streaming equivalent of the prior example. +### Stream Timeout + +While OpenAI's official API implementation typically doesn't require explicit timeout handling for streams, some third-party implementations of the OpenAI API may benefit from custom timeout settings. OpenaiEx provides a way to set a stream-specific timeout to handle these cases. + +You can set a stream-specific timeout using the `with_stream_timeout` function: + +```elixir +# Set a stream timeout of 30 seconds +openai_with_timeout = openai |> OpenaiEx.with_stream_timeout(30_000) +``` + +This is particularly useful when working with third-party OpenAI API implementations that may have different performance characteristics than the official API. + +### Exception Handling for Streams + +When working with streams, it's important to handle potential exceptions that may occur during stream processing. OpenaiEx uses a custom exception type for stream-related errors. Here's how you can handle these exceptions: + +```elixir +alias OpenaiEx.Exception + +process_stream = fn openai, request -> + response = Chat.Completions.create(openai, request, stream: true) + + try do + response.body_stream + |> Stream.flat_map(& &1) + |> Enum.each(fn chunk -> + # Process each chunk here + IO.inspect(chunk) + end) + rescue + e in OpenaiEx.Exception -> + case e do + %{type: :sse_exception, message: "canceled"} -> + IO.puts("Stream was canceled") + {:error, :canceled, e.message} + %{type: :sse_exception, message: "timeout"} -> + IO.puts("Timeout on SSE stream") + {:error, :timeout, e.message} + _ -> + IO.puts("Unknown error occurred") + {:error, :unknown, e.message} + end + e -> + IO.puts("An unexpected error occurred") + {:error, :unexpected, Exception.message(e)} + end +end + +# Usage +chat_req = Chat.Completions.new( + model: "gpt-3.5-turbo", + messages: [ChatMessage.user("Tell me a short story about a brave knight")], + max_tokens: 500 +) + +# Use the OpenaiEx struct with custom stream timeout +result = process_stream.(openai_with_timeout, chat_req) +case result do + {:error, type, message} -> + IO.puts("Error type: #{type}") + IO.puts("Error message: #{message}") + _ -> + IO.puts("Stream processed successfully") +end +``` + +In this example, we define a `process_stream` function that handles different types of stream exceptions: + +* `:canceled`: The stream was canceled. We return an error tuple. +* `:timeout`: The stream timed out. We return an error tuple. +* Any other `OpenaiEx.Exception`: We treat it as an unknown error. +* Any other exception: We treat it as an unexpected error. + +This approach allows you to gracefully handle different types of stream-related errors and take appropriate actions. + For more information on generating chat completions, see the [OpenAI API Chat Completions reference](https://platform.openai.com/docs/api-reference/chat). ### Function(Tool) Calling @@ -1179,5 +1257,3 @@ Use [`Run.create_and_run()`](https://platform.openai.com/docs/api-reference/runs ```elixir ``` - -