diff --git a/redis-lettuce/build.gradle b/redis-lettuce/build.gradle index 9479d841d..d5da720c7 100644 --- a/redis-lettuce/build.gradle +++ b/redis-lettuce/build.gradle @@ -7,6 +7,8 @@ dependencies { api libs.managed.lettuce api libs.micronaut.cache + // FIXME + implementation 'io.micronaut:micronaut-messaging:3.3.4' compileOnly libs.micronaut.session compileOnly libs.micronaut.management diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/RedisConnectionUtil.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/RedisConnectionUtil.java index 5519fc171..e919ce893 100644 --- a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/RedisConnectionUtil.java +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/RedisConnectionUtil.java @@ -22,6 +22,7 @@ import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.codec.ByteArrayCodec; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.micronaut.context.BeanLocator; import io.micronaut.context.exceptions.ConfigurationException; import io.micronaut.core.annotation.Internal; @@ -37,6 +38,7 @@ * @since 1.0 */ @Internal +@SuppressWarnings("rawtypes") public class RedisConnectionUtil { /** * Utility method for establishing a redis connection. @@ -117,6 +119,27 @@ public static StatefulConnection openBytesRedisConnection(BeanLo throw new ConfigurationException(errorMessage); } + /** + * Utility method for opening a new bytes redis pubsub connection. + * + * @param beanLocator The bean locator to use + * @param serverName The server name to use + * @param errorMessage The error message to use if the connection can't be found + * @return The connection + * @throws ConfigurationException If the connection cannot be found + */ + public static StatefulRedisPubSubConnection openBytesRedisPubSubConnection(BeanLocator beanLocator, Optional serverName, String errorMessage) { + Optional redisClusterClient = findRedisClusterClient(beanLocator, serverName); + if (redisClusterClient.isPresent()) { + return redisClusterClient.get().connectPubSub(ByteArrayCodec.INSTANCE); + } + Optional redisClient = findRedisClient(beanLocator, serverName); + if (redisClient.isPresent()) { + return redisClient.get().connectPubSub(ByteArrayCodec.INSTANCE); + } + throw new ConfigurationException(errorMessage); + } + private static Optional findRedisClusterClient(BeanLocator beanLocator, Optional serverName) { Optional namedClient = serverName.flatMap(name -> beanLocator.findBean(RedisClusterClient.class, Qualifiers.byName(name))); if (namedClient.isPresent()) { diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/RedisSetting.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/RedisSetting.java index 3fb288039..3453c2eb5 100644 --- a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/RedisSetting.java +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/RedisSetting.java @@ -53,4 +53,8 @@ public interface RedisSetting { * Default configuration for Redis caches pool. */ String REDIS_POOL = PREFIX + ".pool"; + /** + * Configured Redis pubsub. + */ + String REDIS_PUBSUB = PREFIX + ".pubsub"; } diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/MessageChannel.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/MessageChannel.java new file mode 100644 index 000000000..12a46ed3b --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/MessageChannel.java @@ -0,0 +1,39 @@ +package io.micronaut.configuration.lettuce.pubsub; + +import java.io.Serializable; +import java.util.Objects; + +public class MessageChannel implements Serializable { + + private final String value; + + public MessageChannel(String value) { + this.value = value; + } + + public static MessageChannel individual(String name) { + return new MessageChannel(name); + } + + public static MessageChannel pattern(String pattern) { + return new MessageChannel(pattern); + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MessageChannel that = (MessageChannel) o; + return value.equals(that.value); + } + + @Override + public int hashCode() { + return Objects.hash(value); + } + +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/RedisMessage.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/RedisMessage.java new file mode 100644 index 000000000..799564b16 --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/RedisMessage.java @@ -0,0 +1,21 @@ +package io.micronaut.configuration.lettuce.pubsub; + +public class RedisMessage { + + private final Object body; + private final MessageChannel channel; + + public RedisMessage(Object body, MessageChannel channel) { + this.body = body; + this.channel = channel; + } + + public Object getBody() { + return body; + } + + public MessageChannel getChannel() { + return channel; + } + +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/RedisPubSubConfiguration.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/RedisPubSubConfiguration.java new file mode 100644 index 000000000..cff999957 --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/RedisPubSubConfiguration.java @@ -0,0 +1,33 @@ +package io.micronaut.configuration.lettuce.pubsub; + +import io.micronaut.configuration.lettuce.RedisSetting; +import io.micronaut.context.annotation.ConfigurationProperties; +import io.micronaut.context.annotation.EachProperty; +import io.micronaut.core.serialize.ObjectSerializer; +import io.micronaut.runtime.ApplicationConfiguration; + +import java.util.Optional; + +@EachProperty(RedisSetting.REDIS_CACHES) +@ConfigurationProperties(RedisSetting.REDIS_PUBSUB) +public class RedisPubSubConfiguration { + + protected String server; + protected Class channelSerializer; + protected Class valueSerializer; + + public RedisPubSubConfiguration(ApplicationConfiguration applicationConfiguration) {} + + public Optional getServer() { + return Optional.ofNullable(server); + } + + public Optional> getChannelSerializer() { + return Optional.ofNullable(channelSerializer); + } + + public Optional> getValueSerializer() { + return Optional.ofNullable(valueSerializer); + } + +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/RedisSubscriber.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/RedisSubscriber.java new file mode 100644 index 000000000..af6c868bd --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/RedisSubscriber.java @@ -0,0 +1,49 @@ + +package io.micronaut.configuration.lettuce.pubsub; + +import io.micronaut.core.async.subscriber.TypedSubscriber; +import io.micronaut.core.bind.BoundExecutable; +import io.micronaut.core.type.Argument; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Function; + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class RedisSubscriber extends TypedSubscriber { + + private static final Logger LOG = LoggerFactory.getLogger(RedisSubscriber.class); + + private final Function onNext; + private final Object bean; + + public RedisSubscriber(Object bean, Function onNext) { + super(Argument.of(RedisMessage.class)); + this.onNext = onNext; + this.bean = bean; + } + + @Override + protected void doOnSubscribe(Subscription subscription) { + this.subscription = subscription; + this.subscription.request(1); + } + + @Override + protected void doOnNext(RedisMessage message) { + BoundExecutable executable = onNext.apply(message); + executable.invoke(bean); + } + + @Override + protected void doOnError(Throwable t) { + LOG.error("Error", t); + } + + @Override + protected void doOnComplete() { + LOG.info("Closing"); + } + +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/annotations/MessageChannel.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/annotations/MessageChannel.java new file mode 100644 index 000000000..584b2109f --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/annotations/MessageChannel.java @@ -0,0 +1,35 @@ +package io.micronaut.configuration.lettuce.pubsub.annotations; + +import io.micronaut.context.annotation.Executable; +import io.micronaut.core.bind.annotation.Bindable; +import io.micronaut.messaging.annotation.MessageMapping; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Used to specify which message channel to subscribe to and mark the handler method + * + * @author Cristian Morales + * @since TODO + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE}) +@Bindable +@Inherited +@Executable +@MessageMapping +public @interface MessageChannel { + + String value() default ""; + + String[] channels() default {}; + + String[] patterns() default {}; + +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/annotations/RedisListener.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/annotations/RedisListener.java new file mode 100644 index 000000000..b915f5931 --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/annotations/RedisListener.java @@ -0,0 +1,29 @@ +package io.micronaut.configuration.lettuce.pubsub.annotations; + +import io.micronaut.messaging.annotation.MessageListener; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * Class level annotation to indicate that a bean will be consumers of messages + * from Redis. + * + * @author Cristian Morales + * @since TODO + */ +@Documented +@Retention(RUNTIME) +@Target({ElementType.TYPE}) +@MessageListener +@Inherited +public @interface RedisListener { + + String value() default ""; + +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/AnnotatedRedisMessageBinder.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/AnnotatedRedisMessageBinder.java new file mode 100644 index 000000000..78ea11aa2 --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/AnnotatedRedisMessageBinder.java @@ -0,0 +1,21 @@ +package io.micronaut.configuration.lettuce.pubsub.bind; + +import io.micronaut.configuration.lettuce.pubsub.RedisMessage; + +import java.lang.annotation.Annotation; + +/** + * Interface for binders that bind method arguments from a {@link RedisMessage} via annotation. + * + * @param The target type + * @param The annotation type + * @author Cristian Morales + * @since TODO + */ +public interface AnnotatedRedisMessageBinder extends RedisArgumentBinder { + + /** + * @return The annotation type + */ + Class annotationType(); +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/RedisArgumentBinder.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/RedisArgumentBinder.java new file mode 100644 index 000000000..4a4e61a7a --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/RedisArgumentBinder.java @@ -0,0 +1,14 @@ +package io.micronaut.configuration.lettuce.pubsub.bind; + +import io.micronaut.configuration.lettuce.pubsub.RedisMessage; +import io.micronaut.core.bind.ArgumentBinder; + +/** + * Interface for binders that bind method arguments from a {@link RedisMessage}. + * + * @param The target type + * @author Cristian Morales + */ +@SuppressWarnings("WeakerAccess") +public interface RedisArgumentBinder extends ArgumentBinder { +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/RedisBinderRegistry.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/RedisBinderRegistry.java new file mode 100644 index 000000000..57b704ab8 --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/RedisBinderRegistry.java @@ -0,0 +1,86 @@ +/* + * Copyright 2017-2020 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.lettuce.pubsub.bind; + +import io.micronaut.configuration.lettuce.pubsub.RedisMessage; +import io.micronaut.core.bind.ArgumentBinder; +import io.micronaut.core.bind.ArgumentBinderRegistry; +import io.micronaut.core.bind.annotation.Bindable; +import io.micronaut.core.type.Argument; +import io.micronaut.core.util.ArrayUtils; +import jakarta.inject.Singleton; + +import java.lang.annotation.Annotation; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Used to determine which {@link RedisArgumentBinder} to use for any given argument. + * + * @author Cristian Morales + * @since TODO + */ +@Singleton +public class RedisBinderRegistry implements ArgumentBinderRegistry { + + private final Map, ArgumentBinder> byAnnotation = new LinkedHashMap<>(); + private final Map> byType = new LinkedHashMap<>(); + private final RedisArgumentBinder defaultBinder; + + /** + * Default constructor. + * + * @param defaultBinder The binder to use when one cannot be found for an argument + * @param binders The list of binders to choose from to bind an argument + */ + public RedisBinderRegistry(RedisArgumentBinder defaultBinder, RedisArgumentBinder... binders) { + this.defaultBinder = defaultBinder; + if (ArrayUtils.isNotEmpty(binders)) { + for (RedisArgumentBinder binder : binders) { + if (binder instanceof AnnotatedRedisMessageBinder) { + AnnotatedRedisMessageBinder annotatedBinder = (AnnotatedRedisMessageBinder) binder; + byAnnotation.put(annotatedBinder.annotationType(), binder); + } else if (binder instanceof TypedRedisMessageBinder) { + TypedRedisMessageBinder typedBinder = (TypedRedisMessageBinder) binder; + byType.put(typedBinder.argumentType().typeHashCode(), typedBinder); + } + } + } + } + + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public Optional> findArgumentBinder(Argument argument, RedisMessage source) { + Optional> opt = argument.getAnnotationMetadata().getAnnotationTypeByStereotype(Bindable.class); + + if (opt.isPresent()) { + Class annotationType = opt.get(); + ArgumentBinder binder = byAnnotation.get(annotationType); + if (binder != null) { + return Optional.of(binder); + } + } else { + ArgumentBinder binder = byType.get(argument.typeHashCode()); + if (binder != null) { + return Optional.of(binder); + } + } + + return Optional.of((ArgumentBinder) defaultBinder); + } + +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/RedisBodyBinder.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/RedisBodyBinder.java new file mode 100644 index 000000000..e902ee61f --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/RedisBodyBinder.java @@ -0,0 +1,26 @@ +package io.micronaut.configuration.lettuce.pubsub.bind; + +import io.micronaut.configuration.lettuce.pubsub.RedisMessage; +import io.micronaut.core.convert.ArgumentConversionContext; +import io.micronaut.core.convert.ConversionService; +import io.micronaut.messaging.annotation.MessageBody; +import jakarta.inject.Singleton; + +import java.util.Optional; + +@Singleton +public class RedisBodyBinder implements AnnotatedRedisMessageBinder{ + + @Override + public Class annotationType() { + return MessageBody.class; + } + + @Override + public BindingResult bind(ArgumentConversionContext context, RedisMessage source) { + Object value = source.getBody(); + Optional converted = ConversionService.SHARED.convert(value, context); + return () -> converted; + } + +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/TypedRedisMessageBinder.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/TypedRedisMessageBinder.java new file mode 100644 index 000000000..502647b7f --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/bind/TypedRedisMessageBinder.java @@ -0,0 +1,29 @@ +/* + * Copyright 2017-2020 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.lettuce.pubsub.bind; + +import io.micronaut.configuration.lettuce.pubsub.RedisMessage; +import io.micronaut.core.bind.TypeArgumentBinder; + +/** + * Allows binding by type. + * + * @param The target type + * @author Cristian Morales + * @since TODO + */ +public interface TypedRedisMessageBinder extends TypeArgumentBinder, RedisArgumentBinder { +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/executor/RedisConsumerExecutorFactory.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/executor/RedisConsumerExecutorFactory.java new file mode 100644 index 000000000..9ac5a7cfe --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/executor/RedisConsumerExecutorFactory.java @@ -0,0 +1,48 @@ +/* + * Copyright 2017-2020 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.lettuce.pubsub.executor; + +import io.micronaut.context.annotation.Bean; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Requires; +import io.micronaut.scheduling.TaskExecutors; +import io.micronaut.scheduling.executor.ExecutorConfiguration; +import io.micronaut.scheduling.executor.ExecutorType; +import io.micronaut.scheduling.executor.UserExecutorConfiguration; +import jakarta.inject.Named; +import jakarta.inject.Singleton; + +/** + * Configures a {@link java.util.concurrent.ScheduledExecutorService} for running {@link io.micronaut.configuration.lettuce.pubsub.annotations.RedisListener} instances. + * + * @author Cristian Morales + * @since TODO + */ +@Requires(missingProperty = ExecutorConfiguration.PREFIX_CONSUMER) +@Factory +public class RedisConsumerExecutorFactory { + + /** + * @return The executor configurations + */ + @Singleton + @Bean + @Named(TaskExecutors.MESSAGE_CONSUMER) + public ExecutorConfiguration executor() { + return UserExecutorConfiguration.of(ExecutorType.FIXED); + } + +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/processor/ChannelHandlersProcessor.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/processor/ChannelHandlersProcessor.java new file mode 100644 index 000000000..1382cd400 --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/processor/ChannelHandlersProcessor.java @@ -0,0 +1,105 @@ +package io.micronaut.configuration.lettuce.pubsub.processor; + +import io.micronaut.configuration.lettuce.pubsub.RedisMessage; +import io.micronaut.configuration.lettuce.pubsub.RedisSubscriber; +import io.micronaut.configuration.lettuce.pubsub.annotations.MessageChannel; +import io.micronaut.configuration.lettuce.pubsub.annotations.RedisListener; +import io.micronaut.configuration.lettuce.pubsub.bind.RedisBinderRegistry; +import io.micronaut.context.BeanContext; +import io.micronaut.context.Qualifier; +import io.micronaut.context.processor.ExecutableMethodProcessor; +import io.micronaut.core.annotation.AnnotationValue; +import io.micronaut.core.bind.BoundExecutable; +import io.micronaut.core.bind.DefaultExecutableBinder; +import io.micronaut.core.bind.ExecutableBinder; +import io.micronaut.inject.BeanDefinition; +import io.micronaut.inject.ExecutableMethod; +import io.micronaut.inject.qualifiers.Qualifiers; +import jakarta.inject.Singleton; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Set; + +@Singleton +public class ChannelHandlersProcessor implements ExecutableMethodProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(RedisSubscriber.class); + + private final RedisPubSub redisPubSub; + private final BeanContext beanContext; + private final RedisBinderRegistry redisBinderRegistry; + + public ChannelHandlersProcessor(RedisPubSub redisPubSub, BeanContext beanContext, + RedisBinderRegistry redisBinderRegistry) { + this.redisPubSub = redisPubSub; + this.beanContext = beanContext; + this.redisBinderRegistry = redisBinderRegistry; + initializeListeners(); + } + + @Override + public void process(BeanDefinition beanDefinition, ExecutableMethod method) { + AnnotationValue subscriberConfig = beanDefinition.getAnnotation(RedisListener.class); + + if (subscriberConfig == null) { + // TODO: add proper logging + LOG.warn("Missing RedisListener"); + return; + } + + AnnotationValue messageChannelAnnotation = method.getAnnotation(MessageChannel.class); + + final ExecutableBinder binder = new DefaultExecutableBinder<>(); + + MessageHandler messageHandler = (message) -> binder.bind(method, redisBinderRegistry, message); + Object bean = getExecutableMethodBean(beanContext, beanDefinition, method); + RedisSubscriber subscriber = new RedisSubscriber(bean, messageHandler::handle); + redisPubSub.subscribe(getMessageChannels(messageChannelAnnotation), subscriber); + } + + @FunctionalInterface + private interface MessageHandler { + BoundExecutable handle(RedisMessage redisMessage); + } + + /** + * Pre-initialize singletons before processing + */ + private void initializeListeners() { + this.beanContext.getBeanDefinitions(Qualifiers.byType(RedisListener.class)) + .stream().filter(BeanDefinition::isSingleton).forEach(definition -> { + try { + beanContext.getBean(definition.getBeanType()); + } catch (Exception e) { + // TODO: Create proper exception + throw new RuntimeException( + "Error creating bean for @RedisListener of type [" + definition.getBeanType() + "]: " + e.getMessage(), e + ); + } + }); + } + + @SuppressWarnings("unchecked") + private static Object getExecutableMethodBean(BeanContext beanContext, BeanDefinition beanDefinition, + ExecutableMethod method) { + Qualifier qualifier = beanDefinition + .getAnnotationNameByStereotype(jakarta.inject.Qualifier.class) + .map(type -> Qualifiers.byAnnotation(beanDefinition, type)) + .orElse(null); + + Class beanType = (Class) beanDefinition.getBeanType(); + + return beanContext.findBean(beanType, qualifier) + .orElseThrow(() -> new RuntimeException("Could not find the bean to execute the method " + method)); + } + + private static Set getMessageChannels(AnnotationValue annotationValue) { + io.micronaut.configuration.lettuce.pubsub.MessageChannel channels = annotationValue.stringValue() + .map(io.micronaut.configuration.lettuce.pubsub.MessageChannel::individual).orElse(null); + + return Collections.singleton(channels); + } + +} diff --git a/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/processor/RedisPubSub.java b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/processor/RedisPubSub.java new file mode 100644 index 000000000..a4fbc843b --- /dev/null +++ b/redis-lettuce/src/main/java/io/micronaut/configuration/lettuce/pubsub/processor/RedisPubSub.java @@ -0,0 +1,130 @@ +package io.micronaut.configuration.lettuce.pubsub.processor; + +import io.lettuce.core.pubsub.RedisPubSubListener; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; +import io.micronaut.configuration.lettuce.RedisConnectionUtil; +import io.micronaut.configuration.lettuce.pubsub.MessageChannel; +import io.micronaut.configuration.lettuce.pubsub.RedisMessage; +import io.micronaut.configuration.lettuce.pubsub.RedisPubSubConfiguration; +import io.micronaut.configuration.lettuce.pubsub.RedisSubscriber; +import io.micronaut.context.BeanLocator; +import io.micronaut.core.async.publisher.Publishers; +import io.micronaut.core.convert.ConversionService; +import io.micronaut.core.serialize.JdkSerializer; +import io.micronaut.core.serialize.ObjectSerializer; +import io.micronaut.scheduling.TaskExecutors; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import org.reactivestreams.Publisher; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +@Singleton +public class RedisPubSub { + + private final ObjectSerializer channelSerializer; + private final ObjectSerializer valueSerializer; + private final RedisPubSubCommands syncCommands; + private final StatefulRedisPubSubConnection connection; + private final MessageRouter messageRouter = new MessageRouter(); + private final ExecutorService executorService; + private RedisSubscriber subscriber; + + public RedisPubSub( + @Named(TaskExecutors.MESSAGE_CONSUMER) ExecutorService executorService, + ConversionService conversionService, + BeanLocator beanLocator) { + + this.executorService = executorService; + + RedisPubSubConfiguration configuration = new RedisPubSubConfiguration(null); + this.valueSerializer = configuration.getValueSerializer() + .flatMap(beanLocator::findOrInstantiateBean) + .orElseGet(() -> new JdkSerializer(conversionService)); + + this.channelSerializer = configuration.getChannelSerializer() + .flatMap(beanLocator::findOrInstantiateBean) + .orElseGet(() -> new JdkSerializer(conversionService)); + + this.connection = RedisConnectionUtil.openBytesRedisPubSubConnection(beanLocator, + configuration.getServer(), "No Redis server configured."); + + // TODO: use reactive instead ? + this.syncCommands = connection.sync(); + + this.connection.addListener(messageRouter); + } + + void subscribe(Set channels, RedisSubscriber subscriber) { + syncCommands.subscribe(channels.stream().map(this::serializeChannel).toArray(byte[][]::new)); + this.messageRouter.addSubscriber(channels, subscriber); + this.subscriber = subscriber; + } + + public void publish(MessageChannel channel, String message) { + syncCommands.publish(serializeChannel(channel), serializeValue(message)); + } + + private byte[] serializeChannel(MessageChannel channel) { + // FIXME -- proper serialization + return channel.getValue().getBytes(StandardCharsets.UTF_8); + } + + private byte[] serializeValue(Object value) { + return valueSerializer.serialize(value).orElse(null); + } + + private class MessageRouter implements RedisPubSubListener { + + final Map> subscribers = new HashMap<>(); + + void addSubscriber(Set channels, RedisSubscriber subscriber) { + // TODO + channels.forEach(channel -> subscribers.put(channel, Collections.singleton(subscriber))); + } + + @Override + public void message(byte[] channelArg, byte[] message) { + // FIXME -- Proper conversion + MessageChannel channel = MessageChannel.individual(new String(channelArg)); + + subscribers.get(channel).forEach(subscriber -> executorService.submit(() -> { + Publisher publisher = Publishers.just(new RedisMessage(message, channel)); + publisher.subscribe(subscriber); + })); + } + + @Override + public void message(byte[] pattern, byte[] channel, byte[] message) { + // TODO + } + + @Override + public void subscribed(byte[] channel, long count) { + // TODO + } + + @Override + public void psubscribed(byte[] pattern, long count) { + // TODO + } + + @Override + public void unsubscribed(byte[] channel, long count) { + // TODO + } + + @Override + public void punsubscribed(byte[] pattern, long count) { + // TODO + } + + } + +} diff --git a/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/PubSubBaseSpec.groovy b/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/PubSubBaseSpec.groovy new file mode 100644 index 000000000..e86d66c9f --- /dev/null +++ b/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/PubSubBaseSpec.groovy @@ -0,0 +1,55 @@ +package io.micronaut.configuration.lettuce.pubsub + +import io.micronaut.configuration.lettuce.pubsub.executor.RedisConsumerExecutorFactory +import io.micronaut.configuration.lettuce.pubsub.processor.RedisPubSub +import io.micronaut.context.ApplicationContext +import io.micronaut.context.BeanLocator +import io.micronaut.context.Qualifier +import io.micronaut.core.convert.DefaultConversionService +import io.micronaut.inject.qualifiers.Qualifiers +import io.micronaut.runtime.ApplicationConfiguration +import io.micronaut.scheduling.TaskExecutors +import spock.lang.AutoCleanup +import spock.lang.Specification + +import java.time.Instant +import java.util.concurrent.ExecutorService + +abstract class PubSubBaseSpec extends Specification { + + @AutoCleanup("stop") + ApplicationContext app + + // Extra connection for sending messages + RedisPubSub publisher + + public static final String DUMMY_MESSAGE = """ { "message": ["hello", "world"], "timestamp": ${Instant.now().toEpochMilli()} } """ + public static final String DEFAULT_CHANNEL_NAME = "individual_channel" + public static final MessageChannel DEFAULT_CHANNEL = MessageChannel.individual(DEFAULT_CHANNEL_NAME) + + def setup() { + app = ApplicationContext.run('redis.type': 'embedded') + publisher = newPubSubInstance() + } + + protected T getBean(Class type) { + return app.getBean(type) + } + + def publishDummyMessage() { + publish(DUMMY_MESSAGE) + } + + def publish(String msg, MessageChannel channel = DEFAULT_CHANNEL) { + publisher.publish(channel, msg) + } + + protected RedisPubSub newPubSubInstance(Map props = [:]) { + ApplicationConfiguration appConfig = new ApplicationConfiguration() + def executor = app.getBean(ExecutorService, Qualifiers.byName(TaskExecutors.MESSAGE_CONSUMER)) + + return new RedisPubSub(executor, new DefaultConversionService(), + app.getBean(BeanLocator)) + } + +} diff --git a/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/RedisPubSubSpec.groovy b/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/RedisPubSubSpec.groovy new file mode 100644 index 000000000..7480d049e --- /dev/null +++ b/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/RedisPubSubSpec.groovy @@ -0,0 +1,30 @@ +package io.micronaut.configuration.lettuce.pubsub + +import io.micronaut.configuration.lettuce.pubsub.processor.RedisPubSub +import spock.lang.Ignore + +class RedisPubSubSpec extends PubSubBaseSpec { + + RedisPubSub target + + def setup() { + target = newPubSubInstance() + } + + @Ignore + def "Test simple subscriber" () { + given: + boolean sense = false + RedisSubscriber subscriber = new RedisSubscriber({ sense = true }) + + when: + target.subscribe(DEFAULT_CHANNEL, subscriber) + + and: + publish(DUMMY_MESSAGE, DEFAULT_CHANNEL) + + then: + assert sense + } + +} diff --git a/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/subscriber/MessageChannelSubscriber.java b/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/subscriber/MessageChannelSubscriber.java new file mode 100644 index 000000000..308e9e1e4 --- /dev/null +++ b/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/subscriber/MessageChannelSubscriber.java @@ -0,0 +1,20 @@ +package io.micronaut.configuration.lettuce.pubsub.subscriber; + +import io.micronaut.configuration.lettuce.pubsub.annotations.MessageChannel; +import io.micronaut.configuration.lettuce.pubsub.annotations.RedisListener; +import io.micronaut.messaging.annotation.MessageBody; + +import java.util.ArrayList; +import java.util.List; + +@RedisListener +class MessageChannelSubscriber { + + public List log = new ArrayList<>(); + + @MessageChannel("individual_channel") + public void handle(@MessageBody byte[] message) { + log.add(message); + } + +} diff --git a/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/subscriber/RedisSubscriberSpec.groovy b/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/subscriber/RedisSubscriberSpec.groovy new file mode 100644 index 000000000..9c2adfe20 --- /dev/null +++ b/redis-lettuce/src/test/groovy/io/micronaut/configuration/lettuce/pubsub/subscriber/RedisSubscriberSpec.groovy @@ -0,0 +1,21 @@ +package io.micronaut.configuration.lettuce.pubsub.subscriber + +import io.micronaut.configuration.lettuce.pubsub.PubSubBaseSpec + +class RedisSubscriberSpec extends PubSubBaseSpec { + + MessageChannelSubscriber target + + def setup() { + target = getBean(MessageChannelSubscriber) + } + + def "Message handling test" () { + when: + publishDummyMessage() + + then: + target.log.size() == 1 + } + +}