diff --git a/.formatter.exs b/.formatter.exs index f34d806..be56844 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -4,6 +4,6 @@ # Used by "mix format" [ inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], - import_deps: [:typed_struct], + import_deps: [:typedstruct], plugins: [Styler] ] diff --git a/lib/consumer/data_updater.ex b/lib/consumer/data_updater.ex index a6db468..b304684 100644 --- a/lib/consumer/data_updater.ex +++ b/lib/consumer/data_updater.ex @@ -12,7 +12,6 @@ defmodule Mississippi.Consumer.DataUpdater do # We'll be made alive on demand use GenServer, restart: :temporary - use Efx alias Horde.DynamicSupervisor alias Horde.Registry @@ -50,7 +49,7 @@ defmodule Mississippi.Consumer.DataUpdater do """ @spec get_data_updater_process(sharding_key :: term()) :: {:ok, pid()} | {:error, :data_updater_start_fail} - defeffect get_data_updater_process(sharding_key) do + def get_data_updater_process(sharding_key) do # TODO bring back :offload_start (?) case DynamicSupervisor.start_child( DataUpdater.Supervisor, diff --git a/lib/consumer/message_tracker.ex b/lib/consumer/message_tracker.ex index dfe7b45..c4f9a98 100644 --- a/lib/consumer/message_tracker.ex +++ b/lib/consumer/message_tracker.ex @@ -11,8 +11,6 @@ defmodule Mississippi.Consumer.MessageTracker do a message gets processed twice, but after all with strange aeons, even death may die. """ - use Efx - alias Horde.DynamicSupervisor alias Horde.Registry alias Mississippi.Consumer.Message @@ -26,7 +24,7 @@ defmodule Mississippi.Consumer.MessageTracker do """ @spec get_message_tracker(sharding_key :: term()) :: {:ok, pid()} | {:error, :message_tracker_start_fail} - defeffect get_message_tracker(sharding_key) do + def get_message_tracker(sharding_key) do name = {:via, Registry, {MessageTracker.Registry, {:sharding_key, sharding_key}}} # TODO bring back :offload_start (?) diff --git a/mix.exs b/mix.exs index f1237e7..f0792bb 100644 --- a/mix.exs +++ b/mix.exs @@ -34,7 +34,7 @@ defmodule Mississippi.MixProject do {:amqp, "~> 3.3"}, {:credo, "~> 1.0", only: [:dev], runtime: false}, {:dialyxir, "~> 1.4", only: [:dev, :ci], runtime: false}, - {:efx, "~> 0.1"}, + {:mimic, "~> 2.3", only: [:dev, :test, :ci]}, {:elixir_uuid, "~> 1.2"}, {:ex_check, "~> 0.16", only: [:dev], runtime: false}, {:excoveralls, "~> 0.18", only: :test}, @@ -42,12 +42,11 @@ defmodule Mississippi.MixProject do {:current_rabbit_pool, "~> 1.1"}, {:hammox, "~> 0.7", only: :test}, {:mix_audit, "~> 2.0", only: [:dev], runtime: false}, - # Use tagged version when https://github.com/derekkraan/horde/pull/274 appears in a tag - {:horde, github: "derekkraan/horde"}, + {:horde, "~> 0.9"}, {:nimble_options, "~> 1.0"}, {:pretty_log, "~> 0.1"}, {:styler, "~> 1.0.0-rc.1", only: [:dev], runtime: false}, - {:typed_struct, "~> 0.3.0"} + {:typedstruct, "~> 0.5"} ] end diff --git a/mix.lock b/mix.lock index a63258c..2f7d05a 100644 --- a/mix.lock +++ b/mix.lock @@ -1,44 +1,45 @@ %{ - "amqp": {:hex, :amqp, "3.3.0", "056d9f4bac96c3ab5a904b321e70e78b91ba594766a1fc2f32afd9c016d9f43b", [:mix], [{:amqp_client, "~> 3.9", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "8d3ae139d2646c630d674a1b8d68c7f85134f9e8b2a1c3dd5621616994b10a8b"}, + "amqp": {:hex, :amqp, "3.3.2", "6cad7469957b29c517a26a27474828f1db28278a13bcc2e7970db9854a3d3080", [:mix], [{:amqp_client, "~> 3.9", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "f977c41d81b65a21234a9158e6491b2296f8bd5bda48d5b611a64b6e0d2c3f31"}, "amqp_client": {:hex, :amqp_client, "3.12.14", "2b677bc3f2e2234ba7517042b25d72071a79735042e91f9116bd3c176854b622", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:rabbit_common, "3.12.14", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "5f70b6c3b1a739790080da4fddc94a867e99f033c4b1edc20d6ff8b8fb4bd160"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "credentials_obfuscation": {:hex, :credentials_obfuscation, "3.4.0", "34e18b126b3aefd6e8143776fbe1ceceea6792307c99ac5ee8687911f048cfd7", [:rebar3], [], "hexpm", "738ace0ed5545d2710d3f7383906fc6f6b582d019036e5269c4dbd85dbced566"}, - "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, + "credo": {:hex, :credo, "1.7.16", "a9f1389d13d19c631cb123c77a813dbf16449a2aebf602f590defa08953309d4", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d0562af33756b21f248f066a9119e3890722031b6d199f22e3cf95550e4f1579"}, "current_rabbit_pool": {:hex, :current_rabbit_pool, "1.1.1", "3a9594f62fdeef845ebd75833659dd821174b51bd013a53c269bf43e67ba8df6", [:mix], [{:amqp, "~> 3.1", [hex: :amqp, repo: "hexpm", optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}], "hexpm", "69791022a2e818e55a6535ddea623b1240d44c9640ac5ddc99e07262ee711b2b"}, "delta_crdt": {:hex, :delta_crdt, "0.6.5", "c7bb8c2c7e60f59e46557ab4e0224f67ba22f04c02826e273738f3dcc4767adc", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c6ae23a525d30f96494186dd11bf19ed9ae21d9fe2c1f1b217d492a7cc7294ae"}, - "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.43", "34b2f401fe473080e39ff2b90feb8ddfeef7639f8ee0bbf71bb41911831d77c5", [:mix], [], "hexpm", "970a3cd19503f5e8e527a190662be2cee5d98eed1ff72ed9b3d1a3d466692de8"}, - "efx": {:hex, :efx, "0.1.11", "333152b5a045ca3123f7d5c34dd09e62b34d7ae2eadb220a89ac8b6903302ceb", [:mix], [{:process_tree, "0.1.2", [hex: :process_tree, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "124a9b131b248c869d01bd0d92acc006ae0a6b2f372abc67b2e3f08ddffd406f"}, + "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, - "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, + "erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"}, "ex_check": {:hex, :ex_check, "0.16.0", "07615bef493c5b8d12d5119de3914274277299c6483989e52b0f6b8358a26b5f", [:mix], [], "hexpm", "4d809b72a18d405514dda4809257d8e665ae7cf37a7aee3be6b74a34dec310f5"}, - "ex_doc": {:hex, :ex_doc, "0.37.0", "970f92b39e62c460aa8a367508e938f5e4da6e2ff3eaed3f8530b25870f45471", [:mix], [{:earmark_parser, "~> 1.4.42", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "b0ee7f17373948e0cf471e59c3a0ee42f3bd1171c67d91eb3626456ef9c6202c"}, - "excoveralls": {:hex, :excoveralls, "0.18.1", "a6f547570c6b24ec13f122a5634833a063aec49218f6fff27de9df693a15588c", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d65f79db146bb20399f23046015974de0079668b9abb2f5aac074d078da60b8d"}, - "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, - "hammox": {:hex, :hammox, "0.7.0", "a49dc95e0a78e1c38db11c2b6eadff38f25418ef92ecf408bd90d95d459f35a2", [:mix], [{:mox, "~> 1.0", [hex: :mox, repo: "hexpm", optional: false]}, {:ordinal, "~> 0.1", [hex: :ordinal, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5e228c4587f23543f90c11394957878178c489fad46da421c37ca696e37dd91b"}, - "horde": {:git, "https://github.com/derekkraan/horde.git", "f9ef5c4c9d1ad6f24a619a2252b5f25ec6602493", []}, - "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, + "ex_doc": {:hex, :ex_doc, "0.40.1", "67542e4b6dde74811cfd580e2c0149b78010fd13001fda7cfeb2b2c2ffb1344d", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "bcef0e2d360d93ac19f01a85d58f91752d930c0a30e2681145feea6bd3516e00"}, + "excoveralls": {:hex, :excoveralls, "0.18.5", "e229d0a65982613332ec30f07940038fe451a2e5b29bce2a5022165f0c9b157e", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "523fe8a15603f86d64852aab2abe8ddbd78e68579c8525ae765facc5eae01562"}, + "file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"}, + "ham": {:hex, :ham, "0.3.2", "02ae195f49970ef667faf9d01bc454fb80909a83d6c775bcac724ca567aeb7b3", [:mix], [], "hexpm", "b71cc684c0e5a3d32b5f94b186770551509e93a9ae44ca1c1a313700f2f6a69a"}, + "hammox": {:hex, :hammox, "0.7.1", "826a01d3caa574eb6f4cd2b7b0815b16f95951527b1a30871c2ea80bbc00aaec", [:mix], [{:mox, "~> 1.0", [hex: :mox, repo: "hexpm", optional: false]}, {:ordinal, "~> 0.1", [hex: :ordinal, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "add4d7d341c1df7898a96358798e7a7e73587cb28fd4b7127abe55232ae3f6e8"}, + "horde": {:hex, :horde, "0.10.0", "31c6a633057c3ec4e73064d7b11ba409c9f3c518aa185377d76bee441b76ceb0", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.7", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "0b51c435cb698cac9bf9c17391dce3ebb1376ae6154c81f077fc61db771b9432"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"}, "logfmt": {:hex, :logfmt, "3.3.3", "6521ee4a5c532088e15d487fab9f736c07bdd161d643560c73cd4b10685deb65", [:mix], [], "hexpm", "dbd51cd3fe37c3429b9bd687bad1f531a533505f4a641592129e7a47e24104d1"}, - "merkle_map": {:hex, :merkle_map, "0.2.1", "01a88c87a6b9fb594c67c17ebaf047ee55ffa34e74297aa583ed87148006c4c8", [:mix], [], "hexpm", "fed4d143a5c8166eee4fa2b49564f3c4eace9cb252f0a82c1613bba905b2d04d"}, "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, - "makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"}, - "mix_audit": {:hex, :mix_audit, "2.1.4", "0a23d5b07350cdd69001c13882a4f5fb9f90fbd4cbf2ebc190a2ee0d187ea3e9", [:make, :mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.11", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "fd807653cc8c1cada2911129c7eb9e985e3cc76ebf26f4dd628bb25bbcaa7099"}, - "mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"}, - "nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.3", "4252d5d4098da7415c390e847c814bad3764c94a814a0b4245176215615e1035", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "953297c02582a33411ac6208f2c6e55f0e870df7f80da724ed613f10e6706afd"}, + "merkle_map": {:hex, :merkle_map, "0.2.2", "f36ff730cca1f2658e317a3c73406f50bbf5ac8aff54cf837d7ca2069a6e251c", [:mix], [], "hexpm", "383107f0503f230ac9175e0631647c424efd027e89ea65ab5ea12eeb54257aaf"}, + "mimic": {:hex, :mimic, "2.3.0", "88b1d13c285e57df6ea57204317bb56e49e7329668006cdcb80a9aafc73a9616", [:mix], [{:ham, "~> 0.3", [hex: :ham, repo: "hexpm", optional: false]}], "hexpm", "52771f23689398c5d41c7d05e91c2c28e10df273b784f40ca8b02e35e46850d3"}, + "mix_audit": {:hex, :mix_audit, "2.1.5", "c0f77cee6b4ef9d97e37772359a187a166c7a1e0e08b50edf5bf6959dfe5a016", [:make, :mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.11", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "87f9298e21da32f697af535475860dc1d3617a010e0b418d2ec6142bc8b42d69"}, + "mox": {:hex, :mox, "1.2.0", "a2cd96b4b80a3883e3100a221e8adc1b98e4c3a332a8fc434c39526babafd5b3", [:mix], [{:nimble_ownership, "~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}], "hexpm", "c7b92b3cc69ee24a7eeeaf944cd7be22013c52fcb580c1f33f50845ec821089a"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, + "nimble_ownership": {:hex, :nimble_ownership, "1.0.2", "fa8a6f2d8c592ad4d79b2ca617473c6aefd5869abfa02563a77682038bf916cf", [:mix], [], "hexpm", "098af64e1f6f8609c6672127cfe9e9590a5d3fcdd82bc17a377b8692fd81a879"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "ordinal": {:hex, :ordinal, "0.2.0", "d3eda0cb04ee1f0ca0aae37bf2cf56c28adce345fe56a75659031b6068275191", [:mix], [], "hexpm", "defca8f10dee9f03a090ed929a595303252700a9a73096b6f2f8d88341690d65"}, "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, "pretty_log": {:hex, :pretty_log, "0.9.0", "f84aab76e20c551a624ddd4656f1e5f9ca2941625db07549e9cb6a84a346bd40", [:mix], [{:logfmt, "~> 3.3", [hex: :logfmt, repo: "hexpm", optional: false]}], "hexpm", "abf9605c50fdd9377a3ce02ea51696538f4f647b9bb63a8dac209427fc7badf4"}, - "process_tree": {:hex, :process_tree, "0.1.2", "26218b086f5a2265a5c7a24050acd8b7416702633518f423ddde3cf38c6ff3cf", [:mix], [], "hexpm", "e5d0876a23cc7f0062f14f43c3a07de71e063d876bb256ab32055c9131f8f9aa"}, "rabbit_common": {:hex, :rabbit_common, "3.12.14", "466123ee7346a3cdac078c0c302bcd36da4523e8acd678c1b992f7b4df1f7914", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:recon, "2.5.3", [hex: :recon, repo: "hexpm", optional: false]}, {:thoas, "1.0.0", [hex: :thoas, repo: "hexpm", optional: false]}], "hexpm", "70c31a51f7401cc0204ddef2745d98680c2e0df67e3b0c9e198916881fde3293"}, "recon": {:hex, :recon, "2.5.3", "739107b9050ea683c30e96de050bc59248fd27ec147696f79a8797ff9fa17153", [:mix, :rebar3], [], "hexpm", "6c6683f46fd4a1dfd98404b9f78dcabc7fcd8826613a89dcb984727a8c3099d7"}, - "styler": {:hex, :styler, "1.0.0-rc.2", "3fb6949ef1fa07128415631827d6331523923ca9d38c2f113bea5f6c98428440", [:mix], [], "hexpm", "35f8f0eca03c9547f8f1319aa6a95105c6a23586177dbb132e1a5c31d8c708dd"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, - "telemetry_poller": {:hex, :telemetry_poller, "1.1.0", "58fa7c216257291caaf8d05678c8d01bd45f4bdbc1286838a28c4bb62ef32999", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9eb9d9cbfd81cbd7cdd24682f8711b6e2b691289a0de6826e58452f28c103c8f"}, + "styler": {:hex, :styler, "1.0.0", "87daf4a8e421d7678da78f9532a632974de6b8060b80d7827abec3bca5140173", [:mix], [], "hexpm", "4ba5bc40c5eaebe2bb05ec0bb7b5a889d38c6ea6865c584dffd360b3a94ec625"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "telemetry_poller": {:hex, :telemetry_poller, "1.3.0", "d5c46420126b5ac2d72bc6580fb4f537d35e851cc0f8dbd571acf6d6e10f5ec7", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "51f18bed7128544a50f75897db9974436ea9bfba560420b646af27a9a9b35211"}, "thoas": {:hex, :thoas, "1.0.0", "567c03902920827a18a89f05b79a37b5bf93553154b883e0131801600cf02ce0", [:rebar3], [], "hexpm", "fc763185b932ecb32a554fb735ee03c3b6b1b31366077a2427d2a97f3bd26735"}, - "typed_struct": {:hex, :typed_struct, "0.3.0", "939789e3c1dca39d7170c87f729127469d1315dcf99fee8e152bb774b17e7ff7", [:mix], [], "hexpm", "c50bd5c3a61fe4e198a8504f939be3d3c85903b382bde4865579bc23111d1b6d"}, + "typedstruct": {:hex, :typedstruct, "0.5.4", "d1d33d58460a74f413e9c26d55e66fd633abd8ac0fb12639add9a11a60a0462a", [:make, :mix], [], "hexpm", "ffaef36d5dbaebdbf4ed07f7fb2ebd1037b2c1f757db6fb8e7bcbbfabbe608d8"}, "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"}, - "yaml_elixir": {:hex, :yaml_elixir, "2.11.0", "9e9ccd134e861c66b84825a3542a1c22ba33f338d82c07282f4f1f52d847bd50", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "53cc28357ee7eb952344995787f4bb8cc3cecbf189652236e9b163e8ce1bc242"}, + "yaml_elixir": {:hex, :yaml_elixir, "2.12.1", "d74f2d82294651b58dac849c45a82aaea639766797359baff834b64439f6b3f4", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "d9ac16563c737d55f9bfeed7627489156b91268a3a21cd55c54eb2e335207fed"}, } diff --git a/test/consumer/amqp_data_consumer_test.exs b/test/consumer/amqp_data_consumer_test.exs index 59b8f73..41484d9 100644 --- a/test/consumer/amqp_data_consumer_test.exs +++ b/test/consumer/amqp_data_consumer_test.exs @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 defmodule Mississippi.Consumer.AMQPDataConsumer.Test do - use EfxCase, async: false + use ExUnit.Case, async: false # We use Mox here because we don't care about the type safety # guarantees of Hammox @@ -16,6 +16,7 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do alias Mississippi.Consumer.Test.Placeholder require Logger + require Mimic @moduletag :unit @@ -26,6 +27,8 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do :ok end + setup {Mimic, :verify_on_exit!} + describe "AMQPDataConsumer message handling:" do setup :create_queue_index setup :create_sharding_key @@ -39,9 +42,7 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do queue_index: queue_index } do data_consumer_pid = - queue_index - |> amqp_data_consumer_fixture() - |> start_supervised!() + start_amqp_data_consumer!(queue_index) sharding_key_1 = get_sharding_key() sharding_key_2 = get_sharding_key() @@ -54,7 +55,7 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do sharding_key_2 => tracker_2 } - bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, trackers[sharding_key_1]} end, calls: 1) + Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, trackers[sharding_key_1]} end) payload_1 = get_payload() meta_1 = meta_fixture(sharding_key_1) @@ -69,7 +70,7 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do payload_2 = get_payload() meta_2 = meta_fixture(sharding_key_2) - bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, trackers[sharding_key_2]} end, calls: 1) + Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, trackers[sharding_key_2]} end) send(data_consumer_pid, {:basic_deliver, payload_2, meta_2}) @@ -87,11 +88,9 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do meta: meta } do data_consumer_pid = - queue_index - |> amqp_data_consumer_fixture() - |> start_supervised!() + start_amqp_data_consumer!(queue_index) - bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker} end, calls: 1) + Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, message_tracker} end) send(data_consumer_pid, {:basic_deliver, payload, meta}) @@ -115,13 +114,11 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do meta: meta } do data_consumer_pid = - queue_index - |> amqp_data_consumer_fixture() - |> start_supervised!() + start_amqp_data_consumer!(queue_index) :erlang.trace(data_consumer_pid, true, [:receive]) - bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker} end, calls: 1) + Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, message_tracker} end) send(data_consumer_pid, {:basic_deliver, payload, meta}) @@ -146,13 +143,11 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do Process.flag(:trap_exit, true) data_consumer_pid = - queue_index - |> amqp_data_consumer_fixture() - |> start_supervised!() + start_amqp_data_consumer!(queue_index) consumer_ref = Process.monitor(data_consumer_pid) - bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker} end, calls: 1) + Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, message_tracker} end) send(data_consumer_pid, {:basic_deliver, payload, meta}) @@ -230,6 +225,12 @@ defmodule Mississippi.Consumer.AMQPDataConsumer.Test do } end + defp start_amqp_data_consumer!(queue_index) do + data_consumer = queue_index |> amqp_data_consumer_fixture() |> start_supervised!() + Mimic.allow(MessageTracker, self(), data_consumer) + data_consumer + end + defp get_sharding_key do "sharding_#{System.unique_integer()}" end diff --git a/test/consumer/data_updater_test.exs b/test/consumer/data_updater_test.exs index dad90bd..63443b3 100644 --- a/test/consumer/data_updater_test.exs +++ b/test/consumer/data_updater_test.exs @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 defmodule Mississippi.Consumer.DataUpdater.Test do - use EfxCase, async: false + use ExUnit.Case, async: false import Hammox @@ -12,6 +12,9 @@ defmodule Mississippi.Consumer.DataUpdater.Test do alias Mississippi.Consumer.DataUpdater.State alias Mississippi.Consumer.MessageTracker alias Mississippi.Consumer.Test.Placeholder + alias Mississippi.DataUpdater.Helpers + + require Mimic @moduletag :unit @@ -29,60 +32,60 @@ defmodule Mississippi.Consumer.DataUpdater.Test do :ok end + setup {Mimic, :verify_on_exit!} + doctest Mississippi.Consumer.DataUpdater describe "DataUpdater works as an Orleans grain:" do setup :setup_mock_message_handler setup :create_sharding_key + setup :setup_data_updater @tag :data_updater_orleans test "a process is successfully started with a given sharding key", %{ - sharding_key: sharding_key + sharding_key: sharding_key, + data_updater: data_updater } do - {:ok, pid} = DataUpdater.get_data_updater_process(sharding_key) - du_processes = Registry.select(DataUpdater.Registry, [{{:"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]}]) - assert {{:sharding_key, sharding_key}, pid} in du_processes + assert {{:sharding_key, sharding_key}, data_updater} in du_processes end @tag :data_updater_orleans test "a process is not duplicated when using the same sharding key", %{ - sharding_key: sharding_key + sharding_key: sharding_key, + data_updater: data_updater } do - {:ok, pid} = DataUpdater.get_data_updater_process(sharding_key) - du_processes = Registry.select(DataUpdater.Registry, [{{:"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]}]) - assert {{:sharding_key, sharding_key}, pid} in du_processes + assert {{:sharding_key, sharding_key}, data_updater} in du_processes - {:ok, ^pid} = DataUpdater.get_data_updater_process(sharding_key) - assert {{:sharding_key, sharding_key}, pid} in du_processes + {:ok, ^data_updater} = DataUpdater.get_data_updater_process(sharding_key) + assert {{:sharding_key, sharding_key}, data_updater} in du_processes end @tag :data_updater_orleans test "a process is spawned again if requested after termination", %{ - sharding_key: sharding_key + sharding_key: sharding_key, + data_updater: data_updater } do - {:ok, first_pid} = DataUpdater.get_data_updater_process(sharding_key) - du_processes = Registry.select(DataUpdater.Registry, [{{:"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]}]) - assert {{:sharding_key, sharding_key}, first_pid} in du_processes + assert {{:sharding_key, sharding_key}, data_updater} in du_processes - DynamicSupervisor.terminate_child(DataUpdater.Supervisor, first_pid) + DynamicSupervisor.terminate_child(DataUpdater.Supervisor, data_updater) {:ok, second_pid} = DataUpdater.get_data_updater_process(sharding_key) - assert first_pid != second_pid + assert data_updater != second_pid du_processes = Registry.select(DataUpdater.Registry, [{{:"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]}]) assert {{:sharding_key, sharding_key}, second_pid} in du_processes - refute {{:sharding_key, sharding_key}, first_pid} in du_processes + refute {{:sharding_key, sharding_key}, data_updater} in du_processes end end @@ -92,52 +95,44 @@ defmodule Mississippi.Consumer.DataUpdater.Test do setup :create_message setup :setup_mock_message_tracker setup :add_on_exit + setup :setup_data_updater @tag :data_updater_message_handling test "acking when ok", %{ - sharding_key: sharding_key, message: message, - message_tracker: message_tracker + message_tracker: message_tracker, + data_updater: data_updater } do expect(MockMessageHandler, :handle_message, fn _, _, _, _, state -> {:ack, :ok, state} end) + Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, message_tracker} end) - {:ok, data_updater_pid} = DataUpdater.get_data_updater_process(sharding_key) - - :erlang.trace(data_updater_pid, true, [:receive]) - - bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker} end, calls: 1) - - DataUpdater.handle_message(data_updater_pid, message) + DataUpdater.handle_message(data_updater, message) assert_receive {:trace, ^message_tracker, :receive, {_, {_, _}, {:ack_delivery, ^message}}} end @tag :data_updater_message_handling test "rejecting when error", %{ - sharding_key: sharding_key, message: message, - message_tracker: message_tracker + message_tracker: message_tracker, + data_updater: data_updater } do expect(MockMessageHandler, :handle_message, fn _, _, _, _, state -> {:discard, :aaaa, state} end) - {:ok, data_updater_pid} = DataUpdater.get_data_updater_process(sharding_key) - - :erlang.trace(data_updater_pid, true, [:receive]) + Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, message_tracker} end) - bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker} end, calls: 1) - - DataUpdater.handle_message(data_updater_pid, message) + DataUpdater.handle_message(data_updater, message) assert_receive {:trace, ^message_tracker, :receive, {_, {_, _}, {:reject, ^message}}} end @tag :data_updater_message_handling test "terminating when requested", %{ - sharding_key: sharding_key, message: message, - message_tracker: message_tracker + message_tracker: message_tracker, + data_updater: data_updater } do Process.flag(:trap_exit, true) @@ -147,15 +142,13 @@ defmodule Mississippi.Consumer.DataUpdater.Test do expect(MockMessageHandler, :terminate, fn _, _ -> :ok end) - {:ok, data_updater_pid} = DataUpdater.get_data_updater_process(sharding_key) + ref = Process.monitor(data_updater) + Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, message_tracker} end) - ref = Process.monitor(data_updater_pid) - bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker} end, calls: 1) + DataUpdater.handle_message(data_updater, message) - DataUpdater.handle_message(data_updater_pid, message) - - assert_receive {:DOWN, ^ref, :process, ^data_updater_pid, {:shutdown, :requested}} - refute Process.alive?(data_updater_pid) + assert_receive {:DOWN, ^ref, :process, ^data_updater, {:shutdown, :requested}} + refute Process.alive?(data_updater) end end @@ -163,27 +156,19 @@ defmodule Mississippi.Consumer.DataUpdater.Test do setup :setup_mock_message_handler setup :create_sharding_key setup :create_signal + setup :add_signal_expectations + setup :setup_data_updater @tag :data_updater_signal_handling test "updating the handler state", %{ - sharding_key: sharding_key, + data_updater: data_updater, signal: signal } do - expect(MockMessageHandler, :init, fn _sharding_key -> - {:ok, %{signals: []}} - end) - - expect(MockMessageHandler, :handle_signal, fn signal, _state -> - {:ok, %{signals: [signal]}} - end) - - {:ok, data_updater_pid} = DataUpdater.get_data_updater_process(sharding_key) + assert %State{handler_state: %{signals: []}} = :sys.get_state(data_updater) - assert %State{handler_state: %{signals: []}} = :sys.get_state(data_updater_pid) + DataUpdater.handle_signal(data_updater, signal) - DataUpdater.handle_signal(data_updater_pid, signal) - - assert %State{handler_state: %{signals: [^signal]}} = :sys.get_state(data_updater_pid) + assert %State{handler_state: %{signals: [^signal]}} = :sys.get_state(data_updater) end end @@ -218,6 +203,18 @@ defmodule Mississippi.Consumer.DataUpdater.Test do context end + defp add_signal_expectations(_context) do + expect(MockMessageHandler, :init, fn _sharding_key -> + {:ok, %{signals: []}} + end) + + expect(MockMessageHandler, :handle_signal, fn signal, _state -> + {:ok, %{signals: [signal]}} + end) + + :ok + end + defp setup_mock_message_tracker(context) do {:ok, message_tracker} = GenServer.start(Placeholder, []) @@ -225,6 +222,12 @@ defmodule Mississippi.Consumer.DataUpdater.Test do Map.put(context, :message_tracker, message_tracker) end + defp setup_data_updater(context) do + %{sharding_key: sharding_key} = context + data_updater = Helpers.setup_data_updater!(sharding_key) + %{data_updater: data_updater} + end + defp message_fixture(sharding_key) do %Mississippi.Consumer.Message{ payload: "payload_#{System.unique_integer()}", diff --git a/test/consumer/message_tracker_test.exs b/test/consumer/message_tracker_test.exs index 6a14a3a..50cd14f 100644 --- a/test/consumer/message_tracker_test.exs +++ b/test/consumer/message_tracker_test.exs @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 defmodule Mississippi.Consumer.MessageTracker.Test do - use EfxCase, async: false + use ExUnit.Case, async: false import Hammox @@ -12,8 +12,10 @@ defmodule Mississippi.Consumer.MessageTracker.Test do alias Mississippi.Consumer.DataUpdater alias Mississippi.Consumer.MessageTracker alias Mississippi.Consumer.Test.Placeholder + alias Mississippi.DataUpdater.Helpers require Logger + require Mimic @moduletag :unit @@ -25,6 +27,8 @@ defmodule Mississippi.Consumer.MessageTracker.Test do :ok end + setup {Mimic, :verify_on_exit!} + doctest Mississippi.Consumer.MessageTracker describe "MessageTracker works as an Orleans grain:" do @@ -83,6 +87,7 @@ defmodule Mississippi.Consumer.MessageTracker.Test do setup :setup_mock_channel setup :setup_mock_data_updater setup :add_on_exit + setup :setup_message_tracker @tag :message_tracker_fault_tolerance test "a process crashes if the related AMQP Channel crashes", %{ @@ -94,7 +99,7 @@ defmodule Mississippi.Consumer.MessageTracker.Test do {:ok, message_tracker_pid} = MessageTracker.get_message_tracker(sharding_key) mt_ref = Process.monitor(message_tracker_pid) - bind(DataUpdater, :get_data_updater_process, fn _ -> {:ok, data_updater} end) + Mimic.stub(DataUpdater, :get_data_updater_process, fn _ -> {:ok, data_updater} end) MessageTracker.handle_message(message_tracker_pid, message, channel) send(message_tracker_pid, {:DOWN, :dontcare, :process, channel.pid, :crash}) @@ -114,13 +119,13 @@ defmodule Mississippi.Consumer.MessageTracker.Test do data_updater_2_pid = get_mock_data_updater!() Enum.each([message_tracker_pid, data_updater_1_pid, data_updater_2_pid], &:erlang.trace(&1, true, [:receive])) - bind(DataUpdater, :get_data_updater_process, fn _ -> {:ok, data_updater_1_pid} end, calls: 1) + Mimic.expect(DataUpdater, :get_data_updater_process, 1, fn _ -> {:ok, data_updater_1_pid} end) MessageTracker.handle_message(message_tracker_pid, message, channel) assert_receive {:trace, ^data_updater_1_pid, :receive, {_, {:handle_message, ^message}}} - bind(DataUpdater, :get_data_updater_process, fn _ -> {:ok, data_updater_2_pid} end, calls: 1) + Mimic.expect(DataUpdater, :get_data_updater_process, 1, fn _ -> {:ok, data_updater_2_pid} end) kill_data_updater(data_updater_1_pid) assert_receive {:trace, ^message_tracker_pid, :receive, {:DOWN, _, :process, ^data_updater_1_pid, _}} @@ -134,33 +139,24 @@ defmodule Mississippi.Consumer.MessageTracker.Test do setup :create_message setup :setup_mock_channel setup :add_on_exit + setup :setup_message_tracker + setup :setup_data_updater @tag :message_tracker_message_handling test "is successful on valid message", %{ - sharding_key: sharding_key, channel: channel, - message: message + message: message, + message_tracker: message_tracker, + data_updater: data_updater } do - {:ok, message_tracker_pid} = MessageTracker.get_message_tracker(sharding_key) - :erlang.trace(message_tracker_pid, true, [:receive]) + Mimic.expect(DataUpdater, :get_data_updater_process, 1, fn _ -> {:ok, data_updater} end) + Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, message_tracker} end) - {:ok, data_updater_pid} = - GenServer.start_link(DataUpdater, - sharding_key: sharding_key, - message_handler: MockMessageHandler - ) + MessageTracker.handle_message(message_tracker, message, channel) - :erlang.trace(data_updater_pid, true, [:receive]) - - bind(DataUpdater, :get_data_updater_process, fn _ -> {:ok, data_updater_pid} end, calls: 1) - - MessageTracker.handle_message(message_tracker_pid, message, channel) - - assert_receive {:trace, ^data_updater_pid, :receive, {_, {:handle_message, ^message}}} + assert_receive {:trace, ^data_updater, :receive, {_, {:handle_message, ^message}}} - bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker_pid} end, calls: 1) - - assert_receive {:trace, ^message_tracker_pid, :receive, {_, {_, _}, {:ack_delivery, ^message}}} + assert_receive {:trace, ^message_tracker, :receive, {_, {_, _}, {:ack_delivery, ^message}}} end @tag :message_tracker_message_handling @@ -172,7 +168,7 @@ defmodule Mississippi.Consumer.MessageTracker.Test do message_2 = message_fixture(sharding_key) {:ok, message_tracker_pid} = MessageTracker.get_message_tracker(sharding_key) :erlang.trace(message_tracker_pid, true, [:receive]) - bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker_pid} end) + Mimic.stub(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker_pid} end) {:ok, data_updater_pid} = GenServer.start_link(DataUpdater, @@ -181,7 +177,7 @@ defmodule Mississippi.Consumer.MessageTracker.Test do ) :erlang.trace(data_updater_pid, true, [:receive]) - bind(DataUpdater, :get_data_updater_process, fn _ -> {:ok, data_updater_pid} end) + Mimic.stub(DataUpdater, :get_data_updater_process, fn _ -> {:ok, data_updater_pid} end) MessageTracker.handle_message(message_tracker_pid, message_1, channel) MessageTracker.handle_message(message_tracker_pid, message_2, channel) @@ -203,15 +199,14 @@ defmodule Mississippi.Consumer.MessageTracker.Test do end test "shuts down if message forces process termination", %{ - sharding_key: sharding_key, channel: channel, - message: message + message: message, + message_tracker: message_tracker, + data_updater: data_updater } do Process.flag(:trap_exit, true) - {:ok, message_tracker_pid} = MessageTracker.get_message_tracker(sharding_key) - :erlang.trace(message_tracker_pid, true, [:receive]) - mt_ref = Process.monitor(message_tracker_pid) + mt_ref = Process.monitor(message_tracker) MockMessageHandler |> expect(:init, fn _ -> {:ok, []} end) @@ -220,27 +215,19 @@ defmodule Mississippi.Consumer.MessageTracker.Test do end) |> expect(:terminate, fn _, _ -> :ok end) - {:ok, data_updater_pid} = - GenServer.start_link(DataUpdater, - sharding_key: sharding_key, - message_handler: MockMessageHandler - ) - - du_ref = Process.monitor(data_updater_pid) + du_ref = Process.monitor(data_updater) - bind(DataUpdater, :get_data_updater_process, fn _ -> {:ok, data_updater_pid} end, calls: 1) - - MessageTracker.handle_message(message_tracker_pid, message, channel) + Mimic.expect(DataUpdater, :get_data_updater_process, 1, fn _ -> {:ok, data_updater} end) + Mimic.expect(MessageTracker, :get_message_tracker, 1, fn _ -> {:ok, message_tracker} end) - bind(MessageTracker, :get_message_tracker, fn _ -> {:ok, message_tracker_pid} end, calls: 1) + MessageTracker.handle_message(message_tracker, message, channel) - assert_receive {:trace, ^message_tracker_pid, :receive, {_, {_, _}, {:ack_delivery, ^message}}} + assert_receive {:trace, ^message_tracker, :receive, {_, {_, _}, {:ack_delivery, ^message}}} + assert_receive {:DOWN, ^du_ref, :process, ^data_updater, {:shutdown, :requested}} + refute Process.alive?(data_updater) - assert_receive {:DOWN, ^du_ref, :process, ^data_updater_pid, {:shutdown, :requested}} - refute Process.alive?(data_updater_pid) - - assert_receive {:DOWN, ^mt_ref, :process, ^message_tracker_pid, {:shutdown, :requested}} - refute Process.alive?(message_tracker_pid) + assert_receive {:DOWN, ^mt_ref, :process, ^message_tracker, {:shutdown, :requested}} + refute Process.alive?(message_tracker) end end @@ -334,4 +321,19 @@ defmodule Mississippi.Consumer.MessageTracker.Test do 100 -> false end end + + defp setup_message_tracker(context) do + %{sharding_key: sharding_key} = context + {:ok, message_tracker} = MessageTracker.get_message_tracker(sharding_key) + :erlang.trace(message_tracker, true, [:receive]) + Mimic.allow(DataUpdater, self(), message_tracker) + + %{message_tracker: message_tracker} + end + + defp setup_data_updater(context) do + %{sharding_key: sharding_key} = context + data_updater = Helpers.setup_standalone_data_updater!(MockMessageHandler, sharding_key) + %{data_updater: data_updater} + end end diff --git a/test/support/data_updater/helpers.ex b/test/support/data_updater/helpers.ex new file mode 100644 index 0000000..376282c --- /dev/null +++ b/test/support/data_updater/helpers.ex @@ -0,0 +1,38 @@ +# Copyright 2026 SECO Mind Srl +# SPDX-License-Identifier: Apache-2.0 + +defmodule Mississippi.DataUpdater.Helpers do + @moduledoc """ + Helpers for DataUpdaters in tests + """ + + alias Mississippi.Consumer.DataUpdater + alias Mississippi.Consumer.DataUpdater.Handler.Impl + alias Mississippi.Consumer.MessageTracker + + require Hammox + require Mimic + + def setup_data_updater!(sharding_key) do + {:ok, data_updater} = + DataUpdater.get_data_updater_process(sharding_key) + + do_setup_data_updater!(data_updater) + + data_updater + end + + def setup_standalone_data_updater!(message_handler \\ Impl, sharding_key) do + opts = [message_handler: message_handler, sharding_key: sharding_key] + {:ok, data_updater} = GenServer.start_link(DataUpdater, opts) + do_setup_data_updater!(data_updater) + data_updater + end + + defp do_setup_data_updater!(data_updater) do + :erlang.trace(data_updater, true, [:receive]) + + Mimic.allow(MessageTracker, self(), data_updater) + Hammox.allow(MockMessageHandler, self(), data_updater) + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index c9b3feb..d1bde4c 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,4 +1,7 @@ # Copyright 2024 SECO Mind Srl # SPDX-License-Identifier: Apache-2.0 -ExUnit.start() +Mimic.copy(Mississippi.Consumer.DataUpdater) +Mimic.copy(Mississippi.Consumer.MessageTracker) + +ExUnit.start(capture_log: true)