Skip to content

Commit 0166d26

Browse files
fernandobalieirodavsclaus
authored andcommitted
feat(azure-servicebus): Ensure Content-Type header is propagated into AMQP Message.
1 parent 670fce1 commit 0166d26

File tree

3 files changed

+73
-0
lines changed

3 files changed

+73
-0
lines changed

components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.azure.messaging.servicebus.ServiceBusMessage;
2525
import org.apache.camel.util.ObjectHelper;
2626

27+
import static org.apache.camel.Exchange.CONTENT_TYPE;
28+
2729
public final class ServiceBusUtils {
2830

2931
private ServiceBusUtils() {
@@ -44,6 +46,11 @@ public static ServiceBusMessage createServiceBusMessage(
4446
}
4547
if (applicationProperties != null) {
4648
serviceBusMessage.getRawAmqpMessage().getApplicationProperties().putAll(applicationProperties);
49+
50+
final Object contentType = serviceBusMessage.getRawAmqpMessage().getApplicationProperties().get(CONTENT_TYPE);
51+
if (contentType != null) {
52+
serviceBusMessage.getRawAmqpMessage().getProperties().setContentType(contentType.toString());
53+
}
4754
}
4855
if (ObjectHelper.isNotEmpty(correlationId)) {
4956
serviceBusMessage.setCorrelationId(correlationId);

components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,23 @@
2020
import java.util.Arrays;
2121
import java.util.LinkedList;
2222
import java.util.List;
23+
import java.util.Map;
2324
import java.util.stream.StreamSupport;
2425

2526
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
2627
import com.azure.messaging.servicebus.ServiceBusMessage;
2728
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
2829
import com.azure.messaging.servicebus.ServiceBusSenderClient;
30+
import org.apache.camel.Exchange;
2931
import org.junit.jupiter.api.Test;
3032

33+
import static org.assertj.core.api.Assertions.assertThat;
3134
import static org.junit.jupiter.api.Assertions.*;
3235

3336
public class ServiceBusUtilsTest {
3437

38+
private static final String APPLICATION_JSON_CONTENT_TYPE = "application/json";
39+
3540
@Test
3641
void testCreateServiceBusMessage() {
3742
// test string
@@ -137,6 +142,55 @@ void testCreateServiceBusMessagesWithSession() {
137142
.anyMatch(record -> record.getSessionId().equals("session-2")));
138143
}
139144

145+
@Test
146+
void testCreateServiceBusMessageWithContentType() {
147+
final Map<String, Object> applicationProperties = Map.of(
148+
Exchange.CONTENT_TYPE, APPLICATION_JSON_CONTENT_TYPE);
149+
150+
final ServiceBusMessage message
151+
= ServiceBusUtils.createServiceBusMessage("test string", applicationProperties, null, null);
152+
153+
assertEquals("test string", message.getBody().toString());
154+
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
155+
}
156+
157+
@Test
158+
void testCreateServiceBusMessagesWithContentType() {
159+
final Map<String, Object> applicationProperties = Map.of(
160+
Exchange.CONTENT_TYPE, APPLICATION_JSON_CONTENT_TYPE);
161+
162+
final List<String> inputMessages = new LinkedList<>();
163+
inputMessages.add("test data");
164+
inputMessages.add(String.valueOf(12345));
165+
166+
final Iterable<ServiceBusMessage> busMessages
167+
= ServiceBusUtils.createServiceBusMessages(inputMessages, applicationProperties, null, null);
168+
169+
assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
170+
.anyMatch(record -> record.getBody().toString().equals("test data")));
171+
assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
172+
.anyMatch(record -> record.getBody().toString().equals("12345")));
173+
assertThat(StreamSupport.stream(busMessages.spliterator(), false))
174+
.allMatch(message -> APPLICATION_JSON_CONTENT_TYPE.equals(message.getContentType()));
175+
176+
//Test bytes
177+
final List<byte[]> inputMessages2 = new LinkedList<>();
178+
byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
179+
byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
180+
inputMessages2.add(byteBody1);
181+
inputMessages2.add(byteBody2);
182+
183+
final Iterable<ServiceBusMessage> busMessages2
184+
= ServiceBusUtils.createServiceBusMessages(inputMessages2, applicationProperties, null, null);
185+
186+
assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
187+
.anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody1)));
188+
assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
189+
.anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody2)));
190+
assertThat(StreamSupport.stream(busMessages.spliterator(), false))
191+
.allMatch(message -> APPLICATION_JSON_CONTENT_TYPE.equals(message.getContentType()));
192+
}
193+
140194
@Test
141195
void validateConfigurationMissingCredentials() {
142196
assertThrows(IllegalArgumentException.class,

components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
2626
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
2727
import org.apache.camel.CamelContext;
28+
import org.apache.camel.Exchange;
2829
import org.apache.camel.ProducerTemplate;
2930
import org.apache.camel.builder.RouteBuilder;
3031
import org.apache.camel.component.azure.servicebus.ServiceBusConstants;
@@ -47,6 +48,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport {
4748
private static final String DIRECT_SEND_TO_SESSION_QUEUE_URI = "direct:sendToQueueSessions";
4849
private static final Map<String, Object> PROPAGATED_HEADERS = new HashMap<>();
4950
private static final Pattern MESSAGE_BODY_PATTERN = Pattern.compile("^message-[0-4]$");
51+
private static final String APPLICATION_JSON_CONTENT_TYPE = "application/json";
5052

5153
static {
5254
PROPAGATED_HEADERS.put("booleanHeader", true);
@@ -60,6 +62,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport {
6062
PROPAGATED_HEADERS.put("stringHeader", "stringHeader");
6163
PROPAGATED_HEADERS.put("timestampHeader", new Date());
6264
PROPAGATED_HEADERS.put("uuidHeader", UUID.randomUUID());
65+
PROPAGATED_HEADERS.put(Exchange.CONTENT_TYPE, APPLICATION_JSON_CONTENT_TYPE);
6366
}
6467

6568
private ProducerTemplate producerTemplate;
@@ -120,6 +123,7 @@ void camelSendsMessageToServiceBusQueue() throws InterruptedException {
120123
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
121124
Map<String, Object> applicationProperties = message.getApplicationProperties();
122125
assertEquals(PROPAGATED_HEADERS, applicationProperties);
126+
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
123127
});
124128
}
125129
}
@@ -148,6 +152,7 @@ void camelSendsMessageBatchToServiceBusQueue() throws InterruptedException {
148152
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
149153
Map<String, Object> applicationProperties = message.getApplicationProperties();
150154
assertEquals(PROPAGATED_HEADERS, applicationProperties);
155+
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
151156
});
152157
}
153158
}
@@ -170,6 +175,7 @@ void camelSendsMessageToServiceBusTopic() throws InterruptedException {
170175
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
171176
Map<String, Object> applicationProperties = message.getApplicationProperties();
172177
assertEquals(PROPAGATED_HEADERS, applicationProperties);
178+
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
173179
});
174180
}
175181
}
@@ -194,6 +200,7 @@ void camelSchedulesServiceBusMessage() throws InterruptedException {
194200
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
195201
Map<String, Object> applicationProperties = message.getApplicationProperties();
196202
assertEquals(PROPAGATED_HEADERS, applicationProperties);
203+
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
197204
assertInstanceOf(OffsetDateTime.class, message.getScheduledEnqueueTime());
198205
});
199206
}
@@ -225,6 +232,7 @@ void camelSchedulesServiceBusMessageBatch() throws InterruptedException {
225232
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
226233
Map<String, Object> applicationProperties = message.getApplicationProperties();
227234
assertEquals(PROPAGATED_HEADERS, applicationProperties);
235+
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
228236
assertInstanceOf(OffsetDateTime.class, message.getScheduledEnqueueTime());
229237
});
230238
}
@@ -252,6 +260,7 @@ void camelSendsMessageToServiceBusSessionEnabledQueue() throws InterruptedExcept
252260
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
253261
Map<String, Object> applicationProperties = message.getApplicationProperties();
254262
assertEquals(PROPAGATED_HEADERS, applicationProperties);
263+
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
255264
});
256265
}
257266
}
@@ -280,6 +289,7 @@ void camelSendsMessageBatchToServiceBusSessionEnabledQueue() throws InterruptedE
280289
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
281290
Map<String, Object> applicationProperties = message.getApplicationProperties();
282291
assertEquals(PROPAGATED_HEADERS, applicationProperties);
292+
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
283293
});
284294
}
285295
}
@@ -324,6 +334,7 @@ void camelSchedulesServiceBusMessageWithSessions() throws InterruptedException {
324334
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
325335
Map<String, Object> applicationProperties = message.getApplicationProperties();
326336
assertEquals(PROPAGATED_HEADERS, applicationProperties);
337+
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
327338
assertInstanceOf(OffsetDateTime.class, message.getScheduledEnqueueTime());
328339
});
329340
}
@@ -355,6 +366,7 @@ void camelSchedulesServiceBusMessageBatchWIthSessions() throws InterruptedExcept
355366
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
356367
Map<String, Object> applicationProperties = message.getApplicationProperties();
357368
assertEquals(PROPAGATED_HEADERS, applicationProperties);
369+
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
358370
assertInstanceOf(OffsetDateTime.class, message.getScheduledEnqueueTime());
359371
});
360372
}

0 commit comments

Comments
 (0)