From ad0d3d4e73cbc09bef8bcb8a41e143b797883085 Mon Sep 17 00:00:00 2001
From: wty4427300 <wty4427300@gmail.com>
Date: Mon, 6 Jul 2020 16:30:45 +0800
Subject: [PATCH 1/3] add setnx

---
 .../streaming/connectors/redis/RedisSink.java   |  2 ++
 .../common/container/RedisClusterContainer.java | 13 +++++++++++++
 .../container/RedisCommandsContainer.java       |  2 ++
 .../redis/common/container/RedisContainer.java  | 17 +++++++++++++++++
 .../redis/common/mapper/RedisCommand.java       |  3 +++
 .../redis/common/mapper/row/SetNxMapper.java    | 14 ++++++++++++++
 6 files changed, 51 insertions(+)
 create mode 100644 flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java

diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 4b6898d9..08fbbd59 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -158,6 +158,8 @@ public void invoke(IN input, Context context) throws Exception {
             case SETEX:
                 this.redisCommandsContainer.setex(key, value, optAdditionalTTL.orElse(this.additionalTTL));
                 break;
+            case SETNX:
+                this.redisCommandsContainer.setnx(key,value);
             case PFADD:
                 this.redisCommandsContainer.pfadd(key, value);
                 break;
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 018b8ce5..2f0e6387 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -166,6 +166,19 @@ public void setex(final String key, final String value, final Integer ttl) {
         }
     }
 
+    @Override
+    public void setnx(String key, String value) {
+        try {
+            jedisCluster.setnx(key, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command SETNX to key {} error message {}",
+                        key, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
     @Override
     public void pfadd(final String key, final String element) {
         try {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 6f3e6c14..ae13703c 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -103,6 +103,8 @@ public interface RedisCommandsContainer extends Serializable {
      */
     void setex(String key, String value, Integer ttl);
 
+    void setnx(String key, String value);
+
     /**
      * Adds all the element arguments to the HyperLogLog data structure
      * stored at the variable name specified as first argument.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index d573bde6..15928f23 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -227,6 +227,23 @@ public void setex(final String key, final String value, final Integer ttl) {
         }
     }
 
+    @Override
+    public void setnx(String key, String value) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.setnx(key,value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command SETNX to key {} error message {}",
+                        key, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
     @Override
     public void pfadd(final String key, final String element) {
         Jedis jedis = null;
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index 858720bf..8413a7ac 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -51,9 +51,12 @@ public enum RedisCommand {
      */
     SETEX(RedisDataType.STRING),
 
+    SETNX(RedisDataType.STRING),
+
     /**
      * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
      */
+
     PFADD(RedisDataType.HYPER_LOG_LOG),
 
     /**
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java
new file mode 100644
index 00000000..e559d9bc
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java
@@ -0,0 +1,14 @@
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+public class SetNxMapper extends RowRedisMapper{
+
+    public SetNxMapper() {
+        super(RedisCommand.SETNX);
+    }
+
+    public SetNxMapper(Integer ttl) {
+        super(ttl, RedisCommand.SETNX);
+    }
+}

From ea54d9289fed3d00aa6cba8f31cad82d666f932a Mon Sep 17 00:00:00 2001
From: wty4427300 <wty4427300@gmail.com>
Date: Mon, 6 Jul 2020 17:12:22 +0800
Subject: [PATCH 2/3] fix

---
 .../streaming/connectors/redis/RedisSink.java    |  1 +
 .../redis/common/mapper/row/SetNxMapper.java     | 16 ++++++++++++++++
 2 files changed, 17 insertions(+)

diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 08fbbd59..3a7007e4 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -160,6 +160,7 @@ public void invoke(IN input, Context context) throws Exception {
                 break;
             case SETNX:
                 this.redisCommandsContainer.setnx(key,value);
+                break;
             case PFADD:
                 this.redisCommandsContainer.pfadd(key, value);
                 break;
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java
index e559d9bc..21c9cd2e 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flink.streaming.connectors.redis.common.mapper.row;
 
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;

From e2af93839143939c8dd04e023c81cb20f9851abf Mon Sep 17 00:00:00 2001
From: wty4427300 <wty4427300@gmail.com>
Date: Mon, 6 Jul 2020 18:05:46 +0800
Subject: [PATCH 3/3] del code

---
 .../connectors/redis/common/mapper/row/SetNxMapper.java        | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java
index 21c9cd2e..d7a3ffb3 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetNxMapper.java
@@ -24,7 +24,4 @@ public SetNxMapper() {
         super(RedisCommand.SETNX);
     }
 
-    public SetNxMapper(Integer ttl) {
-        super(ttl, RedisCommand.SETNX);
-    }
 }