-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpusher.go
113 lines (98 loc) · 2.35 KB
/
pusher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main
import (
"log"
"bytes"
"io/ioutil"
"net/http"
"github.com/streadway/amqp"
"github.com/spf13/viper"
)
type trigger struct {
Source string
Endpoint string
Sink string
}
type config struct {
Url string
Concurrency int // not sure if we need this in go at all
Triggers []trigger
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func parseConfig() (cfg config) {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
failOnError(viper.ReadInConfig(), "Config could not be loaded")
failOnError(viper.Unmarshal(&cfg), "Config could not be unmarshaled")
return
}
func declareQueue(name string, ch *amqp.Channel) {
if name != "" {
_, err := ch.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Println("Declared queue:", name)
}
}
func process(t trigger, ch *amqp.Channel, msg []byte) {
r := bytes.NewReader(msg)
resp, err := http.Post(t.Endpoint, "application/json", r)
failOnError(err, "Failed to send json to the endpoint")
defer resp.Body.Close()
// todo: check status code and nack the message
// (resp.StatusCode == http.StatusOK)
if t.Sink != "" {
body, err := ioutil.ReadAll(resp.Body)
failOnError(err, "Could not read response body")
data := amqp.Publishing {
ContentType: "text/plain",
Body: body,
}
err = ch.Publish(
"", // exchange
t.Sink, // routing key
false, // mandatory
false, // immediate
data)
failOnError(err, "Failed to publish response")
}
}
func listen(t trigger, ch *amqp.Channel) {
msgs, err := ch.Consume(
t.Source, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
log.Println("Listening on:", t.Source)
for msg := range msgs {
go process(t, ch, msg.Body)
}
}
func main() {
cfg := parseConfig()
conn, err := amqp.Dial(cfg.Url)
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
for _, t := range cfg.Triggers {
declareQueue(t.Source, ch)
declareQueue(t.Sink, ch)
go listen(t, ch)
}
for {}
}