178 lines
4.4 KiB
Elixir
178 lines
4.4 KiB
Elixir
defmodule Symbiont.Queue do
|
|
@moduledoc """
|
|
Persistent task queue backed by JSONL file.
|
|
|
|
Tasks flow through states: pending → processing → done | failed
|
|
The queue is durable — survives restarts via the JSONL file.
|
|
"""
|
|
use GenServer
|
|
|
|
require Logger
|
|
|
|
defstruct [:path, tasks: []]
|
|
|
|
# -- Client API --
|
|
|
|
def start_link(opts) do
|
|
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
|
end
|
|
|
|
@doc "Add a task to the queue. Returns the task ID."
|
|
def enqueue(task_text, priority \\ "normal") do
|
|
GenServer.call(__MODULE__, {:enqueue, task_text, priority})
|
|
end
|
|
|
|
@doc "Take up to `n` pending tasks for processing."
|
|
def take(n \\ 1) do
|
|
GenServer.call(__MODULE__, {:take, n})
|
|
end
|
|
|
|
@doc "Mark a task as completed."
|
|
def complete(task_id, result \\ nil) do
|
|
GenServer.cast(__MODULE__, {:complete, task_id, result})
|
|
end
|
|
|
|
@doc "Mark a task as failed."
|
|
def fail(task_id, reason \\ nil) do
|
|
GenServer.cast(__MODULE__, {:fail, task_id, reason})
|
|
end
|
|
|
|
@doc "Get the current queue size (pending tasks only)."
|
|
def size do
|
|
GenServer.call(__MODULE__, :size)
|
|
end
|
|
|
|
@doc "List all tasks (optionally filtered by status)."
|
|
def list(status \\ nil) do
|
|
GenServer.call(__MODULE__, {:list, status})
|
|
end
|
|
|
|
# -- Server Callbacks --
|
|
|
|
@impl true
|
|
def init(opts) do
|
|
data_dir = Keyword.fetch!(opts, :data_dir)
|
|
path = Path.join(data_dir, "queue.jsonl")
|
|
|
|
unless File.exists?(path), do: File.write!(path, "")
|
|
|
|
tasks = load_tasks(path)
|
|
Logger.info("Queue loaded: #{length(tasks)} tasks (#{count_pending(tasks)} pending)")
|
|
|
|
{:ok, %__MODULE__{path: path, tasks: tasks}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_call({:enqueue, task_text, priority}, _from, state) do
|
|
task = %{
|
|
"id" => generate_id(),
|
|
"task" => task_text,
|
|
"priority" => priority,
|
|
"status" => "pending",
|
|
"created_at" => DateTime.utc_now() |> DateTime.to_iso8601()
|
|
}
|
|
|
|
new_tasks = state.tasks ++ [task]
|
|
persist!(state.path, new_tasks)
|
|
|
|
{:reply, {:ok, task["id"]}, %{state | tasks: new_tasks}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_call({:take, n}, _from, state) do
|
|
{to_process, rest} =
|
|
state.tasks
|
|
|> Enum.split_with(&(&1["status"] == "pending"))
|
|
|
|
taken = Enum.take(to_process, n)
|
|
remaining_pending = Enum.drop(to_process, n)
|
|
|
|
updated_taken =
|
|
Enum.map(taken, &Map.put(&1, "status", "processing"))
|
|
|
|
new_tasks = updated_taken ++ remaining_pending ++ rest
|
|
persist!(state.path, new_tasks)
|
|
|
|
{:reply, updated_taken, %{state | tasks: new_tasks}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_call(:size, _from, state) do
|
|
{:reply, count_pending(state.tasks), state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_call({:list, nil}, _from, state) do
|
|
{:reply, state.tasks, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_call({:list, status}, _from, state) do
|
|
filtered = Enum.filter(state.tasks, &(&1["status"] == to_string(status)))
|
|
{:reply, filtered, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:complete, task_id, result}, state) do
|
|
new_tasks = update_task_status(state.tasks, task_id, "done", result)
|
|
persist!(state.path, new_tasks)
|
|
{:noreply, %{state | tasks: new_tasks}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:fail, task_id, reason}, state) do
|
|
new_tasks = update_task_status(state.tasks, task_id, "failed", reason)
|
|
persist!(state.path, new_tasks)
|
|
{:noreply, %{state | tasks: new_tasks}}
|
|
end
|
|
|
|
# -- Private --
|
|
|
|
defp load_tasks(path) do
|
|
path
|
|
|> File.stream!()
|
|
|> Stream.reject(&(&1 in ["", "\n"]))
|
|
|> Enum.map(fn line ->
|
|
case Jason.decode(String.trim(line)) do
|
|
{:ok, task} -> task
|
|
{:error, _} -> nil
|
|
end
|
|
end)
|
|
|> Enum.reject(&is_nil/1)
|
|
end
|
|
|
|
defp persist!(path, tasks) do
|
|
content =
|
|
tasks
|
|
|> Enum.map(&(Jason.encode!(&1) <> "\n"))
|
|
|> Enum.join()
|
|
|
|
File.write!(path, content)
|
|
end
|
|
|
|
defp update_task_status(tasks, task_id, status, extra) do
|
|
Enum.map(tasks, fn task ->
|
|
if task["id"] == task_id do
|
|
task
|
|
|> Map.put("status", status)
|
|
|> Map.put("completed_at", DateTime.utc_now() |> DateTime.to_iso8601())
|
|
|> then(fn t ->
|
|
if extra, do: Map.put(t, "result", extra), else: t
|
|
end)
|
|
else
|
|
task
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp count_pending(tasks) do
|
|
Enum.count(tasks, &(&1["status"] == "pending"))
|
|
end
|
|
|
|
defp generate_id do
|
|
ts = System.system_time(:second)
|
|
rand = :rand.uniform(9999)
|
|
"task-#{ts}-#{rand}"
|
|
end
|
|
end
|