Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ COPY --from=base /bin/numaflow /bin/numaflow
COPY --from=base /bin/numaflow-rs /bin/numaflow-rs
COPY ui/build /ui/build

RUN apt update && apt install -y krb5-user libpam-krb5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might bring in glibc which will create a lot of CVEs. perhaps, we can also give a non-scratch image too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about if we should run them in one container with the same image?


# TODO: remove this when we are ported everything to Rust
COPY --from=base /bin/entrypoint /bin/entrypoint
ENTRYPOINT ["/bin/entrypoint"]
Expand Down
62 changes: 47 additions & 15 deletions examples/11-join-on-map.yaml
Original file line number Diff line number Diff line change
@@ -1,33 +1,65 @@
apiVersion: v1
kind: Secret
metadata:
name: gssapi
data:
username: a2Fma2F0ZXN0L2hvc3QubGltYS5pbnRlcm5hbAo=
password: cm9vdAo=
config: |
W2xpYmRlZmF1bHRzXQogIGRlZmF1bHRfcmVhbG0gPSBFWEFNUExFLkNPTQogIHVkcF9wcmVmZXJl
bmNlX2xpbWl0ID0gMQoKW3JlYWxtc10KICBFWEFNUExFLkNPTSA9IHsKICAgIGtkYyA9IGhvc3Qu
bGltYS5pbnRlcm5hbDo4OAogICAgYWRtaW5fc2VydmVyID0gaG9zdC5saW1hLmludGVybmFsOjc0
OQogIH0KCltkb21haW5fcmVhbG1dCiAgLmV4YW1wbGUuY29tID0gRVhBTVBMRS5DT00KICBleGFt
cGxlLmNvbSA9IEVYQU1QTEUuQ09NCg==

---
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: join-on-map
spec:
vertices:
- name: http-in
source:
http: {}
- name: kafka-in
scale:
min: 1
max: 1
containerTemplate:
env:
- name: NUMAFLOW_RUNTIME
value: "rust"
source:
kafka:
brokers:
- my-broker1:19700
- my-broker2:19700
- host.lima.internal:9094
topic: my-topic
consumerGroup: my-consumer-group
# Both of the incoming Vertices join on this Map Vertex
- name: cat
udf:
builtin:
name: cat # A built-in UDF which simply cats the message
sasl:
mechanism: GSSAPI
gssapi:
serviceName: kafka
realm: EXAMPLE.COM
authType: KRB5_USER_AUTH
usernameSecret: # Pointing to a secret reference which contains the username
name: gssapi
key: username
passwordSecret: # Pointing to a secret reference which contains the username
name: gssapi
key: password
kerberosConfigSecret: # Pointing to a secret reference which contains the kerberos config
name: gssapi
key: config

- name: out
scale:
min: 1
max: 1
containerTemplate:
env:
- name: NUMAFLOW_RUNTIME
value: "rust"
sink:
# A simple log printing sink
log: {}
edges:
- from: http-in
to: cat
- from: kafka-in
to: cat
- from: cat
to: out
to: out
3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ verbose_file_reads = "warn"
# Compared to default release profile, this profile reduced binary size from 29MB to 21MB
# and increased build time (with only one line change in code) from 12 seconds to 133 seconds (tested on Mac M2 Max).
[profile.release]
lto = "fat"
codegen-units = 256
lto = false

# This profile optimizes for short build times at the expense of larger binary size and slower runtime performance.
# If you have to rebuild image often, in Dockerfile you may replace `--release` passed to cargo command with `--profile quick-release`
Expand Down
68 changes: 53 additions & 15 deletions rust/extns/numaflow-kafka/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ pub enum Error {
Other(String),
}

#[derive(Debug, Clone, PartialEq)]
pub enum GssAPIAuthType {
KeyTab,
UserAuth,
}

/// Represents the SASL authentication mechanism for connecting to Kafka.
///
/// See the [Kafka Security documentation](https://kafka.apache.org/documentation/#security_sasl) for more details on each mechanism.
Expand Down Expand Up @@ -61,8 +67,10 @@ pub enum KafkaSaslAuth {
keytab: Option<String>,
/// Path to the Kerberos configuration file (optional).
kerberos_config: Option<String>,
/// Authentication type (e.g., "kinit").
auth_type: String,
/// Authentication type (e.g., "").
/// KRB5_USER_AUTH for auth using password
/// KRB5_KEYTAB_AUTH for auth using keytab
auth_type: GssAPIAuthType,
},
/// SASL/OAUTHBEARER authentication mechanism.
///
Expand Down Expand Up @@ -108,7 +116,7 @@ fn update_auth_config(
client_config: &mut ClientConfig,
tls_config: Option<TlsConfig>,
auth_config: Option<KafkaSaslAuth>,
) {
) -> Result<()> {
let tls_enabled = tls_config.is_some();
if let Some(tls_config) = tls_config {
client_config.set("security.protocol", "SSL");
Expand Down Expand Up @@ -160,20 +168,49 @@ fn update_auth_config(
service_name,
realm: _,
username,
password: _,
password,
keytab,
kerberos_config,
auth_type: _,
auth_type,
} => {
client_config.set("sasl.mechanisms", "GSSAPI");
client_config.set("sasl.kerberos.service.name", service_name);
client_config.set("sasl.kerberos.principal", username);
if let Some(keytab) = keytab {
client_config.set("sasl.kerberos.keytab", keytab);
}

const CONFIG_FILE_PATH: &str = "/tmp/krb5.conf";
let kinit_cmd = match auth_type {
GssAPIAuthType::UserAuth => {
let password = password.unwrap_or_default(); // FIXME:
format!(
r#"echo '{password}' | kinit -R %{{sasl.kerberos.principal}} || echo '{password}' | kinit %{{sasl.kerberos.principal}}"#
)
}
GssAPIAuthType::KeyTab => {
// Default value. https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html
r##"kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}"##.to_string()
}
};

if let Some(kerberos_config) = kerberos_config {
client_config.set("sasl.kerberos.kinit.cmd", kerberos_config);
std::fs::write(CONFIG_FILE_PATH, &kerberos_config).map_err(|err| {
Error::Other(format!(
"Saving kerberos config to file {kerberos_config}: {err:?}"
))
})?;
client_config.set(
"sasl.kerberos.kinit.cmd",
format!("export KRB5_CONFIG={CONFIG_FILE_PATH} && {kinit_cmd}"),
);
} else {
client_config.set("sasl.kerberos.kinit.cmd", kinit_cmd);
}
tracing::info!(
kinit_cmd = client_config.get("sasl.kerberos.kinit.cmd"),
"Running kinit command"
);
}
KafkaSaslAuth::Oauth {
client_id,
Expand All @@ -187,6 +224,7 @@ fn update_auth_config(
}
}
}
Ok(())
}

#[cfg(test)]
Expand All @@ -198,7 +236,7 @@ mod tests {
#[test]
fn test_update_auth_config_none() {
let mut config = ClientConfig::new();
update_auth_config(&mut config, None, None);
update_auth_config(&mut config, None, None).unwrap();
// Should not set any security or SASL keys
assert!(config.get("security.protocol").is_none());
assert!(config.get("sasl.mechanisms").is_none());
Expand All @@ -215,7 +253,7 @@ mod tests {
client_cert_private_key: "CLIENT_KEY_DATA".to_string(),
}),
};
update_auth_config(&mut config, Some(tls), None);
update_auth_config(&mut config, Some(tls), None).unwrap();
let expected_config = [
("security.protocol", "SSL"),
("enable.ssl.certificate.verification", "false"),
Expand All @@ -236,7 +274,7 @@ mod tests {
username: "user".to_string(),
password: "pass".to_string(),
};
update_auth_config(&mut config, None, Some(auth));
update_auth_config(&mut config, None, Some(auth)).unwrap();
let expected_config = [
("security.protocol", "SASL_PLAINTEXT"),
("sasl.mechanisms", "PLAIN"),
Expand All @@ -256,7 +294,7 @@ mod tests {
username: "user256".to_string(),
password: "pass256".to_string(),
};
update_auth_config(&mut config, None, Some(auth));
update_auth_config(&mut config, None, Some(auth)).unwrap();
let expected_config = [
("security.protocol", "SASL_PLAINTEXT"),
("sasl.mechanisms", "SCRAM-SHA-256"),
Expand All @@ -276,7 +314,7 @@ mod tests {
username: "user512".to_string(),
password: "pass512".to_string(),
};
update_auth_config(&mut config, None, Some(auth));
update_auth_config(&mut config, None, Some(auth)).unwrap();
let expected_config = [
("security.protocol", "SASL_PLAINTEXT"),
("sasl.mechanisms", "SCRAM-SHA-512"),
Expand All @@ -299,9 +337,9 @@ mod tests {
password: None,
keytab: Some("/path/to/keytab".to_string()),
kerberos_config: Some("/path/to/krb5.conf".to_string()),
auth_type: "kinit".to_string(),
auth_type: GssAPIAuthType::KeyTab,
};
update_auth_config(&mut config, None, Some(auth));
update_auth_config(&mut config, None, Some(auth)).unwrap();
let expected_config = [
("security.protocol", "SASL_PLAINTEXT"),
("sasl.mechanisms", "GSSAPI"),
Expand All @@ -324,7 +362,7 @@ mod tests {
client_secret: "csecret".to_string(),
token_endpoint: "https://token".to_string(),
};
update_auth_config(&mut config, None, Some(auth));
update_auth_config(&mut config, None, Some(auth)).unwrap();
let expected_config = [
("security.protocol", "SASL_PLAINTEXT"),
("sasl.mechanisms", "OAUTHBEARER"),
Expand All @@ -350,7 +388,7 @@ mod tests {
username: "user".to_string(),
password: "pass".to_string(),
};
update_auth_config(&mut config, Some(tls), Some(auth));
update_auth_config(&mut config, Some(tls), Some(auth)).unwrap();
let expected_config = [
("security.protocol", "SASL_SSL"),
("sasl.mechanisms", "PLAIN"),
Expand Down
2 changes: 1 addition & 1 deletion rust/extns/numaflow-kafka/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub fn new_sink(config: KafkaSinkConfig) -> crate::Result<KafkaSink> {
.set("bootstrap.servers", config.brokers.join(","))
.set_log_level(RDKafkaLogLevel::Warning);

crate::update_auth_config(&mut client_config, config.tls, config.auth);
crate::update_auth_config(&mut client_config, config.tls, config.auth)?;

let producer: FutureProducer = client_config
.create()
Expand Down
2 changes: 1 addition & 1 deletion rust/extns/numaflow-kafka/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl KafkaActor {
.set("enable.auto.commit", "false")
.set_log_level(RDKafkaLogLevel::Warning);

crate::update_auth_config(&mut client_config, config.tls, config.auth);
crate::update_auth_config(&mut client_config, config.tls, config.auth)?;

let context = KafkaContext;
let consumer: Arc<NumaflowConsumer> =
Expand Down
16 changes: 12 additions & 4 deletions rust/numaflow-core/src/config/components.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use numaflow_kafka::TlsConfig;
use numaflow_kafka::{GssAPIAuthType, TlsConfig};
use numaflow_models::models::{Sasl, Tls};

use crate::Error;
Expand Down Expand Up @@ -1199,7 +1199,14 @@ fn parse_kafka_auth_config(
} else {
None
};
let auth_type = format!("{:?}", gssapi.auth_type);
let auth_type = match gssapi.auth_type {
numaflow_models::models::gssapi::AuthType::KeytabAuth => {
GssAPIAuthType::KeyTab
}
numaflow_models::models::gssapi::AuthType::UserAuth => {
GssAPIAuthType::UserAuth
}
};
Some(numaflow_kafka::KafkaSaslAuth::Gssapi {
service_name,
realm,
Expand Down Expand Up @@ -1847,6 +1854,7 @@ mod kafka_tests {
use super::sink::SinkType;
use super::source::SourceType;
use k8s_openapi::api::core::v1::SecretKeySelector;
use numaflow_kafka::GssAPIAuthType;
use numaflow_models::models::gssapi::AuthType;
use numaflow_models::models::{Gssapi, KafkaSource, Sasl, SaslPlain, SasloAuth, Tls};
use std::collections::HashMap;
Expand Down Expand Up @@ -2402,7 +2410,7 @@ mod kafka_tests {
assert_eq!(password, Some("test-pass".to_string()));
assert!(keytab.is_none());
assert!(kerberos_config.is_none());
assert_eq!(auth_type, "UserAuth");
assert_eq!(auth_type, GssAPIAuthType::UserAuth);
}
_ => panic!("Unexpected KafkaAuth variant"),
}
Expand Down Expand Up @@ -2475,7 +2483,7 @@ mod kafka_tests {
assert!(password.is_none());
assert_eq!(keytab, Some("test-keytab".to_string()));
assert!(kerberos_config.is_none());
assert_eq!(auth_type, "KeytabAuth");
assert_eq!(auth_type, GssAPIAuthType::KeyTab);
}
_ => panic!("Unexpected KafkaAuth variant"),
}
Expand Down
Loading