From 644a29131cfb5ceffe54415c9517b03bec187e96 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Fri, 17 Oct 2025 08:45:33 +0800 Subject: [PATCH 1/3] Support a new read_consistency_level --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 7 +++++- .../plan/AbstractFragmentParallelPlanner.java | 23 +++++++++++++++---- .../conf/iotdb-system.properties.template | 1 + .../commons/enums/ReadConsistencyLevel.java | 3 ++- 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 64d0dc6d2145..231a3b77f1da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -3342,8 +3342,13 @@ public ReadConsistencyLevel getReadConsistencyLevel() { public void setReadConsistencyLevel(String readConsistencyLevel) { if ("weak".equalsIgnoreCase(readConsistencyLevel)) { this.readConsistencyLevel = ReadConsistencyLevel.WEAK; - } else { + } else if ("strong".equalsIgnoreCase(readConsistencyLevel)) { this.readConsistencyLevel = ReadConsistencyLevel.STRONG; + } else if ("follower_read".equalsIgnoreCase(readConsistencyLevel)) { + this.readConsistencyLevel = ReadConsistencyLevel.FOLLOWER_READ; + } else { + throw new IllegalArgumentException( + String.format("Unknown readConsistencyLevel %s", readConsistencyLevel)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java index 49ed6e36ee65..bcc6e83f14c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java @@ -116,7 +116,6 @@ protected TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplica throw new IllegalArgumentException( String.format("regionReplicaSet is invalid: %s", regionReplicaSet)); } - boolean selectRandomDataNode = ReadConsistencyLevel.WEAK == this.readConsistencyLevel; // When planning fragment onto specific DataNode, the DataNode whose endPoint is in // black list won't be considered because it may have connection issue now. @@ -133,13 +132,29 @@ protected TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplica if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) { LOGGER.info("available replicas: {}", availableDataNodes); } + int targetIndex = getTargetIndex(availableDataNodes); + return availableDataNodes.get(targetIndex); + } + + private int getTargetIndex(List availableDataNodes) { int targetIndex; - if (!selectRandomDataNode || queryContext.getSession() == null) { + if (ReadConsistencyLevel.STRONG == this.readConsistencyLevel + || queryContext.getSession() == null) { targetIndex = 0; - } else { + } else if (ReadConsistencyLevel.WEAK == this.readConsistencyLevel) { targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size()); + } else if (ReadConsistencyLevel.FOLLOWER_READ == this.readConsistencyLevel) { + // The first available data node is always leader which is guaranteed by ConfigNode and + // PartitionFetcher in DataNode + // We only need to randomly choose any one from [1, availableDataNodes.size()). + // SessionId is a unchanged long value for each connection, so we can use that as random seed + targetIndex = + (int) (queryContext.getSession().getSessionId() % (availableDataNodes.size() - 1)) + 1; + } else { + throw new IllegalArgumentException( + String.format("Unknown readConsistencyLevel %s", readConsistencyLevel)); } - return availableDataNodes.get(targetIndex); + return targetIndex; } protected FragmentInstance findDownStreamInstance( diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 207c0507093b..d9a08a646ccc 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1026,6 +1026,7 @@ text_compressor=LZ4 # These consistency levels are currently supported: # 1. strong(Default, read from the leader replica) # 2. weak(Read from a random replica) +# 3. follower_read(If there are available follower replicas, select one from any of the follower replicas.) # effectiveMode: restart # Datatype: string read_consistency_level=strong diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java index 3fb8fdb4e843..70bfcf1708d8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java @@ -21,5 +21,6 @@ public enum ReadConsistencyLevel { STRONG, - WEAK + WEAK, + FOLLOWER_READ } From 2ca4436907a38a6fff3e8de7b70698b8807e55bf Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Fri, 17 Oct 2025 09:00:43 +0800 Subject: [PATCH 2/3] Support a new read_consistency_level --- .../apache/iotdb/confignode/conf/ConfigNodeDescriptor.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 0ea7a278732e..9f6d7b58793e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -359,7 +359,9 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio String readConsistencyLevel = properties.getProperty("read_consistency_level", conf.getReadConsistencyLevel()); - if (readConsistencyLevel.equals("strong") || readConsistencyLevel.equals("weak")) { + if (readConsistencyLevel.equalsIgnoreCase("strong") + || readConsistencyLevel.equalsIgnoreCase("weak") + || readConsistencyLevel.equalsIgnoreCase("follower_read")) { conf.setReadConsistencyLevel(readConsistencyLevel); } else { throw new IOException( From 5df19d5b477afc03856b3a777a8cdfe477bcb90f Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Fri, 17 Oct 2025 11:51:09 +0800 Subject: [PATCH 3/3] Fix potential / zore error while one replica --- .../plan/planner/plan/AbstractFragmentParallelPlanner.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java index bcc6e83f14c8..5e13c3025332 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java @@ -137,6 +137,11 @@ protected TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplica } private int getTargetIndex(List availableDataNodes) { + // if only one node is available, just return 0 + if (availableDataNodes.size() == 1) { + return 0; + } + int targetIndex; if (ReadConsistencyLevel.STRONG == this.readConsistencyLevel || queryContext.getSession() == null) {