Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ defmodule Mississippi.Consumer do

amqp_consumer_options = opts[:amqp_consumer_options]

queue_config = opts[:mississippi_config][:queues]
mississippi_config = opts[:mississippi_config]

message_handler = opts[:mississippi_config][:message_handler]
queue_config = mississippi_config[:queues]

channels_per_connection = amqp_consumer_options[:channels]

Expand All @@ -47,7 +47,7 @@ defmodule Mississippi.Consumer do
children = [
{ExRabbitPool.PoolSupervisor,
rabbitmq_config: amqp_consumer_options, connection_pools: [events_consumer_pool_config(connection_number)]},
{ConsumersSupervisor, queues: queue_config, message_handler: message_handler}
{ConsumersSupervisor, mississippi_config}
]

Supervisor.init(children, strategy: :rest_for_one)
Expand Down
21 changes: 18 additions & 3 deletions lib/consumer/consumers_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do

queues_config = init_arg[:queues]

distribution_strategy = distribution_strategy!(init_arg[:cluster_distribution_strategy])

children = [
{Registry, [keys: :unique, name: DataUpdater.Registry, members: :auto]},
{Registry, [keys: :unique, name: MessageTracker.Registry, members: :auto]},
Expand All @@ -35,19 +37,19 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
members: :auto,
process_redistribution: :active,
extra_arguments: [message_handler: message_handler],
distribution_strategy: Horde.UniformQuorumDistribution},
distribution_strategy: distribution_strategy},
{DynamicSupervisor,
strategy: :one_for_one,
name: MessageTracker.Supervisor,
members: :auto,
process_redistribution: :active,
distribution_strategy: Horde.UniformQuorumDistribution},
distribution_strategy: distribution_strategy},
{DynamicSupervisor,
strategy: :one_for_one,
name: AMQPDataConsumer.Supervisor,
members: :auto,
process_redistribution: :active,
distribution_strategy: Horde.UniformQuorumDistribution},
distribution_strategy: distribution_strategy},
# This will make queue listeners start after re-sharding in a multi-node cluster
{NodeListener, queues_config},
# This will make queue listeners start in a single-node cluster
Expand All @@ -64,9 +66,11 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
[
mississippi_config: [
type: :keyword_list,
default: [],
keys: [
queues: [
type: :keyword_list,
default: [],
keys: [
events_exchange_name: [
type: :string,
Expand Down Expand Up @@ -115,9 +119,20 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
The module that will be invoked by Mississippi to process incoming messages.
It must implement the `Mississippi.Consumer.DataUpdater.Handler` behaviour.
"""
],
cluster_distribution_strategy: [
type: {:in, [:uniform_quorum, :uniform_random, :uniform]},
default: :uniform_quorum,
doc: """
The strategy to use for redistributing consumer processes within the cluster.
"""
]
]
]
]
end

defp distribution_strategy!(:uniform_quorum), do: Horde.UniformQuorumDistribution
defp distribution_strategy!(:uniform_random), do: Horde.UniformRandomDistribution
defp distribution_strategy!(:uniform), do: Horde.UniformDistribution
end
30 changes: 30 additions & 0 deletions test/consumer/options_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2026 SECO Mind Srl
# SPDX-License-Identifier: Apache-2.0

defmodule Mississippi.Consumer.Options.Test do
use ExUnit.Case, async: true

alias Mississippi.Consumer.DataUpdater.Handler.Impl
alias Mississippi.Consumer.Options

@schema Options.definition()

test "an empty configuration returns defaults" do
assert {:ok, opts} = NimbleOptions.validate([], @schema)
assert opts[:mississippi_config][:message_handler] == Impl
assert opts[:mississippi_config][:cluster_distribution_strategy] == :uniform_quorum
assert is_integer(opts[:mississippi_config][:queues][:total_count])
end

test "allows setting valid distribution strategies" do
valid_distribution_strategy = :uniform
opts = [mississippi_config: [cluster_distribution_strategy: valid_distribution_strategy]]
assert {:ok, res} = NimbleOptions.validate(opts, @schema)
assert res[:mississippi_config][:cluster_distribution_strategy] == valid_distribution_strategy
end

test "does not allow invalid distribution strategies" do
opts = [mississippi_config: [cluster_distribution_strategy: :not_a_distribution_strategy]]
assert {:error, %{key: :cluster_distribution_strategy}} = NimbleOptions.validate(opts, @schema)
end
end
Loading