Skip to content

Commit

Permalink
Merge pull request #94 from Annopaolo/mississippi-vmq-dup
Browse files Browse the repository at this point in the history
 Move AMQP publish logic to Mississippi
  • Loading branch information
rbino authored Oct 30, 2024
2 parents df9326a + f01cbfb commit 9ca4d04
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 166 deletions.
24 changes: 14 additions & 10 deletions lib/astarte_vmq_plugin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ defmodule Astarte.VMQ.Plugin do
"""

alias Astarte.VMQ.Plugin.Config
alias Astarte.VMQ.Plugin.AMQPClient
alias Astarte.VMQ.Plugin.Connection.Synchronizer
alias Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor, as: SynchronizerSupervisor
alias Astarte.VMQ.Plugin.Queries
Expand Down Expand Up @@ -246,15 +245,20 @@ defmodule Astarte.VMQ.Plugin do
] ++ additional_headers

message_id = generate_message_id(realm, device_id, timestamp)
sharding_key = {realm, device_id}

:ok =
AMQPClient.publish(payload,
headers: headers,
message_id: message_id,
timestamp: timestamp,
sharding_key: sharding_key
)

{:ok, decoded_device_id} =
Astarte.Core.Device.decode_device_id(device_id, allow_extended_id: true)

sharding_key = {realm, decoded_device_id}

publish_opts = [
headers: headers,
message_id: message_id,
timestamp: timestamp,
sharding_key: sharding_key
]

:ok = Mississippi.Producer.EventsProducer.publish(payload, publish_opts)
end

defp now_us_x10_timestamp do
Expand Down
136 changes: 0 additions & 136 deletions lib/astarte_vmq_plugin/amqp_client.ex

This file was deleted.

2 changes: 1 addition & 1 deletion lib/astarte_vmq_plugin/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule Astarte.VMQ.Plugin.Application do

# List all child processes to be supervised
children = [
Astarte.VMQ.Plugin.AMQPClient,
{Mississippi.Producer, Config.mississippi_opts!()},
{Registry, keys: :unique, name: AstarteVMQPluginConnectionSynchronizer.Registry},
Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor,
{Astarte.VMQ.Plugin.Publisher, [Config.registry_mfa()]},
Expand Down
13 changes: 13 additions & 0 deletions lib/astarte_vmq_plugin/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,19 @@ defmodule Astarte.VMQ.Plugin.Config do
Application.get_env(:astarte_vmq_plugin, :mirror_queue_name)
end

def mississippi_opts! do
[
amqp_producer_options: amqp_options(),
mississippi_config: [
queues: [
events_exchange_name: "",
total_count: data_queue_count(),
prefix: data_queue_prefix()
]
]
]
end

def registry_mfa do
Application.get_env(:astarte_vmq_plugin, :registry_mfa)
end
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ defmodule Astarte.VMQ.Plugin.Mixfile do
{:amqp, "~> 3.3"},
{:vernemq_dev, github: "vernemq/vernemq_dev"},
{:excoveralls, "~> 0.15", only: :test},
{:mississippi, github: "secomind/mississippi"},
{:pretty_log, "~> 0.1"},
{:dialyxir, "~> 1.4", only: [:dev, :ci], runtime: false},
{:xandra, "~> 0.14"}
Expand Down
8 changes: 7 additions & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,34 @@
"dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"},
"ecto": {:hex, :ecto, "3.10.2", "6b887160281a61aa16843e47735b8a266caa437f80588c3ab80a8a960e6abe37", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6a895778f0d7648a4b34b486af59a1c8009041fbdf2b17f1ac215eb829c60235"},
"ecto_morph": {:hex, :ecto_morph, "0.1.29", "bc0b915779636bd2d30c54cad6922b3cb40f85b1d4ad59bdffd3c788d9d1f972", [:mix], [{:ecto, ">= 3.0.3", [hex: :ecto, repo: "hexpm", optional: false]}], "hexpm", "814bed72e3d03b278c1dfb3fbc4da37f478a37518ee54f010c1ad9254f1ca0e3"},
"efx": {:hex, :efx, "0.2.6", "ec7c42b05073e6fdc61d971cc02d366f73f40d8093272a5326d16861003036b0", [:mix], [{:process_tree, "0.1.2", [hex: :process_tree, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "7648dcfd05f9ac39b257d4a9aa7c5b243823fee31933e725b2d346371b6d6bc4"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_rabbit_pool": {:git, "https://github.com/leductam/ex_rabbit_pool.git", "9951452ab51d36648b9a9d3373609e48d1379a0d", []},
"excoveralls": {:hex, :excoveralls, "0.16.1", "0bd42ed05c7d2f4d180331a20113ec537be509da31fed5c8f7047ce59ee5a7c5", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dae763468e2008cf7075a64cb1249c97cb4bc71e236c5c2b5e5cdf1cfa2bf138"},
"exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"},
"goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm", "99cb4128cffcb3227581e5d4d803d5413fa643f4eb96523f77d9e6937d994ceb"},
"hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"jsx": {:hex, :jsx, "3.1.0", "d12516baa0bb23a59bb35dccaf02a1bd08243fcbb9efe24f2d9d056ccff71268", [:rebar3], [], "hexpm", "0c5cc8fdc11b53cc25cf65ac6705ad39e54ecc56d1c22e4adb8f5a53fb9427f3"},
"logfmt": {:hex, :logfmt, "3.3.2", "c432765cff9c26cf4ba78cf66ece183e56562dfeba6e2d9f077804cc4c756677", [:mix], [], "hexpm", "8dfc07bf11d362d1ffb11fa34647f4e78dba47247589cc94fd8c9155889c8fcb"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mississippi": {:git, "https://github.com/secomind/mississippi.git", "510a30878d1a990fca15b54eda6b4370c636cefa", []},
"nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"pretty_log": {:hex, :pretty_log, "0.9.0", "f84aab76e20c551a624ddd4656f1e5f9ca2941625db07549e9cb6a84a346bd40", [:mix], [{:logfmt, "~> 3.3", [hex: :logfmt, repo: "hexpm", optional: false]}], "hexpm", "abf9605c50fdd9377a3ce02ea51696538f4f647b9bb63a8dac209427fc7badf4"},
"process_tree": {:hex, :process_tree, "0.1.2", "26218b086f5a2265a5c7a24050acd8b7416702633518f423ddde3cf38c6ff3cf", [:mix], [], "hexpm", "e5d0876a23cc7f0062f14f43c3a07de71e063d876bb256ab32055c9131f8f9aa"},
"protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"},
"rabbit_common": {:hex, :rabbit_common, "3.12.10", "7fc633ee206ae48783d8a5302dfc8fe1e086a5d7de494785ed206f586ad64b34", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:recon, "2.5.3", [hex: :recon, repo: "hexpm", optional: false]}, {:thoas, "1.0.0", [hex: :thoas, repo: "hexpm", optional: false]}], "hexpm", "908a8b1bd059f5baefe225fe9d3e2545d35a28db8f6a14d60372556ca7afe641"},
"recon": {:hex, :recon, "2.5.3", "739107b9050ea683c30e96de050bc59248fd27ec147696f79a8797ff9fa17153", [:mix, :rebar3], [], "hexpm", "6c6683f46fd4a1dfd98404b9f78dcabc7fcd8826613a89dcb984727a8c3099d7"},
"skogsra": {:hex, :skogsra, "2.4.1", "50f0e984d7560ffab30f8f5bb66e177a75d2dc72ed12de373aed7b6dfb54fb8c", [:mix], [{:jason, "~> 1.3", [hex: :jason, repo: "hexpm", optional: true]}, {:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: true]}], "hexpm", "ffef5de2bfb1618babf692803acdd158cc081324735e28deea982dc87c9e565f"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"thoas": {:hex, :thoas, "1.0.0", "567c03902920827a18a89f05b79a37b5bf93553154b883e0131801600cf02ce0", [:rebar3], [], "hexpm", "fc763185b932ecb32a554fb735ee03c3b6b1b31366077a2427d2a97f3bd26735"},
"typed_struct": {:hex, :typed_struct, "0.3.0", "939789e3c1dca39d7170c87f729127469d1315dcf99fee8e152bb774b17e7ff7", [:mix], [], "hexpm", "c50bd5c3a61fe4e198a8504f939be3d3c85903b382bde4865579bc23111d1b6d"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
"vernemq_dev": {:git, "https://github.com/vernemq/vernemq_dev.git", "6d622aa8c901ae7777433aef2bd049e380c474a6", []},
"xandra": {:hex, :xandra, "0.17.0", "c1291a6ade16d19ddf4ebb5e3e947b5e3177e3a0791913a2c4a947b34aa5d400", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "65937898bbfe5eba692a5ce2937cba792bef01deae866cecdd0f0f59b327c88a"},
Expand Down
32 changes: 23 additions & 9 deletions test/astarte_vmq_plugin_connection_synchronizer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,40 @@ defmodule Astarte.VMQ.Plugin.Connection.SynchronizerTest do
@device_id :crypto.strong_rand_bytes(16) |> Base.url_encode64(padding: false)
@realm "test"
@device_base_path "#{@realm}/#{@device_id}"
@queue_name "#{Config.data_queue_prefix()}0"

setup_all do
amqp_opts = Config.amqp_options()
{:ok, conn} = Connection.open(amqp_opts)
{:ok, chan} = Channel.open(conn)
Queue.declare(chan, @queue_name)
{:ok, chan: chan}
queue_total = Config.mississippi_opts!()[:mississippi_config][:queues][:total_count]

queues =
for n <- 0..queue_total do
queue_name = "#{Config.data_queue_prefix()}#{n}"
{:ok, _} = Queue.declare(chan, queue_name, durable: true)
queue_name
end

{:ok, chan: chan, queues: queues}
end

setup %{chan: chan} do
setup %{chan: chan, queues: queues} do
test_pid = self()

{:ok, consumer_tag} =
Queue.subscribe(chan, @queue_name, fn payload, meta ->
send(test_pid, {:amqp_msg, payload, meta})
end)
consumer_tags =
for queue <- queues do
{:ok, consumer_tag} =
Queue.subscribe(chan, queue, fn payload, meta ->
send(test_pid, {:amqp_msg, payload, meta})
end)

consumer_tag
end

on_exit(fn ->
Queue.unsubscribe(chan, consumer_tag)
Enum.each(consumer_tags, fn consumer_tag ->
Queue.unsubscribe(chan, consumer_tag)
end)
end)

:ok
Expand Down
32 changes: 23 additions & 9 deletions test/astarte_vmq_plugin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,43 @@ defmodule Astarte.VMQ.PluginTest do
@other_device_base_path "#{@realm}/#{@other_device_id}"
@other_mqtt_user "other"
@another_mqtt_user "another"
@queue_name "#{Config.data_queue_prefix()}0"

setup_all do
amqp_opts = Config.amqp_options()
{:ok, conn} = Connection.open(amqp_opts)
{:ok, chan} = Channel.open(conn)
Queue.declare(chan, @queue_name)
queue_total = Config.mississippi_opts!()[:mississippi_config][:queues][:total_count]

queues =
for n <- 0..queue_total do
queue_name = "#{Config.data_queue_prefix()}#{n}"
{:ok, _} = Queue.declare(chan, queue_name, durable: true)
queue_name
end

:ok = DatabaseTestHelper.await_xandra_cluster_connected!()
DatabaseTestHelper.setup_db!()
on_exit(&DatabaseTestHelper.teardown_db!/0)
{:ok, chan: chan}
{:ok, chan: chan, queues: queues}
end

setup %{chan: chan} do
setup %{chan: chan, queues: queues} do
test_pid = self()

{:ok, consumer_tag} =
Queue.subscribe(chan, @queue_name, fn payload, meta ->
send(test_pid, {:amqp_msg, payload, meta})
end)
consumer_tags =
for queue <- queues do
{:ok, consumer_tag} =
Queue.subscribe(chan, queue, fn payload, meta ->
send(test_pid, {:amqp_msg, payload, meta})
end)

consumer_tag
end

on_exit(fn ->
Queue.unsubscribe(chan, consumer_tag)
Enum.each(consumer_tags, fn consumer_tag ->
Queue.unsubscribe(chan, consumer_tag)
end)
end)

:ok
Expand Down

0 comments on commit 9ca4d04

Please sign in to comment.