Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions redis-lettuce/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dependencies {

api libs.managed.lettuce
api libs.micronaut.cache
// FIXME
implementation 'io.micronaut:micronaut-messaging:3.3.4'

compileOnly libs.micronaut.session
compileOnly libs.micronaut.management
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.micronaut.context.BeanLocator;
import io.micronaut.context.exceptions.ConfigurationException;
import io.micronaut.core.annotation.Internal;
Expand All @@ -37,6 +38,7 @@
* @since 1.0
*/
@Internal
@SuppressWarnings("rawtypes")
public class RedisConnectionUtil {
/**
* Utility method for establishing a redis connection.
Expand Down Expand Up @@ -117,6 +119,27 @@ public static StatefulConnection<byte[], byte[]> openBytesRedisConnection(BeanLo
throw new ConfigurationException(errorMessage);
}

/**
* Utility method for opening a new bytes redis pubsub connection.
*
* @param beanLocator The bean locator to use
* @param serverName The server name to use
* @param errorMessage The error message to use if the connection can't be found
* @return The connection
* @throws ConfigurationException If the connection cannot be found
*/
public static StatefulRedisPubSubConnection<byte[], byte[]> openBytesRedisPubSubConnection(BeanLocator beanLocator, Optional<String> serverName, String errorMessage) {
Optional<RedisClusterClient> redisClusterClient = findRedisClusterClient(beanLocator, serverName);
if (redisClusterClient.isPresent()) {
return redisClusterClient.get().connectPubSub(ByteArrayCodec.INSTANCE);
}
Optional<RedisClient> redisClient = findRedisClient(beanLocator, serverName);
if (redisClient.isPresent()) {
return redisClient.get().connectPubSub(ByteArrayCodec.INSTANCE);
}
throw new ConfigurationException(errorMessage);
}

private static Optional<RedisClusterClient> findRedisClusterClient(BeanLocator beanLocator, Optional<String> serverName) {
Optional<RedisClusterClient> namedClient = serverName.flatMap(name -> beanLocator.findBean(RedisClusterClient.class, Qualifiers.byName(name)));
if (namedClient.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,8 @@ public interface RedisSetting {
* Default configuration for Redis caches pool.
*/
String REDIS_POOL = PREFIX + ".pool";
/**
* Configured Redis pubsub.
*/
String REDIS_PUBSUB = PREFIX + ".pubsub";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.micronaut.configuration.lettuce.pubsub;

import java.io.Serializable;
import java.util.Objects;

public class MessageChannel implements Serializable {

private final String value;

public MessageChannel(String value) {
this.value = value;
}

public static MessageChannel individual(String name) {
return new MessageChannel(name);
}

public static MessageChannel pattern(String pattern) {
return new MessageChannel(pattern);
}

public String getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MessageChannel that = (MessageChannel) o;
return value.equals(that.value);
}

@Override
public int hashCode() {
return Objects.hash(value);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.micronaut.configuration.lettuce.pubsub;

public class RedisMessage {

private final Object body;
private final MessageChannel channel;

public RedisMessage(Object body, MessageChannel channel) {
this.body = body;
this.channel = channel;
}

public Object getBody() {
return body;
}

public MessageChannel getChannel() {
return channel;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.micronaut.configuration.lettuce.pubsub;

import io.micronaut.configuration.lettuce.RedisSetting;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.core.serialize.ObjectSerializer;
import io.micronaut.runtime.ApplicationConfiguration;

import java.util.Optional;

@EachProperty(RedisSetting.REDIS_CACHES)
@ConfigurationProperties(RedisSetting.REDIS_PUBSUB)
public class RedisPubSubConfiguration {

protected String server;
protected Class<ObjectSerializer> channelSerializer;
protected Class<ObjectSerializer> valueSerializer;

public RedisPubSubConfiguration(ApplicationConfiguration applicationConfiguration) {}

public Optional<String> getServer() {
return Optional.ofNullable(server);
}

public Optional<Class<ObjectSerializer>> getChannelSerializer() {
return Optional.ofNullable(channelSerializer);
}

public Optional<Class<ObjectSerializer>> getValueSerializer() {
return Optional.ofNullable(valueSerializer);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

package io.micronaut.configuration.lettuce.pubsub;

import io.micronaut.core.async.subscriber.TypedSubscriber;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.type.Argument;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Function;

@SuppressWarnings({"rawtypes", "unchecked"})
public class RedisSubscriber extends TypedSubscriber<RedisMessage> {

private static final Logger LOG = LoggerFactory.getLogger(RedisSubscriber.class);

private final Function<RedisMessage, BoundExecutable> onNext;
private final Object bean;

public RedisSubscriber(Object bean, Function<RedisMessage, BoundExecutable> onNext) {
super(Argument.of(RedisMessage.class));
this.onNext = onNext;
this.bean = bean;
}

@Override
protected void doOnSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}

@Override
protected void doOnNext(RedisMessage message) {
BoundExecutable executable = onNext.apply(message);
executable.invoke(bean);
}

@Override
protected void doOnError(Throwable t) {
LOG.error("Error", t);
}

@Override
protected void doOnComplete() {
LOG.info("Closing");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.micronaut.configuration.lettuce.pubsub.annotations;

import io.micronaut.context.annotation.Executable;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.messaging.annotation.MessageMapping;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Used to specify which message channel to subscribe to and mark the handler method
*
* @author Cristian Morales
* @since TODO
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE})
@Bindable
@Inherited
@Executable
@MessageMapping
public @interface MessageChannel {

String value() default "";

String[] channels() default {};

String[] patterns() default {};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.micronaut.configuration.lettuce.pubsub.annotations;

import io.micronaut.messaging.annotation.MessageListener;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.RetentionPolicy.RUNTIME;

/**
* Class level annotation to indicate that a bean will be consumers of messages
* from Redis.
*
* @author Cristian Morales
* @since TODO
*/
@Documented
@Retention(RUNTIME)
@Target({ElementType.TYPE})
@MessageListener
@Inherited
public @interface RedisListener {

String value() default "";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.micronaut.configuration.lettuce.pubsub.bind;

import io.micronaut.configuration.lettuce.pubsub.RedisMessage;

import java.lang.annotation.Annotation;

/**
* Interface for binders that bind method arguments from a {@link RedisMessage} via annotation.
*
* @param <T> The target type
* @param <A> The annotation type
* @author Cristian Morales
* @since TODO
*/
public interface AnnotatedRedisMessageBinder<A extends Annotation, T> extends RedisArgumentBinder<T> {

/**
* @return The annotation type
*/
Class<A> annotationType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.micronaut.configuration.lettuce.pubsub.bind;

import io.micronaut.configuration.lettuce.pubsub.RedisMessage;
import io.micronaut.core.bind.ArgumentBinder;

/**
* Interface for binders that bind method arguments from a {@link RedisMessage}.
*
* @param <T> The target type
* @author Cristian Morales
*/
@SuppressWarnings("WeakerAccess")
public interface RedisArgumentBinder<T> extends ArgumentBinder<T, RedisMessage> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.lettuce.pubsub.bind;

import io.micronaut.configuration.lettuce.pubsub.RedisMessage;
import io.micronaut.core.bind.ArgumentBinder;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.ArrayUtils;
import jakarta.inject.Singleton;

import java.lang.annotation.Annotation;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;

/**
* Used to determine which {@link RedisArgumentBinder} to use for any given argument.
*
* @author Cristian Morales
* @since TODO
*/
@Singleton
public class RedisBinderRegistry implements ArgumentBinderRegistry<RedisMessage> {

private final Map<Class<? extends Annotation>, ArgumentBinder<?, RedisMessage>> byAnnotation = new LinkedHashMap<>();
private final Map<Integer, ArgumentBinder<?, RedisMessage>> byType = new LinkedHashMap<>();
private final RedisArgumentBinder<?> defaultBinder;

/**
* Default constructor.
*
* @param defaultBinder The binder to use when one cannot be found for an argument
* @param binders The list of binders to choose from to bind an argument
*/
public RedisBinderRegistry(RedisArgumentBinder<?> defaultBinder, RedisArgumentBinder<?>... binders) {
this.defaultBinder = defaultBinder;
if (ArrayUtils.isNotEmpty(binders)) {
for (RedisArgumentBinder<?> binder : binders) {
if (binder instanceof AnnotatedRedisMessageBinder) {
AnnotatedRedisMessageBinder<?, ?> annotatedBinder = (AnnotatedRedisMessageBinder<?, ?>) binder;
byAnnotation.put(annotatedBinder.annotationType(), binder);
} else if (binder instanceof TypedRedisMessageBinder) {
TypedRedisMessageBinder<?> typedBinder = (TypedRedisMessageBinder<?>) binder;
byType.put(typedBinder.argumentType().typeHashCode(), typedBinder);
}
}
}
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public <T> Optional<ArgumentBinder<T, RedisMessage>> findArgumentBinder(Argument<T> argument, RedisMessage source) {
Optional<Class<? extends Annotation>> opt = argument.getAnnotationMetadata().getAnnotationTypeByStereotype(Bindable.class);

if (opt.isPresent()) {
Class<? extends Annotation> annotationType = opt.get();
ArgumentBinder binder = byAnnotation.get(annotationType);
if (binder != null) {
return Optional.of(binder);
}
} else {
ArgumentBinder binder = byType.get(argument.typeHashCode());
if (binder != null) {
return Optional.of(binder);
}
}

return Optional.of((ArgumentBinder<T, RedisMessage>) defaultBinder);
}

}
Loading