Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 203 additions & 0 deletions lib/pinchflat/diagnostics/queue_diagnostics.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
defmodule Pinchflat.Diagnostics.QueueDiagnostics do
@moduledoc """
Provides diagnostic information about Oban job queues.
"""

import Ecto.Query

alias Pinchflat.Repo

@queues [:default, :fast_indexing, :media_collection_indexing, :media_fetching, :remote_metadata, :local_data]

@doc """
Returns a list of all queue names.
"""
def queue_names, do: @queues

@doc """
Returns health status for all queues including job counts by state.
"""
def get_all_queue_stats do
Enum.map(@queues, fn queue_name ->
queue_info = Oban.check_queue(queue: queue_name)
job_counts = get_job_counts_for_queue(queue_name)

%{
name: queue_name,
running: length(Map.get(queue_info, :running, [])),
limit: Map.get(queue_info, :limit, 0),
paused: Map.get(queue_info, :paused, false),
available: Map.get(job_counts, :available, 0),
scheduled: Map.get(job_counts, :scheduled, 0),
retryable: Map.get(job_counts, :retryable, 0),
executing: Map.get(job_counts, :executing, 0)
}
end)
end

@doc """
Returns jobs that are in a retryable state (failed but will retry).
"""
def get_retryable_jobs(limit \\ 50) do
from(j in Oban.Job,
where: j.state == "retryable",
order_by: [desc: j.attempted_at],
limit: ^limit,
select: %{
id: j.id,
queue: j.queue,
worker: j.worker,
state: j.state,
attempt: j.attempt,
max_attempts: j.max_attempts,
errors: j.errors,
args: j.args,
attempted_at: j.attempted_at,
scheduled_at: j.scheduled_at
}
)
|> Repo.all()
end

@doc """
Returns jobs that appear to be stuck (executing for too long or orphaned).
A job is considered stuck if it's been "executing" for more than the threshold.
"""
def get_stuck_jobs(threshold_minutes \\ 30) do
threshold = DateTime.add(DateTime.utc_now(), -threshold_minutes * 60, :second)

from(j in Oban.Job,
where: j.state == "executing",
where: j.attempted_at < ^threshold,
order_by: [asc: j.attempted_at],
select: %{
id: j.id,
queue: j.queue,
worker: j.worker,
attempt: j.attempt,
attempted_at: j.attempted_at,
args: j.args
}
)
|> Repo.all()
end

@doc """
Resets stuck jobs by marking them as available for retry.
Returns the number of jobs reset.
"""
def reset_stuck_jobs(threshold_minutes \\ 30) do
threshold = DateTime.add(DateTime.utc_now(), -threshold_minutes * 60, :second)

{count, _} =
from(j in Oban.Job,
where: j.state == "executing",
where: j.attempted_at < ^threshold
)
|> Repo.update_all(set: [state: "available", scheduled_at: DateTime.utc_now(), attempted_at: nil])

count
end

@doc """
Resets all retryable jobs by clearing their error history and marking as available.
Returns the number of jobs reset.
"""
def reset_retryable_jobs do
{count, _} =
from(j in Oban.Job,
where: j.state == "retryable"
)
|> Repo.update_all(set: [state: "available", attempt: 1, errors: [], scheduled_at: DateTime.utc_now()])

count
end

@doc """
Resets a specific job by ID.
"""
def reset_job(job_id) do
{count, _} =
from(j in Oban.Job,
where: j.id == ^job_id,
where: j.state in ["retryable", "executing"]
)
|> Repo.update_all(set: [state: "available", attempt: 1, errors: [], scheduled_at: DateTime.utc_now()])

count
end

@doc """
Cancels a specific job by ID.
"""
def cancel_job(job_id) do
case Oban.cancel_job(job_id) do
:ok -> {:ok, :cancelled}
{:error, reason} -> {:error, reason}
end
end

@doc """
Returns summary statistics for the system.
"""
def get_system_stats do
%{
total_pending_downloads: count_pending_downloads(),
total_downloaded: count_downloaded_media(),
total_sources: count_sources(),
database_size: get_database_size()
}
end

# Private functions

defp get_job_counts_for_queue(queue_name) do
queue_string = Atom.to_string(queue_name)

from(j in Oban.Job,
where: j.queue == ^queue_string,
where: j.state in ["available", "scheduled", "retryable", "executing"],
group_by: j.state,
select: {j.state, count(j.id)}
)
|> Repo.all()
|> Enum.into(%{}, fn {state, count} -> {String.to_atom(state), count} end)
end

defp count_pending_downloads do
from(m in Pinchflat.Media.MediaItem,
where: is_nil(m.media_filepath),
where: m.prevent_download == false
)
|> Repo.aggregate(:count)
end

defp count_downloaded_media do
from(m in Pinchflat.Media.MediaItem,
where: not is_nil(m.media_filepath)
)
|> Repo.aggregate(:count)
end

defp count_sources do
Repo.aggregate(Pinchflat.Sources.Source, :count)
end

defp get_database_size do
db_path = Application.get_env(:pinchflat, Pinchflat.Repo)[:database]

if db_path && File.exists?(db_path) do
case File.stat(db_path) do
{:ok, %{size: size}} -> format_bytes(size)
_ -> "Unknown"
end
else
"Unknown"
end
end

defp format_bytes(bytes) when bytes < 1024, do: "#{bytes} B"
defp format_bytes(bytes) when bytes < 1024 * 1024, do: "#{Float.round(bytes / 1024, 1)} KB"
defp format_bytes(bytes) when bytes < 1024 * 1024 * 1024, do: "#{Float.round(bytes / 1024 / 1024, 1)} MB"
defp format_bytes(bytes), do: "#{Float.round(bytes / 1024 / 1024 / 1024, 2)} GB"
end
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
>
<:submenu text="Settings" href={~p"/settings"} />
<:submenu text="App Info" href={~p"/app_info"} />
<:submenu text="Diagnostics" href={~p"/diagnostics"} />
</.sidebar_submenu>
</ul>
</div>
Expand Down
53 changes: 53 additions & 0 deletions lib/pinchflat_web/controllers/settings/diagnostics_controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
defmodule PinchflatWeb.Settings.DiagnosticsController do
use PinchflatWeb, :controller

alias Pinchflat.Diagnostics.QueueDiagnostics

def show(conn, _params) do
render(conn, "show.html")
end

def reset_stuck_jobs(conn, _params) do
count = QueueDiagnostics.reset_stuck_jobs()

conn
|> put_flash(:info, "Reset #{count} stuck job(s). The queue will restart processing shortly.")
|> redirect(to: ~p"/diagnostics")
end

def reset_retryable_jobs(conn, _params) do
count = QueueDiagnostics.reset_retryable_jobs()

conn
|> put_flash(:info, "Reset #{count} retryable job(s). They will be retried shortly.")
|> redirect(to: ~p"/diagnostics")
end

def reset_job(conn, %{"id" => job_id}) do
case QueueDiagnostics.reset_job(String.to_integer(job_id)) do
1 ->
conn
|> put_flash(:info, "Job ##{job_id} has been reset and will retry shortly.")
|> redirect(to: ~p"/diagnostics")

0 ->
conn
|> put_flash(:error, "Job ##{job_id} could not be reset. It may have already completed or been cancelled.")
|> redirect(to: ~p"/diagnostics")
end
end

def cancel_job(conn, %{"id" => job_id}) do
case QueueDiagnostics.cancel_job(String.to_integer(job_id)) do
{:ok, :cancelled} ->
conn
|> put_flash(:info, "Job ##{job_id} has been cancelled.")
|> redirect(to: ~p"/diagnostics")

{:error, _reason} ->
conn
|> put_flash(:error, "Job ##{job_id} could not be cancelled.")
|> redirect(to: ~p"/diagnostics")
end
end
end
79 changes: 79 additions & 0 deletions lib/pinchflat_web/controllers/settings/diagnostics_html.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule PinchflatWeb.Settings.DiagnosticsHTML do
use PinchflatWeb, :html

alias Pinchflat.Diagnostics.QueueDiagnostics

embed_templates "diagnostics_html/*"

def queue_stats do
QueueDiagnostics.get_all_queue_stats()
end

def retryable_jobs do
QueueDiagnostics.get_retryable_jobs(20)
end

def stuck_jobs do
QueueDiagnostics.get_stuck_jobs(30)
end

def system_stats do
QueueDiagnostics.get_system_stats()
end

def format_worker_name(worker) do
worker
|> String.split(".")
|> Enum.at(-1)
|> format_worker_short_name()
end

defp format_worker_short_name("FastIndexingWorker"), do: "Fast Indexing"
defp format_worker_short_name("MediaDownloadWorker"), do: "Download"
defp format_worker_short_name("MediaCollectionIndexingWorker"), do: "Indexing"
defp format_worker_short_name("MediaQualityUpgradeWorker"), do: "Quality Upgrade"
defp format_worker_short_name("SourceMetadataStorageWorker"), do: "Metadata"
defp format_worker_short_name(other), do: other

def format_queue_name(queue) do
queue
|> Atom.to_string()
|> String.replace("_", " ")
|> String.split(" ")
|> Enum.map_join(" ", &String.capitalize/1)
end

def format_datetime(nil), do: "-"

def format_datetime(datetime) do
Calendar.strftime(datetime, "%Y-%m-%d %H:%M:%S")
end

def extract_last_error(errors) when is_list(errors) and length(errors) > 0 do
errors
|> List.last()
|> Map.get("error", "Unknown error")
|> String.slice(0, 200)
end

def extract_last_error(_), do: "No error details"

def queue_health_class(stats) do
cond do
stats.paused -> "bg-yellow-500/20 border-yellow-500"
stats.retryable > 0 -> "bg-red-500/20 border-red-500"
stats.running >= stats.limit and stats.available > 0 -> "bg-blue-500/20 border-blue-500"
true -> "bg-green-500/20 border-green-500"
end
end

def queue_status_text(stats) do
cond do
stats.paused -> "Paused"
stats.retryable > 0 -> "Has Failures"
stats.running >= stats.limit -> "At Capacity"
stats.running > 0 -> "Active"
true -> "Idle"
end
end
end
Loading