Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
import_deps: [:typed_struct],
import_deps: [:typedstruct],
plugins: [Styler]
]
3 changes: 1 addition & 2 deletions lib/consumer/data_updater.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ defmodule Mississippi.Consumer.DataUpdater do

# We'll be made alive on demand
use GenServer, restart: :temporary
use Efx

alias Horde.DynamicSupervisor
alias Horde.Registry
Expand Down Expand Up @@ -50,7 +49,7 @@ defmodule Mississippi.Consumer.DataUpdater do
"""
@spec get_data_updater_process(sharding_key :: term()) ::
{:ok, pid()} | {:error, :data_updater_start_fail}
defeffect get_data_updater_process(sharding_key) do
def get_data_updater_process(sharding_key) do
# TODO bring back :offload_start (?)
case DynamicSupervisor.start_child(
DataUpdater.Supervisor,
Expand Down
4 changes: 1 addition & 3 deletions lib/consumer/message_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ defmodule Mississippi.Consumer.MessageTracker do
a message gets processed twice, but after all with strange aeons, even death may die.
"""

use Efx

alias Horde.DynamicSupervisor
alias Horde.Registry
alias Mississippi.Consumer.Message
Expand All @@ -26,7 +24,7 @@ defmodule Mississippi.Consumer.MessageTracker do
"""
@spec get_message_tracker(sharding_key :: term()) ::
{:ok, pid()} | {:error, :message_tracker_start_fail}
defeffect get_message_tracker(sharding_key) do
def get_message_tracker(sharding_key) do
name = {:via, Registry, {MessageTracker.Registry, {:sharding_key, sharding_key}}}

# TODO bring back :offload_start (?)
Expand Down
7 changes: 3 additions & 4 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,19 @@ defmodule Mississippi.MixProject do
{:amqp, "~> 3.3"},
{:credo, "~> 1.0", only: [:dev], runtime: false},
{:dialyxir, "~> 1.4", only: [:dev, :ci], runtime: false},
{:efx, "~> 0.1"},
{:mimic, "~> 2.3", only: [:dev, :test, :ci]},
{:elixir_uuid, "~> 1.2"},
{:ex_check, "~> 0.16", only: [:dev], runtime: false},
{:excoveralls, "~> 0.18", only: :test},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:current_rabbit_pool, "~> 1.1"},
{:hammox, "~> 0.7", only: :test},
{:mix_audit, "~> 2.0", only: [:dev], runtime: false},
# Use tagged version when https://github.com/derekkraan/horde/pull/274 appears in a tag
{:horde, github: "derekkraan/horde"},
{:horde, "~> 0.9"},
{:nimble_options, "~> 1.0"},
{:pretty_log, "~> 0.1"},
{:styler, "~> 1.0.0-rc.1", only: [:dev], runtime: false},
{:typed_struct, "~> 0.3.0"}
{:typedstruct, "~> 0.5"}
]
end

Expand Down
47 changes: 24 additions & 23 deletions mix.lock

Large diffs are not rendered by default.

37 changes: 19 additions & 18 deletions test/consumer/amqp_data_consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0

defmodule Mississippi.Consumer.AMQPDataConsumer.Test do
use EfxCase, async: false
use ExUnit.Case, async: false

# We use Mox here because we don't care about the type safety
# guarantees of Hammox
Expand All @@ -16,6 +16,7 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do
alias Mississippi.Consumer.Test.Placeholder

require Logger
require Mimic

@moduletag :unit

Expand All @@ -26,6 +27,8 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do
:ok
end

setup {Mimic, :verify_on_exit!}

describe "AMQPDataConsumer message handling:" do
setup :create_queue_index
setup :create_sharding_key
Expand All @@ -39,9 +42,7 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do
queue_index: queue_index
} do
data_consumer_pid =
queue_index
|> amqp_data_consumer_fixture()
|> start_supervised!()
start_amqp_data_consumer!(queue_index)

sharding_key_1 = get_sharding_key()
sharding_key_2 = get_sharding_key()
Expand All @@ -54,7 +55,7 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do
sharding_key_2 => tracker_2
}

bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, trackers[sharding_key_1]} end, calls: 1)
Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, trackers[sharding_key_1]} end)

payload_1 = get_payload()
meta_1 = meta_fixture(sharding_key_1)
Expand All @@ -69,7 +70,7 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do
payload_2 = get_payload()
meta_2 = meta_fixture(sharding_key_2)

bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, trackers[sharding_key_2]} end, calls: 1)
Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, trackers[sharding_key_2]} end)

send(data_consumer_pid, {:basic_deliver, payload_2, meta_2})

Expand All @@ -87,11 +88,9 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do
meta: meta
} do
data_consumer_pid =
queue_index
|> amqp_data_consumer_fixture()
|> start_supervised!()
start_amqp_data_consumer!(queue_index)

bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker} end, calls: 1)
Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, message_tracker} end)

send(data_consumer_pid, {:basic_deliver, payload, meta})

Expand All @@ -115,13 +114,11 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do
meta: meta
} do
data_consumer_pid =
queue_index
|> amqp_data_consumer_fixture()
|> start_supervised!()
start_amqp_data_consumer!(queue_index)

:erlang.trace(data_consumer_pid, true, [:receive])

bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker} end, calls: 1)
Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, message_tracker} end)

send(data_consumer_pid, {:basic_deliver, payload, meta})

Expand All @@ -146,13 +143,11 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do
Process.flag(:trap_exit, true)

data_consumer_pid =
queue_index
|> amqp_data_consumer_fixture()
|> start_supervised!()
start_amqp_data_consumer!(queue_index)

consumer_ref = Process.monitor(data_consumer_pid)

bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker} end, calls: 1)
Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, message_tracker} end)

send(data_consumer_pid, {:basic_deliver, payload, meta})

Expand Down Expand Up @@ -230,6 +225,12 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do
}
end

defp start_amqp_data_consumer!(queue_index) do
data_consumer = queue_index |> amqp_data_consumer_fixture() |> start_supervised!()
Mimic.allow(MessageTracker, self(), data_consumer)
data_consumer
end

defp get_sharding_key do
"sharding_#{System.unique_integer()}"
end
Expand Down
Loading
Loading