Ever wanted to raise millions of $ by adding "AI" to your company name; and you happen to want to code it using Elixir, Phoenix, LiveView? This is the tutorial on making API calls to model providers like Together AI; Wait, not just calls, but HTTP streams to receive the data in chunks.
Big picture for any stack:
- Get prompt from the user
- Send the prompt to the TogetherAI through API call
- Handle incoming chunks of LLM output
TLDR:
In Elixir world, we are going to have two processes, one for liveview and another process that will handle HTTP call with streams. LiveView will send the prompt and its pid (process id) to the handler, that in turn will spawn a separate process that will make HTTP call and send the chunks of LLM output to the LiveView as the chunks arrive. When the last chunk arrives, we then notify the LiveView that the text generation has finished.
Setup
Generate new Phoenix project by running
mix phx.new phoenix_playground
cd ./phoenix_playground
# you will need to create running PostgreSQL db. Alternatively, just use --sqlite flag when generating phoenix project
mix ecto.create
iex -S mix phx.server
Go to mix.exs and add Req to dependencies:
{:req, "~> 0.5.0"}
Implementation
Let's start with the LiveView. Here is the scaffold of the LiveView with a form that will handle input and on submission, set loading state to true and send the prompt to the TogetherAI.
defmodule PhoenixPlaygroundWeb.HomeLive do
alias PhoenixPlayground.TogetherAi
use PhoenixPlaygroundWeb, :live_view
def mount(_params, _session, socket) do
socket =
socket |> assign(:loading, false) |> assign(:text, "")
{:ok, socket}
end
def render(assigns) do
~H"""
<div class="mx-auto max-w-3xl my-20">
<div class="flex items-start space-x-4">
<div class="min-w-0 flex-1">
<form phx-change="validate" phx-submit="submit" id="prompt-form" class="relative">
<div class="">
<label for="prompt" class="sr-only">Add your prompt</label>
<input
type="text"
name="prompt"
id="prompt"
class="block w-full "
placeholder="Let me know what you want to achieve"
/>
</div>
<div class="flex-shrink-0">
<button
:if={not @loading}
type="submit"
class="tailwind-goes-here"
>
Send
</button>
<button
:if={@loading}
type="submit"
class="tailwind-goes-here"
>
Loading...
</button>
</div>
</form>
</div>
</div>
<p :if={not is_nil(@text)}><%= @text %></p>
</div>
"""
def handle_event("validate", _, socket) do
{:noreply, socket}
end
def handle_event("submit", %{"prompt" => prompt}, socket) do
# TODO: submit our prompt to the Together AI (or OpenAI or Mistral)
TogetherAI.stream_completion(prompt, self())
socket =
socket |> update(:loading, &toggle_loading/1)
{:noreply, socket}
end
end
Now, add this LiveView to router.ex:
live "/home", HomeLive
You will need to register at TogetherAI and get your API key to access the platform. Upon registration, you will receive $5 in credits which is more than enough for this tutorial. Assuming you have your API key, go to the paltform, then pick a LLM model you would like to use.
Let's start writing TogetherAI module.
defmodule PhoenixPlayground.TogetherAi do
def stream_completion(prompt, pid) when is_binary(prompt) do
url = "https://api.together.xyz/v1/chat/completions"
body = %{
model: "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo", # your model id
messages: [
# some system prompts to help fine tune the output. Write whatever you want here or just ignore it
%{
role: "system",
content:
"Your response must be in a format of bullet list with actionable items, like a todo-list. Your response must not start with 'Here is...'. Give only 2 actionable items, no more. Do not add any additional comments after actionable items. Give brief response."
},
# here, we are including the prompt by user
%{role: "user", content: prompt}
],
stream: true,
max_tokens: 512,
temperature: 0.7,
top_p: 0.7,
top_k: 50,
stop: "[DONE]"
}
Task.async(fn ->
Req.post(
url,
json: body,
auth: {:bearer, "<your-api-key>"},
into: fn {:data, _data} = data, {_req, _resp} = req_resp ->
handle_stream(data, req_resp, pid)
end
)
end)
end
end
Here, we are constructing the request configs as needed, then creating a separate process with Task.async() where we make a post request to the platform and specify the function that will be handling the streams.
Req does a lot of heavy lifing for us when it comes to HTTP request and streams. We will just need to write handle_stream(). More on it here https://hexdocs.pm/req/Req.Request.html.
Here is the implementation for handle_stream() given that succes is successful with 200 status code.
defp handle_stream(
{:data, data},
{req = %Req.Request{}, resp = %Req.Response{status: 200}},
pid
) do
# string manipulations
decoded =
data
|> String.split("data: ")
|> Enum.map(fn str ->
str
|> String.trim()
|> decode_body()
end)
|> Enum.filter(fn d -> d != :ok end)
case handle_response(decoded) do
# LLM finished generating, so we are informing the LiveView about it
{text, :finish} ->
send(pid, {__MODULE__, "last_chunk", text})
# LLM generated text, so we are sending it to the LiveView process
generated_text ->
send(pid, {__MODULE__, "chunk", generated_text})
end
{:cont, {req, resp}}
end
defp handle_response(decoded) when is_map(decoded) do
decoded
|> Map.get("choices")
|> List.first()
|> Map.get("delta")
|> Map.get("content")
end
defp handle_response(decoded) when is_list(decoded) do
result =
Enum.reduce(decoded, "", fn choices_map, acc ->
case choices_map do
:finish ->
{acc, :finish}
map ->
acc <> handle_response(map)
end
end)
result
end
defp decode_body(""), do: :ok
defp decode_body("[DONE]"), do: :finish
defp decode_body(json), do: Jason.decode!(json)
After receiving a response chunk, we need to do couple of string manipulations before decoding it with Jason. Note that decode_body() function returns :ok on empty string, and :finish when we reach the stop string (in our case it is "[DONE]".
After we decode the response, we need to extract the text generated by LLM. Here is the sample response decoded from LLM:
%{
"choices" => [
%{
"delta" => %{
"content" => "•",
"role" => "assistant",
"token_id" => 6806,
"tool_calls" => nil
},
"finish_reason" => nil,
"index" => 0,
"logprobs" => nil,
"seed" => nil,
"text" => "•"
}
],
"created" => 1721912534,
"id" => "some-id",
"model" => "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo",
"object" => "chat.completion.chunk",
"usage" => nil
}
Streamed LLM output might contain a single world or multiple words. When it contains a single world, we handle it as a map. When it contains many words in one response chunk, we need to extract the words together and put them together. That's what is going on in this function:
defp handle_response(decoded) when is_list(decoded) do
result =
Enum.reduce(decoded, "", fn choices_map, acc ->
case choices_map do
:finish ->
{acc, :finish}
map ->
acc <> handle_response(map)
end
end)
result
end
Here, we are looping through multiple outputs and extracting the text by recursively calling handle_response(map).
Output with multiple chunks of text will keep arriving. A list of chunks may arrive where the last statement will be "[DONE]" indicating the end of LLM output. When the last message arrives, we will decode it to :finish which we will send to the LiveView.
When we reach the end of output, we are doing to receive the :finish atom, but we are also going to receive the chunk as below, with usage that indicates how many tokens were used, and finish_reason with the finish reason instead of nil. The token usage and finish reasons are not to be ignored when you build your SAAS. Remember, this is the part where you multiple token cost by multiple X to make profit :)
%{
"choices" => [
%{
"delta" => %{
"content" => " measurements",
"role" => "assistant",
"token_id" => 22323,
"tool_calls" => nil
},
"finish_reason" => "length",
"index" => 0,
"logprobs" => nil,
"seed" => 16606951688656440000,
"text" => " measurements"
}
],
"created" => 1721827856,
"id" => "8a8dcdd7e2d74-ARN",
"model" => "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo",
"object" => "chat.completion.chunk",
"usage" => %{
"completion_tokens" => 512,
"prompt_tokens" => 93,
"total_tokens" => 605
}
}
Now, let's handle the responses on the LiveView. There are two function that we will handle the incoming message from another process. One handles the LLM chunk(s) and other handles finishing chunk(s). As we receive the text, we are going to append it with the text we have received before.
When we receive the last chunk, then we are going to show the flash with "Finished generating".
def handle_info({PhoenixPlayground.TogetherAi, "chunk", text}, socket) when is_binary(text) do
socket =
socket |> update(:text, &(&1 <> text))
{:noreply, socket}
end
def handle_info({PhoenixPlayground.TogetherAi, "last_chunk", text}, socket)
when is_binary(text) do
socket =
socket
|> update(:text, &(&1 <> text))
|> update(:loading, &toggle_loading/1)
|> put_flash(:info, "Finished generating")
{:noreply, socket}
end
There is still a couple of more things we need to consider. We need to add error handling as well as message handler for Task.
Let's assume that the API call to TogetherAI will not succeed. In that case, we will need to add another handle_stream() function that will receive the response with error code. For the sake of simplicity, I put the handle_stream that receives the success 200 first, so it will be called first, if does not pattern match (e.g. different status code), it will go to this handle_stream where we assume it will be response with error and error message.
defp handle_stream({:data, data}, {req = %Req.Request{}, resp = %Req.Response{status: _}}, pid) do
error_msg = data |> Jason.decode!() |> Map.get("error") |> Map.get("message")
send(pid, {__MODULE__, :error, error_msg})
{:cont, {req, resp}}
end
Remeber that we are calling spawning new process with Task.async()? We will need to handle other messages that the Task sends to its parent. Please refer to officials docs on Task.async.
def handle_info({PhoenixPlayground.TogetherAi, :error, error_msg}, socket) do
socket = socket |> put_flash(:error, error_msg)
{:noreply, socket}
end
def handle_info(_msg, socket) do
# message that come here unhandled are:
# 1. {:DOWN, _ref, :process, _pid, :normal}
# 2. {_ref, {:ok, response = %Req.Response{}}}
# TODO: other Task.async responses
{:noreply, socket}
end
Now, run iex -S mix phx.server
go to /home
and enjoy!
Here is the repo for the project. https://github.com/azyzz228/elixir-llm-tutorial
Discussion:
more error handling should be added?
Handle the last chunk with Token usage info and finish_reason better
Top comments (1)
This is awesome! Thanks for sharing 🙌