Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.azure.messaging.servicebus.ServiceBusMessage;
import org.apache.camel.util.ObjectHelper;

import static org.apache.camel.Exchange.CONTENT_TYPE;

public final class ServiceBusUtils {

private ServiceBusUtils() {
Expand All @@ -44,6 +46,11 @@ public static ServiceBusMessage createServiceBusMessage(
}
if (applicationProperties != null) {
serviceBusMessage.getRawAmqpMessage().getApplicationProperties().putAll(applicationProperties);

final Object contentType = serviceBusMessage.getRawAmqpMessage().getApplicationProperties().get(CONTENT_TYPE);
if (contentType != null) {
serviceBusMessage.getRawAmqpMessage().getProperties().setContentType(contentType.toString());
}
}
if (ObjectHelper.isNotEmpty(correlationId)) {
serviceBusMessage.setCorrelationId(correlationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,23 @@
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.StreamSupport;

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import org.apache.camel.Exchange;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;

public class ServiceBusUtilsTest {

private static final String APPLICATION_JSON_CONTENT_TYPE = "application/json";

@Test
void testCreateServiceBusMessage() {
// test string
Expand Down Expand Up @@ -137,6 +142,55 @@ void testCreateServiceBusMessagesWithSession() {
.anyMatch(record -> record.getSessionId().equals("session-2")));
}

@Test
void testCreateServiceBusMessageWithContentType() {
final Map<String, Object> applicationProperties = Map.of(
Exchange.CONTENT_TYPE, APPLICATION_JSON_CONTENT_TYPE);

final ServiceBusMessage message
= ServiceBusUtils.createServiceBusMessage("test string", applicationProperties, null, null);

assertEquals("test string", message.getBody().toString());
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
}

@Test
void testCreateServiceBusMessagesWithContentType() {
final Map<String, Object> applicationProperties = Map.of(
Exchange.CONTENT_TYPE, APPLICATION_JSON_CONTENT_TYPE);

final List<String> inputMessages = new LinkedList<>();
inputMessages.add("test data");
inputMessages.add(String.valueOf(12345));

final Iterable<ServiceBusMessage> busMessages
= ServiceBusUtils.createServiceBusMessages(inputMessages, applicationProperties, null, null);

assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
.anyMatch(record -> record.getBody().toString().equals("test data")));
assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
.anyMatch(record -> record.getBody().toString().equals("12345")));
assertThat(StreamSupport.stream(busMessages.spliterator(), false))
.allMatch(message -> APPLICATION_JSON_CONTENT_TYPE.equals(message.getContentType()));

//Test bytes
final List<byte[]> inputMessages2 = new LinkedList<>();
byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
inputMessages2.add(byteBody1);
inputMessages2.add(byteBody2);

final Iterable<ServiceBusMessage> busMessages2
= ServiceBusUtils.createServiceBusMessages(inputMessages2, applicationProperties, null, null);

assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
.anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody1)));
assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
.anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody2)));
assertThat(StreamSupport.stream(busMessages.spliterator(), false))
.allMatch(message -> APPLICATION_JSON_CONTENT_TYPE.equals(message.getContentType()));
}

@Test
void validateConfigurationMissingCredentials() {
assertThrows(IllegalArgumentException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.azure.servicebus.ServiceBusConstants;
Expand All @@ -47,6 +48,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport {
private static final String DIRECT_SEND_TO_SESSION_QUEUE_URI = "direct:sendToQueueSessions";
private static final Map<String, Object> PROPAGATED_HEADERS = new HashMap<>();
private static final Pattern MESSAGE_BODY_PATTERN = Pattern.compile("^message-[0-4]$");
private static final String APPLICATION_JSON_CONTENT_TYPE = "application/json";

static {
PROPAGATED_HEADERS.put("booleanHeader", true);
Expand All @@ -60,6 +62,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport {
PROPAGATED_HEADERS.put("stringHeader", "stringHeader");
PROPAGATED_HEADERS.put("timestampHeader", new Date());
PROPAGATED_HEADERS.put("uuidHeader", UUID.randomUUID());
PROPAGATED_HEADERS.put(Exchange.CONTENT_TYPE, APPLICATION_JSON_CONTENT_TYPE);
}

private ProducerTemplate producerTemplate;
Expand Down Expand Up @@ -120,6 +123,7 @@ void camelSendsMessageToServiceBusQueue() throws InterruptedException {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties = message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
});
}
}
Expand Down Expand Up @@ -148,6 +152,7 @@ void camelSendsMessageBatchToServiceBusQueue() throws InterruptedException {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties = message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
});
}
}
Expand All @@ -170,6 +175,7 @@ void camelSendsMessageToServiceBusTopic() throws InterruptedException {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties = message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
});
}
}
Expand All @@ -194,6 +200,7 @@ void camelSchedulesServiceBusMessage() throws InterruptedException {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties = message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
assertInstanceOf(OffsetDateTime.class, message.getScheduledEnqueueTime());
});
}
Expand Down Expand Up @@ -225,6 +232,7 @@ void camelSchedulesServiceBusMessageBatch() throws InterruptedException {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties = message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
assertInstanceOf(OffsetDateTime.class, message.getScheduledEnqueueTime());
});
}
Expand Down Expand Up @@ -252,6 +260,7 @@ void camelSendsMessageToServiceBusSessionEnabledQueue() throws InterruptedExcept
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties = message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
});
}
}
Expand Down Expand Up @@ -280,6 +289,7 @@ void camelSendsMessageBatchToServiceBusSessionEnabledQueue() throws InterruptedE
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties = message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
});
}
}
Expand Down Expand Up @@ -324,6 +334,7 @@ void camelSchedulesServiceBusMessageWithSessions() throws InterruptedException {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties = message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
assertInstanceOf(OffsetDateTime.class, message.getScheduledEnqueueTime());
});
}
Expand Down Expand Up @@ -355,6 +366,7 @@ void camelSchedulesServiceBusMessageBatchWIthSessions() throws InterruptedExcept
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties = message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
assertInstanceOf(OffsetDateTime.class, message.getScheduledEnqueueTime());
});
}
Expand Down