Skip to content
6 changes: 6 additions & 0 deletions app/models/organization.rb
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ def organization
self
end

# Same intent as #organization: a uniform interface so callers walking an arbitrary
# object can ask for the organization_id without special-casing Organization itself.
def organization_id
id
end

def maximum_wallets_per_customer
max_wallets if events_targeting_wallets_enabled?
end
Expand Down
92 changes: 61 additions & 31 deletions lib/active_job/json_log_subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,19 @@ def perform(event)

if ex
error do
{
payload = {
level: "error",
event: "perform",
status: "error",
job: job.class.name,
duration: event.duration.round(2),
job_id: job.job_id,
queue: job.queue_name,
exception: {
class: ex.class.name,
message: ex.message
}
}.to_json
arguments: args_info(job),
retries: job.executions,
exception: exception_payload(ex)
}
merge_organization_id(payload, job).to_json
end
elsif event.payload[:aborted]
info do
Expand Down Expand Up @@ -141,31 +141,33 @@ def enqueue_retry(event)
wait = event.payload[:wait]

info do
if ex
payload = if ex
{
level: "error",
event: "retry",
status: "error",
job: job.class.name,
job_id: job.job_id,
queue: job.queue_name,
arguments: args_info(job),
execution: job.executions,
wait: wait.to_i,
exception: {
class: ex.class.name,
message: ex.message
}
}.to_json
exception: exception_payload(ex)
}
else
{
level: "info",
event: "retry",
status: "success",
job: job.class.name,
job_id: job.job_id,
queue: job.queue_name,
arguments: args_info(job),
execution: job.executions,
wait: wait.to_i
}.to_json
}
end
merge_organization_id(payload, job).to_json
end
end
subscribe_log_level :enqueue_retry, :info
Expand All @@ -175,19 +177,18 @@ def retry_stopped(event)
ex = event.payload[:error]

error do
{
payload = {
level: "error",
event: "retry",
status: "stopped",
job: job.class.name,
job_id: job.job_id,
queue: job.queue_name,
arguments: args_info(job),
retries: job.executions,
exception: {
class: ex.class.name,
message: ex.message
}
}.to_json
exception: exception_payload(ex)
}
merge_organization_id(payload, job).to_json
end
end
subscribe_log_level :retry_stopped, :error
Expand All @@ -197,17 +198,18 @@ def discard(event)
ex = event.payload[:error]

error do
{
payload = {
level: "error",
event: "discard",
status: "error",
job: job.class.name,
job_id: job.job_id,
exception: {
class: ex.class.name,
message: ex.message
}
}.to_json
queue: job.queue_name,
arguments: args_info(job),
retries: job.executions,
exception: exception_payload(ex)
}
merge_organization_id(payload, job).to_json
end
end
subscribe_log_level :discard, :error
Expand Down Expand Up @@ -243,17 +245,17 @@ def scheduled_at(job)

def enqueue_error(job, ex)
error do
{
payload = {
level: "error",
event: "enqueue",
status: "error",
job: job.class.name,
job_id: job.job_id,
queue: job.queue_name,
exception: {
class: ex.class.name,
message: ex.message
}
}.to_json
arguments: args_info(job),
exception: exception_payload(ex)
}
merge_organization_id(payload, job).to_json
end
end

Expand All @@ -271,6 +273,34 @@ def enqueue_success(job, **extra)
}.to_json
end
end

def exception_payload(ex)
{class: ex.class.name, message: ex.message}
end

def merge_organization_id(payload, job)
org_id = organization_id_from(job)
unless org_id.nil?
payload[:organization_id] = org_id
end
payload
end

def organization_id_from(job)
arg = job.arguments&.find { |a| !organization_id_in(a).nil? }
organization_id_in(arg) if arg
rescue
nil
end

def organization_id_in(arg)
case arg
when Hash
arg[:organization_id].presence || arg["organization_id"].presence
else
arg.organization_id if arg.respond_to?(:organization_id)
end
end
end
end

Expand Down
Loading
Loading