Streaming OpenAI data with Elixir
Parinda Darden・Dec 25, 2022・3 min read
OpenAI request usually takes more than a minute to get a response back. To solve timeout issue that we'll run into on most hosting services you'll need to use Elixir's Stream module.
For this example, we'll make a call to OpenAI's completion endpoint.
defmodule MyApp.OpenAI do
@moduledoc """
Handles OpenAI API request and response.
"""
@endpoint "api.openai.com"
@doc"""
Returns full response from OpenAI stream given the input and options.
Takes the following options:
- model
- max_tokens
- temperature
- consume_func, function that processes streamed response
"""
@spec generate_completion(String.t(), list()) :: {:ok, list()} | {:error, atom()}
def generate_completion(_input, opts \\ [])
def generate_completion("", _opts), do: {:error, :missing_input}
def generate_completion(input, opts) do
body =
%{
"model" => Keyword.get(opts, :model, "text-davinci-003"),
"prompt" => input,
"max_tokens" => Keyword.get(opts, :max_tokens, 250),
"temperature" => Keyword.get(opts, :temperature, 0.7),
"stream" => true
}
|> encode_body()
url = url("/v1/completions")
consume_func =
Keyword.get(opts, :consume_func, fn
{:status, value}, {_, headers, body} -> {value, headers, body}
{:headers, value}, {status, headers, body} -> {status, headers ++ value, body}
{:data, value}, {status, headers, body} -> {status, headers, [value | body]}
end)
Finch.build(:post, url, headers(), body)
|> Finch.stream(MyApp.Finch, {nil, [], []}, consume_func)
|> handle_stream(:generate_completion)
end
defp url(path) do
"https://" <> @endpoint <> path
end
defp handle_stream({:ok, {200, _headers, body}}, _event) do
{:ok, body |> Enum.reverse()}
end
defp handle_stream({:ok, {status, headers, body}}, event) do
IO.inspect(status, label: "status")
IO.inspect(headers, label: "headers")
IO.inspect(body, label: "body")
{:error, event}
end
defp handle_stream({:error, error}, event) do
IO.inspect(error, label: "error")
{:error, event}
end
defp encode_body(body), do: Jason.encode!(body)
defp headers do
[{"Authorization", "Bearer #{api_key()}"}, {"Content-Type", "application/json"}]
end
defp api_key do
Application.get_env(:my_app, :envs)[:open_ai][:api_key]
end
end
In `def generatecompletion`, the two important parts of the function are the stream option passed to the request, which will tell the API to send back partial progress keeping the connection open, and the `consumefunc` option that will parse through the streamed response.
Now, let's use this module to ask OpenAI a question!
opts = [
consume_func: fn
{:status, value}, {_, headers, body} ->
{value, headers, body}
{:headers, value}, {status, headers, body} ->
{status, headers ++ value, body}
{:data, value}, {status, headers, body} ->
if value == "data: [DONE]\n\n" do
{status, headers, body}
else
value =
value
|> String.replace_prefix("data: ", "")
|> String.replace_suffix("\n\n", "")
|> String.split("\n\ndata: ")
|> Enum.reduce("", fn data, acc ->
if is_nil(data) || data == "[DONE]" do
acc
else
value = Jason.decode!(data)
value = List.first(value["choices"], %{}) |> Map.get("text", "")
acc <> value
end
end)
{status, headers, [value | body]}
end
end
]
answer = case MyApp.OpenAI.generate_completion("How do I say hello in Thai?", opts) do
{:ok, body} ->
body |> Enum.join("")
{:error, error} ->
IO.inspect(error)
end
We're adding a bit more logic to parse data that's streamed from OpenAI. From the API documentation, the `stream` option will stream back the data as they become available, with the stream terminated by a `data: [DONE]` message. We're handling the end of the stream by returning the current `{status, headers, body}` tuple. The data returned from OpenAI will be a text that starts with `"data: "` and ends with `"\n\n"`, which will want to remove by using `|> String.replaceprefix("data: ", "")` and `|> String.replacesuffix("\n\n", "")`.
Sometimes the data will sneak in a few extra instances of `"\n\ndata: "`, so we have to handle this edge case with `|> String.split("\n\ndata: ")`. Because of this edge case, we need to check to make if the stream ended again, and make sure that the data is not empty. Once the data passes all the checks, it gets parsed and added to the previous parsed data.
The final answer from OpenAI completion endpoint should be `Sawasdee (สวัสดี)`.