Skip to content

Commit

Permalink
Connect to Astarte database
Browse files Browse the repository at this point in the history
The plugin has now access to the Astarte database. Relevant
environment variables for Cassandra connection have been added:
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__NODES` (default: "localhost:9042")
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__USERNAME` (default: "cassandra")
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__PASSWORD` (default: "cassandra")
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__POOL_SIZE` (default: 1)
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__SSL_ENABLED` (default: false)
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__SSL_DISABLE_SNI`(default: true)
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__SSL_CUSTOM_SNI`
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__SSL_CA_FILE`

Signed-off-by: Arnaldo Cesco <[email protected]>
  • Loading branch information
Annopaolo committed Aug 31, 2023
1 parent 0beca12 commit e8919ed
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 8 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- The plugin now accesses the Astarte database. The following
env variables have been added:
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__NODES`
(defaults to `localhost:9042`)
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__USERNAME`
(defaults to `cassandra`)
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__PASSWORD`
(defaults to `cassandra`)
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__POOL_SIZE`
(defaults to 10)
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__SSL_ENABLED`
(defaults to `false`)
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__SSL_DISABLE_SNI`
(defaults to `true`)
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__SSL_CUSTOM_SNI`
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__SSL_CA_FILE`

## [1.1.0] - 2023-06-20

Expand Down
5 changes: 3 additions & 2 deletions lib/astarte_vmq_plugin/application.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 Ispirata Srl
# Copyright 2017 - 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,8 @@ defmodule Astarte.VMQ.Plugin.Application do
{Registry, keys: :unique, name: AstarteVMQPluginConnectionSynchronizer.Registry},
Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor,
{Astarte.VMQ.Plugin.Publisher, [Config.registry_mfa()]},
{Astarte.RPC.AMQP.Server, [amqp_queue: Protocol.amqp_queue(), handler: RPCHandler]}
{Astarte.RPC.AMQP.Server, [amqp_queue: Protocol.amqp_queue(), handler: RPCHandler]},
{Xandra.Cluster, Config.xandra_options!()}
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
67 changes: 66 additions & 1 deletion lib/astarte_vmq_plugin/config.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 Ispirata Srl
# Copyright 2017 - 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,24 @@ defmodule Astarte.VMQ.Plugin.Config do

Application.put_env(:astarte_vmq_plugin, :amqp_options, amqp_opts)

cassandra_nodes =
Application.get_env(:astarte_vmq_plugin, :cassandra_nodes, ["localhost:9042"])
|> normalize_cassandra_nodes()

Application.put_env(:astarte_vmq_plugin, :cassandra_nodes, cassandra_nodes)

cassandra_ssl_custom_sni =
Application.get_env(:astarte_vmq_plugin, :cassandra_ssl_custom_sni, "")
|> to_string()

Application.put_env(:astarte_vmq_plugin, :cassandra_ssl_custom_sni, cassandra_ssl_custom_sni)

cassandra_ssl_ca_file =
Application.get_env(:astarte_vmq_plugin, :cassandra_ssl_ca_file, CAStore.file_path())
|> to_string()

Application.put_env(:astarte_vmq_plugin, :cassandra_ssl_ca_file, cassandra_ssl_ca_file)

data_queue_prefix =
Application.get_env(:astarte_vmq_plugin, :data_queue_prefix, "astarte_data_")
|> to_string()
Expand Down Expand Up @@ -147,6 +165,47 @@ defmodule Astarte.VMQ.Plugin.Config do
)
end

def xandra_authentication_options do
password_auth_opts = [
username:
Application.get_env(:astarte_vmq_plugin, :cassandra_username, "cassandra") |> to_string(),
password:
Application.get_env(:astarte_vmq_plugin, :cassandra_password, "cassandra") |> to_string()
]

{Xandra.Authenticator.Password, password_auth_opts}
end

def xandra_options! do
# TODO handle SNI
[
nodes: Application.get_env(:astarte_vmq_plugin, :cassandra_nodes),
authentication: xandra_authentication_options(),
pool_size: Application.get_env(:astarte_vmq_plugin, :cassandra_pool_size, 10),
encryption: Application.get_env(:astarte_vmq_plugin, :cassandra_ssl_enabled, false),
name: :xandra
]
|> populate_xandra_ssl_options!()
end

defp populate_xandra_ssl_options!(options) do
if Application.get_env(:astarte_vmq_plugin, :cassandra_ssl_enabled, false) do
ssl_options = build_xandra_ssl_options!()
Keyword.put(options, :transport_options, ssl_options)
else
options
end
end

defp build_xandra_ssl_options! do
[
cacertfile: Application.fetch_env!(:astarte_vmq_plugin, :cassandra_ssl_ca_file),
verify: :verify_peer,
depth: 10,
server_name_indication: :disable
]
end

defp normalize_opts_strings(amqp_options) do
Enum.map(amqp_options, fn
{:username, value} -> {:username, to_string(value)}
Expand All @@ -156,4 +215,10 @@ defmodule Astarte.VMQ.Plugin.Config do
other -> other
end)
end

defp normalize_cassandra_nodes(nodes) when is_list(nodes) do
nodes
# convert from Erlang strings to Elixir strings
|> Enum.map(&to_string/1)
end
end
5 changes: 3 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ defmodule Astarte.VMQ.Plugin.Mixfile do

defp astarte_required_modules(_) do
[
{:astarte_rpc, "~> 1.1"}
{:astarte_rpc, github: "astarte-platform/astarte_rpc"}
]
end

Expand All @@ -87,7 +87,8 @@ defmodule Astarte.VMQ.Plugin.Mixfile do
{:vernemq_dev, github: "vernemq/vernemq_dev"},
{:excoveralls, "~> 0.15", only: :test},
{:pretty_log, "~> 0.1"},
{:dialyzex, github: "Comcast/dialyzex", only: [:dev, :ci]}
{:dialyzex, github: "Comcast/dialyzex", only: [:dev, :ci]},
{:xandra, "~> 0.14"}
]
end
end
12 changes: 9 additions & 3 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,29 @@
"db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"},
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
"dialyzex": {:git, "https://github.com/Comcast/dialyzex.git", "cdc7cf71fe6df0ce4cf59e3f497579697a05c989", []},
"excoveralls": {:hex, :excoveralls, "0.15.0", "ac941bf85f9f201a9626cc42b2232b251ad8738da993cf406a4290cacf562ea4", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "9631912006b27eca30a2f3c93562bc7ae15980afb014ceb8147dc5cdd8f376f1"},
"ecto": {:hex, :ecto, "3.10.1", "c6757101880e90acc6125b095853176a02da8f1afe056f91f1f90b80c9389822", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d2ac4255f1601bdf7ac74c0ed971102c6829dc158719b94bd30041bbad77f87a"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"excoveralls": {:hex, :excoveralls, "0.16.1", "0bd42ed05c7d2f4d180331a20113ec537be509da31fed5c8f7047ce59ee5a7c5", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dae763468e2008cf7075a64cb1249c97cb4bc71e236c5c2b5e5cdf1cfa2bf138"},
"exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"},
"goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm", "99cb4128cffcb3227581e5d4d803d5413fa643f4eb96523f77d9e6937d994ceb"},
"hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
"jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"},
"jsx": {:hex, :jsx, "3.1.0", "d12516baa0bb23a59bb35dccaf02a1bd08243fcbb9efe24f2d9d056ccff71268", [:rebar3], [], "hexpm", "0c5cc8fdc11b53cc25cf65ac6705ad39e54ecc56d1c22e4adb8f5a53fb9427f3"},
"lager": {:hex, :lager, "3.9.2", "4cab289120eb24964e3886bd22323cb5fefe4510c076992a23ad18cf85413d8c", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm", "7f904d9e87a8cb7e66156ed31768d1c8e26eba1d54f4bc85b1aa4ac1f6340c28"},
"logfmt": {:hex, :logfmt, "3.3.2", "c432765cff9c26cf4ba78cf66ece183e56562dfeba6e2d9f077804cc4c756677", [:mix], [], "hexpm", "8dfc07bf11d362d1ffb11fa34647f4e78dba47247589cc94fd8c9155889c8fcb"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"pretty_log": {:hex, :pretty_log, "0.9.0", "f84aab76e20c551a624ddd4656f1e5f9ca2941625db07549e9cb6a84a346bd40", [:mix], [{:logfmt, "~> 3.3", [hex: :logfmt, repo: "hexpm", optional: false]}], "hexpm", "abf9605c50fdd9377a3ce02ea51696538f4f647b9bb63a8dac209427fc7badf4"},
"protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"},
"rabbit_common": {:hex, :rabbit_common, "3.8.35", "392b43d3242bc9b02b6889be1726ce9ee6bb8e9a226e20253cd8512a147fa765", [:make, :rebar3], [{:credentials_obfuscation, "3.1.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:jsx, "3.1.0", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.9.2", [hex: :lager, repo: "hexpm", optional: false]}, {:recon, "2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "aa79691f95bc86f6383fea2be6a781c1c5efa76d3e6cc0670af2232d8515baac"},
"recon": {:hex, :recon, "2.5.1", "430ffa60685ac1efdfb1fe4c97b8767c92d0d92e6e7c3e8621559ba77598678a", [:mix, :rebar3], [], "hexpm", "5721c6b6d50122d8f68cccac712caa1231f97894bab779eff5ff0f886cb44648"},
"skogsra": {:hex, :skogsra, "2.3.3", "90ea76d98ad749241b31e724ca17ed8aca0202001972aeca3cb834f44027f3ea", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}, {:yamerl, "~> 0.8", [hex: :yamerl, repo: "hexpm", optional: true]}], "hexpm", "e36880922431d41ac56d6cb4529b0526039a108fb44f8ecc90b517d494b86c28"},
"skogsra": {:hex, :skogsra, "2.4.1", "50f0e984d7560ffab30f8f5bb66e177a75d2dc72ed12de373aed7b6dfb54fb8c", [:mix], [{:jason, "~> 1.3", [hex: :jason, repo: "hexpm", optional: true]}, {:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: true]}], "hexpm", "ffef5de2bfb1618babf692803acdd158cc081324735e28deea982dc87c9e565f"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
"vernemq_dev": {:git, "https://github.com/vernemq/vernemq_dev.git", "6d622aa8c901ae7777433aef2bd049e380c474a6", []},
"xandra": {:hex, :xandra, "0.17.0", "c1291a6ade16d19ddf4ebb5e3e947b5e3177e3a0791913a2c4a947b34aa5d400", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "65937898bbfe5eba692a5ce2937cba792bef01deae866cecdd0f0f59b327c88a"},
}
46 changes: 46 additions & 0 deletions priv/astarte_vmq_plugin.schema
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,56 @@
hidden
]}.

{mapping, "astarte_vmq_plugin.cassandra.nodes", "astarte_vmq_plugin.cassandra_nodes", [
{default, "localhost:9042"},
{datatype, string}
]}.

{mapping, "astarte_vmq_plugin.cassandra.username", "astarte_vmq_plugin.cassandra_username", [
{default, "cassandra"},
{datatype, string}
]}.

{mapping, "astarte_vmq_plugin.cassandra.password", "astarte_vmq_plugin.cassandra_password", [
{default, "cassandra"},
{datatype, string}
]}.

{mapping, "astarte_vmq_plugin.cassandra.pool_size", "astarte_vmq_plugin.cassandra_pool_size", [
{default, 10},
{datatype, integer}
]}.

{mapping, "astarte_vmq_plugin.cassandra.ssl_enabled", "astarte_vmq_plugin.cassandra_ssl_enabled", [
{default, false},
{datatype, {enum, [true, false]}}
]}.

%TODO handle enabled SNI
{mapping, "astarte_vmq_plugin.cassandra.ssl_disable_sni", "astarte_vmq_plugin.cassandra_ssl_disable_sni", [
{default, true},
{datatype, {enum, [true, false]}}
]}.

{mapping, "astarte_vmq_plugin.cassandra.ssl_custom_sni", "astarte_vmq_plugin.cassandra_ssl_custom_sni", [
{datatype, string}
]}.

{mapping, "astarte_vmq_plugin.cassandra.ssl_ca_file", "astarte_vmq_plugin.cassandra_ssl_ca_file", [
{datatype, string}
]}.

{translation, "astarte_vmq_plugin.registry_mfa",
fun(Conf) ->
S = cuttlefish:conf_get("astarte_vmq_plugin.registry_mfa", Conf),
{ok, T, _} = erl_scan:string(S ++ "."),
{ok, Term} = erl_parse:parse_term(T),
Term
end}.

{translation, "astarte_vmq_plugin.cassandra_nodes",
fun(Conf) ->
Nodes = cuttlefish:conf_get("astarte_vmq_plugin.cassandra.nodes", Conf),
Splitted = string:split(Nodes, ",", all),
lists:map(fun(E) -> string:trim(E, both) end, Splitted)
end}.

0 comments on commit e8919ed

Please sign in to comment.