Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(circular-dependency): Extract listener config and remove handler resolver from replies listener #129

Merged
merged 3 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.stream.Stream;

import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
Expand Down Expand Up @@ -45,43 +44,51 @@ public static HandlerResolver buildResolver(String domain,
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
ConcurrentHashMap::putAll);

final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventsToBind = getEventsToBind(domain, registries);
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventsToBind = getEventsToBind(domain,
registries);

final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventHandlers = getEventHandlersWithDynamics(domain, registries);
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventHandlers =
getEventHandlersWithDynamics(domain, registries);

return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener, commandHandlers) {
return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener,
commandHandlers) {
@Override
@SuppressWarnings("unchecked")
public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
final RegisteredCommandHandler<T, D> handler = super.getCommandHandler(path);
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class);
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler,
Object.class);
}
};
}


final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventsToBind = getEventsToBind(domain, registries);
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventHandlers = getEventHandlersWithDynamics(domain, registries);
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventHandlers =
getEventHandlersWithDynamics(domain, registries);

return new HandlerResolver(new ConcurrentHashMap<>(), eventHandlers, eventsToBind, new ConcurrentHashMap<>(), new ConcurrentHashMap<>()) {
return new HandlerResolver(new ConcurrentHashMap<>(), eventHandlers, eventsToBind, new ConcurrentHashMap<>(),
new ConcurrentHashMap<>()) {
@Override
@SuppressWarnings("unchecked")
public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
final RegisteredCommandHandler<T, D> handler = super.getCommandHandler(path);
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class);
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler,
Object.class);
}
};
}

private static ConcurrentMap<String, RegisteredEventListener<?, ?>> getEventHandlersWithDynamics(String domain, Map<String, HandlerRegistry> registries) {
private static ConcurrentMap<String, RegisteredEventListener<?, ?>> getEventHandlersWithDynamics(String domain,
Map<String,
HandlerRegistry> registries) {
// event handlers and dynamic handlers
return registries
.values().stream()
.flatMap(r -> {
if (r.getDomainEventListeners().containsKey(domain)) {
return Stream.concat(r.getDomainEventListeners().get(domain).stream(), getDynamics(domain, r));
}
log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
return Stream.empty();
})
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
Expand All @@ -95,14 +102,14 @@ public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
return Stream.of();
}

private static ConcurrentMap<String, RegisteredEventListener<?, ?>> getEventsToBind(String domain, Map<String, HandlerRegistry> registries) {
private static ConcurrentMap<String, RegisteredEventListener<?, ?>> getEventsToBind(String domain, Map<String,
HandlerRegistry> registries) {
return registries
.values().stream()
.flatMap(r -> {
if (r.getDomainEventListeners().containsKey(domain)) {
return r.getDomainEventListeners().get(domain).stream();
}
log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
return Stream.empty();
})
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class LoggerSubscriber<T> extends BaseSubscriber<T> {

private final String flowName;
private static final String ON_COMPLETE_MSG = "%s: ##On Complete Hook!!";
private static final String ON_ERROR_MSG = "%s: ##On Error Hook!!";
private static final String ON_ERROR_MSG = "%s: ##On Error Hook!! %s";
private static final String ON_CANCEL_MSG = "%s: ##On Cancel Hook!!";
private static final String ON_FINALLY_MSG = "%s: ##On Finally Hook! Signal type: %s";

Expand All @@ -29,7 +29,7 @@ protected void hookOnComplete() {

@Override
protected void hookOnError(Throwable throwable) {
log.log(Level.SEVERE, format(ON_ERROR_MSG), throwable);
log.log(Level.SEVERE, format(ON_ERROR_MSG, throwable.getMessage()), throwable);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ plugins {
id 'org.sonarqube' version '6.0.1.5171'
id 'org.springframework.boot' version '3.4.1' apply false
id 'io.github.gradle-nexus.publish-plugin' version '2.0.0'
id 'co.com.bancolombia.cleanArchitecture' version '3.20.7'
id 'co.com.bancolombia.cleanArchitecture' version '3.20.8'
}

repositories {
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
2 changes: 1 addition & 1 deletion main.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,5 @@ tasks.register('generateMergedReport', JacocoReport) {
}

tasks.named('wrapper') {
gradleVersion = '8.11'
gradleVersion = '8.11.1'
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface BrokerProvider<T extends GenericAsyncProps> {

DomainEventBus getDomainBus();

DirectAsyncGateway getDirectAsyncGateway(HandlerResolver resolver);
DirectAsyncGateway getDirectAsyncGateway();

void listenDomainEvents(HandlerResolver resolver);

Expand All @@ -23,7 +23,7 @@ public interface BrokerProvider<T extends GenericAsyncProps> {

void listenQueries(HandlerResolver resolver);

void listenReplies(HandlerResolver resolver);
void listenReplies();

Mono<RCHealth> healthCheck();
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package org.reactivecommons.async.starter.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.reactivecommons.async.api.DefaultCommandHandler;
import org.reactivecommons.async.api.DefaultQueryHandler;
import org.reactivecommons.async.api.HandlerRegistry;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.commons.HandlerResolverBuilder;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier;
import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.ext.DefaultCustomReporter;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.reactivecommons.async.starter.broker.BrokerProviderFactory;
Expand All @@ -27,7 +21,6 @@
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import reactor.core.publisher.Mono;

import java.util.Map;

Expand Down Expand Up @@ -66,24 +59,6 @@ public ConnectionManager buildConnectionManager(ApplicationContext context) {
return connectionManager;
}

@Bean
@SuppressWarnings({"rawtypes", "unchecked"})
public DomainHandlers buildHandlers(ApplicationContext context,
HandlerRegistry primaryRegistry, DefaultCommandHandler<?> commandHandler) {
DomainHandlers handlers = new DomainHandlers();
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
if (!registries.containsValue(primaryRegistry)) {
registries.put("primaryHandlerRegistry", primaryRegistry);
}
final Map<String, GenericAsyncPropsDomain> props = context.getBeansOfType(GenericAsyncPropsDomain.class);
props.forEach((beanName, properties) -> properties.forEach((domain, asyncProps) -> {
String domainName = (String) domain;
HandlerResolver resolver = HandlerResolverBuilder.buildResolver(domainName, registries, commandHandler);
handlers.add(domainName, resolver);
}));
return handlers;
}

@Bean
@ConditionalOnMissingBean
public BrokerConfig brokerConfig() {
Expand All @@ -98,29 +73,8 @@ public ObjectMapperSupplier objectMapperSupplier() {

@Bean
@ConditionalOnMissingBean
public CustomReporter reactiveCommonsCustomErrorReporter() {
return new DefaultCustomReporter();
}

@Bean
@ConditionalOnMissingBean
@SuppressWarnings("rawtypes")
public DefaultQueryHandler defaultHandler() {
return (DefaultQueryHandler<Object, Object>) command ->
Mono.error(new RuntimeException("No Handler Registered"));
}

@Bean
@ConditionalOnMissingBean
@SuppressWarnings("rawtypes")
public DefaultCommandHandler defaultCommandHandler() {
return message -> Mono.error(new RuntimeException("No Handler Registered"));
}

@Bean
@ConditionalOnMissingBean
public HandlerRegistry defaultHandlerRegistry() {
return HandlerRegistry.register();
public ObjectMapper defaultReactiveCommonsObjectMapper(ObjectMapperSupplier supplier) {
return supplier.get();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.reactivecommons.async.starter.config;

import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.reactivecommons.async.api.DefaultCommandHandler;
import org.reactivecommons.async.api.DefaultQueryHandler;
import org.reactivecommons.async.api.HandlerRegistry;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.commons.HandlerResolverBuilder;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.ext.DefaultCustomReporter;
import org.reactivecommons.async.starter.props.GenericAsyncPropsDomain;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Mono;

import java.util.Map;

@Log
@Configuration
@RequiredArgsConstructor
public class ReactiveCommonsListenersConfig {

@Bean
@SuppressWarnings({"rawtypes", "unchecked"})
public DomainHandlers buildHandlers(ApplicationContext context,
HandlerRegistry primaryRegistry, DefaultCommandHandler<?> commandHandler) {
DomainHandlers handlers = new DomainHandlers();
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
if (!registries.containsValue(primaryRegistry)) {
registries.put("primaryHandlerRegistry", primaryRegistry);
}
final Map<String, GenericAsyncPropsDomain> props = context.getBeansOfType(GenericAsyncPropsDomain.class);
props.forEach((beanName, properties) -> properties.forEach((domain, asyncProps) -> {
String domainName = (String) domain;
HandlerResolver resolver = HandlerResolverBuilder.buildResolver(domainName, registries, commandHandler);
handlers.add(domainName, resolver);
}));
return handlers;
}

@Bean
@ConditionalOnMissingBean
public CustomReporter reactiveCommonsCustomErrorReporter() {
return new DefaultCustomReporter();
}

@Bean
@ConditionalOnMissingBean
@SuppressWarnings("rawtypes")
public DefaultQueryHandler defaultHandler() {
return (DefaultQueryHandler<Object, Object>) command ->
Mono.error(new RuntimeException("No Handler Registered"));
}

@Bean
@ConditionalOnMissingBean
@SuppressWarnings("rawtypes")
public DefaultCommandHandler defaultCommandHandler() {
return message -> Mono.error(new RuntimeException("No Handler Registered"));
}

@Bean
@ConditionalOnMissingBean
public HandlerRegistry defaultHandlerRegistry() {
return HandlerRegistry.register();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@


import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.starter.config.ConnectionManager;
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.reactivecommons.async.starter.config.ConnectionManager;
import org.reactivecommons.async.starter.config.DomainHandlers;
import org.reactivecommons.async.starter.config.ReactiveCommonsConfig;
import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import(ReactiveCommonsConfig.class)
@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class})
public class CommandsListenerConfig extends AbstractListenerConfig {

public CommandsListenerConfig(ConnectionManager manager, DomainHandlers handlers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.reactivecommons.async.starter.config.DomainHandlers;
import org.reactivecommons.async.starter.config.ReactiveCommonsConfig;
import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import(ReactiveCommonsConfig.class)
@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class})
public class EventsListenerConfig extends AbstractListenerConfig {

public EventsListenerConfig(ConnectionManager manager, DomainHandlers handlers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.reactivecommons.async.starter.config.DomainHandlers;
import org.reactivecommons.async.starter.config.ReactiveCommonsConfig;
import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import(ReactiveCommonsConfig.class)
@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class})
public class NotificationEventsListenerConfig extends AbstractListenerConfig {

public NotificationEventsListenerConfig(ConnectionManager manager, DomainHandlers handlers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@


import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.starter.config.ConnectionManager;
import org.reactivecommons.async.starter.broker.BrokerProvider;
import org.reactivecommons.async.starter.config.ConnectionManager;
import org.reactivecommons.async.starter.config.DomainHandlers;
import org.reactivecommons.async.starter.config.ReactiveCommonsConfig;
import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import(ReactiveCommonsConfig.class)
@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class})
public class QueriesListenerConfig extends AbstractListenerConfig {

public QueriesListenerConfig(ConnectionManager manager, DomainHandlers handlers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import lombok.extern.java.Log;
import org.reactivecommons.async.api.DirectAsyncGateway;
import org.reactivecommons.async.starter.config.ConnectionManager;
import org.reactivecommons.async.starter.config.DomainHandlers;
import org.reactivecommons.async.starter.config.ReactiveCommonsConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -20,10 +19,10 @@
public class DirectAsyncGatewayConfig {

@Bean
public DirectAsyncGateway genericDirectAsyncGateway(ConnectionManager manager, DomainHandlers handlers) {
public DirectAsyncGateway genericDirectAsyncGateway(ConnectionManager manager) {
ConcurrentMap<String, DirectAsyncGateway> directAsyncGateways = new ConcurrentHashMap<>();
manager.forDomain((domain, provider) -> directAsyncGateways.put(domain,
provider.getDirectAsyncGateway(handlers.get(domain))));
provider.getDirectAsyncGateway()));
return new GenericDirectAsyncGateway(directAsyncGateways);
}
}
Loading
Loading