diff --git a/lib/astarte_vmq_plugin.ex b/lib/astarte_vmq_plugin.ex index ab600a5..941a766 100644 --- a/lib/astarte_vmq_plugin.ex +++ b/lib/astarte_vmq_plugin.ex @@ -22,7 +22,6 @@ defmodule Astarte.VMQ.Plugin do """ alias Astarte.VMQ.Plugin.Config - alias Astarte.VMQ.Plugin.AMQPClient alias Astarte.VMQ.Plugin.Connection.Synchronizer alias Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor, as: SynchronizerSupervisor alias Astarte.VMQ.Plugin.Queries @@ -246,15 +245,20 @@ defmodule Astarte.VMQ.Plugin do ] ++ additional_headers message_id = generate_message_id(realm, device_id, timestamp) - sharding_key = {realm, device_id} - - :ok = - AMQPClient.publish(payload, - headers: headers, - message_id: message_id, - timestamp: timestamp, - sharding_key: sharding_key - ) + + {:ok, decoded_device_id} = + Astarte.Core.Device.decode_device_id(device_id, allow_extended_id: true) + + sharding_key = {realm, decoded_device_id} + + publish_opts = [ + headers: headers, + message_id: message_id, + timestamp: timestamp, + sharding_key: sharding_key + ] + + :ok = Mississippi.Producer.EventsProducer.publish(payload, publish_opts) end defp now_us_x10_timestamp do diff --git a/lib/astarte_vmq_plugin/amqp_client.ex b/lib/astarte_vmq_plugin/amqp_client.ex deleted file mode 100644 index 97d3fe1..0000000 --- a/lib/astarte_vmq_plugin/amqp_client.ex +++ /dev/null @@ -1,136 +0,0 @@ -# -# This file is part of Astarte. -# -# Copyright 2017-2018 Ispirata Srl -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -defmodule Astarte.VMQ.Plugin.AMQPClient do - require Logger - use GenServer - - alias AMQP.Basic - alias AMQP.Channel - alias AMQP.Connection - alias Astarte.VMQ.Plugin.Config - - @connection_backoff 10000 - - # API - - def start_link(args \\ []) do - GenServer.start_link(__MODULE__, args, name: __MODULE__) - end - - def publish(payload, opts \\ []) do - GenServer.call(__MODULE__, {:publish, payload, opts}) - end - - # Server callbacks - - def init(_args) do - send(self(), :try_to_connect) - {:ok, :not_connected} - end - - def terminate(reason, %Channel{conn: conn} = chan) do - Logger.warn("AMQPClient terminated with reason #{inspect(reason)}", - tag: "ampq_client_terminate" - ) - - Channel.close(chan) - Connection.close(conn) - end - - def terminate(reason, _state) do - Logger.warn("AMQPClient terminated with reason #{inspect(reason)}", - tag: "ampq_client_terminate" - ) - end - - def handle_call({:publish, _payload, _opts}, _from, :not_connected = state) do - {:reply, {:error, :not_connected}, state} - end - - def handle_call({:publish, payload, opts}, _from, chan) do - sharding_key = Keyword.fetch!(opts, :sharding_key) - - # TODO: handle basic.return - full_opts = - opts - |> Keyword.delete(:sharding_key) - |> Keyword.put(:persistent, true) - |> Keyword.put(:mandatory, true) - - queue_prefix = Config.data_queue_prefix() - queue_index = :erlang.phash2(sharding_key, Config.data_queue_count()) - queue_name = "#{queue_prefix}#{queue_index}" - res = Basic.publish(chan, "", queue_name, payload, full_opts) - - if Config.mirror_queue_name() do - Basic.publish(chan, "", Config.mirror_queue_name(), payload, full_opts) - end - - {:reply, res, chan} - end - - def handle_info(:try_to_connect, _state) do - with {:ok, channel} <- connect() do - {:noreply, channel} - else - {:error, :not_connected} -> - {:noreply, :not_connected} - end - end - - def handle_info({:DOWN, _, :process, _pid, reason}, _state) do - Logger.warn("RabbitMQ connection lost: #{inspect(reason)}. Trying to reconnect...", - tag: "rabbitmq_connection_lost" - ) - - with {:ok, channel} <- connect() do - {:noreply, channel} - else - {:error, :not_connected} -> - {:noreply, :not_connected} - end - end - - defp connect do - with {:ok, conn} <- Connection.open(Config.amqp_options()), - # Get notifications when the connection goes down - {:ok, chan} <- Channel.open(conn), - Process.monitor(conn.pid) do - {:ok, chan} - else - {:error, reason} -> - Logger.warn("RabbitMQ Connection error: " <> inspect(reason), - tag: "rabbitmq_connection_error" - ) - - retry_connection_after(@connection_backoff) - {:error, :not_connected} - - :error -> - Logger.warn("Unknown RabbitMQ connection error", tag: "rabbitmq_connection_error") - retry_connection_after(@connection_backoff) - {:error, :not_connected} - end - end - - defp retry_connection_after(backoff) do - Logger.warn("Retrying connection in #{backoff} ms", tag: "retrying_connection") - Process.send_after(self(), :try_to_connect, backoff) - end -end diff --git a/lib/astarte_vmq_plugin/application.ex b/lib/astarte_vmq_plugin/application.ex index f873f1f..de41327 100644 --- a/lib/astarte_vmq_plugin/application.ex +++ b/lib/astarte_vmq_plugin/application.ex @@ -38,7 +38,7 @@ defmodule Astarte.VMQ.Plugin.Application do # List all child processes to be supervised children = [ - Astarte.VMQ.Plugin.AMQPClient, + {Mississippi.Producer, Config.mississippi_opts!()}, {Registry, keys: :unique, name: AstarteVMQPluginConnectionSynchronizer.Registry}, Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor, {Astarte.VMQ.Plugin.Publisher, [Config.registry_mfa()]}, diff --git a/lib/astarte_vmq_plugin/config.ex b/lib/astarte_vmq_plugin/config.ex index 8ac86c3..69d9476 100644 --- a/lib/astarte_vmq_plugin/config.ex +++ b/lib/astarte_vmq_plugin/config.ex @@ -153,6 +153,19 @@ defmodule Astarte.VMQ.Plugin.Config do Application.get_env(:astarte_vmq_plugin, :mirror_queue_name) end + def mississippi_opts! do + [ + amqp_producer_options: amqp_options(), + mississippi_config: [ + queues: [ + events_exchange_name: "", + total_count: data_queue_count(), + prefix: data_queue_prefix() + ] + ] + ] + end + def registry_mfa do Application.get_env(:astarte_vmq_plugin, :registry_mfa) end diff --git a/mix.exs b/mix.exs index f1c2e17..71431f4 100644 --- a/mix.exs +++ b/mix.exs @@ -89,6 +89,7 @@ defmodule Astarte.VMQ.Plugin.Mixfile do {:amqp, "~> 3.3"}, {:vernemq_dev, github: "vernemq/vernemq_dev"}, {:excoveralls, "~> 0.15", only: :test}, + {:mississippi, github: "secomind/mississippi"}, {:pretty_log, "~> 0.1"}, {:dialyxir, "~> 1.4", only: [:dev, :ci], runtime: false}, {:xandra, "~> 0.14"} diff --git a/mix.lock b/mix.lock index c63872c..51721e3 100644 --- a/mix.lock +++ b/mix.lock @@ -12,21 +12,26 @@ "dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"}, "ecto": {:hex, :ecto, "3.10.2", "6b887160281a61aa16843e47735b8a266caa437f80588c3ab80a8a960e6abe37", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6a895778f0d7648a4b34b486af59a1c8009041fbdf2b17f1ac215eb829c60235"}, "ecto_morph": {:hex, :ecto_morph, "0.1.29", "bc0b915779636bd2d30c54cad6922b3cb40f85b1d4ad59bdffd3c788d9d1f972", [:mix], [{:ecto, ">= 3.0.3", [hex: :ecto, repo: "hexpm", optional: false]}], "hexpm", "814bed72e3d03b278c1dfb3fbc4da37f478a37518ee54f010c1ad9254f1ca0e3"}, + "efx": {:hex, :efx, "0.2.6", "ec7c42b05073e6fdc61d971cc02d366f73f40d8093272a5326d16861003036b0", [:mix], [{:process_tree, "0.1.2", [hex: :process_tree, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "7648dcfd05f9ac39b257d4a9aa7c5b243823fee31933e725b2d346371b6d6bc4"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, + "ex_rabbit_pool": {:git, "https://github.com/leductam/ex_rabbit_pool.git", "9951452ab51d36648b9a9d3373609e48d1379a0d", []}, "excoveralls": {:hex, :excoveralls, "0.16.1", "0bd42ed05c7d2f4d180331a20113ec537be509da31fed5c8f7047ce59ee5a7c5", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dae763468e2008cf7075a64cb1249c97cb4bc71e236c5c2b5e5cdf1cfa2bf138"}, "exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"}, "goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm", "99cb4128cffcb3227581e5d4d803d5413fa643f4eb96523f77d9e6937d994ceb"}, "hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, - "jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"}, + "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "jsx": {:hex, :jsx, "3.1.0", "d12516baa0bb23a59bb35dccaf02a1bd08243fcbb9efe24f2d9d056ccff71268", [:rebar3], [], "hexpm", "0c5cc8fdc11b53cc25cf65ac6705ad39e54ecc56d1c22e4adb8f5a53fb9427f3"}, "logfmt": {:hex, :logfmt, "3.3.2", "c432765cff9c26cf4ba78cf66ece183e56562dfeba6e2d9f077804cc4c756677", [:mix], [], "hexpm", "8dfc07bf11d362d1ffb11fa34647f4e78dba47247589cc94fd8c9155889c8fcb"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mississippi": {:git, "https://github.com/secomind/mississippi.git", "510a30878d1a990fca15b54eda6b4370c636cefa", []}, "nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, + "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, "pretty_log": {:hex, :pretty_log, "0.9.0", "f84aab76e20c551a624ddd4656f1e5f9ca2941625db07549e9cb6a84a346bd40", [:mix], [{:logfmt, "~> 3.3", [hex: :logfmt, repo: "hexpm", optional: false]}], "hexpm", "abf9605c50fdd9377a3ce02ea51696538f4f647b9bb63a8dac209427fc7badf4"}, + "process_tree": {:hex, :process_tree, "0.1.2", "26218b086f5a2265a5c7a24050acd8b7416702633518f423ddde3cf38c6ff3cf", [:mix], [], "hexpm", "e5d0876a23cc7f0062f14f43c3a07de71e063d876bb256ab32055c9131f8f9aa"}, "protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"}, "rabbit_common": {:hex, :rabbit_common, "3.12.10", "7fc633ee206ae48783d8a5302dfc8fe1e086a5d7de494785ed206f586ad64b34", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:recon, "2.5.3", [hex: :recon, repo: "hexpm", optional: false]}, {:thoas, "1.0.0", [hex: :thoas, repo: "hexpm", optional: false]}], "hexpm", "908a8b1bd059f5baefe225fe9d3e2545d35a28db8f6a14d60372556ca7afe641"}, "recon": {:hex, :recon, "2.5.3", "739107b9050ea683c30e96de050bc59248fd27ec147696f79a8797ff9fa17153", [:mix, :rebar3], [], "hexpm", "6c6683f46fd4a1dfd98404b9f78dcabc7fcd8826613a89dcb984727a8c3099d7"}, @@ -34,6 +39,7 @@ "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "thoas": {:hex, :thoas, "1.0.0", "567c03902920827a18a89f05b79a37b5bf93553154b883e0131801600cf02ce0", [:rebar3], [], "hexpm", "fc763185b932ecb32a554fb735ee03c3b6b1b31366077a2427d2a97f3bd26735"}, + "typed_struct": {:hex, :typed_struct, "0.3.0", "939789e3c1dca39d7170c87f729127469d1315dcf99fee8e152bb774b17e7ff7", [:mix], [], "hexpm", "c50bd5c3a61fe4e198a8504f939be3d3c85903b382bde4865579bc23111d1b6d"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, "vernemq_dev": {:git, "https://github.com/vernemq/vernemq_dev.git", "6d622aa8c901ae7777433aef2bd049e380c474a6", []}, "xandra": {:hex, :xandra, "0.17.0", "c1291a6ade16d19ddf4ebb5e3e947b5e3177e3a0791913a2c4a947b34aa5d400", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "65937898bbfe5eba692a5ce2937cba792bef01deae866cecdd0f0f59b327c88a"}, diff --git a/test/astarte_vmq_plugin_connection_synchronizer_test.exs b/test/astarte_vmq_plugin_connection_synchronizer_test.exs index f7504df..065f621 100644 --- a/test/astarte_vmq_plugin_connection_synchronizer_test.exs +++ b/test/astarte_vmq_plugin_connection_synchronizer_test.exs @@ -27,26 +27,40 @@ defmodule Astarte.VMQ.Plugin.Connection.SynchronizerTest do @device_id :crypto.strong_rand_bytes(16) |> Base.url_encode64(padding: false) @realm "test" @device_base_path "#{@realm}/#{@device_id}" - @queue_name "#{Config.data_queue_prefix()}0" setup_all do amqp_opts = Config.amqp_options() {:ok, conn} = Connection.open(amqp_opts) {:ok, chan} = Channel.open(conn) - Queue.declare(chan, @queue_name) - {:ok, chan: chan} + queue_total = Config.mississippi_opts!()[:mississippi_config][:queues][:total_count] + + queues = + for n <- 0..queue_total do + queue_name = "#{Config.data_queue_prefix()}#{n}" + {:ok, _} = Queue.declare(chan, queue_name, durable: true) + queue_name + end + + {:ok, chan: chan, queues: queues} end - setup %{chan: chan} do + setup %{chan: chan, queues: queues} do test_pid = self() - {:ok, consumer_tag} = - Queue.subscribe(chan, @queue_name, fn payload, meta -> - send(test_pid, {:amqp_msg, payload, meta}) - end) + consumer_tags = + for queue <- queues do + {:ok, consumer_tag} = + Queue.subscribe(chan, queue, fn payload, meta -> + send(test_pid, {:amqp_msg, payload, meta}) + end) + + consumer_tag + end on_exit(fn -> - Queue.unsubscribe(chan, consumer_tag) + Enum.each(consumer_tags, fn consumer_tag -> + Queue.unsubscribe(chan, consumer_tag) + end) end) :ok diff --git a/test/astarte_vmq_plugin_test.exs b/test/astarte_vmq_plugin_test.exs index 4cb0b05..85dbaac 100644 --- a/test/astarte_vmq_plugin_test.exs +++ b/test/astarte_vmq_plugin_test.exs @@ -32,29 +32,43 @@ defmodule Astarte.VMQ.PluginTest do @other_device_base_path "#{@realm}/#{@other_device_id}" @other_mqtt_user "other" @another_mqtt_user "another" - @queue_name "#{Config.data_queue_prefix()}0" setup_all do amqp_opts = Config.amqp_options() {:ok, conn} = Connection.open(amqp_opts) {:ok, chan} = Channel.open(conn) - Queue.declare(chan, @queue_name) + queue_total = Config.mississippi_opts!()[:mississippi_config][:queues][:total_count] + + queues = + for n <- 0..queue_total do + queue_name = "#{Config.data_queue_prefix()}#{n}" + {:ok, _} = Queue.declare(chan, queue_name, durable: true) + queue_name + end + :ok = DatabaseTestHelper.await_xandra_cluster_connected!() DatabaseTestHelper.setup_db!() on_exit(&DatabaseTestHelper.teardown_db!/0) - {:ok, chan: chan} + {:ok, chan: chan, queues: queues} end - setup %{chan: chan} do + setup %{chan: chan, queues: queues} do test_pid = self() - {:ok, consumer_tag} = - Queue.subscribe(chan, @queue_name, fn payload, meta -> - send(test_pid, {:amqp_msg, payload, meta}) - end) + consumer_tags = + for queue <- queues do + {:ok, consumer_tag} = + Queue.subscribe(chan, queue, fn payload, meta -> + send(test_pid, {:amqp_msg, payload, meta}) + end) + + consumer_tag + end on_exit(fn -> - Queue.unsubscribe(chan, consumer_tag) + Enum.each(consumer_tags, fn consumer_tag -> + Queue.unsubscribe(chan, consumer_tag) + end) end) :ok