Skip to content

Leverage the Segment kafka-go writer #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ Usage:
--topic=<topic>
--broker=<url>...
[--listen=<addr>]
[--trusted-cert=<path> --client-cert=<path> --client-cert-key=<path>]
connect-kafka -h | --help
connect-kafka --version

Expand All @@ -86,18 +85,12 @@ Below is an example to connect to a Heroku Kafka in a public space (via SSL):
```bash
go get -u github.com/segment-integrations/connect-kafka
heroku config:get KAFKA_URL -a kafka-integration-demo # copy the kafka broker urls into command below
heroku config:get KAFKA_TRUSTED_CERT -a kafka-integration-demo > kafka_trusted_cert.cer
heroku config:get KAFKA_CLIENT_CERT -a kafka-integration-demo > kafka_client_cert.cer
heroku config:get KAFKA_CLIENT_CERT_KEY -a kafka-integration-demo > kafka_client_key_cert.cer
connect-kafka \
--debug \
--topic=segment \
--broker=kafka+ssl://ec2-51-16-10-109.compute-1.amazonaws.com:9096 \
--broker=kafka+ssl://ec2-62-7-61-181.compute-1.amazonaws.com:9096 \
--broker=kafka+ssl://ec2-33-20-240-35.compute-1.amazonaws.com:9096 \
--trusted-cert=kafka_trusted_cert.cer \
--client-cert=kafka_client_cert.cer \
--client-cert-key=kafka_client_key_cert.cer
--broker=kafka+ssl://ec2-33-20-240-35.compute-1.amazonaws.com:9096
```

### Setup Webhook
Expand Down
85 changes: 17 additions & 68 deletions integration.go
Original file line number Diff line number Diff line change
@@ -1,94 +1,43 @@
package main

import (
"crypto/tls"
"crypto/x509"
"context"
"io"
"io/ioutil"

"github.com/Shopify/sarama"
log "github.com/Sirupsen/logrus"
"github.com/segment-integrations/connect-kafka/internal/kafka"
"github.com/tj/docopt"
"github.com/segmentio/kafka-go"
)

// KafkaIntegration holds open connection to Kafka writer and allows
// caller to produce messages to Kafka stream.
type KafkaIntegration struct {
topic string
producer sarama.SyncProducer
}

func (k *KafkaIntegration) newTLSFromConfig(m map[string]interface{}) *tls.Config {
trustedCertPath, _ := m["--trusted-cert"].(string)
clientCertPath, _ := m["--client-cert"].(string)
clientCertKeyPath, _ := m["--client-cert-key"].(string)

if trustedCertPath == "" && clientCertPath == "" && clientCertKeyPath == "" {
return nil
}

trustedCertBytes, err := ioutil.ReadFile(trustedCertPath)
if err != nil {
log.Fatal(err)
}

clientCertBytes, err := ioutil.ReadFile(clientCertPath)
if err != nil {
log.Fatal(err)
}

clientCertKeyBytes, err := ioutil.ReadFile(clientCertKeyPath)
if err != nil {
log.Fatal(err)
}

cert, err := tls.X509KeyPair(clientCertBytes, clientCertKeyBytes)
if err != nil {
log.Fatal(err)
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(trustedCertBytes)

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: true,
RootCAs: certPool,
}
tlsConfig.BuildNameToCertificate()
producer *kafka.Writer

return tlsConfig
Brokers []string
Topic string
}

// Init initializes the Kafka writer with brokers and topic
func (k *KafkaIntegration) Init() error {
m, err := docopt.Parse(usage, nil, true, Version, false)
if err != nil {
return err
}

kafkaConfig := &kafka.Config{BrokerAddresses: m["--broker"].([]string)}
kafkaConfig.TLSConfig = k.newTLSFromConfig(m)

producer, err := kafka.NewProducer(kafkaConfig)
if err != nil {
return err
}

k.producer = producer
k.topic = m["--topic"].(string)
k.producer = kafka.NewWriter(kafka.WriterConfig{
Brokers: k.Brokers,
Topic: k.Topic,
Balancer: &kafka.LeastBytes{},
})

return nil
}

// Process writes data to the Kafka stream
func (k *KafkaIntegration) Process(r io.ReadCloser) error {
defer r.Close()

b, err := ioutil.ReadAll(r)
if err != nil {
return err
}

_, _, err = k.producer.SendMessage(&sarama.ProducerMessage{
Topic: k.topic,
Value: sarama.ByteEncoder(b),
return k.producer.WriteMessages(context.Background(), kafka.Message{
Value: b,
})

return err
}
30 changes: 0 additions & 30 deletions internal/kafka/client.go

This file was deleted.

24 changes: 0 additions & 24 deletions internal/kafka/config.go

This file was deleted.

22 changes: 17 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@ package main
import (
_ "net/http/pprof"

"github.com/Sirupsen/logrus"
"github.com/segmentio/connect"
"github.com/tj/docopt"
)

const (
Version = "0.0.1-beta"
)
const version = "0.0.1-beta"

var usage = `
Usage:
connect-kafka
--topic=<topic>
--broker=<url>...
[--trusted-cert=<path> --client-cert=<path> --client-cert-key=<path>]
connect-kafka -h | --help
connect-kafka --version

Expand All @@ -27,5 +26,18 @@ Options:
`

func main() {
connect.Run(&KafkaIntegration{})
m, err := docopt.Parse(usage, nil, true, version, false)
if err != nil {
logrus.Fatalf("parse flags error: %v", err)
}

var (
brokers = m["--broker"].([]string)
topic = m["--topic"].(string)
)

connect.Run(&KafkaIntegration{
Brokers: brokers,
Topic: topic,
})
}