diff --git a/app/models/organization.rb b/app/models/organization.rb index 7f48b16fe0d..5844f271217 100644 --- a/app/models/organization.rb +++ b/app/models/organization.rb @@ -268,6 +268,12 @@ def organization self end + # Same intent as #organization: a uniform interface so callers walking an arbitrary + # object can ask for the organization_id without special-casing Organization itself. + def organization_id + id + end + def maximum_wallets_per_customer max_wallets if events_targeting_wallets_enabled? end diff --git a/lib/active_job/json_log_subscriber.rb b/lib/active_job/json_log_subscriber.rb index 630bd519706..bf1b0f667d0 100644 --- a/lib/active_job/json_log_subscriber.rb +++ b/lib/active_job/json_log_subscriber.rb @@ -93,7 +93,7 @@ def perform(event) if ex error do - { + payload = { level: "error", event: "perform", status: "error", @@ -101,11 +101,11 @@ def perform(event) duration: event.duration.round(2), job_id: job.job_id, queue: job.queue_name, - exception: { - class: ex.class.name, - message: ex.message - } - }.to_json + arguments: args_info(job), + retries: job.executions, + exception: exception_payload(ex) + } + merge_organization_id(payload, job).to_json end elsif event.payload[:aborted] info do @@ -141,20 +141,19 @@ def enqueue_retry(event) wait = event.payload[:wait] info do - if ex + payload = if ex { level: "error", event: "retry", status: "error", job: job.class.name, job_id: job.job_id, + queue: job.queue_name, + arguments: args_info(job), execution: job.executions, wait: wait.to_i, - exception: { - class: ex.class.name, - message: ex.message - } - }.to_json + exception: exception_payload(ex) + } else { level: "info", @@ -162,10 +161,13 @@ def enqueue_retry(event) status: "success", job: job.class.name, job_id: job.job_id, + queue: job.queue_name, + arguments: args_info(job), execution: job.executions, wait: wait.to_i - }.to_json + } end + merge_organization_id(payload, job).to_json end end subscribe_log_level :enqueue_retry, :info @@ -175,19 +177,18 @@ def retry_stopped(event) ex = event.payload[:error] error do - { + payload = { level: "error", event: "retry", status: "stopped", job: job.class.name, job_id: job.job_id, queue: job.queue_name, + arguments: args_info(job), retries: job.executions, - exception: { - class: ex.class.name, - message: ex.message - } - }.to_json + exception: exception_payload(ex) + } + merge_organization_id(payload, job).to_json end end subscribe_log_level :retry_stopped, :error @@ -197,17 +198,18 @@ def discard(event) ex = event.payload[:error] error do - { + payload = { level: "error", event: "discard", status: "error", job: job.class.name, job_id: job.job_id, - exception: { - class: ex.class.name, - message: ex.message - } - }.to_json + queue: job.queue_name, + arguments: args_info(job), + retries: job.executions, + exception: exception_payload(ex) + } + merge_organization_id(payload, job).to_json end end subscribe_log_level :discard, :error @@ -243,17 +245,17 @@ def scheduled_at(job) def enqueue_error(job, ex) error do - { + payload = { level: "error", event: "enqueue", status: "error", job: job.class.name, + job_id: job.job_id, queue: job.queue_name, - exception: { - class: ex.class.name, - message: ex.message - } - }.to_json + arguments: args_info(job), + exception: exception_payload(ex) + } + merge_organization_id(payload, job).to_json end end @@ -271,6 +273,34 @@ def enqueue_success(job, **extra) }.to_json end end + + def exception_payload(ex) + {class: ex.class.name, message: ex.message} + end + + def merge_organization_id(payload, job) + org_id = organization_id_from(job) + unless org_id.nil? + payload[:organization_id] = org_id + end + payload + end + + def organization_id_from(job) + arg = job.arguments&.find { |a| !organization_id_in(a).nil? } + organization_id_in(arg) if arg + rescue + nil + end + + def organization_id_in(arg) + case arg + when Hash + arg[:organization_id].presence || arg["organization_id"].presence + else + arg.organization_id if arg.respond_to?(:organization_id) + end + end end end diff --git a/spec/lib/active_job/json_log_subscriber_spec.rb b/spec/lib/active_job/json_log_subscriber_spec.rb index 7cfc3b98786..0156f72cdc8 100644 --- a/spec/lib/active_job/json_log_subscriber_spec.rb +++ b/spec/lib/active_job/json_log_subscriber_spec.rb @@ -10,6 +10,13 @@ def perform(*) end end +class TestLogJobWithArgs < ApplicationJob + self.log_arguments = true + + def perform(*) + end +end + RSpec.describe ActiveJob::JsonLogSubscriber do subject(:subscriber) { described_class.new } @@ -42,6 +49,14 @@ def build_job( end end + def build_job_with_args(*args, job_id: "test-job-id", queue_name: "default", executions: 0) + TestLogJobWithArgs.new(*args).tap do |job| + job.job_id = job_id + job.queue_name = queue_name + job.executions = executions + end + end + def build_event(name, payload) ActiveSupport::Notifications::Event.new(name, nil, nil, "transaction_id", payload) end @@ -88,7 +103,9 @@ def parsed_log_lines "event" => "enqueue", "status" => "error", "job" => "TestLogJob", + "job_id" => "test-job-id", "queue" => "default", + "arguments" => {}, "exception" => {"class" => "RuntimeError", "message" => "redis down"} }) end @@ -109,7 +126,9 @@ def parsed_log_lines "event" => "enqueue", "status" => "error", "job" => "TestLogJob", + "job_id" => "test-job-id", "queue" => "low_priority", + "arguments" => {}, "exception" => {"class" => "ArgumentError", "message" => "invalid args"} }) end @@ -133,6 +152,30 @@ def parsed_log_lines }) end end + + context "when the job has an exception and an argument carries an organization_id" do + it "logs an error entry enriched with the organization_id" do + exception = RuntimeError.new("redis down") + job = build_job_with_args({organization_id: "org-enq"}, job_id: "abc-123") + event = build_event("enqueue.active_job", {job: job, exception_object: exception}) + + subscriber.enqueue(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "enqueue", + "status" => "error", + "job" => "TestLogJobWithArgs", + "job_id" => "abc-123", + "queue" => "default", + "arguments" => "{organization_id: \"org-enq\"}", + "exception" => {"class" => "RuntimeError", "message" => "redis down"}, + "organization_id" => "org-enq" + }) + end + end end describe "#enqueue_all" do @@ -174,8 +217,8 @@ def parsed_log_lines context "when there is a global exception" do it "logs an error entry with all expected attributes for each job" do - job1 = build_job(queue_name: "billing") - job2 = build_job(queue_name: "default") + job1 = build_job(job_id: "id-1", queue_name: "billing") + job2 = build_job(job_id: "id-2", queue_name: "default") exception = RuntimeError.new("connection failed") event = build_event("enqueue_all.active_job", {jobs: [job1, job2], exception_object: exception}) @@ -194,7 +237,9 @@ def parsed_log_lines "event" => "enqueue", "status" => "error", "job" => "TestLogJob", + "job_id" => "id-1", "queue" => "billing", + "arguments" => {}, "exception" => expected_exception }) @@ -203,7 +248,9 @@ def parsed_log_lines "event" => "enqueue", "status" => "error", "job" => "TestLogJob", + "job_id" => "id-2", "queue" => "default", + "arguments" => {}, "exception" => expected_exception }) end @@ -229,7 +276,9 @@ def parsed_log_lines "event" => "enqueue", "status" => "error", "job" => "TestLogJob", + "job_id" => "id-fail", "queue" => "default", + "arguments" => {}, "exception" => {"class" => "ArgumentError", "message" => "queue full"} }) @@ -284,11 +333,41 @@ def parsed_log_lines "event" => "enqueue", "status" => "error", "job" => "TestLogJob", + "job_id" => "test-job-id", "queue" => "default", + "arguments" => {}, "exception" => {"class" => "RuntimeError", "message" => "global failure"} }) end end + + context "when an individual job's enqueue_error happens and its arguments carry an organization_id" do + it "logs an error entry enriched with arguments and organization_id" do + error = ArgumentError.new("queue full") + failed_job = TestLogJobWithArgs.new({organization_id: "org-bulk"}).tap do |j| + j.job_id = "id-fail" + j.queue_name = "default" + j.enqueue_error = error + end + event = build_event("enqueue_all.active_job", {jobs: [failed_job], exception_object: nil}) + + subscriber.enqueue_all(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "enqueue", + "status" => "error", + "job" => "TestLogJobWithArgs", + "job_id" => "id-fail", + "queue" => "default", + "arguments" => "{organization_id: \"org-bulk\"}", + "exception" => {"class" => "ArgumentError", "message" => "queue full"}, + "organization_id" => "org-bulk" + }) + end + end end describe "#enqueue_at" do @@ -331,7 +410,9 @@ def parsed_log_lines "event" => "enqueue", "status" => "error", "job" => "TestLogJob", + "job_id" => "test-job-id", "queue" => "default", + "arguments" => {}, "exception" => {"class" => "RuntimeError", "message" => "redis down"} }) end @@ -352,7 +433,9 @@ def parsed_log_lines "event" => "enqueue", "status" => "error", "job" => "TestLogJob", + "job_id" => "test-job-id", "queue" => "low_priority", + "arguments" => {}, "exception" => {"class" => "ArgumentError", "message" => "invalid args"} }) end @@ -376,6 +459,30 @@ def parsed_log_lines }) end end + + context "when the job has an exception and an argument carries an organization_id" do + it "logs an error entry enriched with the organization_id" do + exception = RuntimeError.new("redis down") + job = build_job_with_args({organization_id: "org-enq-at"}, job_id: "abc-123") + event = build_event("enqueue_at.active_job", {job: job, exception_object: exception}) + + subscriber.enqueue_at(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "enqueue", + "status" => "error", + "job" => "TestLogJobWithArgs", + "job_id" => "abc-123", + "queue" => "default", + "arguments" => "{organization_id: \"org-enq-at\"}", + "exception" => {"class" => "RuntimeError", "message" => "redis down"}, + "organization_id" => "org-enq-at" + }) + end + end end describe "#perform_start" do @@ -450,7 +557,7 @@ def parsed_log_lines context "when the job raises an exception" do it "logs an error entry with all expected attributes" do exception = RuntimeError.new("something broke") - job = build_job(job_id: "abc-123") + job = build_job(job_id: "abc-123", executions: 2) event = build_event("perform.active_job", {job: job, exception_object: exception}) allow(event).to receive(:duration).and_return(45.678) @@ -466,9 +573,274 @@ def parsed_log_lines "duration" => 45.68, "job_id" => "abc-123", "queue" => "default", + "arguments" => {}, + "retries" => 2, "exception" => {"class" => "RuntimeError", "message" => "something broke"} }) end + + context "when an argument is a hash with a symbol :organization_id key" do + it "logs an error entry with the extracted organization_id" do + exception = RuntimeError.new("boom") + job = build_job_with_args({organization_id: "org-symbol"}, job_id: "abc-123", executions: 1) + event = build_event("perform.active_job", {job: job, exception_object: exception}) + allow(event).to receive(:duration).and_return(1.0) + + subscriber.perform(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "perform", + "status" => "error", + "job" => "TestLogJobWithArgs", + "duration" => 1.0, + "job_id" => "abc-123", + "queue" => "default", + "arguments" => "{organization_id: \"org-symbol\"}", + "retries" => 1, + "exception" => {"class" => "RuntimeError", "message" => "boom"}, + "organization_id" => "org-symbol" + }) + end + end + + context "when an argument is a hash with a string 'organization_id' key" do + it "logs an error entry with the extracted organization_id" do + exception = RuntimeError.new("boom") + job = build_job_with_args({"organization_id" => "org-string"}, job_id: "abc-123", executions: 1) + event = build_event("perform.active_job", {job: job, exception_object: exception}) + allow(event).to receive(:duration).and_return(1.0) + + subscriber.perform(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "perform", + "status" => "error", + "job" => "TestLogJobWithArgs", + "duration" => 1.0, + "job_id" => "abc-123", + "queue" => "default", + "arguments" => "{\"organization_id\" => \"org-string\"}", + "retries" => 1, + "exception" => {"class" => "RuntimeError", "message" => "boom"}, + "organization_id" => "org-string" + }) + end + end + + context "when an argument responds to organization_id" do + it "logs an error entry with the extracted organization_id" do + exception = RuntimeError.new("boom") + org_carrier = Struct.new(:organization_id).new("org-from-method") + job = build_job_with_args(org_carrier, job_id: "abc-123", executions: 1) + event = build_event("perform.active_job", {job: job, exception_object: exception}) + allow(event).to receive(:duration).and_return(1.0) + + subscriber.perform(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "perform", + "status" => "error", + "job" => "TestLogJobWithArgs", + "duration" => 1.0, + "job_id" => "abc-123", + "queue" => "default", + "arguments" => org_carrier.inspect, + "retries" => 1, + "exception" => {"class" => "RuntimeError", "message" => "boom"}, + "organization_id" => "org-from-method" + }) + end + end + + context "when an argument hash has organization_id: false" do + it "treats false as blank and falls through to the next argument's organization_id" do + exception = RuntimeError.new("boom") + job = build_job_with_args({organization_id: false}, {organization_id: "fallback"}, job_id: "abc-123", executions: 1) + event = build_event("perform.active_job", {job: job, exception_object: exception}) + allow(event).to receive(:duration).and_return(1.0) + + subscriber.perform(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "perform", + "status" => "error", + "job" => "TestLogJobWithArgs", + "duration" => 1.0, + "job_id" => "abc-123", + "queue" => "default", + "arguments" => "{organization_id: false}, {organization_id: \"fallback\"}", + "retries" => 1, + "exception" => {"class" => "RuntimeError", "message" => "boom"}, + "organization_id" => "fallback" + }) + end + end + + context "when an argument hash has organization_id: 0" do + it "logs an error entry with the zero organization_id rather than falling through" do + exception = RuntimeError.new("boom") + job = build_job_with_args({organization_id: 0}, {organization_id: "fallback"}, job_id: "abc-123", executions: 1) + event = build_event("perform.active_job", {job: job, exception_object: exception}) + allow(event).to receive(:duration).and_return(1.0) + + subscriber.perform(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "perform", + "status" => "error", + "job" => "TestLogJobWithArgs", + "duration" => 1.0, + "job_id" => "abc-123", + "queue" => "default", + "arguments" => "{organization_id: 0}, {organization_id: \"fallback\"}", + "retries" => 1, + "exception" => {"class" => "RuntimeError", "message" => "boom"}, + "organization_id" => 0 + }) + end + end + + context "when an argument hash has organization_id: nil" do + it "logs an error entry with the next argument's organization_id" do + exception = RuntimeError.new("boom") + job = build_job_with_args({organization_id: nil}, {organization_id: "fallback"}, job_id: "abc-123", executions: 1) + event = build_event("perform.active_job", {job: job, exception_object: exception}) + allow(event).to receive(:duration).and_return(1.0) + + subscriber.perform(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "perform", + "status" => "error", + "job" => "TestLogJobWithArgs", + "duration" => 1.0, + "job_id" => "abc-123", + "queue" => "default", + "arguments" => "{organization_id: nil}, {organization_id: \"fallback\"}", + "retries" => 1, + "exception" => {"class" => "RuntimeError", "message" => "boom"}, + "organization_id" => "fallback" + }) + end + end + + context "when no argument carries an organization_id" do + it "logs an error entry without the organization_id key" do + exception = RuntimeError.new("boom") + job = build_job(job_id: "abc-123") + event = build_event("perform.active_job", {job: job, exception_object: exception}) + allow(event).to receive(:duration).and_return(1.0) + + subscriber.perform(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "perform", + "status" => "error", + "job" => "TestLogJob", + "duration" => 1.0, + "job_id" => "abc-123", + "queue" => "default", + "arguments" => {}, + "retries" => 0, + "exception" => {"class" => "RuntimeError", "message" => "boom"} + }) + end + end + + context "when an argument is an Organization instance" do + it "logs an error entry with the Organization id as organization_id" do + exception = RuntimeError.new("boom") + organization = build(:organization) + job = build_job_with_args(organization, job_id: "abc-123", executions: 1) + event = build_event("perform.active_job", {job: job, exception_object: exception}) + allow(event).to receive(:duration).and_return(1.0) + + subscriber.perform(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first["organization_id"]).to eq(organization.id) + end + end + + context "when the first argument does not carry an organization_id and a later argument does" do + it "walks past the first argument and extracts the later organization_id" do + exception = RuntimeError.new("boom") + plain = Struct.new(:name).new("nothing") + org_carrier = Struct.new(:organization_id).new("org-later") + job = build_job_with_args(plain, org_carrier, job_id: "abc-123", executions: 1) + event = build_event("perform.active_job", {job: job, exception_object: exception}) + allow(event).to receive(:duration).and_return(1.0) + + subscriber.perform(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first["organization_id"]).to eq("org-later") + end + end + + context "when an argument's organization_id accessor raises" do + it "logs an error entry without the organization_id key" do + exception = RuntimeError.new("boom") + raising_arg = Class.new do + def organization_id + raise "kaboom" + end + + def inspect + "#" + end + end.new + job = build_job_with_args(raising_arg, job_id: "abc-123", executions: 1) + event = build_event("perform.active_job", {job: job, exception_object: exception}) + allow(event).to receive(:duration).and_return(1.0) + + subscriber.perform(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).not_to have_key("organization_id") + expect(logs.first["exception"]).to eq({"class" => "RuntimeError", "message" => "boom"}) + end + end + + context "when the job arguments collection is nil" do + it "logs an error entry without the organization_id key" do + exception = RuntimeError.new("boom") + job = build_job(job_id: "abc-123") + allow(job).to receive(:arguments).and_return(nil) + event = build_event("perform.active_job", {job: job, exception_object: exception}) + allow(event).to receive(:duration).and_return(1.0) + + subscriber.perform(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).not_to have_key("organization_id") + end + end end context "when the job is aborted" do @@ -511,11 +883,39 @@ def parsed_log_lines "status" => "error", "job" => "TestLogJob", "job_id" => "abc-123", + "queue" => "default", + "arguments" => {}, "execution" => 3, "wait" => 30, "exception" => {"class" => "RuntimeError", "message" => "transient failure"} }) end + + context "when the job arguments contain an organization_id" do + it "logs a retry error entry with the extracted organization_id" do + exception = RuntimeError.new("transient failure") + job = build_job_with_args({organization_id: "org-retry"}, job_id: "abc-123", executions: 3) + event = build_event("enqueue_retry.active_job", {job: job, error: exception, wait: 30.5}) + + subscriber.enqueue_retry(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "retry", + "status" => "error", + "job" => "TestLogJobWithArgs", + "job_id" => "abc-123", + "queue" => "default", + "arguments" => "{organization_id: \"org-retry\"}", + "execution" => 3, + "wait" => 30, + "exception" => {"class" => "RuntimeError", "message" => "transient failure"}, + "organization_id" => "org-retry" + }) + end + end end context "when there is no error" do @@ -533,6 +933,8 @@ def parsed_log_lines "status" => "success", "job" => "TestLogJob", "job_id" => "abc-123", + "queue" => "default", + "arguments" => {}, "execution" => 1, "wait" => 5 }) @@ -557,16 +959,42 @@ def parsed_log_lines "job" => "TestLogJob", "job_id" => "abc-123", "queue" => "default", + "arguments" => {}, "retries" => 5, "exception" => {"class" => "RuntimeError", "message" => "permanent failure"} }) end + + context "when the job arguments contain an organization_id" do + it "logs a stopped entry with the extracted organization_id" do + exception = RuntimeError.new("permanent failure") + job = build_job_with_args({organization_id: "org-77"}, job_id: "abc-123", executions: 5) + event = build_event("retry_stopped.active_job", {job: job, error: exception}) + + subscriber.retry_stopped(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "retry", + "status" => "stopped", + "job" => "TestLogJobWithArgs", + "job_id" => "abc-123", + "queue" => "default", + "arguments" => "{organization_id: \"org-77\"}", + "retries" => 5, + "exception" => {"class" => "RuntimeError", "message" => "permanent failure"}, + "organization_id" => "org-77" + }) + end + end end describe "#discard" do it "logs a discard entry with all expected attributes" do exception = RuntimeError.new("unrecoverable error") - job = build_job(job_id: "abc-123") + job = build_job(job_id: "abc-123", executions: 2) event = build_event("discard.active_job", {job: job, error: exception}) subscriber.discard(event) @@ -579,8 +1007,36 @@ def parsed_log_lines "status" => "error", "job" => "TestLogJob", "job_id" => "abc-123", + "queue" => "default", + "arguments" => {}, + "retries" => 2, "exception" => {"class" => "RuntimeError", "message" => "unrecoverable error"} }) end + + context "when the job arguments contain an organization_id" do + it "logs a discard entry with the extracted organization_id" do + exception = RuntimeError.new("unrecoverable") + job = build_job_with_args({organization_id: "org-99"}, job_id: "abc-123", executions: 1) + event = build_event("discard.active_job", {job: job, error: exception}) + + subscriber.discard(event) + + logs = parsed_log_lines + expect(logs.size).to eq(1) + expect(logs.first).to eq({ + "level" => "error", + "event" => "discard", + "status" => "error", + "job" => "TestLogJobWithArgs", + "job_id" => "abc-123", + "queue" => "default", + "arguments" => "{organization_id: \"org-99\"}", + "retries" => 1, + "exception" => {"class" => "RuntimeError", "message" => "unrecoverable"}, + "organization_id" => "org-99" + }) + end + end end end diff --git a/spec/models/organization_spec.rb b/spec/models/organization_spec.rb index b5c75193264..4bb43882b58 100644 --- a/spec/models/organization_spec.rb +++ b/spec/models/organization_spec.rb @@ -561,6 +561,12 @@ end end + describe "#organization_id" do + it "returns the organization id" do + expect(organization.organization_id).to eq(organization.id) + end + end + # this requires double confirmation: value on the org + premium integration describe "#maximum_wallets_per_customer", :premium do subject { organization.maximum_wallets_per_customer }