diff --git a/src/app/config.rs b/src/app/config.rs index cc1cd4ca..5c54d39b 100644 --- a/src/app/config.rs +++ b/src/app/config.rs @@ -8,6 +8,7 @@ use std::{borrow::Cow, collections::BTreeMap, net::SocketAddr}; use serde::{Deserialize, Deserializer, Serialize}; use serde_with::formats::CommaSeparator; use serde_with::serde_as; +use serde_with::with_prefix; use std::str::FromStr; use crate::{app::cli, logging}; @@ -46,6 +47,51 @@ pub struct MetricsConfig { pub hostname_tag: Option, } +#[serde_as] +#[derive(PartialEq, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum KafkaSecurityProtocol { + Plaintext, + Ssl, + SaslPlaintext, + SaslSsl, +} + +// We need snake-case to_string() elsewhere, and during serialization, so.... +impl std::fmt::Display for KafkaSecurityProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let serialized = serde_json::to_string(self).expect("This must be serializable"); + // Ignore the quotes! + write!(f, "{}", &serialized[1..serialized.len() - 1]) + } +} + +#[serde_as] +#[derive(PartialEq, Debug, Serialize, Deserialize, Default)] +pub struct KafkaConfig { + /// Kafka security protocol to use. The value must be one of "plaintext, "ssl", "sasl_plaintext", "sasl_ssl". + /// If not specified, defaults to "plaintext". + pub security_protocol: Option, + + /// TLS CA certificate location for Kafka. + pub ssl_ca_location: Option, + + /// TLS certificate location for Kafka. + pub ssl_cert_location: Option, + + /// TLS private key location for Kafka. + pub ssl_key_location: Option, + + /// SASL mechanism to use for Kafka. The value must be one of "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512". + pub sasl_mechanism: Option, + + /// SASL username for Kafka. + pub sasl_username: Option, + + /// SASL password for Kafka. + pub sasl_password: Option, +} + #[serde_as] #[derive(PartialEq, Debug, Serialize, Deserialize)] pub struct Config { @@ -86,6 +132,10 @@ pub struct Config { /// The topic to produce uptime checks into. pub results_kafka_topic: String, + /// Kafka extended configuration + #[serde(flatten, with = "kafka_config")] + pub kafka_config: KafkaConfig, + /// Which config provider to use to load configs into memory pub config_provider_mode: ConfigProviderMode, @@ -158,6 +208,9 @@ pub struct Config { pub enable_metrics: bool, } +// Adding a kafka_ prefix to all the kafka config fields. +with_prefix!(kafka_config "kafka_"); + impl Default for Config { fn default() -> Self { Self { @@ -175,6 +228,7 @@ impl Default for Config { }, results_kafka_cluster: vec!["127.0.0.1:9092".to_owned()], results_kafka_topic: "uptime-results".to_owned(), + kafka_config: KafkaConfig::default(), config_provider_mode: ConfigProviderMode::Redis, checker_mode: CheckerMode::Reqwest, vector_batch_size: 10, @@ -263,9 +317,12 @@ mod tests { use std::net::IpAddr; use std::{borrow::Cow, collections::BTreeMap, path::PathBuf}; + use crate::app::config::KafkaSecurityProtocol; use crate::{app::cli, logging}; - use super::{CheckerMode, Config, ConfigProviderMode, MetricsConfig, ProducerMode}; + use super::{ + CheckerMode, Config, ConfigProviderMode, KafkaConfig, MetricsConfig, ProducerMode, + }; fn test_with_config(yaml: &str, env_vars: &[(&str, &str)], test_fn: F) where @@ -330,6 +387,7 @@ mod tests { "10.0.0.2:9000".to_owned() ], results_kafka_topic: "uptime-results".to_owned(), + kafka_config: KafkaConfig::default(), config_provider_mode: ConfigProviderMode::Redis, checker_mode: CheckerMode::Reqwest, config_provider_redis_update_ms: 1000, @@ -376,6 +434,16 @@ mod tests { "UPTIME_CHECKER_CONFIGS_KAFKA_CLUSTER", "10.0.0.1,10.0.0.2:7000", ), + ("UPTIME_CHECKER_KAFKA_SECURITY_PROTOCOL", "sasl_plaintext"), + ("UPTIME_CHECKER_KAFKA_SSL_CA_LOCATION", "/path/to/ca.crt"), + ( + "UPTIME_CHECKER_KAFKA_SSL_CERT_LOCATION", + "/path/to/cert.crt", + ), + ("UPTIME_CHECKER_KAFKA_SSL_KEY_LOCATION", "/path/to/key.key"), + ("UPTIME_CHECKER_KAFKA_SASL_MECHANISM", "scram-sha-256"), + ("UPTIME_CHECKER_KAFKA_SASL_USERNAME", "my_user"), + ("UPTIME_CHECKER_KAFKA_SASL_PASSWORD", "my_password"), ("UPTIME_CHECKER_CONFIG_PROVIDER_MODE", "redis"), ("UPTIME_CHECKER_CONFIG_PROVIDER_REDIS_UPDATE_MS", "2000"), ( @@ -420,6 +488,15 @@ mod tests { "10.0.0.2:7000".to_owned() ], results_kafka_topic: "uptime-results".to_owned(), + kafka_config: KafkaConfig { + security_protocol: Some(KafkaSecurityProtocol::SaslPlaintext), + ssl_ca_location: Some("/path/to/ca.crt".to_owned()), + ssl_cert_location: Some("/path/to/cert.crt".to_owned()), + ssl_key_location: Some("/path/to/key.key".to_owned()), + sasl_mechanism: Some("scram-sha-256".to_owned()), + sasl_username: Some("my_user".to_owned()), + sasl_password: Some("my_password".to_owned()), + }, config_provider_mode: ConfigProviderMode::Redis, checker_mode: CheckerMode::Reqwest, config_provider_redis_update_ms: 2000, @@ -450,6 +527,61 @@ mod tests { }, ) } + // We have to do some impl Display tomfoolery to convert our enum into a nice + // snake_case string, so this just sanity-checks that feature. + #[test] + fn test_config_kafka_tostring() { + let c = Config { + sentry_dsn: Some("my_dsn".to_owned()), + sentry_env: Some(Cow::from("my_env_override")), + checker_concurrency: 200, + checker_parallel: false, + log_level: logging::Level::Warn, + log_format: logging::LogFormat::Json, + interface: Some("eth0".to_owned()), + metrics: MetricsConfig { + statsd_addr: "10.0.0.1:1234".parse().unwrap(), + default_tags: BTreeMap::new(), + hostname_tag: None, + }, + results_kafka_cluster: vec!["10.0.0.1".to_owned(), "10.0.0.2:7000".to_owned()], + results_kafka_topic: "uptime-results".to_owned(), + kafka_config: KafkaConfig { + security_protocol: Some(KafkaSecurityProtocol::SaslPlaintext), + ..Default::default() + }, + config_provider_mode: ConfigProviderMode::Redis, + checker_mode: CheckerMode::Reqwest, + config_provider_redis_update_ms: 2000, + config_provider_redis_total_partitions: 32, + redis_enable_cluster: true, + redis_host: "10.0.0.3:6379".to_owned(), + region: "us-west", + allow_internal_ips: true, + disable_connection_reuse: false, + record_task_metrics: false, + pool_idle_timeout_secs: 600, + checker_number: 2, + total_checkers: 5, + producer_mode: ProducerMode::Kafka, + vector_batch_size: 10, + vector_endpoint: "http://localhost:8020".to_owned(), + retry_vector_errors_forever: false, + failure_retries: 2, + http_checker_dns_nameservers: Some(vec![ + IpAddr::from([8, 8, 8, 8]), + IpAddr::from([8, 8, 4, 4]), + ]), + thread_cpu_scale_factor: 3, + redis_timeouts_ms: 30_000, + enable_metrics: false, + }; + + assert_eq!( + c.kafka_config.security_protocol.unwrap().to_string(), + "sasl_plaintext" + ); + } #[test] fn test_config_default_checker_ordinal() { diff --git a/src/manager.rs b/src/manager.rs index 30809f96..2dbe4f4c 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -188,8 +188,52 @@ impl Manager { (sender, handle, results_worker) } ProducerMode::Kafka => { - let kafka_overrides = - HashMap::from([("compression.type".to_string(), "lz4".to_string())]); + let mut kafka_overrides = + HashMap::from([("compression.type".to_owned(), "lz4".to_owned())]); + + if let Some(kafka_ssl_ca_location) = &config.kafka_config.ssl_ca_location { + kafka_overrides.insert( + "ssl.ca.location".to_owned(), + kafka_ssl_ca_location.to_owned(), + ); + } + + if let Some(kafka_ssl_cert_location) = &config.kafka_config.ssl_cert_location { + kafka_overrides.insert( + "ssl.certificate.location".to_owned(), + kafka_ssl_cert_location.to_owned(), + ); + } + + if let Some(kafka_ssl_key_location) = &config.kafka_config.ssl_key_location { + kafka_overrides.insert( + "ssl.key.location".to_owned(), + kafka_ssl_key_location.to_owned(), + ); + } + + if let Some(kafka_security_protocol) = &config.kafka_config.security_protocol { + kafka_overrides.insert( + "security.protocol".to_owned(), + kafka_security_protocol.to_string(), + ); + } + + if let Some(kafka_sasl_mechanism) = &config.kafka_config.sasl_mechanism { + kafka_overrides + .insert("sasl.mechanism".to_owned(), kafka_sasl_mechanism.to_owned()); + } + + if let Some(kafka_sasl_username) = &config.kafka_config.sasl_username { + kafka_overrides + .insert("sasl.username".to_owned(), kafka_sasl_username.to_owned()); + } + + if let Some(kafka_sasl_password) = &config.kafka_config.sasl_password { + kafka_overrides + .insert("sasl.password".to_owned(), kafka_sasl_password.to_owned()); + } + let kafka_config = KafkaConfig::new_config( config.results_kafka_cluster.to_owned(), Some(kafka_overrides),