From 719ca4fa7f5427f02d7a8de20ea7c88afc9bd7a1 Mon Sep 17 00:00:00 2001 From: "[zylk] Aian Cantabrana" Date: Thu, 6 Aug 2020 10:01:43 +0200 Subject: [PATCH] =?UTF-8?q?configuraci=C3=B3n=20como=20argumentos=20kafka?= =?UTF-8?q?=20producer=20with=20kerberos?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kerberos/KafkaProducerWithKerberos.java | 95 +++++++++++++++++-- 1 file changed, 88 insertions(+), 7 deletions(-) diff --git a/src/main/java/net/zylklab/kafka/kerberos/KafkaProducerWithKerberos.java b/src/main/java/net/zylklab/kafka/kerberos/KafkaProducerWithKerberos.java index f76f8d7..264e0e5 100644 --- a/src/main/java/net/zylklab/kafka/kerberos/KafkaProducerWithKerberos.java +++ b/src/main/java/net/zylklab/kafka/kerberos/KafkaProducerWithKerberos.java @@ -3,24 +3,104 @@ import java.util.Date; import java.util.Properties; +import org.apache.commons.math3.exception.NullArgumentException; +import org.apache.flink.api.java.utils.ParameterTool; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaProducerWithKerberos { - private static final String KAFKA_BROKER = "enbarr001.bigdata.zylk.net:6667,enbarr002.bigdata.zylk.net:6667"; - private static final String KAFKA_PROTOCOL = "SASL_PLAINTEXT"; - private static final String KAFKA_TOPIC = "KERBEROS_JSON"; - private static final String KAFKA_KERBEROS_SERVICE_NAME = "kafka"; - private static final String JAAS_CLIENT_CONF = "/home/gus/git/flink/flink_cep_examples/external-resources/jaas/jaas-client.conf"; - private static final String KRB5_CONF = "/home/gus/git/flink/flink_cep_examples/external-resources/jaas/krb5.conf"; + private static final Logger _log = LoggerFactory.getLogger(KafkaProducerWithKerberos.class); + + private static String KAFKA_BROKER = "enbarr001.bigdata.zylk.net:6667,enbarr002.bigdata.zylk.net:6667"; + private static String KAFKA_PROTOCOL = "SASL_PLAINTEXT"; + private static String KAFKA_TOPIC = "KERBEROS_JSON"; + private static String KAFKA_KERBEROS_SERVICE_NAME = "kafka"; + private static String JAAS_CLIENT_CONF = "/home/gus/git/flink/flink_cep_examples/external-resources/jaas/jaas-client.conf"; + private static String KRB5_CONF = "/home/gus/git/flink/flink_cep_examples/external-resources/jaas/krb5.conf"; + private static String MSG = ""; + + private static final String KAFKA_BROKER_FLAG = "kafka-broker"; + private static final String KAFKA_PROTOCOL_FLAG = "kafka-protocol"; + private static final String KAFKA_TOPIC_FLAG = "kafka-topic"; + private static final String KAFKA_KERBEROS_SERVICE_NAME_FLAG = "kerberos-service-name"; + private static final String JAAS_CLIENT_CONF_FLAG = "jaas-client-conf"; + private static final String KRB5_CONF_FLAG = "krb5-conf"; + private static final String MSG_FLAG = "message"; + private static void usage() { + _log.error(String.format("Usage:")); + _log.error(String.format("$ java -jar JarExample.jar")); + _log.error(String.format("--%s\t%s", KAFKA_BROKER_FLAG, "Kafka brokers separated by commas")); + _log.error(String.format("--%s\t%s", KAFKA_PROTOCOL_FLAG, "Kafka protocol (SASL_PLAINTEXT)")); + _log.error(String.format("--%s\t%s", KAFKA_TOPIC_FLAG, "Kafka topic name")); + _log.error(String.format("--%s\t%s", KAFKA_KERBEROS_SERVICE_NAME_FLAG, "Kerberos service name")); + _log.error(String.format("--%s\t%s", JAAS_CLIENT_CONF_FLAG, "jaas client configuration file")); + _log.error(String.format("--%s\t%s", KRB5_CONF_FLAG, "KRB5 configuration file")); + _log.error(String.format("--%s\t%s", MSG_FLAG, "Message to send")); + } + public static void main(String[] args) { + + // Get configurations from arguments + ParameterTool parameters = ParameterTool.fromArgs(args); + if (parameters.has(KAFKA_BROKER_FLAG)) { + KAFKA_BROKER = parameters.get(KAFKA_BROKER_FLAG); + } else { + _log.error(String.format("Kafka broker is not specified (--%s)", KAFKA_BROKER_FLAG)); + usage(); + throw new NullArgumentException(); + } + if (parameters.has(KAFKA_PROTOCOL_FLAG)) { + KAFKA_PROTOCOL = parameters.get(KAFKA_PROTOCOL_FLAG); + } else { + _log.error(String.format("Kafka protocol is not specified (--%s)", KAFKA_PROTOCOL_FLAG)); + usage(); + throw new NullArgumentException(); + } + if (parameters.has(KAFKA_TOPIC_FLAG)) { + KAFKA_TOPIC = parameters.get(KAFKA_TOPIC_FLAG); + } else { + _log.error(String.format("Kafka topic is not specified (--%s)", KAFKA_TOPIC_FLAG)); + usage(); + throw new NullArgumentException(); + } + if (parameters.has(KAFKA_KERBEROS_SERVICE_NAME_FLAG)) { + KAFKA_KERBEROS_SERVICE_NAME = parameters.get(KAFKA_KERBEROS_SERVICE_NAME_FLAG); + } else { + _log.error(String.format("Kafka kerberos service name is not specified (--%s)", KAFKA_KERBEROS_SERVICE_NAME_FLAG)); + usage(); + throw new NullArgumentException(); + } + if (parameters.has(JAAS_CLIENT_CONF_FLAG)) { + JAAS_CLIENT_CONF = parameters.get(JAAS_CLIENT_CONF_FLAG); + } else { + _log.error(String.format("JAAS client configuration file is not specified (--%s)", JAAS_CLIENT_CONF_FLAG)); + usage(); + throw new NullArgumentException(); + } + if (parameters.has(KRB5_CONF_FLAG)) { + KRB5_CONF = parameters.get(KRB5_CONF_FLAG); + } else { + _log.error(String.format("KRB5 configuration file is not specified (--%s)", KRB5_CONF_FLAG)); + usage(); + throw new NullArgumentException(); + } + if (parameters.has(MSG_FLAG)) { + MSG = parameters.get(MSG_FLAG); + } else { + _log.error(String.format("Kafka message is not specified (--%s)", MSG_FLAG)); + usage(); + throw new NullArgumentException(); + } + System.setProperty("java.security.auth.login.config", JAAS_CLIENT_CONF); System.setProperty("java.security.krb5.conf", KRB5_CONF); System.setProperty("sun.security.krb5.debug", "true"); @@ -41,7 +121,8 @@ public static void main(String[] args) { String key = "key-" + 1; msg.setKey(key); - msg.setMsg("This is a message with key: " + key); +// msg.setMsg("This is a message with key: " + key); + msg.setMsg(MSG); msg.setTs(new Date().getTime()); ProducerRecord event = new ProducerRecord(KAFKA_TOPIC, key, new JSONObject(msg).toString() );