Skip to content

Commit 044c705

Browse files
committed
initial commit
0 parents  commit 044c705

File tree

15 files changed

+443
-0
lines changed

15 files changed

+443
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.idea
2+
cmd

Makefile

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
GOBIN=./cmd/main
2+
3+
## build: Build go binary
4+
build:
5+
go build -o $(GOBIN)
6+
7+
## run: Run go server
8+
.PHONY: run
9+
run:
10+
$(GOBIN)
11+
12+
## get: Run go get missing dependencies
13+
get:
14+
go get ./...
15+
16+
## deploy: Run commands to deploy app to container
17+
deploy:
18+
make get
19+
GOOS=linux GOARCH=amd64 go build \
20+
-ldflags='-w -s -extldflags "-static"' -a \
21+
-o /go/bin/main .
22+
23+
.PHONY: help
24+
all: help
25+
help: Makefile
26+
@echo
27+
@echo " Choose a command"
28+
@echo
29+
@sed -n 's/^##//p' $< | column -t -s ':' | sed -e 's/^/ /'
30+
@echo
31+
32+
.DEFAULT_GOAL := help

README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Kafka server client example
2+
## Installation
3+
### Docker
4+
You need pre install kafka and zookeeper in docker.
5+
Enter at deploy folder and run
6+
```sh
7+
$ docker-compose up -d
8+
```
9+
This command automatically installs kafka and zookeeper and starts them.
10+
Kafka default expose port `<you docker ip>:9092`
11+
### Go cleint and server
12+
For install all dependencies run
13+
```sh
14+
$ make get
15+
```
16+
Then build your binary
17+
```sh
18+
$ make build
19+
```
20+
21+
## Run
22+
For starting consumer start your binary
23+
```sh
24+
$ ./cmd/main --act consumer
25+
```
26+
For Windows:
27+
```sh
28+
$ ./cmd/main.exe --act consumer
29+
```
30+
For starting producer
31+
```sh
32+
$ ./cmd/main --act producer
33+
```
34+
For Windows:
35+
```sh
36+
$ ./cmd/main.exe --act producer
37+
```
38+
## Usage
39+
For publish message at broker in producer cli simply write next command to cli
40+
```sh
41+
$ send###<your message here>
42+
```
43+
`send` this is event. So far, there is one event in the app.
44+
45+
Then in consumer cli you will see that message delivered.
46+

client/client.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package client
2+
3+
import (
4+
"kafka-server-client/client/consumer"
5+
"kafka-server-client/configuration"
6+
)
7+
8+
func Start(brokers []string, topic ...string) {
9+
cons := consumer.New(brokers, configuration.NewKafkaConfiguration())
10+
consumer.Start(cons, topic...)
11+
}

client/consumer/consumer.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package consumer
2+
3+
import (
4+
"bufio"
5+
"encoding/json"
6+
"fmt"
7+
"github.com/Shopify/sarama"
8+
"kafka-server-client/events"
9+
"log"
10+
"os"
11+
"strings"
12+
)
13+
14+
func New(brokers []string, config ...*sarama.Config) sarama.Consumer {
15+
var conf *sarama.Config
16+
if len(config) > 0 {
17+
conf = config[0]
18+
}
19+
cons, err := sarama.NewConsumer(brokers, conf)
20+
if err != nil {
21+
log.Println("NEW FUNC")
22+
log.Fatal(err)
23+
}
24+
25+
return cons
26+
}
27+
28+
func Consume(consumer sarama.PartitionConsumer) {
29+
for {
30+
select {
31+
case err := <- consumer.Errors():
32+
fmt.Println(err)
33+
case msg := <-consumer.Messages():
34+
message := msg.Value
35+
var logs events.Event
36+
if err := json.Unmarshal(message, &logs); err != nil {
37+
fmt.Printf("can not unmarshal value: %v", string(message))
38+
} else {
39+
switch logs.Type {
40+
case "send":
41+
var event events.MessageEvent
42+
if err := json.Unmarshal(message, &event); err != nil {
43+
log.Println(err)
44+
}
45+
fmt.Printf("new message: %s", event.Message)
46+
default:
47+
fmt.Println("unknown command")
48+
}
49+
}
50+
}
51+
}
52+
}
53+
54+
func Start(consumer sarama.Consumer, topic ...string) {
55+
defer consumer.Close()
56+
57+
go ConsumeAllTopics(topic, consumer)
58+
59+
fmt.Println("press ENTER to exit\n")
60+
bufio.NewReader(os.Stdin).ReadString('\n')
61+
fmt.Println("terminating...")
62+
}
63+
64+
func ConsumeAllTopics(topics []string, master sarama.Consumer) {
65+
for _, topic := range topics {
66+
if strings.Contains(topic, "__consumer_offsets") {
67+
continue
68+
}
69+
partitions, _ := master.Partitions(topic)
70+
// this only consumes partition no 1, you would probably want to consume all partitions
71+
consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
72+
if nil != err {
73+
fmt.Printf("Topic %v Partitions: %v", topic, partitions)
74+
log.Fatal(err)
75+
}
76+
fmt.Println(" Start consuming topic ", topic)
77+
go func(topic string, consumer sarama.PartitionConsumer) {
78+
for {
79+
select {
80+
case consumerError := <-consumer.Errors():
81+
fmt.Println("consumerError: ", consumerError.Err)
82+
83+
case msg := <-consumer.Messages():
84+
fmt.Println("Got message on topic ", topic, string(msg.Value))
85+
}
86+
}
87+
}(topic, consumer)
88+
}
89+
}

configuration/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package configuration
2+
3+
import "github.com/Shopify/sarama"
4+
5+
func NewKafkaConfiguration() *sarama.Config {
6+
conf := sarama.NewConfig()
7+
conf.ClientID = "kafka-server-client"
8+
conf.Producer.RequiredAcks = sarama.WaitForAll
9+
conf.Producer.Return.Successes = true
10+
conf.Consumer.Return.Errors = true
11+
conf.ChannelBufferSize = 1
12+
conf.Version = sarama.V0_10_1_0
13+
14+
return conf
15+
}

deploy/docker-compose.yml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
version: '3.5'
2+
3+
services:
4+
zookeeper:
5+
image: wurstmeister/zookeeper
6+
container_name: zookeeper
7+
ports:
8+
- "2181:2181"
9+
environment:
10+
ZOOKEEPER_CLIENT_PORT: 2181
11+
12+
kafka:
13+
image: wurstmeister/kafka
14+
ports:
15+
- "9092:9092"
16+
expose:
17+
- "9093"
18+
hostname: kafka
19+
environment:
20+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
21+
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://192.168.99.100:9092
22+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
23+
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
24+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
25+
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
26+
depends_on:
27+
- zookeeper
28+

events/event.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package events
2+
3+
type Event struct {
4+
ID int `json:"ID,omitempty"`
5+
Type string `json:"Type,omitempty"`
6+
}

events/new_message_event.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package events
2+
3+
type MessageEvent struct {
4+
Event
5+
Message string `json:"Message,omitempty"`
6+
}
7+
8+
func (m MessageEvent) Process() error {
9+
return nil
10+
}
11+
12+
func NewMessageEvent(message string) MessageEvent {
13+
return MessageEvent{
14+
Event: Event{Type: "send"},
15+
Message: message,
16+
}
17+
}

events/processor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package events
2+
3+
type Processor interface {
4+
Process() error
5+
}

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module kafka-server-client
2+
3+
go 1.15
4+
5+
require github.com/Shopify/sarama v1.27.2

go.sum

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
github.com/Shopify/sarama v1.27.2 h1:1EyY1dsxNDUQEv0O/4TsjosHI2CgB1uo9H/v56xzTxc=
2+
github.com/Shopify/sarama v1.27.2/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt/Mc93II=
3+
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
4+
github.com/confluentinc/confluent-kafka-go v1.5.2 h1:l+qt+a0Okmq0Bdr1P55IX4fiwFJyg0lZQmfHkAFkv7E=
5+
github.com/confluentinc/confluent-kafka-go v1.5.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
6+
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
7+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
8+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
9+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
10+
github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
11+
github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
12+
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
13+
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
14+
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
15+
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
16+
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
17+
github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
18+
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
19+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
20+
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
21+
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
22+
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
23+
github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
24+
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
25+
github.com/klauspost/compress v1.11.0 h1:wJbzvpYMVGG9iTI9VxpnNZfd4DzMPoCWze3GgSqz8yg=
26+
github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
27+
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
28+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
29+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
30+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
31+
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
32+
github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
33+
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
34+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
35+
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
36+
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
37+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
38+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
39+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
40+
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
41+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
42+
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
43+
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM=
44+
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
45+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
46+
golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA=
47+
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
48+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
49+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
50+
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
51+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
52+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
53+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
54+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
55+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
56+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
57+
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
58+
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.5.2 h1:g0WBLy6fobNUU8W/e9zx6I0Yl79Ya+BDW1NwzAlTiiQ=
59+
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.5.2/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY=
60+
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
61+
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
62+
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
63+
gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
64+
gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4=
65+
gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg=
66+
gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
67+
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
68+
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
69+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
70+
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

main.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"github.com/Shopify/sarama"
6+
"kafka-server-client/client"
7+
"kafka-server-client/server"
8+
"log"
9+
"os"
10+
)
11+
12+
var (
13+
brokers = []string { "192.168.99.100:9092" }
14+
topic = "chart1"
15+
topics = []string { topic, "chart2" }
16+
)
17+
18+
func main() {
19+
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
20+
act := flag.String("act", "consumer", "producer or consumer")
21+
flag.Parse()
22+
23+
switch *act {
24+
case "producer":
25+
server.Start(brokers, topic)
26+
case "consumer":
27+
client.Start(brokers, topics...)
28+
}
29+
}

0 commit comments

Comments
 (0)