Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/main/java/com/mds/dubbo/handler/DubboProxyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ protected ChannelInitializer<Channel> channelInitializer() {
protected void initChannel(Channel ch) {
ch.pipeline().addLast("dubbo-decoder",new DubboRequestDecoder());
ch.pipeline().addLast("server-idle-handler", new IdleStateHandler(0, 0, 10000, TimeUnit.MILLISECONDS));
ch.pipeline().addLast("session-handler", new SessionHandler());
ch.pipeline().addLast("frontend-handler",new FrontendHandler(serverConfig.getBackend()));

}
Expand Down
11 changes: 3 additions & 8 deletions src/main/java/com/mds/dubbo/handler/FrontendHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ public class FrontendHandler extends ChannelInboundHandlerAdapter {

private final List<AppInfo> appInfoList;

private static final Map<String,Channel> channelMap = new ConcurrentHashMap<>(16);

public FrontendHandler(List<AppInfo> appInfoList) {
this.appInfoList = appInfoList;
}
Expand Down Expand Up @@ -88,16 +86,13 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
} else if (body instanceof BodyHeartBeat) {
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponent(true, dubboPacket.getDubboRequestHeader().getHeaderBytes());
log.info("心跳包");
SessionManager sessionManager = SessionManager.getInstance();
Channel channel = ctx.channel();
sessionManager.renew(channel);
log.debug("heartbeat:{}", ctx.channel());
ctx.writeAndFlush(compositeByteBuf).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("success");
log.debug("write heartbeat success");
ctx.channel().read();
} else {
log.info("fail");
log.info("write heartbeat fail");
future.channel().close();
}
});;
Expand Down
75 changes: 0 additions & 75 deletions src/main/java/com/mds/dubbo/handler/SessionHandler.java

This file was deleted.

34 changes: 0 additions & 34 deletions src/main/java/com/mds/dubbo/session/SessionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,11 @@ public class SessionManager {
private SessionManager() {
}

/**
* consumer与proxy间的连接,保存channel和过期时间
*/
private final Map<Channel, Long> sessionRegistry = new ConcurrentHashMap<>();

/**
* 保存proxy和provide之间的连接,应用名和channel
*/
private final Map<String,Channel> readyRegistry = new ConcurrentHashMap<>(16);


public void renew(Channel channel) {
log.info("channelId:{}, renew: {}", channel, System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3));
addSession(channel, System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3));
}

private static class Singleton {
static SessionManager instance = new SessionManager();
}
Expand All @@ -56,27 +45,4 @@ public boolean existConnection(String appName) {
return readyRegistry.containsKey(appName);
}



public void addSession(Channel channel) {
log.info("新的客户端加入:{}", channel);
sessionRegistry.put(channel, System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3));
}

public void addSession(Channel channel, Long expireTime) {
sessionRegistry.put(channel, expireTime);
}

public void removeSession(Channel channel) {
sessionRegistry.remove(channel);

}

public Long getSession(Channel channel) {
return sessionRegistry.get(channel);
}

public boolean exist(Channel channel) {
return sessionRegistry.containsKey(channel);
}
}