Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions client-adapter/launcher/src/main/bin/startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,34 @@ in
exit;;
esac

JavaVersion=`$JAVA -version 2>&1 |awk 'NR==1{ gsub(/"/,""); print $3 }' | awk -F '.' '{print $1}'`
str=`file -L $JAVA | grep 64-bit`
JAVA_VERSION=`$JAVA -version 2>&1 |awk 'NR==1{ gsub(/"/,""); print $3 }' | awk -F '.' '{print $1}'`

JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"
if [ $JavaVersion -ge 11 ] ; then
JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow"
JAVA_OPTS="$JAVA_OPTS -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"

if [ $JAVA_VERSION -ge 11 ] ; then
#JAVA_OPTS="$JAVA_OPTS -Xlog:gc*:$base_log/gc.log:time "
JAVA_OPTS="$JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Xlog:gc*:file=$base/logs/adapter/gc.log::filecount=5,filesize=32M"
else
#JAVA_OPTS="$JAVA_OPTS -Xloggc:$base/logs/canal/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime"
JAVA_OPTS="$JAVA_OPTS -XX:+UseFastAccessorMethods -XX:+PrintAdaptiveSizePolicy -XX:+PrintTenuringDistribution"
JAVA_OPTS="$JAVA_OPTS -Xloggc:$base/logs/canal/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=32M"
JAVA_OPTS="$JAVA_OPTS -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:-PrintHeapAtGC -XX:+PrintGCApplicationStoppedTime"
JAVA_OPTS="$JAVA_OPTS -XX:+UseFastAccessorMethods -XX:-PrintReferenceGC -XX:+PrintAdaptiveSizePolicy -XX:+PrintTenuringDistribution"
JAVA_OPTS="$JAVA_OPTS -XX:+PrintSafepointStatistics -XX:PrintSafepointStatisticsCount=1 -XX:+SafepointTimeout -XX:SafepointTimeoutDelay=2000"
JAVA_OPTS="$JAVA_OPTS -XX:+UseCountedLoopSafepoints -XX:+DisableExplicitGC -XX:+ExplicitGCInvokesConcurrent -XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses"
fi

if [ -n "$str" ]; then
if [ $JavaVersion -ge 11 ] ; then
OS_ARCH=`file -L $JAVA | grep 64-bit`
if [ -n "$OS_ARCH" ]; then
if [ $JAVA_VERSION -ge 11 ] ; then
# For G1
JAVA_OPTS="-server -Xms2g -Xmx3g -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent $JAVA_OPTS"
else
JAVA_OPTS="-server -Xms2g -Xmx3g -Xmn1g -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC $JAVA_OPTS"
# JAVA_OPTS="-server -Xms2g -Xmx3g -Xmn1g -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC $JAVA_OPTS"
## JDK 1.8 2c4g [-XX:ConcGCThreads=1 -XX:ParallelGCThreads=2], 4c8g [-XX:ConcGCThreads=2 -XX:ParallelGCThreads=4]
JAVA_OPTS="$JAVA_OPTS -XX:InitialRAMPercentage=45.0 -XX:MinRAMPercentage=45.0 -XX:MaxRAMPercentage=45.0"
JAVA_OPTS="$JAVA_OPTS -XX:-UseParallelGC -XX:+UseConcMarkSweepGC -XX:ConcGCThreads=2 -XX:ParallelGCThreads=4"
JAVA_OPTS="$JAVA_OPTS -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
JAVA_OPTS="-server -XX:NewRatio=1 -XX:SurvivorRatio=2 -XX:MetaspaceSize=96m -XX:MaxMetaspaceSize=128m "
fi
else
JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m $JAVA_OPTS"
Expand Down
31 changes: 21 additions & 10 deletions deployer/src/main/bin/startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,35 @@ in
exit;;
esac

JavaVersion=`$JAVA -version 2>&1 |awk 'NR==1{ gsub(/"/,""); print $3 }' | awk -F '.' '{print $1}'`
str=`file -L $JAVA | grep 64-bit`
JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"
JAVA_VERSION=`$JAVA -version 2>&1 |awk 'NR==1{ gsub(/"/,""); print $3 }' | awk -F '.' '{print $1}'`

if [ $JavaVersion -ge 11 ] ; then
JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow"
JAVA_OPTS="$JAVA_OPTS -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"

if [ $JAVA_VERSION -ge 11 ] ; then
#JAVA_OPTS="$JAVA_OPTS -Xlog:gc*:$base_log/gc.log:time "
JAVA_OPTS="$JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Xlog:gc*:file=$base/logs/adapter/gc.log::filecount=5,filesize=32M"
else
#JAVA_OPTS="$JAVA_OPTS -Xloggc:$base/logs/canal/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime"
JAVA_OPTS="$JAVA_OPTS -XX:+AggressiveOpts -XX:+UseFastAccessorMethods -XX:+PrintAdaptiveSizePolicy -XX:+PrintTenuringDistribution"
JAVA_OPTS="$JAVA_OPTS -Xloggc:$base/logs/canal/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=32M"
JAVA_OPTS="$JAVA_OPTS -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:-PrintHeapAtGC -XX:+PrintGCApplicationStoppedTime"
JAVA_OPTS="$JAVA_OPTS -XX:+UseFastAccessorMethods -XX:-PrintReferenceGC -XX:+PrintAdaptiveSizePolicy -XX:+PrintTenuringDistribution"
JAVA_OPTS="$JAVA_OPTS -XX:+PrintSafepointStatistics -XX:PrintSafepointStatisticsCount=1 -XX:+SafepointTimeout -XX:SafepointTimeoutDelay=2000"
JAVA_OPTS="$JAVA_OPTS -XX:+UseCountedLoopSafepoints -XX:+DisableExplicitGC -XX:+ExplicitGCInvokesConcurrent -XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses"
fi

if [ -n "$str" ]; then
if [ $JavaVersion -ge 11 ] ; then
OS_ARCH=`file -L $JAVA | grep 64-bit`

if [ -n "$OS_ARCH" ]; then
if [ $JAVA_VERSION -ge 11 ] ; then
# For G1
JAVA_OPTS="-server -Xms2g -Xmx3g -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent $JAVA_OPTS"
else
JAVA_OPTS="-server -Xms2g -Xmx3g -Xmn1g -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC $JAVA_OPTS"
# JAVA_OPTS="-server -Xms2g -Xmx3g -Xmn1g -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC $JAVA_OPTS"
## JDK 1.8 2c4g [-XX:ConcGCThreads=1 -XX:ParallelGCThreads=2], 4c8g [-XX:ConcGCThreads=2 -XX:ParallelGCThreads=4]
JAVA_OPTS="$JAVA_OPTS -XX:InitialRAMPercentage=45.0 -XX:MinRAMPercentage=45.0 -XX:MaxRAMPercentage=45.0"
JAVA_OPTS="$JAVA_OPTS -XX:-UseParallelGC -XX:+UseConcMarkSweepGC -XX:ConcGCThreads=2 -XX:ParallelGCThreads=4"
JAVA_OPTS="$JAVA_OPTS -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
JAVA_OPTS="-server -XX:NewRatio=1 -XX:SurvivorRatio=2 -XX:MetaspaceSize=96m -XX:MaxMetaspaceSize=128m "
fi
else
JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m $JAVA_OPTS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,127 +31,114 @@ public SessionHandler(CanalAdmin canalAdmin){
}

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
logger.info("message receives in session handler...");
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
try {
String action = null;
String message = null;
String destination = null;
switch (packet.getType()) {
case SERVER:
ServerAdmin serverAdmin = ServerAdmin.parseFrom(packet.getBody());
action = serverAdmin.getAction();
switch (action) {
ServerAdmin sa = ServerAdmin.parseFrom(packet.getBody());
logger.info("received {} SERVER request in session handler...", sa.getAction());
byte[] svrResp = null;
switch (sa.getAction()) {
case "check":
message = canalAdmin.check() ? "1" : "0";
svrResp = AdminNettyUtils.ackPacket(canalAdmin.check() ? "1" : "0");
break;
case "start":
message = canalAdmin.start() ? "1" : "0";
svrResp = AdminNettyUtils.ackPacket(canalAdmin.start() ? "1" : "0");
break;
case "stop":
message = canalAdmin.stop() ? "1" : "0";
svrResp = AdminNettyUtils.ackPacket(canalAdmin.stop() ? "1" : "0");
break;
case "restart":
message = canalAdmin.restart() ? "1" : "0";
svrResp = AdminNettyUtils.ackPacket(canalAdmin.restart() ? "1" : "0");
break;
case "list":
message = canalAdmin.getRunningInstances();
svrResp = AdminNettyUtils.ackPacket(canalAdmin.getRunningInstances());
break;
default:
byte[] errorBytes = AdminNettyUtils.errorPacket(301,
MessageFormatter.format("ServerAdmin action={} is unknown", action).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
svrResp = AdminNettyUtils.errorPacket(301, "ServerAdmin action: " + sa.getAction() + " is unknown");
break;
}
AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
AdminNettyUtils.write(ctx.getChannel(), svrResp);
break;

case INSTANCE:
InstanceAdmin instanceAdmin = InstanceAdmin.parseFrom(packet.getBody());
destination = instanceAdmin.getDestination();
action = instanceAdmin.getAction();
switch (action) {
InstanceAdmin ia = InstanceAdmin.parseFrom(packet.getBody());
logger.info("received {} INSTANCE request in session handler...", ia.getAction());
byte[] instResp = null;
switch (ia.getAction()) {
case "check":
message = canalAdmin.checkInstance(destination) ? "1" : "0";
instResp = AdminNettyUtils.ackPacket(canalAdmin.checkInstance(ia.getDestination()) ? "1" : "0");
break;
case "start":
message = canalAdmin.startInstance(destination) ? "1" : "0";
instResp = AdminNettyUtils.ackPacket(canalAdmin.startInstance(ia.getDestination()) ? "1" : "0");
break;
case "stop":
message = canalAdmin.stopInstance(destination) ? "1" : "0";
instResp = AdminNettyUtils.ackPacket(canalAdmin.stopInstance(ia.getDestination()) ? "1" : "0");
break;
case "release":
message = canalAdmin.releaseInstance(destination) ? "1" : "0";
instResp = AdminNettyUtils.ackPacket(canalAdmin.releaseInstance(ia.getDestination()) ? "1" : "0");
break;
case "restart":
message = canalAdmin.restartInstance(destination) ? "1" : "0";
instResp = AdminNettyUtils.ackPacket(canalAdmin.restartInstance(ia.getDestination()) ? "1" : "0");
break;
default:
byte[] errorBytes = AdminNettyUtils.errorPacket(301,
MessageFormatter.format("InstanceAdmin action={} is unknown", action).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
instResp = AdminNettyUtils.errorPacket(301, "InstanceAdmin action: " + ia.getAction() + " is unknown");
break;
}
AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
AdminNettyUtils.write(ctx.getChannel(), instResp);
break;

case LOG:
LogAdmin logAdmin = LogAdmin.parseFrom(packet.getBody());
action = logAdmin.getAction();
destination = logAdmin.getDestination();
String type = logAdmin.getType();
String file = logAdmin.getFile();
int count = logAdmin.getCount();
switch (type) {
LogAdmin la = LogAdmin.parseFrom(packet.getBody());
logger.info("received {} LOG request in session handler...", la.getType());
byte[] logResp = null;
switch (la.getType()) {
case "server":
if ("list".equalsIgnoreCase(action)) {
message = canalAdmin.listCanalLog();
if ("list".equalsIgnoreCase(la.getAction())) {
logResp = AdminNettyUtils.ackPacket(canalAdmin.listCanalLog());
} else {
message = canalAdmin.canalLog(count);
logResp = AdminNettyUtils.ackPacket(canalAdmin.canalLog(la.getCount()));
}
break;
case "instance":
if ("list".equalsIgnoreCase(action)) {
message = canalAdmin.listInstanceLog(destination);
if ("list".equalsIgnoreCase(la.getAction())) {
logResp = AdminNettyUtils.ackPacket(canalAdmin.listInstanceLog(la.getDestination()));
} else {
message = canalAdmin.instanceLog(destination, file, count);
logResp = AdminNettyUtils.ackPacket(canalAdmin.instanceLog(la.getDestination(), la.getFile(), la.getCount()));
}
break;
default:
byte[] errorBytes = AdminNettyUtils.errorPacket(301,
MessageFormatter.format("LogAdmin type={} is unknown", type).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
logResp = AdminNettyUtils.errorPacket(301, "LogAdmin type: " + la.getType() + " is unknown");
break;
}
AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
AdminNettyUtils.write(ctx.getChannel(), logResp);
break;

default:
byte[] errorBytes = AdminNettyUtils.errorPacket(300,
MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
logResp = AdminNettyUtils.errorPacket(300, "packet type: " + packet.getType() + " is NOT supported!");
AdminNettyUtils.write(ctx.getChannel(), logResp);
break;
}

} catch (Throwable exception) {
byte[] errorBytes = AdminNettyUtils.errorPacket(400,
MessageFormatter.format("something goes wrong with channel:{}, exception={}",
ctx.getChannel(),
ExceptionUtils.getStackTrace(exception)).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
String error = "something goes wrong with channel: " + ctx.getChannel() + ", exception: "+ ExceptionUtils.getStackTrace(exception);
AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.errorPacket(400, error));
}
}

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
logger.error("something goes wrong with channel:{}, exception={}",
ctx.getChannel(),
ExceptionUtils.getStackTrace(e.getCause()));

ctx.getChannel().close();
}

public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
}

public void setCanalAdmin(CanalAdmin canalAdmin) {
this.canalAdmin = canalAdmin;
}
// public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
// logger.error("something goes wrong with channel:{}, exception={}",
// ctx.getChannel(),
// ExceptionUtils.getStackTrace(e.getCause()));
// ctx.getChannel().close();
// }
//
// public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// }
//
// public void setCanalAdmin(CanalAdmin canalAdmin) {
// this.canalAdmin = canalAdmin;
// }

}
Loading
Loading