From 337f15a012a0d36de997f3a75824ba26c2a29a8e Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Tue, 10 Jun 2025 13:53:03 +0530 Subject: [PATCH] Kerberos keytab/password auth Signed-off-by: Sreekanth --- Dockerfile | 2 + examples/11-join-on-map.yaml | 62 ++++++++++++++----- rust/Cargo.toml | 3 +- rust/extns/numaflow-kafka/src/lib.rs | 68 ++++++++++++++++----- rust/extns/numaflow-kafka/src/sink.rs | 2 +- rust/extns/numaflow-kafka/src/source.rs | 2 +- rust/numaflow-core/src/config/components.rs | 16 +++-- 7 files changed, 118 insertions(+), 37 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2aec6b636c..0e4fbbe57d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -74,6 +74,8 @@ COPY --from=base /bin/numaflow /bin/numaflow COPY --from=base /bin/numaflow-rs /bin/numaflow-rs COPY ui/build /ui/build +RUN apt update && apt install -y krb5-user libpam-krb5 + # TODO: remove this when we are ported everything to Rust COPY --from=base /bin/entrypoint /bin/entrypoint ENTRYPOINT ["/bin/entrypoint"] diff --git a/examples/11-join-on-map.yaml b/examples/11-join-on-map.yaml index 18235350e8..a921c5eeb4 100644 --- a/examples/11-join-on-map.yaml +++ b/examples/11-join-on-map.yaml @@ -1,33 +1,65 @@ +apiVersion: v1 +kind: Secret +metadata: + name: gssapi +data: + username: a2Fma2F0ZXN0L2hvc3QubGltYS5pbnRlcm5hbAo= + password: cm9vdAo= + config: | + W2xpYmRlZmF1bHRzXQogIGRlZmF1bHRfcmVhbG0gPSBFWEFNUExFLkNPTQogIHVkcF9wcmVmZXJl + bmNlX2xpbWl0ID0gMQoKW3JlYWxtc10KICBFWEFNUExFLkNPTSA9IHsKICAgIGtkYyA9IGhvc3Qu + bGltYS5pbnRlcm5hbDo4OAogICAgYWRtaW5fc2VydmVyID0gaG9zdC5saW1hLmludGVybmFsOjc0 + OQogIH0KCltkb21haW5fcmVhbG1dCiAgLmV4YW1wbGUuY29tID0gRVhBTVBMRS5DT00KICBleGFt + cGxlLmNvbSA9IEVYQU1QTEUuQ09NCg== + +--- apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: name: join-on-map spec: vertices: - - name: http-in - source: - http: {} - name: kafka-in + scale: + min: 1 + max: 1 + containerTemplate: + env: + - name: NUMAFLOW_RUNTIME + value: "rust" source: kafka: brokers: - - my-broker1:19700 - - my-broker2:19700 + - host.lima.internal:9094 topic: my-topic consumerGroup: my-consumer-group - # Both of the incoming Vertices join on this Map Vertex - - name: cat - udf: - builtin: - name: cat # A built-in UDF which simply cats the message + sasl: + mechanism: GSSAPI + gssapi: + serviceName: kafka + realm: EXAMPLE.COM + authType: KRB5_USER_AUTH + usernameSecret: # Pointing to a secret reference which contains the username + name: gssapi + key: username + passwordSecret: # Pointing to a secret reference which contains the username + name: gssapi + key: password + kerberosConfigSecret: # Pointing to a secret reference which contains the kerberos config + name: gssapi + key: config + - name: out + scale: + min: 1 + max: 1 + containerTemplate: + env: + - name: NUMAFLOW_RUNTIME + value: "rust" sink: # A simple log printing sink log: {} edges: - - from: http-in - to: cat - from: kafka-in - to: cat - - from: cat - to: out \ No newline at end of file + to: out diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 1c06047e0c..62a02f035d 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -47,7 +47,8 @@ verbose_file_reads = "warn" # Compared to default release profile, this profile reduced binary size from 29MB to 21MB # and increased build time (with only one line change in code) from 12 seconds to 133 seconds (tested on Mac M2 Max). [profile.release] -lto = "fat" +codegen-units = 256 +lto = false # This profile optimizes for short build times at the expense of larger binary size and slower runtime performance. # If you have to rebuild image often, in Dockerfile you may replace `--release` passed to cargo command with `--profile quick-release` diff --git a/rust/extns/numaflow-kafka/src/lib.rs b/rust/extns/numaflow-kafka/src/lib.rs index 38505d576e..1bab382488 100644 --- a/rust/extns/numaflow-kafka/src/lib.rs +++ b/rust/extns/numaflow-kafka/src/lib.rs @@ -20,6 +20,12 @@ pub enum Error { Other(String), } +#[derive(Debug, Clone, PartialEq)] +pub enum GssAPIAuthType { + KeyTab, + UserAuth, +} + /// Represents the SASL authentication mechanism for connecting to Kafka. /// /// See the [Kafka Security documentation](https://kafka.apache.org/documentation/#security_sasl) for more details on each mechanism. @@ -61,8 +67,10 @@ pub enum KafkaSaslAuth { keytab: Option, /// Path to the Kerberos configuration file (optional). kerberos_config: Option, - /// Authentication type (e.g., "kinit"). - auth_type: String, + /// Authentication type (e.g., ""). + /// KRB5_USER_AUTH for auth using password + /// KRB5_KEYTAB_AUTH for auth using keytab + auth_type: GssAPIAuthType, }, /// SASL/OAUTHBEARER authentication mechanism. /// @@ -108,7 +116,7 @@ fn update_auth_config( client_config: &mut ClientConfig, tls_config: Option, auth_config: Option, -) { +) -> Result<()> { let tls_enabled = tls_config.is_some(); if let Some(tls_config) = tls_config { client_config.set("security.protocol", "SSL"); @@ -160,10 +168,10 @@ fn update_auth_config( service_name, realm: _, username, - password: _, + password, keytab, kerberos_config, - auth_type: _, + auth_type, } => { client_config.set("sasl.mechanisms", "GSSAPI"); client_config.set("sasl.kerberos.service.name", service_name); @@ -171,9 +179,38 @@ fn update_auth_config( if let Some(keytab) = keytab { client_config.set("sasl.kerberos.keytab", keytab); } + + const CONFIG_FILE_PATH: &str = "/tmp/krb5.conf"; + let kinit_cmd = match auth_type { + GssAPIAuthType::UserAuth => { + let password = password.unwrap_or_default(); // FIXME: + format!( + r#"echo '{password}' | kinit -R %{{sasl.kerberos.principal}} || echo '{password}' | kinit %{{sasl.kerberos.principal}}"# + ) + } + GssAPIAuthType::KeyTab => { + // Default value. https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html + r##"kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}"##.to_string() + } + }; + if let Some(kerberos_config) = kerberos_config { - client_config.set("sasl.kerberos.kinit.cmd", kerberos_config); + std::fs::write(CONFIG_FILE_PATH, &kerberos_config).map_err(|err| { + Error::Other(format!( + "Saving kerberos config to file {kerberos_config}: {err:?}" + )) + })?; + client_config.set( + "sasl.kerberos.kinit.cmd", + format!("export KRB5_CONFIG={CONFIG_FILE_PATH} && {kinit_cmd}"), + ); + } else { + client_config.set("sasl.kerberos.kinit.cmd", kinit_cmd); } + tracing::info!( + kinit_cmd = client_config.get("sasl.kerberos.kinit.cmd"), + "Running kinit command" + ); } KafkaSaslAuth::Oauth { client_id, @@ -187,6 +224,7 @@ fn update_auth_config( } } } + Ok(()) } #[cfg(test)] @@ -198,7 +236,7 @@ mod tests { #[test] fn test_update_auth_config_none() { let mut config = ClientConfig::new(); - update_auth_config(&mut config, None, None); + update_auth_config(&mut config, None, None).unwrap(); // Should not set any security or SASL keys assert!(config.get("security.protocol").is_none()); assert!(config.get("sasl.mechanisms").is_none()); @@ -215,7 +253,7 @@ mod tests { client_cert_private_key: "CLIENT_KEY_DATA".to_string(), }), }; - update_auth_config(&mut config, Some(tls), None); + update_auth_config(&mut config, Some(tls), None).unwrap(); let expected_config = [ ("security.protocol", "SSL"), ("enable.ssl.certificate.verification", "false"), @@ -236,7 +274,7 @@ mod tests { username: "user".to_string(), password: "pass".to_string(), }; - update_auth_config(&mut config, None, Some(auth)); + update_auth_config(&mut config, None, Some(auth)).unwrap(); let expected_config = [ ("security.protocol", "SASL_PLAINTEXT"), ("sasl.mechanisms", "PLAIN"), @@ -256,7 +294,7 @@ mod tests { username: "user256".to_string(), password: "pass256".to_string(), }; - update_auth_config(&mut config, None, Some(auth)); + update_auth_config(&mut config, None, Some(auth)).unwrap(); let expected_config = [ ("security.protocol", "SASL_PLAINTEXT"), ("sasl.mechanisms", "SCRAM-SHA-256"), @@ -276,7 +314,7 @@ mod tests { username: "user512".to_string(), password: "pass512".to_string(), }; - update_auth_config(&mut config, None, Some(auth)); + update_auth_config(&mut config, None, Some(auth)).unwrap(); let expected_config = [ ("security.protocol", "SASL_PLAINTEXT"), ("sasl.mechanisms", "SCRAM-SHA-512"), @@ -299,9 +337,9 @@ mod tests { password: None, keytab: Some("/path/to/keytab".to_string()), kerberos_config: Some("/path/to/krb5.conf".to_string()), - auth_type: "kinit".to_string(), + auth_type: GssAPIAuthType::KeyTab, }; - update_auth_config(&mut config, None, Some(auth)); + update_auth_config(&mut config, None, Some(auth)).unwrap(); let expected_config = [ ("security.protocol", "SASL_PLAINTEXT"), ("sasl.mechanisms", "GSSAPI"), @@ -324,7 +362,7 @@ mod tests { client_secret: "csecret".to_string(), token_endpoint: "https://token".to_string(), }; - update_auth_config(&mut config, None, Some(auth)); + update_auth_config(&mut config, None, Some(auth)).unwrap(); let expected_config = [ ("security.protocol", "SASL_PLAINTEXT"), ("sasl.mechanisms", "OAUTHBEARER"), @@ -350,7 +388,7 @@ mod tests { username: "user".to_string(), password: "pass".to_string(), }; - update_auth_config(&mut config, Some(tls), Some(auth)); + update_auth_config(&mut config, Some(tls), Some(auth)).unwrap(); let expected_config = [ ("security.protocol", "SASL_SSL"), ("sasl.mechanisms", "PLAIN"), diff --git a/rust/extns/numaflow-kafka/src/sink.rs b/rust/extns/numaflow-kafka/src/sink.rs index d5fd015961..c95266ad3c 100644 --- a/rust/extns/numaflow-kafka/src/sink.rs +++ b/rust/extns/numaflow-kafka/src/sink.rs @@ -87,7 +87,7 @@ pub fn new_sink(config: KafkaSinkConfig) -> crate::Result { .set("bootstrap.servers", config.brokers.join(",")) .set_log_level(RDKafkaLogLevel::Warning); - crate::update_auth_config(&mut client_config, config.tls, config.auth); + crate::update_auth_config(&mut client_config, config.tls, config.auth)?; let producer: FutureProducer = client_config .create() diff --git a/rust/extns/numaflow-kafka/src/source.rs b/rust/extns/numaflow-kafka/src/source.rs index 67fb501297..62ec5dee51 100644 --- a/rust/extns/numaflow-kafka/src/source.rs +++ b/rust/extns/numaflow-kafka/src/source.rs @@ -137,7 +137,7 @@ impl KafkaActor { .set("enable.auto.commit", "false") .set_log_level(RDKafkaLogLevel::Warning); - crate::update_auth_config(&mut client_config, config.tls, config.auth); + crate::update_auth_config(&mut client_config, config.tls, config.auth)?; let context = KafkaContext; let consumer: Arc = diff --git a/rust/numaflow-core/src/config/components.rs b/rust/numaflow-core/src/config/components.rs index d37c0c2210..484a21f28a 100644 --- a/rust/numaflow-core/src/config/components.rs +++ b/rust/numaflow-core/src/config/components.rs @@ -1,4 +1,4 @@ -use numaflow_kafka::TlsConfig; +use numaflow_kafka::{GssAPIAuthType, TlsConfig}; use numaflow_models::models::{Sasl, Tls}; use crate::Error; @@ -1199,7 +1199,14 @@ fn parse_kafka_auth_config( } else { None }; - let auth_type = format!("{:?}", gssapi.auth_type); + let auth_type = match gssapi.auth_type { + numaflow_models::models::gssapi::AuthType::KeytabAuth => { + GssAPIAuthType::KeyTab + } + numaflow_models::models::gssapi::AuthType::UserAuth => { + GssAPIAuthType::UserAuth + } + }; Some(numaflow_kafka::KafkaSaslAuth::Gssapi { service_name, realm, @@ -1847,6 +1854,7 @@ mod kafka_tests { use super::sink::SinkType; use super::source::SourceType; use k8s_openapi::api::core::v1::SecretKeySelector; + use numaflow_kafka::GssAPIAuthType; use numaflow_models::models::gssapi::AuthType; use numaflow_models::models::{Gssapi, KafkaSource, Sasl, SaslPlain, SasloAuth, Tls}; use std::collections::HashMap; @@ -2402,7 +2410,7 @@ mod kafka_tests { assert_eq!(password, Some("test-pass".to_string())); assert!(keytab.is_none()); assert!(kerberos_config.is_none()); - assert_eq!(auth_type, "UserAuth"); + assert_eq!(auth_type, GssAPIAuthType::UserAuth); } _ => panic!("Unexpected KafkaAuth variant"), } @@ -2475,7 +2483,7 @@ mod kafka_tests { assert!(password.is_none()); assert_eq!(keytab, Some("test-keytab".to_string())); assert!(kerberos_config.is_none()); - assert_eq!(auth_type, "KeytabAuth"); + assert_eq!(auth_type, GssAPIAuthType::KeyTab); } _ => panic!("Unexpected KafkaAuth variant"), }