symbiont_ex/lib/symbiont/queue.ex

178 lines
4.4 KiB
Elixir

defmodule Symbiont.Queue do
@moduledoc """
Persistent task queue backed by JSONL file.
Tasks flow through states: pending → processing → done | failed
The queue is durable — survives restarts via the JSONL file.
"""
use GenServer
require Logger
defstruct [:path, tasks: []]
# -- Client API --
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc "Add a task to the queue. Returns the task ID."
def enqueue(task_text, priority \\ "normal") do
GenServer.call(__MODULE__, {:enqueue, task_text, priority})
end
@doc "Take up to `n` pending tasks for processing."
def take(n \\ 1) do
GenServer.call(__MODULE__, {:take, n})
end
@doc "Mark a task as completed."
def complete(task_id, result \\ nil) do
GenServer.cast(__MODULE__, {:complete, task_id, result})
end
@doc "Mark a task as failed."
def fail(task_id, reason \\ nil) do
GenServer.cast(__MODULE__, {:fail, task_id, reason})
end
@doc "Get the current queue size (pending tasks only)."
def size do
GenServer.call(__MODULE__, :size)
end
@doc "List all tasks (optionally filtered by status)."
def list(status \\ nil) do
GenServer.call(__MODULE__, {:list, status})
end
# -- Server Callbacks --
@impl true
def init(opts) do
data_dir = Keyword.fetch!(opts, :data_dir)
path = Path.join(data_dir, "queue.jsonl")
unless File.exists?(path), do: File.write!(path, "")
tasks = load_tasks(path)
Logger.info("Queue loaded: #{length(tasks)} tasks (#{count_pending(tasks)} pending)")
{:ok, %__MODULE__{path: path, tasks: tasks}}
end
@impl true
def handle_call({:enqueue, task_text, priority}, _from, state) do
task = %{
"id" => generate_id(),
"task" => task_text,
"priority" => priority,
"status" => "pending",
"created_at" => DateTime.utc_now() |> DateTime.to_iso8601()
}
new_tasks = state.tasks ++ [task]
persist!(state.path, new_tasks)
{:reply, {:ok, task["id"]}, %{state | tasks: new_tasks}}
end
@impl true
def handle_call({:take, n}, _from, state) do
{to_process, rest} =
state.tasks
|> Enum.split_with(&(&1["status"] == "pending"))
taken = Enum.take(to_process, n)
remaining_pending = Enum.drop(to_process, n)
updated_taken =
Enum.map(taken, &Map.put(&1, "status", "processing"))
new_tasks = updated_taken ++ remaining_pending ++ rest
persist!(state.path, new_tasks)
{:reply, updated_taken, %{state | tasks: new_tasks}}
end
@impl true
def handle_call(:size, _from, state) do
{:reply, count_pending(state.tasks), state}
end
@impl true
def handle_call({:list, nil}, _from, state) do
{:reply, state.tasks, state}
end
@impl true
def handle_call({:list, status}, _from, state) do
filtered = Enum.filter(state.tasks, &(&1["status"] == to_string(status)))
{:reply, filtered, state}
end
@impl true
def handle_cast({:complete, task_id, result}, state) do
new_tasks = update_task_status(state.tasks, task_id, "done", result)
persist!(state.path, new_tasks)
{:noreply, %{state | tasks: new_tasks}}
end
@impl true
def handle_cast({:fail, task_id, reason}, state) do
new_tasks = update_task_status(state.tasks, task_id, "failed", reason)
persist!(state.path, new_tasks)
{:noreply, %{state | tasks: new_tasks}}
end
# -- Private --
defp load_tasks(path) do
path
|> File.stream!()
|> Stream.reject(&(&1 in ["", "\n"]))
|> Enum.map(fn line ->
case Jason.decode(String.trim(line)) do
{:ok, task} -> task
{:error, _} -> nil
end
end)
|> Enum.reject(&is_nil/1)
end
defp persist!(path, tasks) do
content =
tasks
|> Enum.map(&(Jason.encode!(&1) <> "\n"))
|> Enum.join()
File.write!(path, content)
end
defp update_task_status(tasks, task_id, status, extra) do
Enum.map(tasks, fn task ->
if task["id"] == task_id do
task
|> Map.put("status", status)
|> Map.put("completed_at", DateTime.utc_now() |> DateTime.to_iso8601())
|> then(fn t ->
if extra, do: Map.put(t, "result", extra), else: t
end)
else
task
end
end)
end
defp count_pending(tasks) do
Enum.count(tasks, &(&1["status"] == "pending"))
end
defp generate_id do
ts = System.system_time(:second)
rand = :rand.uniform(9999)
"task-#{ts}-#{rand}"
end
end