Skip to content

feat(raw-handler): Raw event handler #136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.reactivecommons.api.domain.RawMessage;
import org.reactivecommons.async.api.handlers.CloudCommandHandler;
import org.reactivecommons.async.api.handlers.CloudEventHandler;
import org.reactivecommons.async.api.handlers.DomainCommandHandler;
import org.reactivecommons.async.api.handlers.DomainEventHandler;
import org.reactivecommons.async.api.handlers.QueryHandler;
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
import org.reactivecommons.async.api.handlers.RawEventHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
Expand Down Expand Up @@ -50,6 +52,12 @@ public HandlerRegistry listenDomainCloudEvent(String domain, String eventName, C
return this;
}

public HandlerRegistry listenDomainRawEvent(String domain, String eventName, RawEventHandler<?> handler) {
domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>())
.add(new RegisteredEventListener<>(eventName, handler, RawMessage.class));
return this;
}

public <T> HandlerRegistry listenEvent(String eventName, DomainEventHandler<T> handler, Class<T> eventClass) {
domainEventListeners.computeIfAbsent(DEFAULT_DOMAIN, ignored -> new CopyOnWriteArrayList<>())
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.reactivecommons.async.api.handlers;

import org.reactivecommons.api.domain.RawMessage;

public interface RawEventHandler<T extends RawMessage> extends EventHandler<T> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import org.junit.jupiter.api.Test;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.RawMessage;
import org.reactivecommons.async.api.handlers.CloudCommandHandler;
import org.reactivecommons.async.api.handlers.CloudEventHandler;
import org.reactivecommons.async.api.handlers.DomainCommandHandler;
import org.reactivecommons.async.api.handlers.DomainEventHandler;
import org.reactivecommons.async.api.handlers.QueryHandler;
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
import org.reactivecommons.async.api.handlers.RawEventHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
Expand Down Expand Up @@ -54,6 +56,20 @@ void shouldListenDomainCloudEvent() {
.containsExactly(name, CloudEvent.class, eventHandler)).hasSize(1);
}

@Test
void shouldListenDomainRawEvent() {
SomeRawEventHandler eventHandler = new SomeRawEventHandler();

registry.listenDomainRawEvent(domain, name, eventHandler);

assertThat(registry.getDomainEventListeners().get(domain))
.anySatisfy(registered -> assertThat(registered)
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass,
RegisteredEventListener::getHandler
)
.containsExactly(name, RawMessage.class, eventHandler)).hasSize(1);
}

@Test
void shouldListenEvent() {
SomeDomainEventHandler<SomeDataClass> eventHandler = new SomeDomainEventHandler<>();
Expand Down Expand Up @@ -269,6 +285,13 @@ public Mono<Void> handle(CloudEvent message) {
}
}

private static class SomeRawEventHandler implements RawEventHandler<RawMessage> {
@Override
public Mono<Void> handle(RawMessage message) {
return null;
}
}

private static class SomeDomainCommandHandler<SomeDataClass> implements DomainCommandHandler<SomeDataClass> {
@Override
public Mono<Void> handle(Command<SomeDataClass> message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package org.reactivecommons.async.commons.communications;

import org.reactivecommons.api.domain.RawMessage;

import java.util.Map;

/**
* Simple Internal Message representation
*
* @author Daniel Bustamante Ospina
*/
public interface Message {
public interface Message extends RawMessage {

byte[] getBody();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import lombok.RequiredArgsConstructor;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.api.domain.RawMessage;
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
import org.reactivestreams.Publisher;

Expand Down Expand Up @@ -31,4 +32,14 @@ public Publisher<Void> emit(CloudEvent event) {
public Publisher<Void> emit(String domain, CloudEvent event) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Publisher<Void> emit(RawMessage event) {
return sender.send(event);
}

@Override
public Publisher<Void> emit(String domain, RawMessage event) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
public class KafkaMessage implements Message {
private final byte[] body;
private final Properties properties;
private final String type;

@Data
public static class KafkaMessageProperties implements Properties {
Expand All @@ -30,7 +31,11 @@ public String getContentType() {
}

public static KafkaMessage fromDelivery(ReceiverRecord<String, byte[]> receiverRecord) {
return new KafkaMessage(receiverRecord.value(), createMessageProps(receiverRecord));
return fromDelivery(receiverRecord, null);
}

public static KafkaMessage fromDelivery(ReceiverRecord<String, byte[]> receiverRecord, String type) {
return new KafkaMessage(receiverRecord.value(), createMessageProps(receiverRecord), type);
}

private static Properties createMessageProps(ReceiverRecord<String, byte[]> receiverRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public KafkaJacksonMessageConverter(ObjectMapper objectMapper) {

@Override
public Message toMessage(Object object) {
if (object instanceof KafkaMessage) {
return (KafkaMessage) object;
}
byte[] bytes;
try {
String jsonString = this.objectMapper.writeValueAsString(object);
Expand All @@ -30,7 +33,7 @@ public Message toMessage(Object object) {
throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e);
}
KafkaMessageProperties props = buildProperties(object);
return new KafkaMessage(bytes, props);
return new KafkaMessage(bytes, props, null);
}

private KafkaMessageProperties buildProperties(Object message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected Mono<ReceiverRecord<String, byte[]>> handle(ReceiverRecord<String, byt
try {
final String executorPath = getExecutorPath(msj);
final Function<Message, Mono<Object>> handler = getExecutor(executorPath);
final Message message = KafkaMessage.fromDelivery(msj);
final Message message = KafkaMessage.fromDelivery(msj, executorPath);

Mono<Object> flow = Mono.defer(() -> handler.apply(message))
.transform(enrichPostProcess(message));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.RawMessage;
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand All @@ -21,6 +22,8 @@ class KafkaDomainEventBusTest {
@Mock
private CloudEvent cloudEvent;
@Mock
private RawMessage rawMessage;
@Mock
private ReactiveMessageSender sender;
@InjectMocks
private KafkaDomainEventBus kafkaDomainEventBus;
Expand Down Expand Up @@ -48,9 +51,21 @@ void shouldEmitCloudEvent() {
.verifyComplete();
}

@Test
void shouldEmitRawMessage() {
// Arrange
when(sender.send(rawMessage)).thenReturn(Mono.empty());
// Act
Mono<Void> flow = Mono.from(kafkaDomainEventBus.emit(rawMessage));
// Assert
StepVerifier.create(flow)
.verifyComplete();
}

@Test
void operationsShouldNotBeAbleForDomains() {
assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, domainEvent));
assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, cloudEvent));
assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, rawMessage));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void shouldSerializeDomainEvent() {
String expectedJson = "{\"name\":\"test\",\"eventId\":\"" + id + "\",\"data\":{\"name\":\"name\",\"age\":1}}";
// Act
Message message = converter.toMessage(testEvent);
assertEquals(message, converter.toMessage(message));
// Assert
assertEquals("test", message.getProperties().getTopic());
assertEquals(id, message.getProperties().getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.cloudevents.CloudEvent;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.api.domain.RawMessage;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivestreams.Publisher;
Expand All @@ -12,6 +13,8 @@

public class RabbitDomainEventBus implements DomainEventBus {

private static final String EVENT_SEND_FAILURE = "Event send failure: ";
private static final String NOT_IMPLEMENTED_YET = "Not implemented yet";
private final ReactiveMessageSender sender;
private final String exchange;
private final boolean persistentEvents;
Expand All @@ -29,24 +32,35 @@ public RabbitDomainEventBus(ReactiveMessageSender sender, String exchange, Broke
@Override
public <T> Mono<Void> emit(DomainEvent<T> event) {
return sender.sendWithConfirm(event, exchange, event.getName(), Collections.emptyMap(), persistentEvents)
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
.onErrorMap(err -> new RuntimeException(EVENT_SEND_FAILURE + event.getName(), err));
}

@Override
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
throw new UnsupportedOperationException("Not implemented yet");
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Publisher<Void> emit(CloudEvent cloudEvent) {
return sender.sendWithConfirm(cloudEvent, exchange, cloudEvent.getType(),
Collections.emptyMap(), persistentEvents)
.onErrorMap(err -> new RuntimeException("Event send failure: " + cloudEvent.getType(), err));
.onErrorMap(err -> new RuntimeException(EVENT_SEND_FAILURE + cloudEvent.getType(), err));
}

@Override
public Publisher<Void> emit(String domain, CloudEvent event) {
throw new UnsupportedOperationException("Not implemented yet");
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Publisher<Void> emit(RawMessage rawEvent) {
return sender.sendWithConfirm(rawEvent, exchange, rawEvent.getType(),
Collections.emptyMap(), persistentEvents)
.onErrorMap(err -> new RuntimeException(EVENT_SEND_FAILURE + rawEvent.getType(), err));
}

@Override
public Publisher<Void> emit(String domain, RawMessage event) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
public class RabbitMessage implements Message {
private final byte[] body;
private final Properties properties;
private final String type;

@Data
public static class RabbitMessageProperties implements Properties {
Expand All @@ -22,7 +23,11 @@ public static class RabbitMessageProperties implements Properties {
}

public static RabbitMessage fromDelivery(Delivery delivery) {
return new RabbitMessage(delivery.getBody(), createMessageProps(delivery));
return fromDelivery(delivery, null);
}

public static RabbitMessage fromDelivery(Delivery delivery, String executorPath) {
return new RabbitMessage(delivery.getBody(), createMessageProps(delivery), executorPath);
}

private static Message.Properties createMessageProps(Delivery msj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public RabbitJacksonMessageConverter(ObjectMapper objectMapper) {

@Override
public Message toMessage(Object object) {
if (object instanceof RabbitMessage) {
return (RabbitMessage) object;
}
byte[] bytes;
try {
String jsonString = this.objectMapper.writeValueAsString(object);
Expand All @@ -29,10 +32,10 @@ public Message toMessage(Object object) {
if (object instanceof CloudEvent) {
props.setContentType(APPLICATION_CLOUD_EVENT_JSON);
} else {
props.setContentType(CONTENT_TYPE);
props.setContentType(APPLICATION_JSON);
}
props.setContentEncoding(StandardCharsets.UTF_8.name());
props.setContentLength(bytes.length);
return new RabbitMessage(bytes, props);
return new RabbitMessage(bytes, props, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import lombok.extern.java.Log;
import org.reactivecommons.async.api.handlers.CloudEventHandler;
import org.reactivecommons.async.api.handlers.DomainEventHandler;
import org.reactivecommons.async.api.handlers.RawEventHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.EventExecutor;
Expand Down Expand Up @@ -133,6 +134,9 @@ private <T, D> Function<Message, Object> resolveConverter(RegisteredEventListene
if (registeredEventListener.getHandler() instanceof CloudEventHandler) {
return messageConverter::readCloudEvent;
}
if (registeredEventListener.getHandler() instanceof RawEventHandler) {
return message -> message;
}
throw new RuntimeException("Unknown handler type");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
try {
final String executorPath = getExecutorPath(msj);
final Function<Message, Mono<Object>> handler = getExecutor(executorPath);
final Message message = RabbitMessage.fromDelivery(msj);
final Message message = RabbitMessage.fromDelivery(msj, executorPath);

Mono<Object> flow = defer(() -> handler.apply(message))
.transform(enrichPostProcess(message));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ public static Message mockMessage() {
properties.getHeaders().put(CORRELATION_ID, "correlation");
properties.getHeaders().put(SERVED_QUERY_ID, "my-query");
return new RabbitMessage("{\"id\":\"id\",\"name\":\"name\",\"date\":\"2020-10-22T17:03:26.062Z\"}".getBytes(),
properties);
properties, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.RawMessage;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand All @@ -24,6 +25,8 @@ class RabbitDomainEventBusTest {
@Mock
private CloudEvent cloudEvent;
@Mock
private RawMessage rawMessage;
@Mock
private ReactiveMessageSender sender;
private RabbitDomainEventBus rabbitDomainEventBus;
private final String domain = "domain";
Expand Down Expand Up @@ -59,9 +62,23 @@ void shouldEmitCloudEvent() {
.verifyComplete();
}

@Test
void shouldEmitRawMessage() {
// Arrange
when(rawMessage.getType()).thenReturn("event");
when(sender.sendWithConfirm(any(RawMessage.class), anyString(), anyString(), any(), anyBoolean()))
.thenReturn(Mono.empty());
// Act
Mono<Void> flow = Mono.from(rabbitDomainEventBus.emit(rawMessage));
// Assert
StepVerifier.create(flow)
.verifyComplete();
}

@Test
void operationsShouldNotBeAbleForDomains() {
assertThrows(UnsupportedOperationException.class, () -> rabbitDomainEventBus.emit(domain, domainEvent));
assertThrows(UnsupportedOperationException.class, () -> rabbitDomainEventBus.emit(domain, cloudEvent));
assertThrows(UnsupportedOperationException.class, () -> rabbitDomainEventBus.emit(domain, rawMessage));
}
}
Loading