Skip to content
This repository was archived by the owner on Nov 26, 2024. It is now read-only.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package se.fortnox.reactivewizard.db;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import se.fortnox.reactivewizard.db.paging.PagingOutput;
import se.fortnox.reactivewizard.db.statement.DbStatementFactory;
import se.fortnox.reactivewizard.metrics.Metrics;

import java.lang.reflect.Method;

public class DaoMethodHandler {

private final Method method;
private final DbStatementFactory statementFactory;
private final PagingOutput pagingOutput;
private final Metrics metrics;

public DaoMethodHandler(Method method,
DbStatementFactory statementFactory, PagingOutput pagingOutput, Metrics metrics) {
this.method = method;
this.statementFactory = statementFactory;
this.pagingOutput = pagingOutput;
this.metrics = metrics;
}

public Publisher<Object> run(Object[] args, ReactiveStatementFactory reactiveStatementFactory) {
if (Mono.class.isAssignableFrom(method.getReturnType())) {
return reactiveStatementFactory.createMono(
metrics,
statementFactory.toString(),
() -> statementFactory.create(args)
);
} else if (Flux.class.isAssignableFrom(method.getReturnType())) {
return reactiveStatementFactory.createFlux(
metrics,
statementFactory.toString(),
() -> statementFactory.create(args),
flux -> pagingOutput.apply(flux, args)
);
} else {
throw new IllegalArgumentException(String.format(
"DAO method %s::%s must return a Flux or Mono. Found %s",
method.getDeclaringClass().getName(),
method.getName(),
method.getReturnType().getName()
));
}
}
}
129 changes: 33 additions & 96 deletions dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java
Original file line number Diff line number Diff line change
@@ -1,97 +1,52 @@
package se.fortnox.reactivewizard.db;

import com.fasterxml.jackson.core.type.TypeReference;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import se.fortnox.reactivewizard.db.config.DatabaseConfig;
import se.fortnox.reactivewizard.db.paging.PagingOutput;
import se.fortnox.reactivewizard.db.statement.DbStatementFactory;
import se.fortnox.reactivewizard.db.statement.DbStatementFactoryFactory;
import se.fortnox.reactivewizard.db.transactions.ConnectionScheduler;
import se.fortnox.reactivewizard.json.JsonSerializerFactory;
import se.fortnox.reactivewizard.metrics.Metrics;
import se.fortnox.reactivewizard.util.DebugUtil;
import se.fortnox.reactivewizard.util.ReflectionUtil;

import javax.annotation.Nullable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import static java.text.MessageFormat.format;

@Singleton
public class DbProxy implements InvocationHandler {

private static final TypeReference<Object[]> OBJECT_ARRAY_TYPE_REFERENCE = new TypeReference<>() {
};
private final DbStatementFactoryFactory dbStatementFactoryFactory;
private final Scheduler scheduler;
protected final Map<Method, ReactiveStatementFactory> statementFactories;
private final ConnectionScheduler connectionScheduler;
protected final Function<Object[], String> paramSerializer;
private final DatabaseConfig databaseConfig;
private final Map<Method, DaoMethodHandler> handlers;
private final ReactiveStatementFactory reactiveStatementFactory;

@Inject
public DbProxy(DatabaseConfig databaseConfig,
@Nullable ConnectionProvider connectionProvider,
DbStatementFactoryFactory dbStatementFactoryFactory,
JsonSerializerFactory jsonSerializerFactory
) {
this(databaseConfig,
threadPool(databaseConfig.getPoolSize()),
connectionProvider,
dbStatementFactoryFactory,
jsonSerializerFactory);
}

public DbProxy(DatabaseConfig databaseConfig,
Scheduler scheduler,
ConnectionProvider connectionProvider,
DbStatementFactoryFactory dbStatementFactoryFactory,
JsonSerializerFactory jsonSerializerFactory
) {
this(databaseConfig, scheduler, connectionProvider, dbStatementFactoryFactory,
jsonSerializerFactory.createStringSerializer(OBJECT_ARRAY_TYPE_REFERENCE),
new ConcurrentHashMap<>());
public DbProxy(ReactiveStatementFactory reactiveStatementFactory,
DbStatementFactoryFactory dbStatementFactoryFactory) {
this(reactiveStatementFactory, dbStatementFactoryFactory, new ConcurrentHashMap<>());
}

public DbProxy(DatabaseConfig databaseConfig, ConnectionProvider connectionProvider) {
this(databaseConfig,
Schedulers.boundedElastic(),
connectionProvider,
new DbStatementFactoryFactory(),
new JsonSerializerFactory());
this(databaseConfig, Schedulers.boundedElastic(), connectionProvider, new DbStatementFactoryFactory());
}

protected DbProxy(DatabaseConfig databaseConfig,
Scheduler scheduler,
ConnectionProvider connectionProvider,
DbStatementFactoryFactory dbStatementFactoryFactory,
Function<Object[], String> paramSerializer,
Map<Method, ReactiveStatementFactory> statementFactories
) {
this.scheduler = scheduler;
this.dbStatementFactoryFactory = dbStatementFactoryFactory;
this.paramSerializer = paramSerializer;
this.databaseConfig = databaseConfig;
this.statementFactories = statementFactories;
this.connectionScheduler = new ConnectionScheduler(connectionProvider, scheduler);
public DbProxy(DatabaseConfig databaseConfig, Scheduler scheduler, ConnectionProvider connectionProvider,
DbStatementFactoryFactory dbStatementFactoryFactory) {
this(new ReactiveStatementFactory(databaseConfig, scheduler, connectionProvider), dbStatementFactoryFactory);
}

private static Scheduler threadPool(int poolSize) {
if (poolSize == -1) {
return Schedulers.boundedElastic();
}
return Schedulers.newBoundedElastic(10, Integer.MAX_VALUE, "DbProxy");
protected DbProxy(ReactiveStatementFactory reactiveStatementFactory,
DbStatementFactoryFactory dbStatementFactoryFactory,
Map<Method, DaoMethodHandler> handlers) {
this.dbStatementFactoryFactory = dbStatementFactoryFactory;
this.reactiveStatementFactory = reactiveStatementFactory;
this.handlers = handlers;
}

/**
Expand All @@ -103,71 +58,53 @@ private static Scheduler threadPool(int poolSize) {
*/
public <T> T create(Class<T> daoInterface) {
return (T) Proxy.newProxyInstance(daoInterface.getClassLoader(),
new Class[]{daoInterface},
this);
new Class[]{daoInterface},
this);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
ReactiveStatementFactory reactiveStatementFactory = statementFactories.get(method);
if (reactiveStatementFactory == null || DebugUtil.IS_DEBUG) {
var handler = handlers.get(method);
if (handler == null || DebugUtil.IS_DEBUG) {
if (DebugUtil.IS_DEBUG) {
// Need to get the actual interface method in order to get updated annotations
method = ReflectionUtil.getRedefinedMethod(method);
}

DbStatementFactory statementFactory = dbStatementFactoryFactory.createStatementFactory(method);
PagingOutput pagingOutput = new PagingOutput(method);
reactiveStatementFactory = new ReactiveStatementFactory(
statementFactory,
pagingOutput,
createMetrics(method),
databaseConfig,
converterFromPublisher(method),
method);
statementFactories.put(method, reactiveStatementFactory);
handler = new DaoMethodHandler(
method,
new DbStatementFactoryFactory().createStatementFactory(method),
new PagingOutput(method),
createMetrics(method)
);
handlers.put(method, handler);
}

return reactiveStatementFactory.create(args, connectionScheduler);
}

private static Function<Publisher, Object> converterFromPublisher(Method method) {
Class<?> returnType = method.getReturnType();

if (Flux.class.isAssignableFrom(returnType)) {
return flux -> flux;
} else if (Mono.class.isAssignableFrom(returnType)) {
return Mono::from;
} else {
throw new IllegalArgumentException(String.format("DAO method %s::%s must return a Flux or Mono. Found %s",
method.getDeclaringClass().getName(),
method.getName(),
method.getReturnType().getName()));
}
return handler.run(args, reactiveStatementFactory);
}

private Metrics createMetrics(Method method) {
String type = method.isAnnotationPresent(Query.class) ? "query" : "update";
String metricsName = format(
"DAO_type:{0}_method:{1}.{2}_{3}",
type, method.getDeclaringClass().getName(), method.getName(), method.getParameterCount());
"DAO_type:{0}_method:{1}.{2}_{3}",
type, method.getDeclaringClass().getName(), method.getName(), method.getParameterCount());
return Metrics.get(metricsName);
}

public DbProxy usingConnectionProvider(ConnectionProvider connectionProvider) {
return new DbProxy(databaseConfig, scheduler, connectionProvider, dbStatementFactoryFactory, paramSerializer, statementFactories);
return new DbProxy(reactiveStatementFactory.usingConnectionProvider(connectionProvider), dbStatementFactoryFactory, handlers);
}

public DbProxy usingConnectionProvider(ConnectionProvider connectionProvider, DatabaseConfig databaseConfig) {
return new DbProxy(databaseConfig, scheduler, connectionProvider, dbStatementFactoryFactory, paramSerializer, statementFactories);
return new DbProxy(reactiveStatementFactory.usingConnectionProvider(connectionProvider, databaseConfig), dbStatementFactoryFactory, handlers);
}

public DbProxy usingConnectionProvider(ConnectionProvider newConnectionProvider, Scheduler newScheduler) {
return new DbProxy(databaseConfig, newScheduler, newConnectionProvider, dbStatementFactoryFactory, paramSerializer, statementFactories);
return new DbProxy(reactiveStatementFactory.usingConnectionProvider(newConnectionProvider, newScheduler), dbStatementFactoryFactory, handlers);
}

public DatabaseConfig getDatabaseConfig() {
return databaseConfig;
return reactiveStatementFactory.getDatabaseConfig();
}

}
Loading