From 9bbd0515dd0f025cf3193092db92b326d41a079f Mon Sep 17 00:00:00 2001 From: Francesco Noacco Date: Tue, 3 Mar 2026 10:54:34 +0100 Subject: [PATCH 1/2] fix: Consumers are now properly initialized without configuration Signed-off-by: Francesco Noacco --- lib/consumer.ex | 6 +++--- lib/consumer/consumers_supervisor.ex | 2 ++ test/consumer/options_test.exs | 17 +++++++++++++++++ 3 files changed, 22 insertions(+), 3 deletions(-) create mode 100644 test/consumer/options_test.exs diff --git a/lib/consumer.ex b/lib/consumer.ex index 4360873..58686fb 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -25,9 +25,9 @@ defmodule Mississippi.Consumer do amqp_consumer_options = opts[:amqp_consumer_options] - queue_config = opts[:mississippi_config][:queues] + mississippi_config = opts[:mississippi_config] - message_handler = opts[:mississippi_config][:message_handler] + queue_config = mississippi_config[:queues] channels_per_connection = amqp_consumer_options[:channels] @@ -47,7 +47,7 @@ defmodule Mississippi.Consumer do children = [ {ExRabbitPool.PoolSupervisor, rabbitmq_config: amqp_consumer_options, connection_pools: [events_consumer_pool_config(connection_number)]}, - {ConsumersSupervisor, queues: queue_config, message_handler: message_handler} + {ConsumersSupervisor, mississippi_config} ] Supervisor.init(children, strategy: :rest_for_one) diff --git a/lib/consumer/consumers_supervisor.ex b/lib/consumer/consumers_supervisor.ex index 5f9dde1..921a89d 100644 --- a/lib/consumer/consumers_supervisor.ex +++ b/lib/consumer/consumers_supervisor.ex @@ -64,9 +64,11 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do [ mississippi_config: [ type: :keyword_list, + default: [], keys: [ queues: [ type: :keyword_list, + default: [], keys: [ events_exchange_name: [ type: :string, diff --git a/test/consumer/options_test.exs b/test/consumer/options_test.exs new file mode 100644 index 0000000..095f66e --- /dev/null +++ b/test/consumer/options_test.exs @@ -0,0 +1,17 @@ +# Copyright 2026 SECO Mind Srl +# SPDX-License-Identifier: Apache-2.0 + +defmodule Mississippi.Consumer.Options.Test do + use ExUnit.Case, async: true + + alias Mississippi.Consumer.DataUpdater.Handler.Impl + alias Mississippi.Consumer.Options + + @schema Options.definition() + + test "an empty configuration returns defaults" do + assert {:ok, opts} = NimbleOptions.validate([], @schema) + assert opts[:mississippi_config][:message_handler] == Impl + assert is_integer(opts[:mississippi_config][:queues][:total_count]) + end +end From 1e415b1cafdef3f3f90bad5619a6a887219e30dc Mon Sep 17 00:00:00 2001 From: Francesco Noacco Date: Tue, 3 Mar 2026 10:54:34 +0100 Subject: [PATCH 2/2] feat: Allow customizing process distribution strategy Use atom keys instead of horde modules to hide the implementation detail of horde being used for clustering Signed-off-by: Francesco Noacco --- lib/consumer/consumers_supervisor.ex | 19 ++++++++++++++++--- test/consumer/options_test.exs | 13 +++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/lib/consumer/consumers_supervisor.ex b/lib/consumer/consumers_supervisor.ex index 921a89d..3947f23 100644 --- a/lib/consumer/consumers_supervisor.ex +++ b/lib/consumer/consumers_supervisor.ex @@ -25,6 +25,8 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do queues_config = init_arg[:queues] + distribution_strategy = distribution_strategy!(init_arg[:cluster_distribution_strategy]) + children = [ {Registry, [keys: :unique, name: DataUpdater.Registry, members: :auto]}, {Registry, [keys: :unique, name: MessageTracker.Registry, members: :auto]}, @@ -35,19 +37,19 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do members: :auto, process_redistribution: :active, extra_arguments: [message_handler: message_handler], - distribution_strategy: Horde.UniformQuorumDistribution}, + distribution_strategy: distribution_strategy}, {DynamicSupervisor, strategy: :one_for_one, name: MessageTracker.Supervisor, members: :auto, process_redistribution: :active, - distribution_strategy: Horde.UniformQuorumDistribution}, + distribution_strategy: distribution_strategy}, {DynamicSupervisor, strategy: :one_for_one, name: AMQPDataConsumer.Supervisor, members: :auto, process_redistribution: :active, - distribution_strategy: Horde.UniformQuorumDistribution}, + distribution_strategy: distribution_strategy}, # This will make queue listeners start after re-sharding in a multi-node cluster {NodeListener, queues_config}, # This will make queue listeners start in a single-node cluster @@ -117,9 +119,20 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do The module that will be invoked by Mississippi to process incoming messages. It must implement the `Mississippi.Consumer.DataUpdater.Handler` behaviour. """ + ], + cluster_distribution_strategy: [ + type: {:in, [:uniform_quorum, :uniform_random, :uniform]}, + default: :uniform_quorum, + doc: """ + The strategy to use for redistributing consumer processes within the cluster. + """ ] ] ] ] end + + defp distribution_strategy!(:uniform_quorum), do: Horde.UniformQuorumDistribution + defp distribution_strategy!(:uniform_random), do: Horde.UniformRandomDistribution + defp distribution_strategy!(:uniform), do: Horde.UniformDistribution end diff --git a/test/consumer/options_test.exs b/test/consumer/options_test.exs index 095f66e..aa61ee2 100644 --- a/test/consumer/options_test.exs +++ b/test/consumer/options_test.exs @@ -12,6 +12,19 @@ defmodule Mississippi.Consumer.Options.Test do test "an empty configuration returns defaults" do assert {:ok, opts} = NimbleOptions.validate([], @schema) assert opts[:mississippi_config][:message_handler] == Impl + assert opts[:mississippi_config][:cluster_distribution_strategy] == :uniform_quorum assert is_integer(opts[:mississippi_config][:queues][:total_count]) end + + test "allows setting valid distribution strategies" do + valid_distribution_strategy = :uniform + opts = [mississippi_config: [cluster_distribution_strategy: valid_distribution_strategy]] + assert {:ok, res} = NimbleOptions.validate(opts, @schema) + assert res[:mississippi_config][:cluster_distribution_strategy] == valid_distribution_strategy + end + + test "does not allow invalid distribution strategies" do + opts = [mississippi_config: [cluster_distribution_strategy: :not_a_distribution_strategy]] + assert {:error, %{key: :cluster_distribution_strategy}} = NimbleOptions.validate(opts, @schema) + end end