Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,9 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio

String readConsistencyLevel =
properties.getProperty("read_consistency_level", conf.getReadConsistencyLevel());
if (readConsistencyLevel.equals("strong") || readConsistencyLevel.equals("weak")) {
if (readConsistencyLevel.equalsIgnoreCase("strong")
|| readConsistencyLevel.equalsIgnoreCase("weak")
|| readConsistencyLevel.equalsIgnoreCase("follower_read")) {
conf.setReadConsistencyLevel(readConsistencyLevel);
} else {
throw new IOException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3342,8 +3342,13 @@ public ReadConsistencyLevel getReadConsistencyLevel() {
public void setReadConsistencyLevel(String readConsistencyLevel) {
if ("weak".equalsIgnoreCase(readConsistencyLevel)) {
this.readConsistencyLevel = ReadConsistencyLevel.WEAK;
} else {
} else if ("strong".equalsIgnoreCase(readConsistencyLevel)) {
this.readConsistencyLevel = ReadConsistencyLevel.STRONG;
} else if ("follower_read".equalsIgnoreCase(readConsistencyLevel)) {
this.readConsistencyLevel = ReadConsistencyLevel.FOLLOWER_READ;
} else {
throw new IllegalArgumentException(
String.format("Unknown readConsistencyLevel %s", readConsistencyLevel));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ protected TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplica
throw new IllegalArgumentException(
String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
}
boolean selectRandomDataNode = ReadConsistencyLevel.WEAK == this.readConsistencyLevel;

// When planning fragment onto specific DataNode, the DataNode whose endPoint is in
// black list won't be considered because it may have connection issue now.
Expand All @@ -133,13 +132,34 @@ protected TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplica
if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) {
LOGGER.info("available replicas: {}", availableDataNodes);
}
int targetIndex = getTargetIndex(availableDataNodes);
return availableDataNodes.get(targetIndex);
}

private int getTargetIndex(List<TDataNodeLocation> availableDataNodes) {
// if only one node is available, just return 0
if (availableDataNodes.size() == 1) {
return 0;
}

int targetIndex;
if (!selectRandomDataNode || queryContext.getSession() == null) {
if (ReadConsistencyLevel.STRONG == this.readConsistencyLevel
|| queryContext.getSession() == null) {
targetIndex = 0;
} else {
} else if (ReadConsistencyLevel.WEAK == this.readConsistencyLevel) {
targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size());
} else if (ReadConsistencyLevel.FOLLOWER_READ == this.readConsistencyLevel) {
// The first available data node is always leader which is guaranteed by ConfigNode and
// PartitionFetcher in DataNode
// We only need to randomly choose any one from [1, availableDataNodes.size()).
// SessionId is a unchanged long value for each connection, so we can use that as random seed
targetIndex =
(int) (queryContext.getSession().getSessionId() % (availableDataNodes.size() - 1)) + 1;
} else {
throw new IllegalArgumentException(
String.format("Unknown readConsistencyLevel %s", readConsistencyLevel));
}
return availableDataNodes.get(targetIndex);
return targetIndex;
}

protected FragmentInstance findDownStreamInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,7 @@ text_compressor=LZ4
# These consistency levels are currently supported:
# 1. strong(Default, read from the leader replica)
# 2. weak(Read from a random replica)
# 3. follower_read(If there are available follower replicas, select one from any of the follower replicas.)
# effectiveMode: restart
# Datatype: string
read_consistency_level=strong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@

public enum ReadConsistencyLevel {
STRONG,
WEAK
WEAK,
FOLLOWER_READ
}
Loading