diff --git a/client-adapter/launcher/src/main/bin/startup.sh b/client-adapter/launcher/src/main/bin/startup.sh index 6023a2e04d..67bad9d3cd 100644 --- a/client-adapter/launcher/src/main/bin/startup.sh +++ b/client-adapter/launcher/src/main/bin/startup.sh @@ -59,24 +59,34 @@ in exit;; esac -JavaVersion=`$JAVA -version 2>&1 |awk 'NR==1{ gsub(/"/,""); print $3 }' | awk -F '.' '{print $1}'` -str=`file -L $JAVA | grep 64-bit` +JAVA_VERSION=`$JAVA -version 2>&1 |awk 'NR==1{ gsub(/"/,""); print $3 }' | awk -F '.' '{print $1}'` -JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs" -if [ $JavaVersion -ge 11 ] ; then +JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow" +JAVA_OPTS="$JAVA_OPTS -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs" + +if [ $JAVA_VERSION -ge 11 ] ; then #JAVA_OPTS="$JAVA_OPTS -Xlog:gc*:$base_log/gc.log:time " - JAVA_OPTS="$JAVA_OPTS" + JAVA_OPTS="$JAVA_OPTS -Xlog:gc*:file=$base/logs/adapter/gc.log::filecount=5,filesize=32M" else - #JAVA_OPTS="$JAVA_OPTS -Xloggc:$base/logs/canal/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime" - JAVA_OPTS="$JAVA_OPTS -XX:+UseFastAccessorMethods -XX:+PrintAdaptiveSizePolicy -XX:+PrintTenuringDistribution" + JAVA_OPTS="$JAVA_OPTS -Xloggc:$base/logs/canal/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=32M" + JAVA_OPTS="$JAVA_OPTS -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:-PrintHeapAtGC -XX:+PrintGCApplicationStoppedTime" + JAVA_OPTS="$JAVA_OPTS -XX:+UseFastAccessorMethods -XX:-PrintReferenceGC -XX:+PrintAdaptiveSizePolicy -XX:+PrintTenuringDistribution" + JAVA_OPTS="$JAVA_OPTS -XX:+PrintSafepointStatistics -XX:PrintSafepointStatisticsCount=1 -XX:+SafepointTimeout -XX:SafepointTimeoutDelay=2000" + JAVA_OPTS="$JAVA_OPTS -XX:+UseCountedLoopSafepoints -XX:+DisableExplicitGC -XX:+ExplicitGCInvokesConcurrent -XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses" fi -if [ -n "$str" ]; then - if [ $JavaVersion -ge 11 ] ; then +OS_ARCH=`file -L $JAVA | grep 64-bit` +if [ -n "$OS_ARCH" ]; then + if [ $JAVA_VERSION -ge 11 ] ; then # For G1 JAVA_OPTS="-server -Xms2g -Xmx3g -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent $JAVA_OPTS" else - JAVA_OPTS="-server -Xms2g -Xmx3g -Xmn1g -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC $JAVA_OPTS" + # JAVA_OPTS="-server -Xms2g -Xmx3g -Xmn1g -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC $JAVA_OPTS" + ## JDK 1.8 2c4g [-XX:ConcGCThreads=1 -XX:ParallelGCThreads=2], 4c8g [-XX:ConcGCThreads=2 -XX:ParallelGCThreads=4] + JAVA_OPTS="$JAVA_OPTS -XX:InitialRAMPercentage=45.0 -XX:MinRAMPercentage=45.0 -XX:MaxRAMPercentage=45.0" + JAVA_OPTS="$JAVA_OPTS -XX:-UseParallelGC -XX:+UseConcMarkSweepGC -XX:ConcGCThreads=2 -XX:ParallelGCThreads=4" + JAVA_OPTS="$JAVA_OPTS -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70" + JAVA_OPTS="-server -XX:NewRatio=1 -XX:SurvivorRatio=2 -XX:MetaspaceSize=96m -XX:MaxMetaspaceSize=128m " fi else JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m $JAVA_OPTS" diff --git a/deployer/src/main/bin/startup.sh b/deployer/src/main/bin/startup.sh index 07236abc11..7f893fcd59 100644 --- a/deployer/src/main/bin/startup.sh +++ b/deployer/src/main/bin/startup.sh @@ -83,24 +83,35 @@ in exit;; esac -JavaVersion=`$JAVA -version 2>&1 |awk 'NR==1{ gsub(/"/,""); print $3 }' | awk -F '.' '{print $1}'` -str=`file -L $JAVA | grep 64-bit` -JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs" +JAVA_VERSION=`$JAVA -version 2>&1 |awk 'NR==1{ gsub(/"/,""); print $3 }' | awk -F '.' '{print $1}'` -if [ $JavaVersion -ge 11 ] ; then +JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow" +JAVA_OPTS="$JAVA_OPTS -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs" + +if [ $JAVA_VERSION -ge 11 ] ; then #JAVA_OPTS="$JAVA_OPTS -Xlog:gc*:$base_log/gc.log:time " - JAVA_OPTS="$JAVA_OPTS" + JAVA_OPTS="$JAVA_OPTS -Xlog:gc*:file=$base/logs/adapter/gc.log::filecount=5,filesize=32M" else - #JAVA_OPTS="$JAVA_OPTS -Xloggc:$base/logs/canal/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime" - JAVA_OPTS="$JAVA_OPTS -XX:+AggressiveOpts -XX:+UseFastAccessorMethods -XX:+PrintAdaptiveSizePolicy -XX:+PrintTenuringDistribution" + JAVA_OPTS="$JAVA_OPTS -Xloggc:$base/logs/canal/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=32M" + JAVA_OPTS="$JAVA_OPTS -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:-PrintHeapAtGC -XX:+PrintGCApplicationStoppedTime" + JAVA_OPTS="$JAVA_OPTS -XX:+UseFastAccessorMethods -XX:-PrintReferenceGC -XX:+PrintAdaptiveSizePolicy -XX:+PrintTenuringDistribution" + JAVA_OPTS="$JAVA_OPTS -XX:+PrintSafepointStatistics -XX:PrintSafepointStatisticsCount=1 -XX:+SafepointTimeout -XX:SafepointTimeoutDelay=2000" + JAVA_OPTS="$JAVA_OPTS -XX:+UseCountedLoopSafepoints -XX:+DisableExplicitGC -XX:+ExplicitGCInvokesConcurrent -XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses" fi -if [ -n "$str" ]; then - if [ $JavaVersion -ge 11 ] ; then +OS_ARCH=`file -L $JAVA | grep 64-bit` + +if [ -n "$OS_ARCH" ]; then + if [ $JAVA_VERSION -ge 11 ] ; then # For G1 JAVA_OPTS="-server -Xms2g -Xmx3g -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent $JAVA_OPTS" else - JAVA_OPTS="-server -Xms2g -Xmx3g -Xmn1g -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC $JAVA_OPTS" + # JAVA_OPTS="-server -Xms2g -Xmx3g -Xmn1g -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC $JAVA_OPTS" + ## JDK 1.8 2c4g [-XX:ConcGCThreads=1 -XX:ParallelGCThreads=2], 4c8g [-XX:ConcGCThreads=2 -XX:ParallelGCThreads=4] + JAVA_OPTS="$JAVA_OPTS -XX:InitialRAMPercentage=45.0 -XX:MinRAMPercentage=45.0 -XX:MaxRAMPercentage=45.0" + JAVA_OPTS="$JAVA_OPTS -XX:-UseParallelGC -XX:+UseConcMarkSweepGC -XX:ConcGCThreads=2 -XX:ParallelGCThreads=4" + JAVA_OPTS="$JAVA_OPTS -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70" + JAVA_OPTS="-server -XX:NewRatio=1 -XX:SurvivorRatio=2 -XX:MetaspaceSize=96m -XX:MaxMetaspaceSize=128m " fi else JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m $JAVA_OPTS" diff --git a/server/src/main/java/com/alibaba/otter/canal/admin/handler/SessionHandler.java b/server/src/main/java/com/alibaba/otter/canal/admin/handler/SessionHandler.java index ed5abea361..eead88a37d 100644 --- a/server/src/main/java/com/alibaba/otter/canal/admin/handler/SessionHandler.java +++ b/server/src/main/java/com/alibaba/otter/canal/admin/handler/SessionHandler.java @@ -31,127 +31,114 @@ public SessionHandler(CanalAdmin canalAdmin){ } public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - logger.info("message receives in session handler..."); ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array()); try { - String action = null; - String message = null; - String destination = null; switch (packet.getType()) { case SERVER: - ServerAdmin serverAdmin = ServerAdmin.parseFrom(packet.getBody()); - action = serverAdmin.getAction(); - switch (action) { + ServerAdmin sa = ServerAdmin.parseFrom(packet.getBody()); + logger.info("received {} SERVER request in session handler...", sa.getAction()); + byte[] svrResp = null; + switch (sa.getAction()) { case "check": - message = canalAdmin.check() ? "1" : "0"; + svrResp = AdminNettyUtils.ackPacket(canalAdmin.check() ? "1" : "0"); break; case "start": - message = canalAdmin.start() ? "1" : "0"; + svrResp = AdminNettyUtils.ackPacket(canalAdmin.start() ? "1" : "0"); break; case "stop": - message = canalAdmin.stop() ? "1" : "0"; + svrResp = AdminNettyUtils.ackPacket(canalAdmin.stop() ? "1" : "0"); break; case "restart": - message = canalAdmin.restart() ? "1" : "0"; + svrResp = AdminNettyUtils.ackPacket(canalAdmin.restart() ? "1" : "0"); break; case "list": - message = canalAdmin.getRunningInstances(); + svrResp = AdminNettyUtils.ackPacket(canalAdmin.getRunningInstances()); break; default: - byte[] errorBytes = AdminNettyUtils.errorPacket(301, - MessageFormatter.format("ServerAdmin action={} is unknown", action).getMessage()); - AdminNettyUtils.write(ctx.getChannel(), errorBytes); + svrResp = AdminNettyUtils.errorPacket(301, "ServerAdmin action: " + sa.getAction() + " is unknown"); break; } - AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message)); + AdminNettyUtils.write(ctx.getChannel(), svrResp); break; + case INSTANCE: - InstanceAdmin instanceAdmin = InstanceAdmin.parseFrom(packet.getBody()); - destination = instanceAdmin.getDestination(); - action = instanceAdmin.getAction(); - switch (action) { + InstanceAdmin ia = InstanceAdmin.parseFrom(packet.getBody()); + logger.info("received {} INSTANCE request in session handler...", ia.getAction()); + byte[] instResp = null; + switch (ia.getAction()) { case "check": - message = canalAdmin.checkInstance(destination) ? "1" : "0"; + instResp = AdminNettyUtils.ackPacket(canalAdmin.checkInstance(ia.getDestination()) ? "1" : "0"); break; case "start": - message = canalAdmin.startInstance(destination) ? "1" : "0"; + instResp = AdminNettyUtils.ackPacket(canalAdmin.startInstance(ia.getDestination()) ? "1" : "0"); break; case "stop": - message = canalAdmin.stopInstance(destination) ? "1" : "0"; + instResp = AdminNettyUtils.ackPacket(canalAdmin.stopInstance(ia.getDestination()) ? "1" : "0"); break; case "release": - message = canalAdmin.releaseInstance(destination) ? "1" : "0"; + instResp = AdminNettyUtils.ackPacket(canalAdmin.releaseInstance(ia.getDestination()) ? "1" : "0"); break; case "restart": - message = canalAdmin.restartInstance(destination) ? "1" : "0"; + instResp = AdminNettyUtils.ackPacket(canalAdmin.restartInstance(ia.getDestination()) ? "1" : "0"); break; default: - byte[] errorBytes = AdminNettyUtils.errorPacket(301, - MessageFormatter.format("InstanceAdmin action={} is unknown", action).getMessage()); - AdminNettyUtils.write(ctx.getChannel(), errorBytes); + instResp = AdminNettyUtils.errorPacket(301, "InstanceAdmin action: " + ia.getAction() + " is unknown"); break; } - AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message)); + AdminNettyUtils.write(ctx.getChannel(), instResp); break; + case LOG: - LogAdmin logAdmin = LogAdmin.parseFrom(packet.getBody()); - action = logAdmin.getAction(); - destination = logAdmin.getDestination(); - String type = logAdmin.getType(); - String file = logAdmin.getFile(); - int count = logAdmin.getCount(); - switch (type) { + LogAdmin la = LogAdmin.parseFrom(packet.getBody()); + logger.info("received {} LOG request in session handler...", la.getType()); + byte[] logResp = null; + switch (la.getType()) { case "server": - if ("list".equalsIgnoreCase(action)) { - message = canalAdmin.listCanalLog(); + if ("list".equalsIgnoreCase(la.getAction())) { + logResp = AdminNettyUtils.ackPacket(canalAdmin.listCanalLog()); } else { - message = canalAdmin.canalLog(count); + logResp = AdminNettyUtils.ackPacket(canalAdmin.canalLog(la.getCount())); } break; case "instance": - if ("list".equalsIgnoreCase(action)) { - message = canalAdmin.listInstanceLog(destination); + if ("list".equalsIgnoreCase(la.getAction())) { + logResp = AdminNettyUtils.ackPacket(canalAdmin.listInstanceLog(la.getDestination())); } else { - message = canalAdmin.instanceLog(destination, file, count); + logResp = AdminNettyUtils.ackPacket(canalAdmin.instanceLog(la.getDestination(), la.getFile(), la.getCount())); } break; default: - byte[] errorBytes = AdminNettyUtils.errorPacket(301, - MessageFormatter.format("LogAdmin type={} is unknown", type).getMessage()); - AdminNettyUtils.write(ctx.getChannel(), errorBytes); + logResp = AdminNettyUtils.errorPacket(301, "LogAdmin type: " + la.getType() + " is unknown"); break; } - AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message)); + AdminNettyUtils.write(ctx.getChannel(), logResp); break; + default: - byte[] errorBytes = AdminNettyUtils.errorPacket(300, - MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage()); - AdminNettyUtils.write(ctx.getChannel(), errorBytes); + logResp = AdminNettyUtils.errorPacket(300, "packet type: " + packet.getType() + " is NOT supported!"); + AdminNettyUtils.write(ctx.getChannel(), logResp); break; } + } catch (Throwable exception) { - byte[] errorBytes = AdminNettyUtils.errorPacket(400, - MessageFormatter.format("something goes wrong with channel:{}, exception={}", - ctx.getChannel(), - ExceptionUtils.getStackTrace(exception)).getMessage()); - AdminNettyUtils.write(ctx.getChannel(), errorBytes); + String error = "something goes wrong with channel: " + ctx.getChannel() + ", exception: "+ ExceptionUtils.getStackTrace(exception); + AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.errorPacket(400, error)); } } - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - logger.error("something goes wrong with channel:{}, exception={}", - ctx.getChannel(), - ExceptionUtils.getStackTrace(e.getCause())); - - ctx.getChannel().close(); - } - - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - } - - public void setCanalAdmin(CanalAdmin canalAdmin) { - this.canalAdmin = canalAdmin; - } +// public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { +// logger.error("something goes wrong with channel:{}, exception={}", +// ctx.getChannel(), +// ExceptionUtils.getStackTrace(e.getCause())); +// ctx.getChannel().close(); +// } +// +// public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { +// } +// +// public void setCanalAdmin(CanalAdmin canalAdmin) { +// this.canalAdmin = canalAdmin; +// } } diff --git a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java index f553ddfb51..5abff97700 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java @@ -52,19 +52,15 @@ public SessionHandler(CanalServerWithEmbedded embeddedServer){ @SuppressWarnings({ "deprecation" }) public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - logger.info("message receives in session handler..."); long start = System.nanoTime(); ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array()); - ClientIdentity clientIdentity = null; try { switch (packet.getType()) { case SUBSCRIPTION: Sub sub = Sub.parseFrom(packet.getBody()); if (StringUtils.isNotEmpty(sub.getDestination()) && StringUtils.isNotEmpty(sub.getClientId())) { - clientIdentity = new ClientIdentity(sub.getDestination(), - Short.valueOf(sub.getClientId()), - sub.getFilter()); + ClientIdentity clientIdentity = new ClientIdentity(sub.getDestination(), Short.parseShort(sub.getClientId()), sub.getFilter()); MDC.put("destination", clientIdentity.getDestination()); // 尝试启动,如果已经启动,忽略 @@ -84,8 +80,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex ackBytes.length, System.nanoTime() - start)); } else { - byte[] errorBytes = NettyUtils.errorPacket(401, - MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage()); + byte[] errorBytes = NettyUtils.errorPacket(401, "destination or clientId is null. Sub: " + sub); NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(sub.getDestination(), @@ -99,12 +94,12 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex case UNSUBSCRIPTION: Unsub unsub = Unsub.parseFrom(packet.getBody()); if (StringUtils.isNotEmpty(unsub.getDestination()) && StringUtils.isNotEmpty(unsub.getClientId())) { - clientIdentity = new ClientIdentity(unsub.getDestination(), - Short.valueOf(unsub.getClientId()), - unsub.getFilter()); + ClientIdentity clientIdentity = new ClientIdentity(unsub.getDestination(), Short.parseShort(unsub.getClientId()), unsub.getFilter()); MDC.put("destination", clientIdentity.getDestination()); + embeddedServer.unsubscribe(clientIdentity); stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭 + byte[] ackBytes = NettyUtils.ackPacket(); NettyUtils.write(ctx.getChannel(), ackBytes, @@ -114,8 +109,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex ackBytes.length, System.nanoTime() - start)); } else { - byte[] errorBytes = NettyUtils.errorPacket(401, - MessageFormatter.format("destination or clientId is null", unsub.toString()).getMessage()); + String error = "destination or clientId is null. Unsub: " + unsub; + byte[] errorBytes = NettyUtils.errorPacket(401, error); NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(unsub.getDestination(), @@ -129,8 +124,9 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex case GET: Get get = CanalPacket.Get.parseFrom(packet.getBody()); if (StringUtils.isNotEmpty(get.getDestination()) && StringUtils.isNotEmpty(get.getClientId())) { - clientIdentity = new ClientIdentity(get.getDestination(), Short.valueOf(get.getClientId())); + ClientIdentity clientIdentity = new ClientIdentity(get.getDestination(), Short.parseShort(get.getClientId())); MDC.put("destination", clientIdentity.getDestination()); + Message message = null; // if (get.getAutoAck()) { @@ -147,10 +143,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize()); } else { TimeUnit unit = convertTimeUnit(get.getUnit()); - message = embeddedServer.getWithoutAck(clientIdentity, - get.getFetchSize(), - get.getTimeout(), - unit); + message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(), unit); } // } @@ -165,7 +158,9 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntry); } messageSize += dataSize; - messageSize += 1 * rowEntries.size(); + // messageSize += 1 * rowEntries.size(); + messageSize += rowEntries.size(); + // packet size int size = 0; size += com.google.protobuf.CodedOutputStream.computeEnumSize(3, @@ -173,6 +168,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex size += com.google.protobuf.CodedOutputStream.computeTagSize(5) + com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize; + // recyle bytes // ByteBuffer byteBuffer = (ByteBuffer) // ctx.getAttachment(); @@ -210,9 +206,6 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex // NettyUtils.write(ctx.getChannel(), byteBuffer, // null); } else { - Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder(); - packetBuilder.setType(PacketType.MESSAGES).setVersion(NettyUtils.VERSION); - Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder(); messageBuilder.setBatchId(message.getId()); if (message.getId() != -1) { @@ -224,9 +217,10 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex } } } - byte[] body = packetBuilder.setBody(messageBuilder.build().toByteString()) - .build() - .toByteArray(); + + Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder(); + packetBuilder.setType(PacketType.MESSAGES).setVersion(NettyUtils.VERSION); + byte[] body = packetBuilder.setBody(messageBuilder.build().toByteString()).build().toByteArray(); NettyUtils.write(ctx.getChannel(), body, new ChannelFutureAggregator(get.getDestination(), get, packet.getType(), @@ -235,8 +229,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex message.getId() == -1));// 输出数据 } } else { - byte[] errorBytes = NettyUtils.errorPacket(401, - MessageFormatter.format("destination or clientId is null", get.toString()).getMessage()); + byte[] errorBytes = NettyUtils.errorPacket(401, "destination or clientId is null. Get: " + get); NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(get.getDestination(), @@ -249,11 +242,10 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex break; case CLIENTACK: ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody()); - MDC.put("destination", ack.getDestination()); if (StringUtils.isNotEmpty(ack.getDestination()) && StringUtils.isNotEmpty(ack.getClientId())) { + MDC.put("destination", ack.getDestination()); if (ack.getBatchId() == 0L) { - byte[] errorBytes = NettyUtils.errorPacket(402, - MessageFormatter.format("batchId should assign value", ack.toString()).getMessage()); + byte[] errorBytes = NettyUtils.errorPacket(402, "batchId should assign value. Ack: "+ack); NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ack.getDestination(), @@ -265,7 +257,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex } else if (ack.getBatchId() == -1L) { // -1代表上一次get没有数据,直接忽略之 // donothing } else { - clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId())); + ClientIdentity clientIdentity = new ClientIdentity(ack.getDestination(), Short.parseShort(ack.getClientId())); embeddedServer.ack(clientIdentity, ack.getBatchId()); new ChannelFutureAggregator(ack.getDestination(), ack, @@ -274,8 +266,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex System.nanoTime() - start).operationComplete(null); } } else { - byte[] errorBytes = NettyUtils.errorPacket(401, - MessageFormatter.format("destination or clientId is null", ack.toString()).getMessage()); + byte[] errorBytes = NettyUtils.errorPacket(401, "destination or clientId is null. Ack: " + ack); NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ack.getDestination(), @@ -288,25 +279,23 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex break; case CLIENTROLLBACK: ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody()); - MDC.put("destination", rollback.getDestination()); - if (StringUtils.isNotEmpty(rollback.getDestination()) - && StringUtils.isNotEmpty(rollback.getClientId())) { - clientIdentity = new ClientIdentity(rollback.getDestination(), - Short.valueOf(rollback.getClientId())); + if (StringUtils.isNotEmpty(rollback.getDestination()) && StringUtils.isNotEmpty(rollback.getClientId())) { + ClientIdentity clientIdentity = new ClientIdentity(rollback.getDestination(), Short.parseShort(rollback.getClientId())); + MDC.put("destination", rollback.getDestination()); + if (rollback.getBatchId() == 0L) { embeddedServer.rollback(clientIdentity);// 回滚所有批次 } else { embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次 } + new ChannelFutureAggregator(rollback.getDestination(), rollback, packet.getType(), 0, System.nanoTime() - start).operationComplete(null); } else { - byte[] errorBytes = NettyUtils.errorPacket(401, - MessageFormatter.format("destination or clientId is null", rollback.toString()) - .getMessage()); + byte[] errorBytes = NettyUtils.errorPacket(401, "destination or clientId is null. Rollback: " + rollback); NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(rollback.getDestination(), @@ -318,42 +307,38 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex } break; default: - byte[] errorBytes = NettyUtils.errorPacket(400, - MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage()); + byte[] errorBytes = NettyUtils.errorPacket(400, "packet type: " + packet.getType() + " is NOT supported!"); NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ctx.getChannel() .getRemoteAddress() .toString(), null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400)); break; } } catch (Throwable exception) { - byte[] errorBytes = NettyUtils.errorPacket(400, - MessageFormatter.format("something goes wrong with channel:{}, exception={}", - ctx.getChannel(), - ExceptionUtils.getStackTrace(exception)).getMessage()); + String error = "something goes wrong with channel: " + ctx.getChannel() + ", exception: " + ExceptionUtils.getStackTrace(exception); + byte[] errorBytes = NettyUtils.errorPacket(400, error); NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ctx.getChannel() - .getRemoteAddress() - .toString(), null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400)); + .getRemoteAddress().toString(), null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400)); } finally { MDC.remove("destination"); } } - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - logger.error("something goes wrong with channel:{}, exception={}", - ctx.getChannel(), - ExceptionUtils.getStackTrace(e.getCause())); +// public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { +// logger.error("something goes wrong with channel:{}, exception={}", +// ctx.getChannel(), +// ExceptionUtils.getStackTrace(e.getCause())); +// +// ctx.getChannel().close(); +// } - ctx.getChannel().close(); - } - - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - // logger.info("remove binding subscription value object if any..."); - // ClientIdentity clientIdentity = (ClientIdentity) ctx.getAttachment(); - // // 如果唯一的订阅者都取消了订阅,直接关闭服务,针对内部版本模式下可以减少资源浪费 - // if (clientIdentity != null) { - // stopCanalInstanceIfNecessary(clientIdentity); - // } - } +// public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { +// // logger.info("remove binding subscription value object if any..."); +// // ClientIdentity clientIdentity = (ClientIdentity) ctx.getAttachment(); +// // // 如果唯一的订阅者都取消了订阅,直接关闭服务,针对内部版本模式下可以减少资源浪费 +// // if (clientIdentity != null) { +// // stopCanalInstanceIfNecessary(clientIdentity); +// // } +// } private void stopCanalInstanceIfNecessary(ClientIdentity clientIdentity) { List clientIdentitys = embeddedServer.listAllSubscribe(clientIdentity.getDestination()); @@ -386,8 +371,8 @@ private TimeUnit convertTimeUnit(int unit) { } } - public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer) { - this.embeddedServer = embeddedServer; - } +// public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer) { +// this.embeddedServer = embeddedServer; +// } }