diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java index 4b6898d9..3a7007e4 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java @@ -158,6 +158,9 @@ public void invoke(IN input, Context context) throws Exception { case SETEX: this.redisCommandsContainer.setex(key, value, optAdditionalTTL.orElse(this.additionalTTL)); break; + case SETNX: + this.redisCommandsContainer.setnx(key,value); + break; case PFADD: this.redisCommandsContainer.pfadd(key, value); break; diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java index 018b8ce5..2f0e6387 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java @@ -166,6 +166,19 @@ public void setex(final String key, final String value, final Integer ttl) { } } + @Override + public void setnx(String key, String value) { + try { + jedisCluster.setnx(key, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command SETNX to key {} error message {}", + key, e.getMessage()); + } + throw e; + } + } + @Override public void pfadd(final String key, final String element) { try { diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java index 6f3e6c14..ae13703c 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java @@ -103,6 +103,8 @@ public interface RedisCommandsContainer extends Serializable { */ void setex(String key, String value, Integer ttl); + void setnx(String key, String value); + /** * Adds all the element arguments to the HyperLogLog data structure * stored at the variable name specified as first argument. diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java index d573bde6..15928f23 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java @@ -227,6 +227,23 @@ public void setex(final String key, final String value, final Integer ttl) { } } + @Override + public void setnx(String key, String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.setnx(key,value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command SETNX to key {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + @Override public void pfadd(final String key, final String element) { Jedis jedis = null; diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java index 858720bf..8413a7ac 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java @@ -51,9 +51,12 @@ public enum RedisCommand { */ SETEX(RedisDataType.STRING), + SETNX(RedisDataType.STRING), + /** * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument. */ + PFADD(RedisDataType.HYPER_LOG_LOG), /** diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java new file mode 100644 index 00000000..d7a3ffb3 --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper.row; + +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; + +public class SetNxMapper extends RowRedisMapper{ + + public SetNxMapper() { + super(RedisCommand.SETNX); + } + +}