Skip to content

Commit f5e0b4d

Browse files
tobiasakeemichaf
authored andcommitted
Fixed AMQP SSL/TLS connection between EI and RabbitMq. (#51)
* Fixed AMQP SSL/TLS connection between EI and RabbitMq. * Removed some connectionFactory code that was not needed. * Stepped version in pom file. * Change parameter name on tlsVer to tlsVersion. * Changed logging of ssl/tls exceptions.
1 parent a74e5a5 commit f5e0b4d

File tree

3 files changed

+188
-146
lines changed

3 files changed

+188
-146
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>com.github.ericsson</groupId>
55
<artifactId>eiffel-intelligence</artifactId>
6-
<version>0.0.2</version>
6+
<version>0.0.3</version>
77

88
<packaging>war</packaging>
99

src/main/java/com/ericsson/ei/rmqhandler/RmqHandler.java

+184-143
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,19 @@
1717
package com.ericsson.ei.rmqhandler;
1818

1919
import com.fasterxml.jackson.annotation.JsonIgnore;
20+
import com.rabbitmq.client.Channel;
21+
import com.rabbitmq.client.Connection;
22+
2023
import lombok.Getter;
2124
import lombok.Setter;
25+
26+
import java.io.IOException;
27+
import java.security.KeyManagementException;
28+
import java.security.NoSuchAlgorithmException;
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import java.util.concurrent.TimeoutException;
32+
2233
import org.slf4j.Logger;
2334
import org.slf4j.LoggerFactory;
2435
import org.springframework.amqp.core.AcknowledgeMode;
@@ -41,147 +52,177 @@
4152

4253
@Component
4354
public class RmqHandler {
44-
static Logger log = (Logger) LoggerFactory.getLogger(RmqHandler.class);
45-
46-
@Getter @Setter
47-
@Value("${rabbitmq.queue.durable}")
48-
private Boolean queueDurable;
49-
50-
@Getter @Setter
51-
@Value("${rabbitmq.host}")
52-
private String host;
53-
54-
@Getter @Setter
55-
@Value("${rabbitmq.exchange.name}")
56-
private String exchangeName;
57-
58-
@Getter @Setter
59-
@Value("${rabbitmq.port}")
60-
private Integer port;
61-
62-
@Getter @Setter
63-
@Value("${rabbitmq.tls}")
64-
private String tlsVer;
65-
66-
@JsonIgnore
67-
@Getter @Setter
68-
@Value("${rabbitmq.user}")
69-
private String user;
70-
71-
@JsonIgnore
72-
@Getter @Setter
73-
@Value("${rabbitmq.password}")
74-
private String password;
75-
76-
@Getter @Setter
77-
@Value("${rabbitmq.domainId}")
78-
private String domainId;
79-
80-
@Getter @Setter
81-
@Value("${rabbitmq.componentName}")
82-
private String componentName;
83-
84-
@Getter @Setter
85-
@Value("${rabbitmq.waitlist.queue.suffix}")
86-
private String waitlistSufix;
87-
88-
@Getter @Setter
89-
@Value("${rabbitmq.routing.key}")
90-
private String routingKey;
91-
92-
@Getter @Setter
93-
@Value("${rabbitmq.consumerName}")
94-
private String consumerName;
95-
96-
private RabbitTemplate rabbitTemplate;
97-
private CachingConnectionFactory factory;
98-
private SimpleMessageListenerContainer container;
99-
private SimpleMessageListenerContainer waitlistContainer;
100-
101-
@Bean
102-
ConnectionFactory connectionFactory() {
103-
factory = new CachingConnectionFactory(host, port);
104-
factory.setPublisherConfirms(true);
105-
factory.setPublisherReturns(true);
106-
if(user != null && user.length() !=0 && password != null && password.length() !=0) {
107-
factory.setUsername(user);
108-
factory.setPassword(password);
109-
}
110-
return factory;
111-
}
112-
113-
@Bean
114-
Queue queue() {
115-
return new Queue(getQueueName(), true);
116-
}
117-
118-
@Bean
119-
TopicExchange exchange() {
120-
return new TopicExchange(exchangeName);
121-
}
122-
123-
@Bean
124-
Binding binding(Queue queue, TopicExchange exchange) {
125-
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
126-
}
127-
128-
@Bean
129-
SimpleMessageListenerContainer bindToQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
130-
String queueName = getQueueName();
131-
MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
132-
container = new SimpleMessageListenerContainer();
133-
container.setConnectionFactory(factory);
134-
container.setQueueNames(queueName);
135-
container.setMessageListener(listenerAdapter);
136-
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
137-
return container;
138-
}
139-
140-
public String getQueueName() {
141-
String durableName = queueDurable ? "durable" : "transient";
142-
return domainId + "." + componentName + "." + consumerName + "." + durableName;
143-
}
144-
145-
public String getWaitlistQueueName() {
146-
String durableName = queueDurable ? "durable" : "transient";
147-
return domainId + "." + componentName + "." + consumerName + "." + durableName + "." + waitlistSufix;
148-
}
149-
150-
@Bean
151-
public RabbitTemplate rabbitMqTemplate() {
152-
if (rabbitTemplate == null) {
153-
if (factory != null) {
154-
rabbitTemplate = new RabbitTemplate(factory);
155-
} else {
156-
rabbitTemplate = new RabbitTemplate(connectionFactory());
157-
}
158-
159-
rabbitTemplate.setExchange(exchangeName);
160-
rabbitTemplate.setRoutingKey(routingKey);
161-
rabbitTemplate.setQueue(getQueueName());
162-
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
163-
@Override
164-
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
165-
log.info("Received confirm with result : {}", ack);
166-
}
167-
});
168-
}
169-
return rabbitTemplate;
170-
}
171-
172-
public void publishObjectToWaitlistQueue(String message) {
173-
log.info("publishing message to message bus...");
174-
rabbitMqTemplate().convertAndSend(message);
175-
}
176-
177-
public void close() {
178-
try {
179-
waitlistContainer.destroy();
180-
container.destroy();
181-
factory.destroy();
182-
} catch (Exception e) {
183-
log.info("exception occured while closing connections");
184-
log.info(e.getMessage(),e);
185-
}
186-
}
55+
static Logger log = (Logger) LoggerFactory.getLogger(RmqHandler.class);
56+
57+
@Getter
58+
@Setter
59+
@Value("${rabbitmq.queue.durable}")
60+
private Boolean queueDurable;
61+
62+
@Getter
63+
@Setter
64+
@Value("${rabbitmq.host}")
65+
private String host;
66+
67+
@Getter
68+
@Setter
69+
@Value("${rabbitmq.exchange.name}")
70+
private String exchangeName;
71+
72+
@Getter
73+
@Setter
74+
@Value("${rabbitmq.port}")
75+
private Integer port;
76+
77+
@Getter
78+
@Setter
79+
@Value("${rabbitmq.tlsVersion}")
80+
private String tlsVersion;
81+
82+
@JsonIgnore
83+
@Getter
84+
@Setter
85+
@Value("${rabbitmq.user}")
86+
private String user;
87+
88+
@JsonIgnore
89+
@Getter
90+
@Setter
91+
@Value("${rabbitmq.password}")
92+
private String password;
93+
94+
@Getter
95+
@Setter
96+
@Value("${rabbitmq.domainId}")
97+
private String domainId;
98+
99+
@Getter
100+
@Setter
101+
@Value("${rabbitmq.componentName}")
102+
private String componentName;
103+
104+
@Getter
105+
@Setter
106+
@Value("${rabbitmq.waitlist.queue.suffix}")
107+
private String waitlistSufix;
108+
109+
@Getter
110+
@Setter
111+
@Value("${rabbitmq.routing.key}")
112+
private String routingKey;
113+
114+
@Getter
115+
@Setter
116+
@Value("${rabbitmq.consumerName}")
117+
private String consumerName;
118+
119+
private RabbitTemplate rabbitTemplate;
120+
private CachingConnectionFactory factory;
121+
private SimpleMessageListenerContainer container;
122+
private SimpleMessageListenerContainer waitlistContainer;
123+
124+
@Bean
125+
ConnectionFactory connectionFactory() {
126+
com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
127+
connectionFactory.setHost(host);
128+
connectionFactory.setPort(port);
129+
if (user != null && user.length() != 0 && password != null && password.length() != 0) {
130+
connectionFactory.setUsername(user);
131+
connectionFactory.setPassword(password);
132+
}
133+
134+
if (tlsVersion != null && !tlsVersion.isEmpty()) {
135+
try {
136+
log.info("Using SSL/TLS version " + tlsVersion + " connection to RabbitMQ.");
137+
connectionFactory.useSslProtocol(tlsVersion);
138+
} catch (KeyManagementException e) {
139+
log.error("Failed to set SSL/TLS version.");
140+
log.error(e.getMessage(), e);
141+
} catch (NoSuchAlgorithmException e) {
142+
log.error("Failed to set SSL/TLS version.");
143+
log.error(e.getMessage(), e);
144+
}
145+
}
146+
147+
factory = new CachingConnectionFactory(connectionFactory);
148+
factory.setPublisherConfirms(true);
149+
factory.setPublisherReturns(true);
150+
151+
return factory;
152+
}
153+
154+
@Bean
155+
Queue queue() {
156+
return new Queue(getQueueName(), true);
157+
}
158+
159+
@Bean
160+
TopicExchange exchange() {
161+
return new TopicExchange(exchangeName);
162+
}
163+
164+
@Bean
165+
Binding binding(Queue queue, TopicExchange exchange) {
166+
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
167+
}
168+
169+
@Bean
170+
SimpleMessageListenerContainer bindToQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
171+
String queueName = getQueueName();
172+
MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
173+
container = new SimpleMessageListenerContainer();
174+
container.setConnectionFactory(factory);
175+
container.setQueueNames(queueName);
176+
container.setMessageListener(listenerAdapter);
177+
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
178+
return container;
179+
}
180+
181+
public String getQueueName() {
182+
String durableName = queueDurable ? "durable" : "transient";
183+
return domainId + "." + componentName + "." + consumerName + "." + durableName;
184+
}
185+
186+
public String getWaitlistQueueName() {
187+
String durableName = queueDurable ? "durable" : "transient";
188+
return domainId + "." + componentName + "." + consumerName + "." + durableName + "." + waitlistSufix;
189+
}
190+
191+
@Bean
192+
public RabbitTemplate rabbitMqTemplate() {
193+
if (rabbitTemplate == null) {
194+
if (factory != null) {
195+
rabbitTemplate = new RabbitTemplate(factory);
196+
} else {
197+
rabbitTemplate = new RabbitTemplate(connectionFactory());
198+
}
199+
200+
rabbitTemplate.setExchange(exchangeName);
201+
rabbitTemplate.setRoutingKey(routingKey);
202+
rabbitTemplate.setQueue(getQueueName());
203+
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
204+
@Override
205+
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
206+
log.info("Received confirm with result : {}", ack);
207+
}
208+
});
209+
}
210+
return rabbitTemplate;
211+
}
212+
213+
public void publishObjectToWaitlistQueue(String message) {
214+
log.info("publishing message to message bus...");
215+
rabbitMqTemplate().convertAndSend(message);
216+
}
217+
218+
public void close() {
219+
try {
220+
waitlistContainer.destroy();
221+
container.destroy();
222+
factory.destroy();
223+
} catch (Exception e) {
224+
log.info("exception occured while closing connections");
225+
log.info(e.getMessage(), e);
226+
}
227+
}
187228
}

src/main/resources/application.properties

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ rabbitmq.host: localhost
1515
rabbitmq.port: 5672
1616
rabbitmq.user:
1717
rabbitmq.password:
18-
rabbitmq.tls:
18+
# Valid TLS versions: 'TLSv1.2', 'TLSv1.1', 'TLSv1', 'TLS', 'SSLv3', 'SSLv2', 'SSL'
19+
rabbitmq.tlsVersion:
1920
rabbitmq.exchange.name: ei-poc-4
2021
rabbitmq.domainId: er001-eiffelxxx
2122
rabbitmq.componentName: eiffelintelligence
@@ -57,4 +58,4 @@ spring.mail.port:
5758
spring.mail.properties.mail.smtp.auth: false
5859
#spring.mail.properties.mail.smtp.starttls.enable: false
5960

60-
er.url: http://localhost:8080/search/
61+
er.url: http://localhost:8080/search/

0 commit comments

Comments
 (0)