From fbee48993317d52379c09290d1f7f1e444c8ab62 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 5 Dec 2025 14:40:45 +0000 Subject: [PATCH 1/5] support batched logs --- lib/lightning/runs.ex | 56 +++++++++++++++-------- lib/lightning_web/channels/run_channel.ex | 4 +- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/lib/lightning/runs.ex b/lib/lightning/runs.ex index 84153b4ecec..a25978cfd37 100644 --- a/lib/lightning/runs.ex +++ b/lib/lightning/runs.ex @@ -262,28 +262,44 @@ defmodule Lightning.Runs do end def append_run_log(run, params, scrubber \\ nil) do - LogLine.new(run, params, scrubber) - |> Ecto.Changeset.validate_change(:step_id, fn _field, step_id -> - if is_nil(step_id) do - [] - else - where(Lightning.RunStep, step_id: ^step_id, run_id: ^run.id) - |> Repo.exists?() - |> if do - [] - else - [{:step_id, "must be associated with the run"}] - end + log_entries = + case Map.get(params, :logs) || Map.get(params, "logs") do + logs when is_list(logs) -> logs + nil -> [params] end - end) - |> Repo.insert() - |> case do - {:ok, log_line} -> - Events.log_appended(log_line) - {:ok, log_line} - {:error, changeset} -> - {:error, changeset} + results = + Enum.map(log_entries, fn log_entry -> + IO.inspect(log_entry, label: "log object in append_run_log") + + LogLine.new(run, log_entry, scrubber) + |> Ecto.Changeset.validate_change(:step_id, fn _field, step_id -> + if is_nil(step_id) do + [] + else + where(Lightning.RunStep, step_id: ^step_id, run_id: ^run.id) + |> Repo.exists?() + |> if do + [] + else + [{:step_id, "must be associated with the run"}] + end + end + end) + |> Repo.insert() + |> case do + {:ok, log_line} -> + Events.log_appended(log_line) + {:ok, log_line} + + {:error, changeset} -> + {:error, changeset} + end + end) + + case Enum.find(results, fn result -> match?({:error, _}, result) end) do + nil -> :ok + error -> error end end diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex index a8a6d4fe5dc..c1d018fe4e6 100644 --- a/lib/lightning_web/channels/run_channel.ex +++ b/lib/lightning_web/channels/run_channel.ex @@ -219,8 +219,8 @@ defmodule LightningWeb.RunChannel do {:error, changeset} -> reply_with(socket, {:error, changeset}) - {:ok, log_line} -> - reply_with(socket, {:ok, %{log_line_id: log_line.id}}) + :ok -> + reply_with(socket, :ok) end end From ad5b757f2ac023f2b83c221115a58d9c90bf01cd Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 10 Dec 2025 12:08:47 +0000 Subject: [PATCH 2/5] changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f906d42b0a..051253a82b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,10 @@ and this project adheres to ### Changed +- Support a batch of logs submitted to the `run:log` channel by the worker + [#4123](https://github.com/OpenFn/lightning/issues/4123) (backwards + compatible) + ### Fixed - Fix new jobs are misplaced on the canvas in manual layout From d52d28f4e24bbb748152991ecc6aa8417a2dc324 Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Thu, 8 Jan 2026 13:19:15 +0300 Subject: [PATCH 3/5] Introduce another event run:batch_logs for batch logs --- lib/lightning/runs.ex | 97 ++++++--- lib/lightning_web/channels/run_channel.ex | 14 +- .../channels/run_channel_test.exs | 184 ++++++++++++++++++ 3 files changed, 272 insertions(+), 23 deletions(-) diff --git a/lib/lightning/runs.ex b/lib/lightning/runs.ex index a25978cfd37..74361275a2b 100644 --- a/lib/lightning/runs.ex +++ b/lib/lightning/runs.ex @@ -262,44 +262,97 @@ defmodule Lightning.Runs do end def append_run_log(run, params, scrubber \\ nil) do - log_entries = - case Map.get(params, :logs) || Map.get(params, "logs") do - logs when is_list(logs) -> logs - nil -> [params] + LogLine.new(run, params, scrubber) + |> Ecto.Changeset.validate_change(:step_id, fn _field, step_id -> + if is_nil(step_id) do + [] + else + where(Lightning.RunStep, step_id: ^step_id, run_id: ^run.id) + |> Repo.exists?() + |> if do + [] + else + [{:step_id, "must be associated with the run"}] + end end + end) + |> Repo.insert() + |> case do + {:ok, log_line} -> + Events.log_appended(log_line) + {:ok, log_line} - results = - Enum.map(log_entries, fn log_entry -> - IO.inspect(log_entry, label: "log object in append_run_log") + {:error, changeset} -> + {:error, changeset} + end + end + + @doc """ + Appends multiple log lines to a run in a batch operation. + + Returns `{:ok, [%LogLine{}, ...]}` if all logs are inserted successfully, or `{:error, changeset}` + for the first validation error encountered. + """ + def append_run_logs_batch(run, log_entries, scrubber \\ nil) + when is_list(log_entries) do + step_ids = + log_entries + |> Enum.map(&(Map.get(&1, "step_id") || Map.get(&1, :step_id))) + |> Enum.reject(&is_nil/1) + |> Enum.uniq() + + valid_step_ids = + if Enum.empty?(step_ids) do + [] + else + from(s in Lightning.RunStep, + where: s.step_id in ^step_ids and s.run_id == ^run.id, + select: s.step_id + ) + |> Repo.all() + end + changesets = + Enum.map(log_entries, fn log_entry -> LogLine.new(run, log_entry, scrubber) |> Ecto.Changeset.validate_change(:step_id, fn _field, step_id -> if is_nil(step_id) do [] else - where(Lightning.RunStep, step_id: ^step_id, run_id: ^run.id) - |> Repo.exists?() - |> if do + if step_id in valid_step_ids do [] else [{:step_id, "must be associated with the run"}] end end end) - |> Repo.insert() - |> case do - {:ok, log_line} -> - Events.log_appended(log_line) - {:ok, log_line} - - {:error, changeset} -> - {:error, changeset} - end end) - case Enum.find(results, fn result -> match?({:error, _}, result) end) do - nil -> :ok - error -> error + # Check if any changeset is invalid + case Enum.find(changesets, fn cs -> not cs.valid? end) do + nil -> + entries = + Enum.map(changesets, fn changeset -> + changeset + |> Ecto.Changeset.apply_changes() + |> Map.take(LogLine.__schema__(:fields)) + end) + + {_count, log_lines} = + Repo.insert_all( + Lightning.Invocation.LogLine, + entries, + returning: true + ) + + Enum.each(log_lines, fn log_line -> + Events.log_appended(log_line) + end) + + {:ok, log_lines} + + invalid_changeset -> + {:error, invalid_changeset} end end diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex index c1d018fe4e6..a080b38db7f 100644 --- a/lib/lightning_web/channels/run_channel.ex +++ b/lib/lightning_web/channels/run_channel.ex @@ -219,7 +219,19 @@ defmodule LightningWeb.RunChannel do {:error, changeset} -> reply_with(socket, {:error, changeset}) - :ok -> + {:ok, log_line} -> + reply_with(socket, {:ok, %{log_line_id: log_line.id}}) + end + end + + def handle_in("run:batch_logs", %{"logs" => payload}, socket) do + %{run: run, scrubber: scrubber} = socket.assigns + + case Runs.append_run_logs_batch(run, payload, scrubber) do + {:error, changeset} -> + reply_with(socket, {:error, changeset}) + + {:ok, _} -> reply_with(socket, :ok) end end diff --git a/test/lightning_web/channels/run_channel_test.exs b/test/lightning_web/channels/run_channel_test.exs index f25c3170324..24c23c6ae30 100644 --- a/test/lightning_web/channels/run_channel_test.exs +++ b/test/lightning_web/channels/run_channel_test.exs @@ -1140,6 +1140,190 @@ defmodule LightningWeb.RunChannelTest do persisted_log_line = Lightning.Repo.one(Lightning.Invocation.LogLine) assert persisted_log_line.timestamp == ~U[2023-11-08 11:57:33.874083Z] end + + test "run:batch_logs inserts multiple log lines in a single operation", %{ + socket: socket, + run: run + } do + ref = + push(socket, "run:batch_logs", %{ + "logs" => [ + %{ + "level" => "info", + "message" => ["First log line"], + "source" => "R/T", + "timestamp" => "1699444653874083" + }, + %{ + "level" => "debug", + "message" => ["Second log line"], + "source" => "R/T", + "timestamp" => "1699444653874084" + }, + %{ + "level" => "error", + "message" => ["Third log line"], + "source" => "R/T", + "timestamp" => "1699444653874085" + } + ] + }) + + assert_reply ref, :ok, _ + + log_lines = + Lightning.Invocation.LogLine + |> where(run_id: ^run.id) + |> order_by(:timestamp) + |> Repo.all() + + assert length(log_lines) == 3 + assert Enum.at(log_lines, 0).message == "First log line" + assert Enum.at(log_lines, 1).message == "Second log line" + assert Enum.at(log_lines, 2).message == "Third log line" + end + + test "run:batch_logs validates all messages", %{ + socket: socket + } do + ref = + push(socket, "run:batch_logs", %{ + "logs" => [ + %{ + "level" => "info", + "message" => ["Valid log"], + "source" => "R/T", + "timestamp" => "1699444653874083" + }, + %{ + "level" => "info", + "timestamp" => "1699444653874084" + # missing message + } + ] + }) + + assert_reply ref, :error, errors + assert errors == %{message: ["This field can't be blank."]} + end + + test "run:batch_logs validates step_id association", %{ + socket: socket, + run: run, + workflow: workflow + } do + [job] = workflow.jobs + step_id_1 = Ecto.UUID.generate() + step_id_2 = Ecto.UUID.generate() + invalid_step_id = Ecto.UUID.generate() + + # Start two steps + ref = + push(socket, "step:start", %{ + "step_id" => step_id_1, + "job_id" => job.id, + "input_dataclip_id" => run.dataclip_id + }) + + assert_reply ref, :ok, _ + + ref = + push(socket, "step:start", %{ + "step_id" => step_id_2, + "job_id" => job.id, + "input_dataclip_id" => run.dataclip_id + }) + + assert_reply ref, :ok, _ + + # Send batch with valid step_ids + ref = + push(socket, "run:batch_logs", %{ + "logs" => [ + %{ + "message" => ["Log for step 1"], + "timestamp" => "1699444653874083", + "step_id" => step_id_1 + }, + %{ + "message" => ["Log for step 2"], + "timestamp" => "1699444653874084", + "step_id" => step_id_2 + }, + %{ + "message" => ["Log without step"], + "timestamp" => "1699444653874085" + } + ] + }) + + assert_reply ref, :ok, _ + + log_lines = + Lightning.Invocation.LogLine + |> where(run_id: ^run.id) + |> order_by(:timestamp) + |> Repo.all() + + assert length(log_lines) == 3 + assert Enum.at(log_lines, 0).step_id == step_id_1 + assert Enum.at(log_lines, 1).step_id == step_id_2 + assert Enum.at(log_lines, 2).step_id == nil + + # Send batch with invalid step_id + ref = + push(socket, "run:batch_logs", %{ + "logs" => [ + %{ + "message" => ["Log with invalid step"], + "timestamp" => "1699444653874086", + "step_id" => invalid_step_id + } + ] + }) + + assert_reply ref, :error, errors + assert errors == %{step_id: ["must be associated with the run"]} + end + + test "run:batch_logs handles empty logs array", %{ + socket: socket + } do + ref = push(socket, "run:batch_logs", %{"logs" => []}) + + assert_reply ref, :ok, _ + end + + test "run:batch_logs broadcasts events for each inserted log line", %{ + socket: socket, + run: run + } do + # Subscribe to run events + Lightning.Runs.Events.subscribe(run) + + ref = + push(socket, "run:batch_logs", %{ + "logs" => [ + %{ + "message" => ["Log 1"], + "timestamp" => "1699444653874083" + }, + %{ + "message" => ["Log 2"], + "timestamp" => "1699444653874084" + } + ] + }) + + assert_reply ref, :ok, _ + + # Should receive 2 log_appended events + assert_receive %Lightning.Runs.Events.LogAppended{log_line: log_line_1} + assert log_line_1.message == "Log 1" + + assert_receive %Lightning.Runs.Events.LogAppended{log_line: log_line_2} + assert log_line_2.message == "Log 2" + end end describe "run:start" do From 5ce693cb52cbeb0686158265939cd39af2dfbbb4 Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Thu, 8 Jan 2026 14:20:06 +0300 Subject: [PATCH 4/5] update changelog --- CHANGELOG.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 50b8daa86ca..c47c0846bfa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,9 +19,8 @@ and this project adheres to ### Changed -- Support a batch of logs submitted to the `run:log` channel by the worker - [#4123](https://github.com/OpenFn/lightning/issues/4123) (backwards - compatible) +- Support a batch of logs submitted via `run:batch_logs` in run channel + [#4123](https://github.com/OpenFn/lightning/issues/4123) ### Fixed From 65aaa4a4208f3508a2a47ba64f5939d40ebf5e9a Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Thu, 8 Jan 2026 14:51:24 +0300 Subject: [PATCH 5/5] refactor to make credo happy --- lib/lightning/runs.ex | 68 ++++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/lib/lightning/runs.ex b/lib/lightning/runs.ex index 74361275a2b..9155db0607c 100644 --- a/lib/lightning/runs.ex +++ b/lib/lightning/runs.ex @@ -295,40 +295,52 @@ defmodule Lightning.Runs do """ def append_run_logs_batch(run, log_entries, scrubber \\ nil) when is_list(log_entries) do - step_ids = - log_entries - |> Enum.map(&(Map.get(&1, "step_id") || Map.get(&1, :step_id))) - |> Enum.reject(&is_nil/1) - |> Enum.uniq() - - valid_step_ids = - if Enum.empty?(step_ids) do - [] - else - from(s in Lightning.RunStep, - where: s.step_id in ^step_ids and s.run_id == ^run.id, - select: s.step_id - ) - |> Repo.all() - end + valid_step_ids = fetch_valid_step_ids(run, log_entries) changesets = Enum.map(log_entries, fn log_entry -> LogLine.new(run, log_entry, scrubber) |> Ecto.Changeset.validate_change(:step_id, fn _field, step_id -> - if is_nil(step_id) do - [] - else - if step_id in valid_step_ids do - [] - else - [{:step_id, "must be associated with the run"}] - end - end + validate_step_id(step_id, valid_step_ids) end) end) - # Check if any changeset is invalid + with {:ok, log_lines} <- insert_all_logs(changesets) do + Enum.each(log_lines, &Events.log_appended/1) + + {:ok, log_lines} + end + end + + defp fetch_valid_step_ids(run, log_entries) do + step_ids = + log_entries + |> Enum.map(&(Map.get(&1, "step_id") || Map.get(&1, :step_id))) + |> Enum.reject(&is_nil/1) + |> Enum.uniq() + + if Enum.empty?(step_ids) do + [] + else + from(s in Lightning.RunStep, + where: s.step_id in ^step_ids and s.run_id == ^run.id, + select: s.step_id + ) + |> Repo.all() + end + end + + defp validate_step_id(nil, _valid_step_ids), do: [] + + defp validate_step_id(step_id, valid_step_ids) do + if step_id in valid_step_ids do + [] + else + [{:step_id, "must be associated with the run"}] + end + end + + defp insert_all_logs(changesets) do case Enum.find(changesets, fn cs -> not cs.valid? end) do nil -> entries = @@ -345,10 +357,6 @@ defmodule Lightning.Runs do returning: true ) - Enum.each(log_lines, fn log_line -> - Events.log_appended(log_line) - end) - {:ok, log_lines} invalid_changeset ->