This repository has been archived by the owner on May 19, 2022. It is now read-only.
forked from carbonblack/cb-event-forwarder
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathamqp.go
126 lines (103 loc) · 2.73 KB
/
amqp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"io/ioutil"
)
/*
* AMQP bookkeeping
*/
func NewConsumer(amqpURI, queueName string, autoDelete bool, ctag string, bindToRawExchange bool,
routingKeys []string) (*Consumer, <-chan amqp.Delivery, error) {
c := &Consumer{
conn: nil,
channel: nil,
tag: ctag,
}
var err error
if config.AMQPTLSEnabled == true {
log.Info("Connecting to message bus via TLS...")
cfg := new(tls.Config)
// Check if we have client SSL config to use and load it
if config.AMQPTLSCACert != "" && config.AMQPTLSClientCert != "" && config.AMQPTLSClientKey != "" {
caCert, err := ioutil.ReadFile(config.AMQPTLSCACert)
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
cfg.RootCAs = caCertPool
cert, err := tls.LoadX509KeyPair(config.AMQPTLSClientCert, config.AMQPTLSClientKey)
if err != nil {
log.Fatal(err)
}
cfg.Certificates = []tls.Certificate{cert}
}
cfg.InsecureSkipVerify = true
c.conn, err = amqp.DialTLS(amqpURI, cfg)
if err != nil {
return nil, nil, fmt.Errorf("Dial: %s", err)
}
} else {
log.Info("Connecting to message bus...")
c.conn, err = amqp.Dial(amqpURI)
if err != nil {
return nil, nil, fmt.Errorf("Dial: %s", err)
}
}
c.channel, err = c.conn.Channel()
if err != nil {
return nil, nil, fmt.Errorf("Channel: %s", err)
}
queue, err := c.channel.QueueDeclare(
queueName,
false, // durable,
autoDelete, // delete when unused
false, // exclusive
false, // nowait
nil, // arguments
)
if err != nil {
return nil, nil, fmt.Errorf("Queue declare: %s", err)
}
if bindToRawExchange {
err = c.channel.QueueBind(queueName, "", "api.rawsensordata", false, nil)
if err != nil {
return nil, nil, fmt.Errorf("QueueBind: %s", err)
}
log.Info("Subscribed to bulk raw sensor event exchange")
}
for _, key := range routingKeys {
err = c.channel.QueueBind(queueName, key, "api.events", false, nil)
if err != nil {
return nil, nil, fmt.Errorf("QueueBind: %s", err)
}
log.Infof("Subscribed to %s", key)
}
deliveries, err := c.channel.Consume(
queue.Name,
c.tag,
true, // automatic ack
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return nil, nil, fmt.Errorf("Queue consume: %s", err)
}
return c, deliveries, nil
}
func (c *Consumer) Shutdown() error {
if err := c.channel.Cancel(c.tag, true); err != nil {
return fmt.Errorf("Consumer cancel failed: %s", err)
}
if err := c.conn.Close(); err != nil {
return fmt.Errorf("AMQP connection close error: %s", err)
}
defer log.Infof("AMQP shutdown OK")
return nil
}