Skip to content

WIP: MQTT/Websocket support for C8y #3575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/common/tedge_config/src/tedge_toml/models/auto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::str::FromStr;

/// A flag that can be set to auto,
/// meaning the system will have to detect the appropriate true/false setting
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, doku::Document)]
#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, Eq, PartialEq, doku::Document,
)]
pub enum AutoFlag {
True,
False,
Expand Down
42 changes: 42 additions & 0 deletions crates/common/tedge_config/src/tedge_toml/models/http_or_s.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::fmt::Display;
use std::fmt::Formatter;
use std::str::FromStr;

/// A flag that can be HTTP or HTTPS
#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, Eq, PartialEq, doku::Document,
)]
pub enum HttpOrS {
Http,
Https,
}

#[derive(thiserror::Error, Debug)]
#[error("Failed to parse flag: {input}. Supported values are: HTTP, HTTPS")]
pub struct InvalidScheme {
input: String,
}

impl FromStr for HttpOrS {
type Err = InvalidScheme;

fn from_str(input: &str) -> Result<Self, Self::Err> {
match input.to_lowercase().as_str() {
"http" => Ok(Self::Http),
"https" => Ok(Self::Https),
_ => Err(Self::Err {
input: input.to_string(),
}),
}
}
}

impl Display for HttpOrS {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let output = match self {
Self::Http => "HTTP",
Self::Https => "HTTPS",
};
output.fmt(f)
}
}
2 changes: 2 additions & 0 deletions crates/common/tedge_config/src/tedge_toml/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ pub mod connect_url;
pub mod cryptoki;
pub mod flag;
pub mod host_port;
pub mod http_or_s;
pub mod ipaddress;
pub mod mqtt_protocol;
pub mod path;
pub mod port;
pub mod seconds;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::fmt::Display;
use std::fmt::Formatter;
use std::str::FromStr;

/// The protocol used to connect to a cloud's MQTT service
#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, Eq, PartialEq, doku::Document,
)]
pub enum MqttProtocol {
Tcp,
Websocket,
}

#[derive(thiserror::Error, Debug)]
#[error("Failed to parse flag: {input}. Supported values are: tcp, ws")]
pub struct InvalidScheme {
input: String,
}

impl FromStr for MqttProtocol {
type Err = InvalidScheme;

fn from_str(input: &str) -> Result<Self, Self::Err> {
match input.to_lowercase().as_str() {
"tcp" => Ok(Self::Tcp),
"ws" => Ok(Self::Websocket),
_ => Err(Self::Err {
input: input.to_string(),
}),
}
}
}

impl Display for MqttProtocol {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let output = match self {
Self::Tcp => "tcp",
Self::Websocket => "ws",
};
output.fmt(f)
}
}
21 changes: 21 additions & 0 deletions crates/common/tedge_config/src/tedge_toml/tedge_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ mod append_remove;
pub use append_remove::AppendRemoveItem;

use super::models::auth_method::AuthMethod;
use super::models::http_or_s::HttpOrS;
use super::models::mqtt_protocol::MqttProtocol;
use super::models::timestamp::TimeFormat;
use super::models::AptConfig;
use super::models::AutoFlag;
Expand Down Expand Up @@ -354,6 +356,25 @@ define_tedge_config! {
/// The amount of time after which the bridge should send a ping if no other traffic has occurred
#[tedge_config(example = "60s", default(from_str = "60s"))]
keepalive_interval: SecondsOrHumanTime,

/// The protocol to connect the MQTT bridge with (only affects built-in bridge)
#[tedge_config(example = "tcp", default(from_str = "tcp"))]
protocol: MqttProtocol,

proxy: {
/// The address (host:port) of an HTTP CONNECT proxy to use when connecting to Cumulocity
address: HostPort<8000>,

/// The username for the proxy connection to Cumulocity's MQTT broker
username: String,

/// The password for the proxy connection to Cumulocity's MQTT broker
password: String,

#[tedge_config(rename = "type", default(variable = "HttpOrS::Https"), example = "HTTPS")]
/// The type of the proxy connection to use, either `HTTP` or `HTTPS`
ty: HttpOrS,
}
},

entity_store: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::models::http_or_s::HttpOrS;

use super::*;

#[diagnostic::on_unimplemented(
Expand All @@ -13,7 +15,7 @@ pub trait AppendRemoveItem {
}

macro_rules! impl_append_remove_for_single_value {
($($type:ty),*) => {
($($type:ty),* $(,)?) => {
$(
impl AppendRemoveItem for $type {
type Item = $type;
Expand Down Expand Up @@ -56,7 +58,10 @@ impl_append_remove_for_single_value!(
AptConfig,
MqttPayloadLimit,
AuthMethod,
Cryptoki
Cryptoki,
HostPort<8000>,
HttpOrS,
MqttProtocol,
);

impl AppendRemoveItem for TemplatesSet {
Expand Down
43 changes: 42 additions & 1 deletion crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,22 @@ use c8y_mapper_ext::config::C8yMapperConfig;
use c8y_mapper_ext::converter::CumulocityConverter;
use mqtt_channel::Config;
use std::borrow::Cow;
use std::sync::Arc;
use tedge_api::entity::EntityExternalId;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_config::all_or_nothing;
use tedge_config::models::http_or_s::HttpOrS;
use tedge_config::models::mqtt_protocol::MqttProtocol;
use tedge_config::tedge_toml::ProfileName;
use tedge_config::TEdgeConfig;
use tedge_downloader_ext::DownloaderActor;
use tedge_file_system_ext::FsWatchActorBuilder;
use tedge_http_ext::HttpActor;
use tedge_mqtt_bridge::rumqttc::LastWill;
use tedge_mqtt_bridge::rumqttc::Proxy;
use tedge_mqtt_bridge::rumqttc::ProxyAuth;
use tedge_mqtt_bridge::rumqttc::ProxyType;
use tedge_mqtt_bridge::rumqttc::TlsConfiguration;
use tedge_mqtt_bridge::rumqttc::Transport;
use tedge_mqtt_bridge::use_credentials;
use tedge_mqtt_bridge::BridgeConfig;
Expand Down Expand Up @@ -143,16 +151,26 @@ impl TEdgeComponent for CumulocityMapper {
}

let c8y = c8y_config.mqtt.or_config_not_set()?;
let mqtt_host = match c8y_config.bridge.protocol {
MqttProtocol::Websocket => format!("wss://{}/mqtt", c8y.host()),
MqttProtocol::Tcp => c8y.host().to_string(),
};
let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new(
c8y_config.device.id()?,
c8y.host().to_string(),
mqtt_host,
// The port is ignored in websocket mode, so this only applies
// to MQTT/TCP connections
c8y.port().into(),
);
// Cumulocity tells us not to not set clean session to false, so don't
// https://cumulocity.com/docs/device-integration/mqtt/#mqtt-clean-session
cloud_config.set_clean_session(true);

if use_certificate {
ensure!(
c8y_config.bridge.mqtt_protocol == MqttProtocol::Tcp,
"To use MQTT over websockets, please enable basic authentication"
);
let cloud_broker_auth_config = tedge_config
.mqtt_auth_config_cloud_broker(c8y_profile)
.expect("error getting cloud broker auth config");
Expand All @@ -165,6 +183,7 @@ impl TEdgeComponent for CumulocityMapper {
let (username, password) = read_c8y_credentials(&c8y_config.credentials_path)?;
use_credentials(
&mut cloud_config,
c8y_config.bridge.protocol,
&c8y_config.root_cert_path,
username,
password,
Expand Down Expand Up @@ -224,6 +243,28 @@ impl TEdgeComponent for CumulocityMapper {
});
cloud_config.set_keep_alive(c8y_config.bridge.keepalive_interval.duration());

let rustls_config = tedge_config.cloud_client_tls_config();
let proxy_config = &c8y_config.bridge.proxy;
if let Some(address) = proxy_config.address.or_none() {
let credentials =
all_or_nothing((proxy_config.username.clone(), proxy_config.password.clone()))
.map_err(|e| anyhow::anyhow!(e))?;
cloud_config.set_proxy(Proxy {
addr: address.host().to_string(),
port: address.port().0,
auth: match credentials {
Some((username, password)) => ProxyAuth::Basic { username, password },
None => ProxyAuth::None,
},
ty: match c8y_config.bridge.proxy.ty {
HttpOrS::Http => ProxyType::Http,
HttpOrS::Https => {
ProxyType::Https(TlsConfiguration::Rustls(Arc::new(rustls_config)))
}
},
});
}

runtime
.spawn(
MqttBridgeActorBuilder::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/tedge_mqtt_bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ certificate = { workspace = true }
futures = { workspace = true }
mqtt_channel = { workspace = true }
mutants = { workspace = true }
rumqttc = { workspace = true }
rumqttc = { workspace = true, features = ["proxy", "websocket"] }
tedge_actors = { workspace = true }
tedge_config = { workspace = true }
thiserror = { workspace = true }
Expand Down
9 changes: 8 additions & 1 deletion crates/extensions/tedge_mqtt_bridge/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use rumqttc::MqttOptions;
use rumqttc::Transport;
use std::borrow::Cow;
use std::path::Path;
use tedge_config::models::mqtt_protocol::MqttProtocol;
use tedge_config::tedge_toml::CloudConfig;

pub fn use_key_and_cert(
Expand All @@ -25,12 +26,18 @@ pub fn use_key_and_cert(

pub fn use_credentials(
config: &mut MqttOptions,
protocol: MqttProtocol,
root_cert_path: impl AsRef<Path>,
username: String,
password: String,
) -> anyhow::Result<()> {
let tls_config = create_tls_config_without_client_cert(root_cert_path)?;
config.set_transport(Transport::tls_with_config(tls_config.into()));
match protocol {
MqttProtocol::Tcp => config.set_transport(Transport::tls_with_config(tls_config.into())),
MqttProtocol::Websocket => {
config.set_transport(Transport::wss_with_config(tls_config.into()))
}
};
config.set_credentials(username, password);
Ok(())
}
Expand Down
Loading