Skip to content

Commit ecdf736

Browse files
committed
improve code; add send control; add example
1 parent 00a5e67 commit ecdf736

18 files changed

+705
-44
lines changed

kcp-example/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>kcp-parent</artifactId>
77
<groupId>io.jpower.kcp</groupId>
8-
<version>1.0.0-SNAPSHOT</version>
8+
<version>1.1.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package io.jpower.kcp.example.rtt;
2+
3+
import io.jpower.kcp.netty.ChannelOptionHelper;
4+
import io.jpower.kcp.netty.UkcpChannel;
5+
import io.jpower.kcp.netty.UkcpChannelOption;
6+
import io.jpower.kcp.netty.UkcpClientChannel;
7+
import io.netty.bootstrap.Bootstrap;
8+
import io.netty.channel.ChannelFuture;
9+
import io.netty.channel.ChannelInitializer;
10+
import io.netty.channel.ChannelPipeline;
11+
import io.netty.channel.EventLoopGroup;
12+
import io.netty.channel.nio.NioEventLoopGroup;
13+
14+
/**
15+
* @author <a href="mailto:[email protected]">szh</a>
16+
*/
17+
public class KcpRttClient {
18+
19+
static final int CONV = Integer.parseInt(System.getProperty("conv", "10"));
20+
static final String HOST = System.getProperty("host", "127.0.0.1");
21+
static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));
22+
static final int SIZE = Integer.parseInt(System.getProperty("size", "200"));
23+
static final int COUNT = Integer.parseInt(System.getProperty("count", "300"));
24+
static final int RTT_INTERVAL = Integer.parseInt(System.getProperty("rttInterval", "20"));
25+
26+
public static void main(String[] args) throws Exception {
27+
// Configure the client.
28+
EventLoopGroup group = new NioEventLoopGroup();
29+
try {
30+
Bootstrap b = new Bootstrap();
31+
b.group(group)
32+
.channel(UkcpClientChannel.class)
33+
.handler(new ChannelInitializer<UkcpChannel>() {
34+
@Override
35+
public void initChannel(UkcpChannel ch) throws Exception {
36+
ChannelPipeline p = ch.pipeline();
37+
p.addLast(new KcpRttClientHandler(COUNT));
38+
}
39+
});
40+
ChannelOptionHelper.nodelay(b, true, 20, 2, true)
41+
.option(UkcpChannelOption.UKCP_MTU, 512);
42+
43+
// Start the client.
44+
ChannelFuture f = b.connect(HOST, PORT).sync();
45+
46+
// Wait until the connection is closed.
47+
f.channel().closeFuture().sync();
48+
} finally {
49+
// Shut down the event loop to terminate all threads.
50+
group.shutdownGracefully();
51+
}
52+
}
53+
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package io.jpower.kcp.example.rtt;
2+
3+
import io.jpower.kcp.netty.UkcpChannel;
4+
import io.netty.buffer.ByteBuf;
5+
import io.netty.buffer.Unpooled;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.ChannelInboundHandlerAdapter;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.util.concurrent.Executors;
12+
import java.util.concurrent.ScheduledExecutorService;
13+
import java.util.concurrent.ScheduledFuture;
14+
import java.util.concurrent.TimeUnit;
15+
16+
/**
17+
* @author <a href="mailto:[email protected]">szh</a>
18+
*/
19+
public class KcpRttClientHandler extends ChannelInboundHandlerAdapter {
20+
21+
private static final Logger log = LoggerFactory.getLogger(KcpRttClientHandler.class);
22+
23+
private final ByteBuf data;
24+
25+
private int[] rtts;
26+
27+
private volatile int count;
28+
29+
private ScheduledExecutorService scheduleSrv;
30+
31+
private ScheduledFuture<?> future = null;
32+
33+
private final long startTime;
34+
35+
/**
36+
* Creates a client-side handler.
37+
*/
38+
public KcpRttClientHandler(int count) {
39+
data = Unpooled.buffer(KcpRttClient.SIZE);
40+
for (int i = 0; i < data.capacity(); i++) {
41+
data.writeByte((byte) i);
42+
}
43+
44+
rtts = new int[count];
45+
for (int i = 0; i < rtts.length; i++) {
46+
rtts[i] = -1;
47+
}
48+
startTime = System.currentTimeMillis();
49+
scheduleSrv = Executors.newSingleThreadScheduledExecutor();
50+
}
51+
52+
@Override
53+
public void channelActive(ChannelHandlerContext ctx) {
54+
UkcpChannel kcpCh = (UkcpChannel) ctx.channel();
55+
kcpCh.conv(KcpRttClient.CONV); // set conv
56+
57+
future = scheduleSrv.scheduleWithFixedDelay(new Runnable() {
58+
@Override
59+
public void run() {
60+
ctx.write(rttMsg(++count));
61+
if (count >= rtts.length) {
62+
// finish
63+
future.cancel(true);
64+
ctx.write(rttMsg(-1));
65+
66+
}
67+
ctx.flush();
68+
}
69+
}, KcpRttClient.RTT_INTERVAL, KcpRttClient.RTT_INTERVAL, TimeUnit.MILLISECONDS);
70+
}
71+
72+
/**
73+
* count+timestamp+dataLen+data
74+
*
75+
* @param count
76+
* @return
77+
*/
78+
public ByteBuf rttMsg(int count) {
79+
ByteBuf buf = Unpooled.buffer(10);
80+
buf.writeShort(count);
81+
buf.writeInt((int) (System.currentTimeMillis() - startTime));
82+
int dataLen = data.readableBytes();
83+
buf.writeShort(dataLen);
84+
buf.writeBytes(data, data.readerIndex(), dataLen);
85+
86+
return buf;
87+
}
88+
89+
@Override
90+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
91+
scheduleSrv.shutdown();
92+
scheduleSrv.awaitTermination(3, TimeUnit.SECONDS);
93+
94+
int sum = 0;
95+
for (int rtt : rtts) {
96+
sum += rtt;
97+
}
98+
log.info("average: {}", (sum / rtts.length));
99+
}
100+
101+
@Override
102+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
103+
ByteBuf buf = (ByteBuf) msg;
104+
int curCount = buf.readShort();
105+
106+
if (curCount == -1) {
107+
scheduleSrv.schedule(new Runnable() {
108+
@Override
109+
public void run() {
110+
ctx.close();
111+
}
112+
}, 3, TimeUnit.SECONDS);
113+
} else {
114+
int idx = curCount - 1;
115+
long time = buf.readInt();
116+
if (rtts[idx] != -1) {
117+
log.error("???");
118+
}
119+
//log.info("rcv count {} {}", curCount, System.currentTimeMillis());
120+
rtts[idx] = (int) (System.currentTimeMillis() - startTime - time);
121+
122+
log.info("rtt {}: {}", curCount, rtts[idx]);
123+
}
124+
125+
buf.release();
126+
}
127+
128+
@Override
129+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
130+
// Close the connection when an exception is raised.
131+
log.error("exceptionCaught", cause);
132+
ctx.close();
133+
}
134+
135+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package io.jpower.kcp.example.rtt;
2+
3+
import io.jpower.kcp.example.echo.EchoServerHandler;
4+
import io.jpower.kcp.netty.ChannelOptionHelper;
5+
import io.jpower.kcp.netty.UkcpChannel;
6+
import io.jpower.kcp.netty.UkcpChannelOption;
7+
import io.jpower.kcp.netty.UkcpServerChannel;
8+
import io.netty.bootstrap.UkcpServerBootstrap;
9+
import io.netty.channel.ChannelFuture;
10+
import io.netty.channel.ChannelInitializer;
11+
import io.netty.channel.ChannelPipeline;
12+
import io.netty.channel.EventLoopGroup;
13+
import io.netty.channel.nio.NioEventLoopGroup;
14+
15+
/**
16+
* @author <a href="mailto:[email protected]">szh</a>
17+
*/
18+
public class KcpRttServer {
19+
20+
static final int CONV = Integer.parseInt(System.getProperty("conv", "10"));
21+
static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));
22+
23+
public static void main(String[] args) throws Exception {
24+
// Configure the server.
25+
EventLoopGroup group = new NioEventLoopGroup();
26+
try {
27+
UkcpServerBootstrap b = new UkcpServerBootstrap();
28+
b.group(group)
29+
.channel(UkcpServerChannel.class)
30+
.childHandler(new ChannelInitializer<UkcpChannel>() {
31+
@Override
32+
public void initChannel(UkcpChannel ch) throws Exception {
33+
ChannelPipeline p = ch.pipeline();
34+
p.addLast(new KcpRttServerHandler());
35+
}
36+
});
37+
ChannelOptionHelper.nodelay(b, true, 20, 2, true)
38+
.childOption(UkcpChannelOption.UKCP_MTU, 512);
39+
40+
// Start the server.
41+
ChannelFuture f = b.bind(PORT).sync();
42+
43+
// Wait until the server socket is closed.
44+
f.channel().closeFuture().sync();
45+
} finally {
46+
// Shut down all event loops to terminate all threads.
47+
group.shutdownGracefully();
48+
}
49+
}
50+
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.jpower.kcp.example.rtt;
2+
3+
import io.jpower.kcp.example.echo.EchoServerHandler;
4+
import io.jpower.kcp.netty.UkcpChannel;
5+
import io.netty.buffer.ByteBuf;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.ChannelInboundHandlerAdapter;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
/**
12+
* @author <a href="mailto:[email protected]">szh</a>
13+
*/
14+
public class KcpRttServerHandler extends ChannelInboundHandlerAdapter {
15+
16+
private static Logger log = LoggerFactory.getLogger(EchoServerHandler.class);
17+
18+
@Override
19+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
20+
UkcpChannel kcpCh = (UkcpChannel) ctx.channel();
21+
kcpCh.conv(KcpRttServer.CONV);
22+
}
23+
24+
@Override
25+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
26+
ByteBuf buf = (ByteBuf) msg;
27+
short curCount = buf.getShort(buf.readerIndex());
28+
ctx.writeAndFlush(msg);
29+
30+
if (curCount == -1) {
31+
ctx.close();
32+
}
33+
}
34+
35+
@Override
36+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
37+
// Close the connection when an exception is raised.
38+
log.error("exceptionCaught", cause);
39+
ctx.close();
40+
}
41+
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.jpower.kcp.example.rtt;
2+
3+
import io.netty.bootstrap.Bootstrap;
4+
import io.netty.channel.*;
5+
import io.netty.channel.nio.NioEventLoopGroup;
6+
import io.netty.channel.socket.SocketChannel;
7+
import io.netty.channel.socket.nio.NioSocketChannel;
8+
9+
/**
10+
* @author <a href="mailto:[email protected]">szh</a>
11+
*/
12+
public class TcpRttClient {
13+
14+
static final String HOST = System.getProperty("host", "127.0.0.1");
15+
static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));
16+
static final int SIZE = Integer.parseInt(System.getProperty("size", "200"));
17+
static final int COUNT = Integer.parseInt(System.getProperty("count", "300"));
18+
static final int RTT_INTERVAL = Integer.parseInt(System.getProperty("rttInterval", "20"));
19+
20+
public static void main(String[] args) throws Exception {
21+
// Configure the client.
22+
EventLoopGroup group = new NioEventLoopGroup();
23+
try {
24+
Bootstrap b = new Bootstrap();
25+
b.group(group)
26+
.channel(NioSocketChannel.class)
27+
.handler(new ChannelInitializer<SocketChannel>() {
28+
@Override
29+
public void initChannel(SocketChannel ch) throws Exception {
30+
ChannelPipeline p = ch.pipeline();
31+
p.addLast(new TcpRttDecoder())
32+
.addLast(new TcpRttClientHandler(COUNT));
33+
}
34+
}).option(ChannelOption.TCP_NODELAY, true);
35+
36+
// Start the client.
37+
ChannelFuture f = b.connect(HOST, PORT).sync();
38+
39+
// Wait until the connection is closed.
40+
f.channel().closeFuture().sync();
41+
} finally {
42+
// Shut down the event loop to terminate all threads.
43+
group.shutdownGracefully();
44+
}
45+
}
46+
47+
}

0 commit comments

Comments
 (0)