Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 195 additions & 17 deletions lib/lightning/channels.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ defmodule Lightning.Channels do

import Ecto.Query

alias Ecto.Multi
alias Lightning.Accounts.User
alias Lightning.Channels.Audit
alias Lightning.Channels.Channel
alias Lightning.Channels.ChannelAuthMethod
alias Lightning.Channels.ChannelRequest
alias Lightning.Channels.ChannelSnapshot
alias Lightning.Repo

Expand All @@ -21,6 +26,53 @@ defmodule Lightning.Channels do
|> Repo.all()
end

@doc """
Returns channels for a project with aggregate stats from channel_requests.

Each entry is a map with keys:
- all Channel fields (via struct)
- `:request_count` — total number of requests
- `:last_activity` — datetime of most recent request, or nil
"""
def list_channels_for_project_with_stats(project_id) do
from(c in Channel,
where: c.project_id == ^project_id,
left_join: cr in ChannelRequest,
on: cr.channel_id == c.id,
group_by: c.id,
order_by: [asc: c.name],
select: %{
channel: c,
request_count: count(cr.id),
last_activity: max(cr.started_at)
}
)
|> Repo.all()
end

@doc """
Returns aggregate stats for all channels in a project.

Returns a map with:
- `:total_channels` — number of channels in the project
- `:total_requests` — total channel requests across all channels

Uses a single query with a LEFT JOIN so both counts are fetched in one
database round-trip.
"""
def get_channel_stats_for_project(project_id) do
from(c in Channel,
where: c.project_id == ^project_id,
left_join: cr in ChannelRequest,
on: cr.channel_id == c.id,
select: %{
total_channels: count(c.id, :distinct),
total_requests: count(cr.id)
}
)
|> Repo.one()
end

@doc """
Gets a single channel by ID. Returns nil if not found.
"""
Expand Down Expand Up @@ -55,26 +107,59 @@ defmodule Lightning.Channels do
@doc """
Gets a single channel. Raises if not found.
"""
def get_channel!(id) do
Repo.get!(Channel, id)
def get_channel!(id, opts \\ []) do
preloads = Keyword.get(opts, :include, [])
Repo.get!(Channel, id) |> Repo.preload(preloads)
end

@doc """
Gets a channel by ID scoped to a project. Returns `nil` if the channel
does not exist or belongs to a different project.
"""
def get_channel_for_project(project_id, channel_id) do
Repo.get_by(Channel, id: channel_id, project_id: project_id)
end

@doc """
Creates a channel.
"""
def create_channel(attrs) do
%Channel{}
|> Channel.changeset(attrs)
|> Repo.insert()
@spec create_channel(map(), actor: User.t()) ::
{:ok, Channel.t()} | {:error, Ecto.Changeset.t()}
def create_channel(attrs, actor: %User{} = actor) do
changeset = Channel.changeset(%Channel{}, attrs)

Multi.new()
|> Multi.insert(:channel, changeset)
|> Multi.insert(:audit, fn %{channel: channel} ->
Audit.event("created", channel.id, actor, changeset)
end)
|> maybe_audit_auth_method_changes(changeset, actor)
|> Repo.transaction()
|> case do
{:ok, %{channel: channel}} -> {:ok, channel}
{:error, :channel, changeset, _} -> {:error, changeset}
end
end

@doc """
Updates a channel's config fields, bumping lock_version.
"""
def update_channel(%Channel{} = channel, attrs) do
channel
|> Channel.changeset(attrs)
|> Repo.update(stale_error_field: :lock_version)
@spec update_channel(Channel.t(), map(), actor: User.t()) ::
{:ok, Channel.t()} | {:error, Ecto.Changeset.t()}
def update_channel(%Channel{} = channel, attrs, actor: %User{} = actor) do
changeset = Channel.changeset(channel, attrs)

Multi.new()
|> Multi.update(:channel, changeset, stale_error_field: :lock_version)
|> Multi.insert(:audit, fn %{channel: updated} ->
Audit.event("updated", updated.id, actor, changeset)
end)
|> maybe_audit_auth_method_changes(changeset, actor)
|> Repo.transaction()
|> case do
{:ok, %{channel: channel}} -> {:ok, channel}
{:error, :channel, changeset, _} -> {:error, changeset}
end
end

@doc """
Expand All @@ -83,14 +168,107 @@ defmodule Lightning.Channels do
Returns `{:error, changeset}` if the channel has snapshots
(due to `:restrict` FK on `channel_snapshots`).
"""
def delete_channel(%Channel{} = channel) do
channel
|> Ecto.Changeset.change()
|> Ecto.Changeset.foreign_key_constraint(:channel_snapshots,
name: "channel_snapshots_channel_id_fkey",
message: "has history that must be retained"
@spec delete_channel(Channel.t(), actor: User.t()) ::
{:ok, Channel.t()} | {:error, Ecto.Changeset.t()}
def delete_channel(%Channel{} = channel, actor: %User{} = actor) do
changeset =
channel
|> Ecto.Changeset.change()
|> Ecto.Changeset.foreign_key_constraint(:channel_snapshots,
name: "channel_snapshots_channel_id_fkey",
message: "has history that must be retained"
)

Multi.new()
|> Multi.insert(:audit, Audit.event("deleted", channel.id, actor, %{}))
|> Multi.delete(:channel, changeset)
|> Repo.transaction()
|> case do
{:ok, %{channel: channel}} -> {:ok, channel}
{:error, :channel, changeset, _} -> {:error, changeset}
end
end

# Emits one "auth_method_added" or "auth_method_removed" audit step per
# association change. No-op when the changeset has no auth method changes
# (e.g. the toggle handler, which passes no "channel_auth_methods" key).
defp maybe_audit_auth_method_changes(multi, changeset, actor) do
auth_changes =
Ecto.Changeset.get_change(changeset, :channel_auth_methods, [])

inserted = Enum.filter(auth_changes, &(&1.action == :insert))
deleted = Enum.filter(auth_changes, &(&1.action == :delete))

multi
|> add_auth_method_added_audits(inserted, actor)
|> add_auth_method_removed_audits(deleted, actor)
end

defp add_auth_method_added_audits(multi, inserted, actor) do
inserted
|> Enum.with_index()
|> Enum.reduce(multi, fn {cs, idx}, acc ->
role = Ecto.Changeset.get_field(cs, :role)
fields = auth_method_fields_for(cs, role)

Multi.insert(
acc,
:"audit_auth_method_added_#{idx}",
fn %{channel: channel} ->
Audit.event("auth_method_added", channel.id, actor, %{
before: nil,
after: fields
})
end
)
end)
end

defp add_auth_method_removed_audits(multi, deleted, actor) do
deleted
|> Enum.with_index()
|> Enum.reduce(multi, fn {cs, idx}, acc ->
role = cs.data.role
fields = auth_method_fields_for(cs, role)

Multi.insert(
acc,
:"audit_auth_method_removed_#{idx}",
fn %{channel: channel} ->
Audit.event("auth_method_removed", channel.id, actor, %{
before: fields,
after: nil
})
end
)
end)
end

defp auth_method_fields_for(cs, :source) do
%{
role: "source",
webhook_auth_method_id:
Ecto.Changeset.get_field(cs, :webhook_auth_method_id)
}
end

defp auth_method_fields_for(cs, :sink) do
%{
role: "sink",
project_credential_id: Ecto.Changeset.get_field(cs, :project_credential_id)
}
end

@doc """
Returns all ChannelAuthMethod records for a channel, preloading
their associated webhook_auth_method and project_credential (with credential).
"""
def list_channel_auth_methods(%Channel{} = channel) do
from(cam in ChannelAuthMethod,
where: cam.channel_id == ^channel.id,
preload: [:webhook_auth_method, project_credential: :credential]
)
|> Repo.delete()
|> Repo.all()
end

@doc """
Expand Down
15 changes: 15 additions & 0 deletions lib/lightning/channels/audit.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Lightning.Channels.Audit do
@moduledoc """
Audit trail for channel CRUD operations.
"""
use Lightning.Auditing.Audit,
repo: Lightning.Repo,
item: "channel",
events: [
"created",
"updated",
"deleted",
"auth_method_added",
"auth_method_removed"
]
end
1 change: 1 addition & 0 deletions lib/lightning/channels/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ defmodule Lightning.Channels.Channel do
|> assoc_constraint(:project)
|> unique_constraint([:project_id, :name])
|> optimistic_lock(:lock_version)
|> cast_assoc(:channel_auth_methods)
end
end
14 changes: 11 additions & 3 deletions lib/lightning/channels/channel_auth_method.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule Lightning.Channels.ChannelAuthMethod do

schema "channel_auth_methods" do
field :role, Ecto.Enum, values: @roles
field :delete, :boolean, virtual: true

belongs_to :channel, Channel
belongs_to :webhook_auth_method, WebhookAuthMethod
Expand All @@ -29,11 +30,11 @@ defmodule Lightning.Channels.ChannelAuthMethod do
struct
|> cast(attrs, [
:role,
:channel_id,
:webhook_auth_method_id,
:project_credential_id
:project_credential_id,
:delete
])
|> validate_required([:role, :channel_id])
|> validate_required([:role])
|> Validators.validate_exclusive(
[:webhook_auth_method_id, :project_credential_id],
"webhook_auth_method_id and project_credential_id are mutually exclusive"
Expand All @@ -52,6 +53,13 @@ defmodule Lightning.Channels.ChannelAuthMethod do
|> unique_constraint(:project_credential_id,
name: :channel_auth_methods_pc_unique
)
|> then(fn changeset ->
if get_change(changeset, :delete) do
%{changeset | action: :delete}
else
changeset
end
end)
end

defp validate_role_target_consistency(changeset) do
Expand Down
23 changes: 23 additions & 0 deletions lib/lightning/config/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,29 @@ defmodule Lightning.Config.Bootstrap do
if config_env() == :dev do
enabled = env!("LIVE_DEBUGGER", &Utils.ensure_boolean/1, true)
config :live_debugger, :disabled?, not enabled

live_debugger_ip =
env!(
"LIVE_DEBUGGER_IP",
fn address ->
address
|> String.split(".")
|> Enum.map(&String.to_integer/1)
|> List.to_tuple()
end,
nil
)

if live_debugger_ip do
config :live_debugger, :ip, live_debugger_ip
end

live_debugger_external_url =
env!("LIVE_DEBUGGER_EXTERNAL_URL", :string, nil)

if live_debugger_external_url do
config :live_debugger, :external_url, live_debugger_external_url
end
end

# Load storage and webhook retry config early so endpoint can respect it.
Expand Down
8 changes: 7 additions & 1 deletion lib/lightning/policies/project_users.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ defmodule Lightning.Policies.ProjectUsers do
| :initiate_github_sync
| :create_collection
| :publish_template
| :create_channel
| :delete_channel
| :update_channel

@doc """
authorize/3 takes an action, a user, and a project. It checks the user's role
Expand Down Expand Up @@ -110,7 +113,10 @@ defmodule Lightning.Policies.ProjectUsers do
:delete_workflow,
:run_workflow,
:create_project_credential,
:initiate_github_sync
:initiate_github_sync,
:create_channel,
:delete_channel,
:update_channel
]

def authorize(
Expand Down
3 changes: 3 additions & 0 deletions lib/lightning/setup_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,9 @@ defmodule Lightning.SetupUtils do
Lightning.Projects.File,
Lightning.Projects.ProjectOauthClient,
Lightning.Credentials.OauthClient,
Lightning.Channels.ChannelRequest,
Lightning.Channels.ChannelSnapshot,
Lightning.Channels.Channel,
Lightning.Projects.Project,
Lightning.Collaboration.DocumentState
])
Expand Down
Loading