From f01cbfb3cdc9e78f921bd38e28d64d32b33a3778 Mon Sep 17 00:00:00 2001 From: Arnaldo Cesco Date: Thu, 23 May 2024 15:06:06 +0200 Subject: [PATCH] AMQP publish: use Mississippi Replace the old messy AMQP code. The sharding key is now included (as a binary header) in the message, according to Mississippi best practices. This change is retrocompatible. Signed-off-by: Arnaldo Cesco --- lib/astarte_vmq_plugin.ex | 24 ++-- lib/astarte_vmq_plugin/amqp_client.ex | 136 ------------------ lib/astarte_vmq_plugin/application.ex | 2 +- lib/astarte_vmq_plugin/config.ex | 13 ++ ...mq_plugin_connection_synchronizer_test.exs | 32 +++-- test/astarte_vmq_plugin_test.exs | 32 +++-- 6 files changed, 74 insertions(+), 165 deletions(-) delete mode 100644 lib/astarte_vmq_plugin/amqp_client.ex diff --git a/lib/astarte_vmq_plugin.ex b/lib/astarte_vmq_plugin.ex index ab600a5..941a766 100644 --- a/lib/astarte_vmq_plugin.ex +++ b/lib/astarte_vmq_plugin.ex @@ -22,7 +22,6 @@ defmodule Astarte.VMQ.Plugin do """ alias Astarte.VMQ.Plugin.Config - alias Astarte.VMQ.Plugin.AMQPClient alias Astarte.VMQ.Plugin.Connection.Synchronizer alias Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor, as: SynchronizerSupervisor alias Astarte.VMQ.Plugin.Queries @@ -246,15 +245,20 @@ defmodule Astarte.VMQ.Plugin do ] ++ additional_headers message_id = generate_message_id(realm, device_id, timestamp) - sharding_key = {realm, device_id} - - :ok = - AMQPClient.publish(payload, - headers: headers, - message_id: message_id, - timestamp: timestamp, - sharding_key: sharding_key - ) + + {:ok, decoded_device_id} = + Astarte.Core.Device.decode_device_id(device_id, allow_extended_id: true) + + sharding_key = {realm, decoded_device_id} + + publish_opts = [ + headers: headers, + message_id: message_id, + timestamp: timestamp, + sharding_key: sharding_key + ] + + :ok = Mississippi.Producer.EventsProducer.publish(payload, publish_opts) end defp now_us_x10_timestamp do diff --git a/lib/astarte_vmq_plugin/amqp_client.ex b/lib/astarte_vmq_plugin/amqp_client.ex deleted file mode 100644 index 97d3fe1..0000000 --- a/lib/astarte_vmq_plugin/amqp_client.ex +++ /dev/null @@ -1,136 +0,0 @@ -# -# This file is part of Astarte. -# -# Copyright 2017-2018 Ispirata Srl -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -defmodule Astarte.VMQ.Plugin.AMQPClient do - require Logger - use GenServer - - alias AMQP.Basic - alias AMQP.Channel - alias AMQP.Connection - alias Astarte.VMQ.Plugin.Config - - @connection_backoff 10000 - - # API - - def start_link(args \\ []) do - GenServer.start_link(__MODULE__, args, name: __MODULE__) - end - - def publish(payload, opts \\ []) do - GenServer.call(__MODULE__, {:publish, payload, opts}) - end - - # Server callbacks - - def init(_args) do - send(self(), :try_to_connect) - {:ok, :not_connected} - end - - def terminate(reason, %Channel{conn: conn} = chan) do - Logger.warn("AMQPClient terminated with reason #{inspect(reason)}", - tag: "ampq_client_terminate" - ) - - Channel.close(chan) - Connection.close(conn) - end - - def terminate(reason, _state) do - Logger.warn("AMQPClient terminated with reason #{inspect(reason)}", - tag: "ampq_client_terminate" - ) - end - - def handle_call({:publish, _payload, _opts}, _from, :not_connected = state) do - {:reply, {:error, :not_connected}, state} - end - - def handle_call({:publish, payload, opts}, _from, chan) do - sharding_key = Keyword.fetch!(opts, :sharding_key) - - # TODO: handle basic.return - full_opts = - opts - |> Keyword.delete(:sharding_key) - |> Keyword.put(:persistent, true) - |> Keyword.put(:mandatory, true) - - queue_prefix = Config.data_queue_prefix() - queue_index = :erlang.phash2(sharding_key, Config.data_queue_count()) - queue_name = "#{queue_prefix}#{queue_index}" - res = Basic.publish(chan, "", queue_name, payload, full_opts) - - if Config.mirror_queue_name() do - Basic.publish(chan, "", Config.mirror_queue_name(), payload, full_opts) - end - - {:reply, res, chan} - end - - def handle_info(:try_to_connect, _state) do - with {:ok, channel} <- connect() do - {:noreply, channel} - else - {:error, :not_connected} -> - {:noreply, :not_connected} - end - end - - def handle_info({:DOWN, _, :process, _pid, reason}, _state) do - Logger.warn("RabbitMQ connection lost: #{inspect(reason)}. Trying to reconnect...", - tag: "rabbitmq_connection_lost" - ) - - with {:ok, channel} <- connect() do - {:noreply, channel} - else - {:error, :not_connected} -> - {:noreply, :not_connected} - end - end - - defp connect do - with {:ok, conn} <- Connection.open(Config.amqp_options()), - # Get notifications when the connection goes down - {:ok, chan} <- Channel.open(conn), - Process.monitor(conn.pid) do - {:ok, chan} - else - {:error, reason} -> - Logger.warn("RabbitMQ Connection error: " <> inspect(reason), - tag: "rabbitmq_connection_error" - ) - - retry_connection_after(@connection_backoff) - {:error, :not_connected} - - :error -> - Logger.warn("Unknown RabbitMQ connection error", tag: "rabbitmq_connection_error") - retry_connection_after(@connection_backoff) - {:error, :not_connected} - end - end - - defp retry_connection_after(backoff) do - Logger.warn("Retrying connection in #{backoff} ms", tag: "retrying_connection") - Process.send_after(self(), :try_to_connect, backoff) - end -end diff --git a/lib/astarte_vmq_plugin/application.ex b/lib/astarte_vmq_plugin/application.ex index f873f1f..de41327 100644 --- a/lib/astarte_vmq_plugin/application.ex +++ b/lib/astarte_vmq_plugin/application.ex @@ -38,7 +38,7 @@ defmodule Astarte.VMQ.Plugin.Application do # List all child processes to be supervised children = [ - Astarte.VMQ.Plugin.AMQPClient, + {Mississippi.Producer, Config.mississippi_opts!()}, {Registry, keys: :unique, name: AstarteVMQPluginConnectionSynchronizer.Registry}, Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor, {Astarte.VMQ.Plugin.Publisher, [Config.registry_mfa()]}, diff --git a/lib/astarte_vmq_plugin/config.ex b/lib/astarte_vmq_plugin/config.ex index 8ac86c3..69d9476 100644 --- a/lib/astarte_vmq_plugin/config.ex +++ b/lib/astarte_vmq_plugin/config.ex @@ -153,6 +153,19 @@ defmodule Astarte.VMQ.Plugin.Config do Application.get_env(:astarte_vmq_plugin, :mirror_queue_name) end + def mississippi_opts! do + [ + amqp_producer_options: amqp_options(), + mississippi_config: [ + queues: [ + events_exchange_name: "", + total_count: data_queue_count(), + prefix: data_queue_prefix() + ] + ] + ] + end + def registry_mfa do Application.get_env(:astarte_vmq_plugin, :registry_mfa) end diff --git a/test/astarte_vmq_plugin_connection_synchronizer_test.exs b/test/astarte_vmq_plugin_connection_synchronizer_test.exs index f7504df..065f621 100644 --- a/test/astarte_vmq_plugin_connection_synchronizer_test.exs +++ b/test/astarte_vmq_plugin_connection_synchronizer_test.exs @@ -27,26 +27,40 @@ defmodule Astarte.VMQ.Plugin.Connection.SynchronizerTest do @device_id :crypto.strong_rand_bytes(16) |> Base.url_encode64(padding: false) @realm "test" @device_base_path "#{@realm}/#{@device_id}" - @queue_name "#{Config.data_queue_prefix()}0" setup_all do amqp_opts = Config.amqp_options() {:ok, conn} = Connection.open(amqp_opts) {:ok, chan} = Channel.open(conn) - Queue.declare(chan, @queue_name) - {:ok, chan: chan} + queue_total = Config.mississippi_opts!()[:mississippi_config][:queues][:total_count] + + queues = + for n <- 0..queue_total do + queue_name = "#{Config.data_queue_prefix()}#{n}" + {:ok, _} = Queue.declare(chan, queue_name, durable: true) + queue_name + end + + {:ok, chan: chan, queues: queues} end - setup %{chan: chan} do + setup %{chan: chan, queues: queues} do test_pid = self() - {:ok, consumer_tag} = - Queue.subscribe(chan, @queue_name, fn payload, meta -> - send(test_pid, {:amqp_msg, payload, meta}) - end) + consumer_tags = + for queue <- queues do + {:ok, consumer_tag} = + Queue.subscribe(chan, queue, fn payload, meta -> + send(test_pid, {:amqp_msg, payload, meta}) + end) + + consumer_tag + end on_exit(fn -> - Queue.unsubscribe(chan, consumer_tag) + Enum.each(consumer_tags, fn consumer_tag -> + Queue.unsubscribe(chan, consumer_tag) + end) end) :ok diff --git a/test/astarte_vmq_plugin_test.exs b/test/astarte_vmq_plugin_test.exs index 4cb0b05..85dbaac 100644 --- a/test/astarte_vmq_plugin_test.exs +++ b/test/astarte_vmq_plugin_test.exs @@ -32,29 +32,43 @@ defmodule Astarte.VMQ.PluginTest do @other_device_base_path "#{@realm}/#{@other_device_id}" @other_mqtt_user "other" @another_mqtt_user "another" - @queue_name "#{Config.data_queue_prefix()}0" setup_all do amqp_opts = Config.amqp_options() {:ok, conn} = Connection.open(amqp_opts) {:ok, chan} = Channel.open(conn) - Queue.declare(chan, @queue_name) + queue_total = Config.mississippi_opts!()[:mississippi_config][:queues][:total_count] + + queues = + for n <- 0..queue_total do + queue_name = "#{Config.data_queue_prefix()}#{n}" + {:ok, _} = Queue.declare(chan, queue_name, durable: true) + queue_name + end + :ok = DatabaseTestHelper.await_xandra_cluster_connected!() DatabaseTestHelper.setup_db!() on_exit(&DatabaseTestHelper.teardown_db!/0) - {:ok, chan: chan} + {:ok, chan: chan, queues: queues} end - setup %{chan: chan} do + setup %{chan: chan, queues: queues} do test_pid = self() - {:ok, consumer_tag} = - Queue.subscribe(chan, @queue_name, fn payload, meta -> - send(test_pid, {:amqp_msg, payload, meta}) - end) + consumer_tags = + for queue <- queues do + {:ok, consumer_tag} = + Queue.subscribe(chan, queue, fn payload, meta -> + send(test_pid, {:amqp_msg, payload, meta}) + end) + + consumer_tag + end on_exit(fn -> - Queue.unsubscribe(chan, consumer_tag) + Enum.each(consumer_tags, fn consumer_tag -> + Queue.unsubscribe(chan, consumer_tag) + end) end) :ok