Skip to content

Commit

Permalink
AMQP publish: use Mississippi
Browse files Browse the repository at this point in the history
Replace the old messy AMQP code.
The sharding key is now included (as a binary header)
in the message, according to Mississippi best practices.

This change is retrocompatible.

Signed-off-by: Arnaldo Cesco <[email protected]>
  • Loading branch information
Annopaolo committed Oct 22, 2024
1 parent 7159bf8 commit f01cbfb
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 165 deletions.
24 changes: 14 additions & 10 deletions lib/astarte_vmq_plugin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ defmodule Astarte.VMQ.Plugin do
"""

alias Astarte.VMQ.Plugin.Config
alias Astarte.VMQ.Plugin.AMQPClient
alias Astarte.VMQ.Plugin.Connection.Synchronizer
alias Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor, as: SynchronizerSupervisor
alias Astarte.VMQ.Plugin.Queries
Expand Down Expand Up @@ -246,15 +245,20 @@ defmodule Astarte.VMQ.Plugin do
] ++ additional_headers

message_id = generate_message_id(realm, device_id, timestamp)
sharding_key = {realm, device_id}

:ok =
AMQPClient.publish(payload,
headers: headers,
message_id: message_id,
timestamp: timestamp,
sharding_key: sharding_key
)

{:ok, decoded_device_id} =
Astarte.Core.Device.decode_device_id(device_id, allow_extended_id: true)

sharding_key = {realm, decoded_device_id}

publish_opts = [
headers: headers,
message_id: message_id,
timestamp: timestamp,
sharding_key: sharding_key
]

:ok = Mississippi.Producer.EventsProducer.publish(payload, publish_opts)
end

defp now_us_x10_timestamp do
Expand Down
136 changes: 0 additions & 136 deletions lib/astarte_vmq_plugin/amqp_client.ex

This file was deleted.

2 changes: 1 addition & 1 deletion lib/astarte_vmq_plugin/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule Astarte.VMQ.Plugin.Application do

# List all child processes to be supervised
children = [
Astarte.VMQ.Plugin.AMQPClient,
{Mississippi.Producer, Config.mississippi_opts!()},
{Registry, keys: :unique, name: AstarteVMQPluginConnectionSynchronizer.Registry},
Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor,
{Astarte.VMQ.Plugin.Publisher, [Config.registry_mfa()]},
Expand Down
13 changes: 13 additions & 0 deletions lib/astarte_vmq_plugin/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,19 @@ defmodule Astarte.VMQ.Plugin.Config do
Application.get_env(:astarte_vmq_plugin, :mirror_queue_name)
end

def mississippi_opts! do
[
amqp_producer_options: amqp_options(),
mississippi_config: [
queues: [
events_exchange_name: "",
total_count: data_queue_count(),
prefix: data_queue_prefix()
]
]
]
end

def registry_mfa do
Application.get_env(:astarte_vmq_plugin, :registry_mfa)
end
Expand Down
32 changes: 23 additions & 9 deletions test/astarte_vmq_plugin_connection_synchronizer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,40 @@ defmodule Astarte.VMQ.Plugin.Connection.SynchronizerTest do
@device_id :crypto.strong_rand_bytes(16) |> Base.url_encode64(padding: false)
@realm "test"
@device_base_path "#{@realm}/#{@device_id}"
@queue_name "#{Config.data_queue_prefix()}0"

setup_all do
amqp_opts = Config.amqp_options()
{:ok, conn} = Connection.open(amqp_opts)
{:ok, chan} = Channel.open(conn)
Queue.declare(chan, @queue_name)
{:ok, chan: chan}
queue_total = Config.mississippi_opts!()[:mississippi_config][:queues][:total_count]

queues =
for n <- 0..queue_total do
queue_name = "#{Config.data_queue_prefix()}#{n}"
{:ok, _} = Queue.declare(chan, queue_name, durable: true)
queue_name
end

{:ok, chan: chan, queues: queues}
end

setup %{chan: chan} do
setup %{chan: chan, queues: queues} do
test_pid = self()

{:ok, consumer_tag} =
Queue.subscribe(chan, @queue_name, fn payload, meta ->
send(test_pid, {:amqp_msg, payload, meta})
end)
consumer_tags =
for queue <- queues do
{:ok, consumer_tag} =
Queue.subscribe(chan, queue, fn payload, meta ->
send(test_pid, {:amqp_msg, payload, meta})
end)

consumer_tag
end

on_exit(fn ->
Queue.unsubscribe(chan, consumer_tag)
Enum.each(consumer_tags, fn consumer_tag ->
Queue.unsubscribe(chan, consumer_tag)
end)
end)

:ok
Expand Down
32 changes: 23 additions & 9 deletions test/astarte_vmq_plugin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,43 @@ defmodule Astarte.VMQ.PluginTest do
@other_device_base_path "#{@realm}/#{@other_device_id}"
@other_mqtt_user "other"
@another_mqtt_user "another"
@queue_name "#{Config.data_queue_prefix()}0"

setup_all do
amqp_opts = Config.amqp_options()
{:ok, conn} = Connection.open(amqp_opts)
{:ok, chan} = Channel.open(conn)
Queue.declare(chan, @queue_name)
queue_total = Config.mississippi_opts!()[:mississippi_config][:queues][:total_count]

queues =
for n <- 0..queue_total do
queue_name = "#{Config.data_queue_prefix()}#{n}"
{:ok, _} = Queue.declare(chan, queue_name, durable: true)
queue_name
end

:ok = DatabaseTestHelper.await_xandra_cluster_connected!()
DatabaseTestHelper.setup_db!()
on_exit(&DatabaseTestHelper.teardown_db!/0)
{:ok, chan: chan}
{:ok, chan: chan, queues: queues}
end

setup %{chan: chan} do
setup %{chan: chan, queues: queues} do
test_pid = self()

{:ok, consumer_tag} =
Queue.subscribe(chan, @queue_name, fn payload, meta ->
send(test_pid, {:amqp_msg, payload, meta})
end)
consumer_tags =
for queue <- queues do
{:ok, consumer_tag} =
Queue.subscribe(chan, queue, fn payload, meta ->
send(test_pid, {:amqp_msg, payload, meta})
end)

consumer_tag
end

on_exit(fn ->
Queue.unsubscribe(chan, consumer_tag)
Enum.each(consumer_tags, fn consumer_tag ->
Queue.unsubscribe(chan, consumer_tag)
end)
end)

:ok
Expand Down

0 comments on commit f01cbfb

Please sign in to comment.