Skip to content
Open

Sp31 #14

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@
<phase>generate-sources</phase>
<configuration>
<target>
<property name="protobuf.src.dir" location="${basedir}/src/protobuf"/>
<property name="protobuf.src.dir" location="${basedir}/src/proto"/>
<property name="protobuf.build.dir" location="${basedir}/src/main/java"/>
<echo>Building Shuffle Message Protobuf</echo>
<mkdir dir="${protobuf.build.dir}"/>
Expand Down
32 changes: 27 additions & 5 deletions src/main/java/com/oppo/shuttle/rss/ShuffleServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public class ShuffleServerConfig {

public static final int DEFAULT_MAX_OPEN_FILES = 60000;
private int maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
private int maxNumPartitions = 8000;
private boolean enableWhiteListCheck = false;

private long idleTimeoutMillis = Math.max(Constants.CLI_CONN_IDLE_TIMEOUT_MS + TimeUnit.MINUTES.toMillis(1),
Constants.SERVER_CONNECTION_IDLE_TIMEOUT_MILLIS_DEFAULT);
Expand All @@ -117,7 +119,7 @@ public class ShuffleServerConfig {
private long updateDelay = 30 * 1000L;
private long appControlInterval = 3600 * 1000L;
private int numAppResourcePerInterval = 20;
private int appNamePreLen = 25;
private int appNamePreLen = 100;
private long blackListRefreshInterval = 300 * 1000L;
private long clearShuffleDirInterval = 300 * 1000L;

Expand Down Expand Up @@ -166,7 +168,7 @@ public void setBaseConnections(int baseConnections) {
private int baseConnections = 5000;
private long connectionTimeoutInterval = 5 * 60 * 1000L;
private long connectRetryInterval = 10 * 1000L;
private long flowControlBuildIdTimeout = 60 * 1000L;
private long flowControlBuildIdTimeout = 30 * 1000L;

public long getConnectionTimeoutInterval() {
return connectionTimeoutInterval;
Expand Down Expand Up @@ -334,6 +336,8 @@ public static ShuffleServerConfig buildFromArgs(String[] args) throws IOExceptio
options.addOption("flowControlBuildIdTimeout", true, "build connection id timeout, default 60000ms");
options.addOption("appNamePreLen", true, "App control pre length");
options.addOption("maxOpenFiles", true, "Maximum number of open files");
options.addOption("maxNumPartitions", true, "Maximum number of partitions");
options.addOption("enableWhiteListCheck", true, "Whether to enable app whitelist check");

CommandLineParser parser = new BasicParser();
HelpFormatter formatter = new HelpFormatter();
Expand Down Expand Up @@ -364,8 +368,8 @@ public static ShuffleServerConfig buildFromArgs(String[] args) throws IOExceptio
serverConfig.heartBeatThreads = Integer.parseInt(cmd.getOptionValue("heartBeatThreads", "5"));
serverConfig.masterPort = Integer.parseInt(cmd.getOptionValue("masterPort", "19189"));
serverConfig.workerPunishMills = Integer.parseInt(cmd.getOptionValue("workerPunishMills", "300000"));
serverConfig.workerCheckInterval = Integer.parseInt(cmd.getOptionValue("workerCheckInterval", "15000"));
serverConfig.maxThroughputPerMin = Long.parseLong(cmd.getOptionValue("maxThroughputPerMin", "4294967296"));
serverConfig.workerCheckInterval = Integer.parseInt(cmd.getOptionValue("workerCheckInterval", "60000"));
serverConfig.maxThroughputPerMin = Long.parseLong(cmd.getOptionValue("maxThroughputPerMin", "8589934592"));
serverConfig.maxHoldDataSize = Long.parseLong(cmd.getOptionValue("maxHoldDataSize", "21474836480"));
serverConfig.maxFlowControlTimes = Integer.parseInt(cmd.getOptionValue("maxFlowControlTimes", "10"));
serverConfig.shuffleProcessThreads = Integer.parseInt(cmd.getOptionValue("nettyWorkerThreads", "32"));
Expand Down Expand Up @@ -401,8 +405,12 @@ public static ShuffleServerConfig buildFromArgs(String[] args) throws IOExceptio
serverConfig.blackListRefreshInterval = Long.parseLong(cmd.getOptionValue("blackListRefreshInterval", "300000"));
serverConfig.filterExcludes = cmd.getOptionValue("filterExcludes", "ors2,livy-session");
serverConfig.flowControlBuildIdTimeout = Long.parseLong(cmd.getOptionValue("flowControlBuildIdTimeout", "60000"));
serverConfig.appNamePreLen = Integer.parseInt(cmd.getOptionValue("appNamePreLen", "25"));
serverConfig.appNamePreLen = Integer.parseInt(cmd.getOptionValue("appNamePreLen", "100"));
serverConfig.maxOpenFiles = Integer.parseInt(cmd.getOptionValue("maxOpenFiles", String.valueOf(serverConfig.maxOpenFiles)));
serverConfig.maxNumPartitions = Integer.parseInt(cmd.getOptionValue("maxNumPartitions",
String.valueOf(serverConfig.maxNumPartitions)));
serverConfig.enableWhiteListCheck = Boolean.parseBoolean(cmd.getOptionValue("enableWhiteListCheck",
String.valueOf(serverConfig.enableWhiteListCheck)));

serverConfig.storage = new ShuffleFileStorage(serverConfig.rootDir);
return serverConfig;
Expand Down Expand Up @@ -569,6 +577,18 @@ public void setMaxOpenFiles(int maxOpenFiles) {
this.maxOpenFiles = maxOpenFiles;
}

public int getMaxNumPartitions() {
return maxNumPartitions;
}

public boolean isEnableWhiteListCheck() {
return enableWhiteListCheck;
}

public void setEnableWhiteListCheck(boolean enableWhiteListCheck) {
this.enableWhiteListCheck = enableWhiteListCheck;
}

public String getShuffleMasterConfig() {
StringBuilder sb = new StringBuilder("ShuffleMasterConfig{useEpoll=").append(useEpoll)
.append(", masterPort=").append(masterPort)
Expand All @@ -583,6 +603,8 @@ public String getShuffleMasterConfig() {
.append(", networkTimeout=").append(networkTimeout)
.append(", networkRetries=").append(networkRetries)
.append(", dispatchStrategy=").append(dispatchStrategy)
.append(", maxNumPartitions=").append(maxNumPartitions)
.append(", enableWhiteListCheck=").append(enableWhiteListCheck)
.append("}");
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ public void unregisterFromMaster(ShuffleMessage.ShuffleWorkerUnregisterRequest s

}

public String masterAddress() {
if (ors2MasterClient.getChannel() == null) {
return null;
} else {
return "" + ors2MasterClient.getChannel().remoteAddress();
}
}
@Override
public void close() {
ors2MasterClient.close();
Expand Down
7 changes: 0 additions & 7 deletions src/main/java/com/oppo/shuttle/rss/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

public class Constants {
public static final int CHECK_SUM_SEQID = -1;
public static final int SHUFFLE_DATA_DUMP_THRESHOLD = 1 * 1024 *1024;
public static final int SHUFFLE_INDEX_COUNT_DUMP_THRESHOLD = 16 * 1024;
public static final int SHUFFLE_DATA_DUMPER_THREADS = 16;
public static final int STAGE_FINALIZED_CHECK_INTERVAL_MILLIS = 1000;
public static final long EMPTY_CHECKSUM_DEFAULT = -1L;
Expand All @@ -33,12 +31,8 @@ public class Constants {
public static final String DEFAULT_SUCCESS = "success";
public static final String SHUFFLE_FILE_PREFIX = "shuffle_";
public static final String SHUFFLE_DATA_FILE_POSTFIX = ".dat";
public static final String SHUFFLE_INDEX_FILE_POSTFIX = ".idx";
public static final String SHUFFLE_FINAL_DATA_FILE_POSTFIX = ".dat_final";
public static final String SHUFFLE_FINAL_INDEX_FILE_POSTFIX = ".idx_final";
public static final String SHUFFLE_FINAL_FILE_POSTFIX = "_final";
public static final String SHUFFLE_DECODER_NAME = "shuffle-decoder";
public static final String BUILD_CONN_HANDLER_NAME = "build-connection-handler";

/**
* Build version constants config
Expand All @@ -59,7 +53,6 @@ public class Constants {

public static final int SERVER_SHUTDOWN_PRIORITY = 1000;
public static final int MASTER_HTTP_SERVER_THREADS = 1;
public static final int MASTER_WORKER_STATUS_UPDATE_DELAY_MS = 30000;
public static final long SERVER_CONNECTION_IDLE_TIMEOUT_MILLIS_DEFAULT = 2 * 60 * 60 * 1000L;
public static final long CLI_CONN_IDLE_TIMEOUT_MS = 3 * 60 * 1000L;

Expand Down
42 changes: 13 additions & 29 deletions src/main/java/com/oppo/shuttle/rss/execution/FlowController.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,19 @@ public class FlowController {
private int barrierConnPerPriorityLevel;
private AtomicLong usedMemorySize = new AtomicLong();
private AtomicLong usedMemoryLastUpdateTimeStamp = new AtomicLong();
private final long CONFIGURED_MAX_JVM_MEMORY = CommonUtils.getJvmConfigMaxMemory();
private final int MEMORY_UPDATE_INTERVAL_MIL = 15 * 1000; // 5s
private final int MAX_RETRY_INDEX = 10;

// value
private Map<Integer, ConnectionValue> allConnections = new HashMap<>();
private final Map<Integer, ConnectionValue> allConnections = new HashMap<>();
private final BitSet allocateConnection;
private volatile int usedConnections = 0;

// a background executor service doing clean up timeout connections
private final ScheduledExecutorService clearTimeoutExecutorService = new DefaultEventLoop();

private double lastBusyControlNumber = 0;

public FlowController(ShuffleServerConfig serverConfig) {
this.serverConfig = serverConfig;
this.memoryContolSizeThreshold = serverConfig.getMemoryControlSizeThreshold();
Expand All @@ -87,18 +88,21 @@ public FlowController(ShuffleServerConfig serverConfig) {
} catch (Exception e) {
logger.warn("Exception occur in clearTimeOutConnections", e);
}
}, 60, 60, TimeUnit.SECONDS);
}, 30, 30, TimeUnit.SECONDS);

clearTimeoutExecutorService.scheduleAtFixedRate(() -> {
try {
double usedMemory = Ors2MetricsConstants.bufferedDataSize.get();
double nowBusyControl = Ors2MetricsConstants.busyControlTimes.get();
double diff = nowBusyControl - lastBusyControlNumber;
lastBusyControlNumber = nowBusyControl;

if (usedMemory > 1024 || usedConnections > 0) {
logger.info("flow-monitor, used memory {}MB, remaining memory {} mb; used connection {}, remaining connection {}.",
logger.info("flow-monitor, used memory {}MB, used connection {}, busyControl {}(interval {}s).",
Ors2MetricsExport.toMb(usedMemory),
Ors2MetricsExport.toMb(memoryContolSizeThreshold - usedMemory),
usedConnections,
totalConnections - usedConnections);
diff,
MEMORY_UPDATE_INTERVAL_MIL / 1000);
}
} catch (Exception e) {
logger.warn("Exception occur in clearTimeOutConnections", e);
Expand All @@ -112,32 +116,14 @@ public boolean memoryFlowControlByMetrics() {

if (memoryContolSizeThreshold < usedMemory
|| memoryControlRatioThreshold <= usedMemory / memoryContolSizeThreshold) {
logger.info("Memory flow control by metrics, usedMemory:{}, memoryContolSizeThreshold: {}, memoryControlRatioThreshold:{}",
logger.debug("Memory flow control by metrics, usedMemory:{}, memoryContolSizeThreshold: {}, memoryControlRatioThreshold:{}",
usedMemory, memoryContolSizeThreshold, memoryControlRatioThreshold);
return true;
}
return false;
}


public boolean memoryFlowControl() {
long nowMillSec = System.currentTimeMillis();
if (nowMillSec - usedMemoryLastUpdateTimeStamp.get() >= MEMORY_UPDATE_INTERVAL_MIL) {
long usedMemorySize = CommonUtils.getUsedMemory();
this.usedMemorySize.set(usedMemorySize);
this.usedMemoryLastUpdateTimeStamp.set(nowMillSec);
}

long usedMemory = this.usedMemorySize.get();
if (CONFIGURED_MAX_JVM_MEMORY < usedMemory
|| memoryControlRatioThreshold <= (double)usedMemory/CONFIGURED_MAX_JVM_MEMORY) {
logger.info("Memory flow control, usedMemory:{}, CONFIGURED_MAX_JVM_MEMORY: {}, memoryControlRatioThreshold:{}",
usedMemory, CONFIGURED_MAX_JVM_MEMORY, memoryControlRatioThreshold);
return true;
}
return false;
}

public boolean busyFlowControl(int usedConnections, int jobPriority, int retryIdx) {
retryIdx = Math.min(retryIdx, MAX_RETRY_INDEX);
if (usedConnections <= baseConnections +
Expand All @@ -159,17 +145,15 @@ public boolean busyFlowControl(int usedConnections, int jobPriority, int retryId
*/
public int buildConnection(Pair<Integer, Long> connIdValue, int jobPriority, int retryIdx) {
if (memoryFlowControlByMetrics()) {
logger.info("MemoryFlowControl jobPriority:{}, retryIdx:{}, runGcImmediately!", jobPriority, retryIdx);
//SystemUtils.runGcImmediately();
logger.debug("MemoryFlowControl jobPriority:{}, retryIdx:{}, runGcImmediately!", jobPriority, retryIdx);
Ors2MetricsConstants.memoryControlTimes.inc();
return MessageConstants.FLOW_CONTROL_MEMORY;
}

try {
synchronized (allocateConnection) {

if (busyFlowControl(usedConnections, jobPriority, retryIdx)) {
logger.info("BusyFlowControl jobPriority:{}, retryIdx:{}, usedConnections:{}, baseConnections: {}, freeConnections: {}" +
logger.debug("BusyFlowControl jobPriority:{}, retryIdx:{}, usedConnections:{}, baseConnections: {}, freeConnections: {}" +
" barrierConnPerPriorityLevel:{}",
jobPriority, retryIdx, allocateConnection.cardinality(), baseConnections, totalConnections - usedConnections, barrierConnPerPriorityLevel);
Ors2MetricsConstants.busyControlTimes.inc();
Expand Down
Loading