diff --git a/README.md b/README.md index 392e247..d2b3ec9 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,6 @@ Usage: --topic= --broker=... [--listen=] - [--trusted-cert= --client-cert= --client-cert-key=] connect-kafka -h | --help connect-kafka --version @@ -86,18 +85,12 @@ Below is an example to connect to a Heroku Kafka in a public space (via SSL): ```bash go get -u github.com/segment-integrations/connect-kafka heroku config:get KAFKA_URL -a kafka-integration-demo # copy the kafka broker urls into command below -heroku config:get KAFKA_TRUSTED_CERT -a kafka-integration-demo > kafka_trusted_cert.cer -heroku config:get KAFKA_CLIENT_CERT -a kafka-integration-demo > kafka_client_cert.cer -heroku config:get KAFKA_CLIENT_CERT_KEY -a kafka-integration-demo > kafka_client_key_cert.cer connect-kafka \ --debug \ --topic=segment \ --broker=kafka+ssl://ec2-51-16-10-109.compute-1.amazonaws.com:9096 \ --broker=kafka+ssl://ec2-62-7-61-181.compute-1.amazonaws.com:9096 \ - --broker=kafka+ssl://ec2-33-20-240-35.compute-1.amazonaws.com:9096 \ - --trusted-cert=kafka_trusted_cert.cer \ - --client-cert=kafka_client_cert.cer \ - --client-cert-key=kafka_client_key_cert.cer + --broker=kafka+ssl://ec2-33-20-240-35.compute-1.amazonaws.com:9096 ``` ### Setup Webhook diff --git a/integration.go b/integration.go index 60e9a6f..d376bda 100644 --- a/integration.go +++ b/integration.go @@ -1,94 +1,43 @@ package main import ( - "crypto/tls" - "crypto/x509" + "context" "io" "io/ioutil" - "github.com/Shopify/sarama" - log "github.com/Sirupsen/logrus" - "github.com/segment-integrations/connect-kafka/internal/kafka" - "github.com/tj/docopt" + "github.com/segmentio/kafka-go" ) +// KafkaIntegration holds open connection to Kafka writer and allows +// caller to produce messages to Kafka stream. type KafkaIntegration struct { - topic string - producer sarama.SyncProducer -} - -func (k *KafkaIntegration) newTLSFromConfig(m map[string]interface{}) *tls.Config { - trustedCertPath, _ := m["--trusted-cert"].(string) - clientCertPath, _ := m["--client-cert"].(string) - clientCertKeyPath, _ := m["--client-cert-key"].(string) - - if trustedCertPath == "" && clientCertPath == "" && clientCertKeyPath == "" { - return nil - } - - trustedCertBytes, err := ioutil.ReadFile(trustedCertPath) - if err != nil { - log.Fatal(err) - } - - clientCertBytes, err := ioutil.ReadFile(clientCertPath) - if err != nil { - log.Fatal(err) - } - - clientCertKeyBytes, err := ioutil.ReadFile(clientCertKeyPath) - if err != nil { - log.Fatal(err) - } - - cert, err := tls.X509KeyPair(clientCertBytes, clientCertKeyBytes) - if err != nil { - log.Fatal(err) - } - certPool := x509.NewCertPool() - certPool.AppendCertsFromPEM(trustedCertBytes) - - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{cert}, - InsecureSkipVerify: true, - RootCAs: certPool, - } - tlsConfig.BuildNameToCertificate() + producer *kafka.Writer - return tlsConfig + Brokers []string + Topic string } +// Init initializes the Kafka writer with brokers and topic func (k *KafkaIntegration) Init() error { - m, err := docopt.Parse(usage, nil, true, Version, false) - if err != nil { - return err - } - - kafkaConfig := &kafka.Config{BrokerAddresses: m["--broker"].([]string)} - kafkaConfig.TLSConfig = k.newTLSFromConfig(m) - - producer, err := kafka.NewProducer(kafkaConfig) - if err != nil { - return err - } - - k.producer = producer - k.topic = m["--topic"].(string) + k.producer = kafka.NewWriter(kafka.WriterConfig{ + Brokers: k.Brokers, + Topic: k.Topic, + Balancer: &kafka.LeastBytes{}, + }) return nil } +// Process writes data to the Kafka stream func (k *KafkaIntegration) Process(r io.ReadCloser) error { defer r.Close() + b, err := ioutil.ReadAll(r) if err != nil { return err } - _, _, err = k.producer.SendMessage(&sarama.ProducerMessage{ - Topic: k.topic, - Value: sarama.ByteEncoder(b), + return k.producer.WriteMessages(context.Background(), kafka.Message{ + Value: b, }) - - return err } diff --git a/internal/kafka/client.go b/internal/kafka/client.go deleted file mode 100644 index b9962a5..0000000 --- a/internal/kafka/client.go +++ /dev/null @@ -1,30 +0,0 @@ -package kafka - -import ( - "github.com/Shopify/sarama" - "github.com/satori/go.uuid" -) - -func NewProducer(c *Config) (sarama.SyncProducer, error) { - config := sarama.NewConfig() - config.Producer.Return.Errors = true - config.ClientID = uuid.NewV4().String() - - // TLS - if c.TLSConfig != nil { - config.Net.TLS.Config = c.TLSConfig - config.Net.TLS.Enable = true - } - - err := config.Validate() - if err != nil { - return nil, err - } - - producer, err := sarama.NewSyncProducer(c.getBrokers(), config) - if err != nil { - return nil, err - } - - return producer, nil -} diff --git a/internal/kafka/config.go b/internal/kafka/config.go deleted file mode 100644 index a5d5bb0..0000000 --- a/internal/kafka/config.go +++ /dev/null @@ -1,24 +0,0 @@ -package kafka - -import ( - "crypto/tls" - "log" - "net/url" -) - -type Config struct { - BrokerAddresses []string - TLSConfig *tls.Config -} - -func (c *Config) getBrokers() []string { - addrs := make([]string, len(c.BrokerAddresses)) - for i, v := range c.BrokerAddresses { - u, err := url.Parse(v) - if err != nil { - log.Fatal(err) - } - addrs[i] = u.Host - } - return addrs -} diff --git a/main.go b/main.go index 01c54b0..24c534a 100644 --- a/main.go +++ b/main.go @@ -3,19 +3,18 @@ package main import ( _ "net/http/pprof" + "github.com/Sirupsen/logrus" "github.com/segmentio/connect" + "github.com/tj/docopt" ) -const ( - Version = "0.0.1-beta" -) +const version = "0.0.1-beta" var usage = ` Usage: connect-kafka --topic= --broker=... - [--trusted-cert= --client-cert= --client-cert-key=] connect-kafka -h | --help connect-kafka --version @@ -27,5 +26,18 @@ Options: ` func main() { - connect.Run(&KafkaIntegration{}) + m, err := docopt.Parse(usage, nil, true, version, false) + if err != nil { + logrus.Fatalf("parse flags error: %v", err) + } + + var ( + brokers = m["--broker"].([]string) + topic = m["--topic"].(string) + ) + + connect.Run(&KafkaIntegration{ + Brokers: brokers, + Topic: topic, + }) }