diff --git a/lib/consumer.ex b/lib/consumer.ex index 4360873..58686fb 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -25,9 +25,9 @@ defmodule Mississippi.Consumer do amqp_consumer_options = opts[:amqp_consumer_options] - queue_config = opts[:mississippi_config][:queues] + mississippi_config = opts[:mississippi_config] - message_handler = opts[:mississippi_config][:message_handler] + queue_config = mississippi_config[:queues] channels_per_connection = amqp_consumer_options[:channels] @@ -47,7 +47,7 @@ defmodule Mississippi.Consumer do children = [ {ExRabbitPool.PoolSupervisor, rabbitmq_config: amqp_consumer_options, connection_pools: [events_consumer_pool_config(connection_number)]}, - {ConsumersSupervisor, queues: queue_config, message_handler: message_handler} + {ConsumersSupervisor, mississippi_config} ] Supervisor.init(children, strategy: :rest_for_one) diff --git a/lib/consumer/consumers_supervisor.ex b/lib/consumer/consumers_supervisor.ex index 5f9dde1..3947f23 100644 --- a/lib/consumer/consumers_supervisor.ex +++ b/lib/consumer/consumers_supervisor.ex @@ -25,6 +25,8 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do queues_config = init_arg[:queues] + distribution_strategy = distribution_strategy!(init_arg[:cluster_distribution_strategy]) + children = [ {Registry, [keys: :unique, name: DataUpdater.Registry, members: :auto]}, {Registry, [keys: :unique, name: MessageTracker.Registry, members: :auto]}, @@ -35,19 +37,19 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do members: :auto, process_redistribution: :active, extra_arguments: [message_handler: message_handler], - distribution_strategy: Horde.UniformQuorumDistribution}, + distribution_strategy: distribution_strategy}, {DynamicSupervisor, strategy: :one_for_one, name: MessageTracker.Supervisor, members: :auto, process_redistribution: :active, - distribution_strategy: Horde.UniformQuorumDistribution}, + distribution_strategy: distribution_strategy}, {DynamicSupervisor, strategy: :one_for_one, name: AMQPDataConsumer.Supervisor, members: :auto, process_redistribution: :active, - distribution_strategy: Horde.UniformQuorumDistribution}, + distribution_strategy: distribution_strategy}, # This will make queue listeners start after re-sharding in a multi-node cluster {NodeListener, queues_config}, # This will make queue listeners start in a single-node cluster @@ -64,9 +66,11 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do [ mississippi_config: [ type: :keyword_list, + default: [], keys: [ queues: [ type: :keyword_list, + default: [], keys: [ events_exchange_name: [ type: :string, @@ -115,9 +119,20 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do The module that will be invoked by Mississippi to process incoming messages. It must implement the `Mississippi.Consumer.DataUpdater.Handler` behaviour. """ + ], + cluster_distribution_strategy: [ + type: {:in, [:uniform_quorum, :uniform_random, :uniform]}, + default: :uniform_quorum, + doc: """ + The strategy to use for redistributing consumer processes within the cluster. + """ ] ] ] ] end + + defp distribution_strategy!(:uniform_quorum), do: Horde.UniformQuorumDistribution + defp distribution_strategy!(:uniform_random), do: Horde.UniformRandomDistribution + defp distribution_strategy!(:uniform), do: Horde.UniformDistribution end diff --git a/test/consumer/options_test.exs b/test/consumer/options_test.exs new file mode 100644 index 0000000..aa61ee2 --- /dev/null +++ b/test/consumer/options_test.exs @@ -0,0 +1,30 @@ +# Copyright 2026 SECO Mind Srl +# SPDX-License-Identifier: Apache-2.0 + +defmodule Mississippi.Consumer.Options.Test do + use ExUnit.Case, async: true + + alias Mississippi.Consumer.DataUpdater.Handler.Impl + alias Mississippi.Consumer.Options + + @schema Options.definition() + + test "an empty configuration returns defaults" do + assert {:ok, opts} = NimbleOptions.validate([], @schema) + assert opts[:mississippi_config][:message_handler] == Impl + assert opts[:mississippi_config][:cluster_distribution_strategy] == :uniform_quorum + assert is_integer(opts[:mississippi_config][:queues][:total_count]) + end + + test "allows setting valid distribution strategies" do + valid_distribution_strategy = :uniform + opts = [mississippi_config: [cluster_distribution_strategy: valid_distribution_strategy]] + assert {:ok, res} = NimbleOptions.validate(opts, @schema) + assert res[:mississippi_config][:cluster_distribution_strategy] == valid_distribution_strategy + end + + test "does not allow invalid distribution strategies" do + opts = [mississippi_config: [cluster_distribution_strategy: :not_a_distribution_strategy]] + assert {:error, %{key: :cluster_distribution_strategy}} = NimbleOptions.validate(opts, @schema) + end +end