diff --git a/src/main/java/com/mds/dubbo/handler/DubboProxyServer.java b/src/main/java/com/mds/dubbo/handler/DubboProxyServer.java index fa77e47..94209b1 100644 --- a/src/main/java/com/mds/dubbo/handler/DubboProxyServer.java +++ b/src/main/java/com/mds/dubbo/handler/DubboProxyServer.java @@ -24,7 +24,6 @@ protected ChannelInitializer channelInitializer() { protected void initChannel(Channel ch) { ch.pipeline().addLast("dubbo-decoder",new DubboRequestDecoder()); ch.pipeline().addLast("server-idle-handler", new IdleStateHandler(0, 0, 10000, TimeUnit.MILLISECONDS)); - ch.pipeline().addLast("session-handler", new SessionHandler()); ch.pipeline().addLast("frontend-handler",new FrontendHandler(serverConfig.getBackend())); } diff --git a/src/main/java/com/mds/dubbo/handler/FrontendHandler.java b/src/main/java/com/mds/dubbo/handler/FrontendHandler.java index 039969a..1af6d59 100644 --- a/src/main/java/com/mds/dubbo/handler/FrontendHandler.java +++ b/src/main/java/com/mds/dubbo/handler/FrontendHandler.java @@ -22,8 +22,6 @@ public class FrontendHandler extends ChannelInboundHandlerAdapter { private final List appInfoList; - private static final Map channelMap = new ConcurrentHashMap<>(16); - public FrontendHandler(List appInfoList) { this.appInfoList = appInfoList; } @@ -88,16 +86,13 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) { } else if (body instanceof BodyHeartBeat) { CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); compositeByteBuf.addComponent(true, dubboPacket.getDubboRequestHeader().getHeaderBytes()); - log.info("心跳包"); - SessionManager sessionManager = SessionManager.getInstance(); - Channel channel = ctx.channel(); - sessionManager.renew(channel); + log.debug("heartbeat:{}", ctx.channel()); ctx.writeAndFlush(compositeByteBuf).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { - log.info("success"); + log.debug("write heartbeat success"); ctx.channel().read(); } else { - log.info("fail"); + log.info("write heartbeat fail"); future.channel().close(); } });; diff --git a/src/main/java/com/mds/dubbo/handler/SessionHandler.java b/src/main/java/com/mds/dubbo/handler/SessionHandler.java deleted file mode 100644 index 2497aaf..0000000 --- a/src/main/java/com/mds/dubbo/handler/SessionHandler.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.mds.dubbo.handler; - -import com.mds.dubbo.session.SessionManager; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.util.HashedWheelTimer; -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.TimeUnit; - -/** - * session处理器 - * - * @author baoyoujia - * @date 2024/6/13 - */ -@Slf4j -public class SessionHandler extends ChannelInboundHandlerAdapter { - - private final HashedWheelTimer timer = new HashedWheelTimer(); - - @Override - public void channelActive(ChannelHandlerContext ctx) { - log.debug("SessionHandler channel is active: {}", ctx.channel().remoteAddress()); - Channel channel = ctx.channel(); - SessionManager sessionManager = SessionManager.getInstance(); - // 180秒 - sessionManager.addSession(channel); - scheduleTimeoutCheck(channel); - ctx.fireChannelActive(); - } - - /** - * 超时检查 - * @param channel - */ - private void scheduleTimeoutCheck(Channel channel) { - SessionManager sessionManager = SessionManager.getInstance(); - timer.newTimeout(timeout -> { - if (!sessionManager.exist(channel)) { - return; - } - Long expireTime = sessionManager.getSession(channel); - long currentTime = System.currentTimeMillis(); - log.warn("channel:{}, current time : {}, expireTime : {}", channel, currentTime, expireTime); - if (currentTime - expireTime > TimeUnit.SECONDS.toMillis(180)) { - // 已经超时,准备关闭 - log.warn("已经超时,准备关闭"); - // 超时,关闭Channel - channel.close(); - sessionManager.removeSession(channel); - } else { - // 重新启动定时 - // 重新安排定时检查 - scheduleTimeoutCheck(channel); - } - }, 30, TimeUnit.SECONDS); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - log.debug("SessionHandler channelRead msg {}",msg); - ctx.fireChannelRead(msg); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - Channel channel = ctx.channel(); - log.debug("SessionHandler channel is inActive: {}, id:{}", channel.remoteAddress(), channel.id().asLongText()); - SessionManager sessionManager = SessionManager.getInstance(); - sessionManager.removeSession(channel); - timer.stop(); - } -} diff --git a/src/main/java/com/mds/dubbo/session/SessionManager.java b/src/main/java/com/mds/dubbo/session/SessionManager.java index a34c8d4..85a49d9 100644 --- a/src/main/java/com/mds/dubbo/session/SessionManager.java +++ b/src/main/java/com/mds/dubbo/session/SessionManager.java @@ -20,22 +20,11 @@ public class SessionManager { private SessionManager() { } - /** - * consumer与proxy间的连接,保存channel和过期时间 - */ - private final Map sessionRegistry = new ConcurrentHashMap<>(); - /** * 保存proxy和provide之间的连接,应用名和channel */ private final Map readyRegistry = new ConcurrentHashMap<>(16); - - public void renew(Channel channel) { - log.info("channelId:{}, renew: {}", channel, System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3)); - addSession(channel, System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3)); - } - private static class Singleton { static SessionManager instance = new SessionManager(); } @@ -56,27 +45,4 @@ public boolean existConnection(String appName) { return readyRegistry.containsKey(appName); } - - - public void addSession(Channel channel) { - log.info("新的客户端加入:{}", channel); - sessionRegistry.put(channel, System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3)); - } - - public void addSession(Channel channel, Long expireTime) { - sessionRegistry.put(channel, expireTime); - } - - public void removeSession(Channel channel) { - sessionRegistry.remove(channel); - - } - - public Long getSession(Channel channel) { - return sessionRegistry.get(channel); - } - - public boolean exist(Channel channel) { - return sessionRegistry.containsKey(channel); - } }