Skip to content

Commit 8dbdfeb

Browse files
authoredFeb 24, 2025··
feat(raw-handler): Raw event handler (#136)
* feat(rawhandler): Allow raw message handler to listening events
1 parent 4235ab8 commit 8dbdfeb

File tree

24 files changed

+220
-20
lines changed

24 files changed

+220
-20
lines changed
 

‎async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

+8
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
import lombok.AccessLevel;
55
import lombok.Getter;
66
import lombok.NoArgsConstructor;
7+
import org.reactivecommons.api.domain.RawMessage;
78
import org.reactivecommons.async.api.handlers.CloudCommandHandler;
89
import org.reactivecommons.async.api.handlers.CloudEventHandler;
910
import org.reactivecommons.async.api.handlers.DomainCommandHandler;
1011
import org.reactivecommons.async.api.handlers.DomainEventHandler;
1112
import org.reactivecommons.async.api.handlers.QueryHandler;
1213
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
14+
import org.reactivecommons.async.api.handlers.RawEventHandler;
1315
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1416
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1517
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
@@ -50,6 +52,12 @@ public HandlerRegistry listenDomainCloudEvent(String domain, String eventName, C
5052
return this;
5153
}
5254

55+
public HandlerRegistry listenDomainRawEvent(String domain, String eventName, RawEventHandler<?> handler) {
56+
domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>())
57+
.add(new RegisteredEventListener<>(eventName, handler, RawMessage.class));
58+
return this;
59+
}
60+
5361
public <T> HandlerRegistry listenEvent(String eventName, DomainEventHandler<T> handler, Class<T> eventClass) {
5462
domainEventListeners.computeIfAbsent(DEFAULT_DOMAIN, ignored -> new CopyOnWriteArrayList<>())
5563
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package org.reactivecommons.async.api.handlers;
2+
3+
import org.reactivecommons.api.domain.RawMessage;
4+
5+
public interface RawEventHandler<T extends RawMessage> extends EventHandler<T> {
6+
}

‎async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java

+23
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
import org.junit.jupiter.api.Test;
66
import org.reactivecommons.api.domain.Command;
77
import org.reactivecommons.api.domain.DomainEvent;
8+
import org.reactivecommons.api.domain.RawMessage;
89
import org.reactivecommons.async.api.handlers.CloudCommandHandler;
910
import org.reactivecommons.async.api.handlers.CloudEventHandler;
1011
import org.reactivecommons.async.api.handlers.DomainCommandHandler;
1112
import org.reactivecommons.async.api.handlers.DomainEventHandler;
1213
import org.reactivecommons.async.api.handlers.QueryHandler;
1314
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
15+
import org.reactivecommons.async.api.handlers.RawEventHandler;
1416
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1517
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1618
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
@@ -54,6 +56,20 @@ void shouldListenDomainCloudEvent() {
5456
.containsExactly(name, CloudEvent.class, eventHandler)).hasSize(1);
5557
}
5658

59+
@Test
60+
void shouldListenDomainRawEvent() {
61+
SomeRawEventHandler eventHandler = new SomeRawEventHandler();
62+
63+
registry.listenDomainRawEvent(domain, name, eventHandler);
64+
65+
assertThat(registry.getDomainEventListeners().get(domain))
66+
.anySatisfy(registered -> assertThat(registered)
67+
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass,
68+
RegisteredEventListener::getHandler
69+
)
70+
.containsExactly(name, RawMessage.class, eventHandler)).hasSize(1);
71+
}
72+
5773
@Test
5874
void shouldListenEvent() {
5975
SomeDomainEventHandler<SomeDataClass> eventHandler = new SomeDomainEventHandler<>();
@@ -269,6 +285,13 @@ public Mono<Void> handle(CloudEvent message) {
269285
}
270286
}
271287

288+
private static class SomeRawEventHandler implements RawEventHandler<RawMessage> {
289+
@Override
290+
public Mono<Void> handle(RawMessage message) {
291+
return null;
292+
}
293+
}
294+
272295
private static class SomeDomainCommandHandler<SomeDataClass> implements DomainCommandHandler<SomeDataClass> {
273296
@Override
274297
public Mono<Void> handle(Command<SomeDataClass> message) {

‎async/async-commons/src/main/java/org/reactivecommons/async/commons/communications/Message.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package org.reactivecommons.async.commons.communications;
22

3+
import org.reactivecommons.api.domain.RawMessage;
4+
35
import java.util.Map;
46

57
/**
68
* Simple Internal Message representation
79
*
810
* @author Daniel Bustamante Ospina
911
*/
10-
public interface Message {
12+
public interface Message extends RawMessage {
1113

1214
byte[] getBody();
1315

‎async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaDomainEventBus.java

+11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import lombok.RequiredArgsConstructor;
55
import org.reactivecommons.api.domain.DomainEvent;
66
import org.reactivecommons.api.domain.DomainEventBus;
7+
import org.reactivecommons.api.domain.RawMessage;
78
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
89
import org.reactivestreams.Publisher;
910

@@ -31,4 +32,14 @@ public Publisher<Void> emit(CloudEvent event) {
3132
public Publisher<Void> emit(String domain, CloudEvent event) {
3233
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
3334
}
35+
36+
@Override
37+
public Publisher<Void> emit(RawMessage event) {
38+
return sender.send(event);
39+
}
40+
41+
@Override
42+
public Publisher<Void> emit(String domain, RawMessage event) {
43+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
44+
}
3445
}

‎async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaMessage.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
public class KafkaMessage implements Message {
1616
private final byte[] body;
1717
private final Properties properties;
18+
private final String type;
1819

1920
@Data
2021
public static class KafkaMessageProperties implements Properties {
@@ -30,7 +31,11 @@ public String getContentType() {
3031
}
3132

3233
public static KafkaMessage fromDelivery(ReceiverRecord<String, byte[]> receiverRecord) {
33-
return new KafkaMessage(receiverRecord.value(), createMessageProps(receiverRecord));
34+
return fromDelivery(receiverRecord, null);
35+
}
36+
37+
public static KafkaMessage fromDelivery(ReceiverRecord<String, byte[]> receiverRecord, String type) {
38+
return new KafkaMessage(receiverRecord.value(), createMessageProps(receiverRecord), type);
3439
}
3540

3641
private static Properties createMessageProps(ReceiverRecord<String, byte[]> receiverRecord) {

‎async/async-kafka/src/main/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ public KafkaJacksonMessageConverter(ObjectMapper objectMapper) {
2222

2323
@Override
2424
public Message toMessage(Object object) {
25+
if (object instanceof KafkaMessage) {
26+
return (KafkaMessage) object;
27+
}
2528
byte[] bytes;
2629
try {
2730
String jsonString = this.objectMapper.writeValueAsString(object);
@@ -30,7 +33,7 @@ public Message toMessage(Object object) {
3033
throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e);
3134
}
3235
KafkaMessageProperties props = buildProperties(object);
33-
return new KafkaMessage(bytes, props);
36+
return new KafkaMessage(bytes, props, null);
3437
}
3538

3639
private KafkaMessageProperties buildProperties(Object message) {

‎async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/GenericMessageListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ protected Mono<ReceiverRecord<String, byte[]>> handle(ReceiverRecord<String, byt
112112
try {
113113
final String executorPath = getExecutorPath(msj);
114114
final Function<Message, Mono<Object>> handler = getExecutor(executorPath);
115-
final Message message = KafkaMessage.fromDelivery(msj);
115+
final Message message = KafkaMessage.fromDelivery(msj, executorPath);
116116

117117
Mono<Object> flow = Mono.defer(() -> handler.apply(message))
118118
.transform(enrichPostProcess(message));

‎async/async-kafka/src/test/java/org/reactivecommons/async/kafka/KafkaDomainEventBusTest.java

+15
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.mockito.Mock;
88
import org.mockito.junit.jupiter.MockitoExtension;
99
import org.reactivecommons.api.domain.DomainEvent;
10+
import org.reactivecommons.api.domain.RawMessage;
1011
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
1112
import reactor.core.publisher.Mono;
1213
import reactor.test.StepVerifier;
@@ -21,6 +22,8 @@ class KafkaDomainEventBusTest {
2122
@Mock
2223
private CloudEvent cloudEvent;
2324
@Mock
25+
private RawMessage rawMessage;
26+
@Mock
2427
private ReactiveMessageSender sender;
2528
@InjectMocks
2629
private KafkaDomainEventBus kafkaDomainEventBus;
@@ -48,9 +51,21 @@ void shouldEmitCloudEvent() {
4851
.verifyComplete();
4952
}
5053

54+
@Test
55+
void shouldEmitRawMessage() {
56+
// Arrange
57+
when(sender.send(rawMessage)).thenReturn(Mono.empty());
58+
// Act
59+
Mono<Void> flow = Mono.from(kafkaDomainEventBus.emit(rawMessage));
60+
// Assert
61+
StepVerifier.create(flow)
62+
.verifyComplete();
63+
}
64+
5165
@Test
5266
void operationsShouldNotBeAbleForDomains() {
5367
assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, domainEvent));
5468
assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, cloudEvent));
69+
assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, rawMessage));
5570
}
5671
}

‎async/async-kafka/src/test/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverterTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ void shouldSerializeDomainEvent() {
4242
String expectedJson = "{\"name\":\"test\",\"eventId\":\"" + id + "\",\"data\":{\"name\":\"name\",\"age\":1}}";
4343
// Act
4444
Message message = converter.toMessage(testEvent);
45+
assertEquals(message, converter.toMessage(message));
4546
// Assert
4647
assertEquals("test", message.getProperties().getTopic());
4748
assertEquals(id, message.getProperties().getKey());

‎async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.cloudevents.CloudEvent;
44
import org.reactivecommons.api.domain.DomainEvent;
55
import org.reactivecommons.api.domain.DomainEventBus;
6+
import org.reactivecommons.api.domain.RawMessage;
67
import org.reactivecommons.async.commons.config.BrokerConfig;
78
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
89
import org.reactivestreams.Publisher;
@@ -12,6 +13,8 @@
1213

1314
public class RabbitDomainEventBus implements DomainEventBus {
1415

16+
private static final String EVENT_SEND_FAILURE = "Event send failure: ";
17+
private static final String NOT_IMPLEMENTED_YET = "Not implemented yet";
1518
private final ReactiveMessageSender sender;
1619
private final String exchange;
1720
private final boolean persistentEvents;
@@ -29,24 +32,35 @@ public RabbitDomainEventBus(ReactiveMessageSender sender, String exchange, Broke
2932
@Override
3033
public <T> Mono<Void> emit(DomainEvent<T> event) {
3134
return sender.sendWithConfirm(event, exchange, event.getName(), Collections.emptyMap(), persistentEvents)
32-
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
35+
.onErrorMap(err -> new RuntimeException(EVENT_SEND_FAILURE + event.getName(), err));
3336
}
3437

3538
@Override
3639
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
37-
throw new UnsupportedOperationException("Not implemented yet");
40+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
3841
}
3942

4043
@Override
4144
public Publisher<Void> emit(CloudEvent cloudEvent) {
4245
return sender.sendWithConfirm(cloudEvent, exchange, cloudEvent.getType(),
4346
Collections.emptyMap(), persistentEvents)
44-
.onErrorMap(err -> new RuntimeException("Event send failure: " + cloudEvent.getType(), err));
47+
.onErrorMap(err -> new RuntimeException(EVENT_SEND_FAILURE + cloudEvent.getType(), err));
4548
}
4649

4750
@Override
4851
public Publisher<Void> emit(String domain, CloudEvent event) {
49-
throw new UnsupportedOperationException("Not implemented yet");
52+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
5053
}
5154

55+
@Override
56+
public Publisher<Void> emit(RawMessage rawEvent) {
57+
return sender.sendWithConfirm(rawEvent, exchange, rawEvent.getType(),
58+
Collections.emptyMap(), persistentEvents)
59+
.onErrorMap(err -> new RuntimeException(EVENT_SEND_FAILURE + rawEvent.getType(), err));
60+
}
61+
62+
@Override
63+
public Publisher<Void> emit(String domain, RawMessage event) {
64+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
65+
}
5266
}

‎async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitMessage.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
public class RabbitMessage implements Message {
1212
private final byte[] body;
1313
private final Properties properties;
14+
private final String type;
1415

1516
@Data
1617
public static class RabbitMessageProperties implements Properties {
@@ -22,7 +23,11 @@ public static class RabbitMessageProperties implements Properties {
2223
}
2324

2425
public static RabbitMessage fromDelivery(Delivery delivery) {
25-
return new RabbitMessage(delivery.getBody(), createMessageProps(delivery));
26+
return fromDelivery(delivery, null);
27+
}
28+
29+
public static RabbitMessage fromDelivery(Delivery delivery, String executorPath) {
30+
return new RabbitMessage(delivery.getBody(), createMessageProps(delivery), executorPath);
2631
}
2732

2833
private static Message.Properties createMessageProps(Delivery msj) {

‎async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/RabbitJacksonMessageConverter.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ public RabbitJacksonMessageConverter(ObjectMapper objectMapper) {
1818

1919
@Override
2020
public Message toMessage(Object object) {
21+
if (object instanceof RabbitMessage) {
22+
return (RabbitMessage) object;
23+
}
2124
byte[] bytes;
2225
try {
2326
String jsonString = this.objectMapper.writeValueAsString(object);
@@ -29,10 +32,10 @@ public Message toMessage(Object object) {
2932
if (object instanceof CloudEvent) {
3033
props.setContentType(APPLICATION_CLOUD_EVENT_JSON);
3134
} else {
32-
props.setContentType(CONTENT_TYPE);
35+
props.setContentType(APPLICATION_JSON);
3336
}
3437
props.setContentEncoding(StandardCharsets.UTF_8.name());
3538
props.setContentLength(bytes.length);
36-
return new RabbitMessage(bytes, props);
39+
return new RabbitMessage(bytes, props, null);
3740
}
3841
}

‎async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import lombok.extern.java.Log;
55
import org.reactivecommons.async.api.handlers.CloudEventHandler;
66
import org.reactivecommons.async.api.handlers.DomainEventHandler;
7+
import org.reactivecommons.async.api.handlers.RawEventHandler;
78
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
89
import org.reactivecommons.async.commons.DiscardNotifier;
910
import org.reactivecommons.async.commons.EventExecutor;
@@ -133,6 +134,9 @@ private <T, D> Function<Message, Object> resolveConverter(RegisteredEventListene
133134
if (registeredEventListener.getHandler() instanceof CloudEventHandler) {
134135
return messageConverter::readCloudEvent;
135136
}
137+
if (registeredEventListener.getHandler() instanceof RawEventHandler) {
138+
return message -> message;
139+
}
136140
throw new RuntimeException("Unknown handler type");
137141
}
138142
}

‎async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
121121
try {
122122
final String executorPath = getExecutorPath(msj);
123123
final Function<Message, Mono<Object>> handler = getExecutor(executorPath);
124-
final Message message = RabbitMessage.fromDelivery(msj);
124+
final Message message = RabbitMessage.fromDelivery(msj, executorPath);
125125

126126
Mono<Object> flow = defer(() -> handler.apply(message))
127127
.transform(enrichPostProcess(message));

‎async/async-rabbit/src/test/java/org/reactivecommons/async/helpers/TestStubs.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ public static Message mockMessage() {
1515
properties.getHeaders().put(CORRELATION_ID, "correlation");
1616
properties.getHeaders().put(SERVED_QUERY_ID, "my-query");
1717
return new RabbitMessage("{\"id\":\"id\",\"name\":\"name\",\"date\":\"2020-10-22T17:03:26.062Z\"}".getBytes(),
18-
properties);
18+
properties, null);
1919
}
2020
}

‎async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDomainEventBusTest.java

+17
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.mockito.Mock;
88
import org.mockito.junit.jupiter.MockitoExtension;
99
import org.reactivecommons.api.domain.DomainEvent;
10+
import org.reactivecommons.api.domain.RawMessage;
1011
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
1112
import reactor.core.publisher.Mono;
1213
import reactor.test.StepVerifier;
@@ -24,6 +25,8 @@ class RabbitDomainEventBusTest {
2425
@Mock
2526
private CloudEvent cloudEvent;
2627
@Mock
28+
private RawMessage rawMessage;
29+
@Mock
2730
private ReactiveMessageSender sender;
2831
private RabbitDomainEventBus rabbitDomainEventBus;
2932
private final String domain = "domain";
@@ -59,9 +62,23 @@ void shouldEmitCloudEvent() {
5962
.verifyComplete();
6063
}
6164

65+
@Test
66+
void shouldEmitRawMessage() {
67+
// Arrange
68+
when(rawMessage.getType()).thenReturn("event");
69+
when(sender.sendWithConfirm(any(RawMessage.class), anyString(), anyString(), any(), anyBoolean()))
70+
.thenReturn(Mono.empty());
71+
// Act
72+
Mono<Void> flow = Mono.from(rabbitDomainEventBus.emit(rawMessage));
73+
// Assert
74+
StepVerifier.create(flow)
75+
.verifyComplete();
76+
}
77+
6278
@Test
6379
void operationsShouldNotBeAbleForDomains() {
6480
assertThrows(UnsupportedOperationException.class, () -> rabbitDomainEventBus.emit(domain, domainEvent));
6581
assertThrows(UnsupportedOperationException.class, () -> rabbitDomainEventBus.emit(domain, cloudEvent));
82+
assertThrows(UnsupportedOperationException.class, () -> rabbitDomainEventBus.emit(domain, rawMessage));
6683
}
6784
}

0 commit comments

Comments
 (0)
Please sign in to comment.