diff --git a/pom.xml b/pom.xml
index f16996e..3c4887a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,8 +10,8 @@
UTF-8
- 11
- 11
+ 17
+ 17
@@ -41,5 +41,11 @@
commons-cli
1.5.0
+
+
+ io.netty
+ netty-all
+ 5.0.0.Alpha1
+
diff --git a/src/main/java/top/guoziyang/mydb/backend/Launcher.java b/src/main/java/top/guoziyang/mydb/backend/Launcher.java
index eb79e35..344f91e 100644
--- a/src/main/java/top/guoziyang/mydb/backend/Launcher.java
+++ b/src/main/java/top/guoziyang/mydb/backend/Launcher.java
@@ -7,6 +7,7 @@
import org.apache.commons.cli.ParseException;
import top.guoziyang.mydb.backend.dm.DataManager;
+import top.guoziyang.mydb.backend.server.NettyServer;
import top.guoziyang.mydb.backend.server.Server;
import top.guoziyang.mydb.backend.tbm.TableManager;
import top.guoziyang.mydb.backend.tm.TransactionManager;
@@ -17,7 +18,7 @@
public class Launcher {
- public static final int port = 9999;
+ public static final int port = 7777;
public static final long DEFALUT_MEM = (1<<20)*64;
public static final long KB = 1 << 10;
@@ -52,12 +53,18 @@ private static void createDB(String path) {
dm.close();
}
+
+ /**
+ * 改用Netty服务端
+ * 作者:RioAngele
+ * 时间:2023.5.23
+ */
private static void openDB(String path, long mem) {
TransactionManager tm = TransactionManager.open(path);
DataManager dm = DataManager.open(path, mem, tm);
VersionManager vm = new VersionManagerImpl(tm, dm);
TableManager tbm = TableManager.open(path, vm, dm);
- new Server(port, tbm).start();
+ new NettyServer(port, tbm).start();
}
private static long parseMem(String memStr) {
diff --git a/src/main/java/top/guoziyang/mydb/backend/dm/Recover.java b/src/main/java/top/guoziyang/mydb/backend/dm/Recover.java
index 42fc076..9145518 100644
--- a/src/main/java/top/guoziyang/mydb/backend/dm/Recover.java
+++ b/src/main/java/top/guoziyang/mydb/backend/dm/Recover.java
@@ -86,7 +86,7 @@ private static void redoTranscations(TransactionManager tm, Logger lg, PageCache
InsertLogInfo li = parseInsertLog(log);
long xid = li.xid;
if(!tm.isActive(xid)) {
- doInsertLog(pc, log, REDO);
+ doInsertLog(pc, li, REDO);
}
} else {
UpdateLogInfo xi = parseUpdateLog(log);
@@ -240,4 +240,27 @@ private static void doInsertLog(PageCache pc, byte[] log, int flag) {
pg.release();
}
}
+
+ /*
+ * 重载doInsertLog(),减少log重复解析消耗
+ * 作者:RioAngele
+ * 时间:2023.5.23
+ */
+
+ private static void doInsertLog(PageCache pc,InsertLogInfo li , int flag) {
+ Page pg = null;
+ try {
+ pg = pc.getPage(li.pgno);
+ } catch(Exception e) {
+ Panic.panic(e);
+ }
+ try {
+ if(flag == UNDO) {
+ DataItem.setDataItemRawInvalid(li.raw);
+ }
+ PageX.recoverInsert(pg, li.raw, li.offset);
+ } finally {
+ pg.release();
+ }
+ }
}
diff --git a/src/main/java/top/guoziyang/mydb/backend/dm/pageCache/PageCacheImpl.java b/src/main/java/top/guoziyang/mydb/backend/dm/pageCache/PageCacheImpl.java
index 9e8a836..ca9d1a7 100644
--- a/src/main/java/top/guoziyang/mydb/backend/dm/pageCache/PageCacheImpl.java
+++ b/src/main/java/top/guoziyang/mydb/backend/dm/pageCache/PageCacheImpl.java
@@ -41,11 +41,14 @@ public class PageCacheImpl extends AbstractCache implements PageCache {
this.fileLock = new ReentrantLock();
this.pageNumbers = new AtomicInteger((int)length / PAGE_SIZE);
}
-
+/*
+ * 改用重载方法flush()
+ */
public int newPage(byte[] initData) {
int pgno = pageNumbers.incrementAndGet();
- Page pg = new PageImpl(pgno, initData, null);
- flush(pg);
+ // Page pg = new PageImpl(pgno, initData, null);
+ // flush(pg);
+ flush(pgno, initData);
return pgno;
}
@@ -106,6 +109,28 @@ private void flush(Page pg) {
}
}
+ /*
+ * 重载 flush()方法
+ * 作者:RioAngle
+ * 时间:2023.5.23
+ */
+
+ private void flush(int pgno,byte[] data) {
+ long offset = pageOffset(pgno);
+
+ fileLock.lock();
+ try {
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ fc.position(offset);
+ fc.write(buf);
+ fc.force(false);
+ } catch(IOException e) {
+ Panic.panic(e);
+ } finally {
+ fileLock.unlock();
+ }
+ }
+
public void truncateByBgno(int maxPgno) {
long size = pageOffset(maxPgno + 1);
try {
diff --git a/src/main/java/top/guoziyang/mydb/backend/server/NettyServer.java b/src/main/java/top/guoziyang/mydb/backend/server/NettyServer.java
new file mode 100644
index 0000000..681d110
--- /dev/null
+++ b/src/main/java/top/guoziyang/mydb/backend/server/NettyServer.java
@@ -0,0 +1,69 @@
+package top.guoziyang.mydb.backend.server;
+
+
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import top.guoziyang.mydb.backend.tbm.TableManager;
+import top.guoziyang.mydb.transport.DecoderHandler;
+import top.guoziyang.mydb.transport.EncoderHandler;
+
+
+/**
+ * netty服务端
+ * 作者:RioAngele
+ * 时间:2023.5.23
+ */
+
+public class NettyServer {
+
+ int PORT ;
+ TableManager tbm;
+ public NettyServer(int PORT, TableManager tbm){
+ this.PORT=PORT;
+ this.tbm=tbm;
+ }
+
+
+
+ public void start() {
+
+ EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 128)
+ .childHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new EncoderHandler())
+ .addLast(new DecoderHandler())
+ .addLast(new NettyServerHandler(tbm));
+ }
+ });
+
+ ChannelFuture f = b.bind(PORT).sync();
+ f.channel().closeFuture().sync();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ System.out.println("fail to start!!!");
+ } finally {
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+ }
+
+
+}
+
diff --git a/src/main/java/top/guoziyang/mydb/backend/server/NettyServerHandler.java b/src/main/java/top/guoziyang/mydb/backend/server/NettyServerHandler.java
new file mode 100644
index 0000000..738ff68
--- /dev/null
+++ b/src/main/java/top/guoziyang/mydb/backend/server/NettyServerHandler.java
@@ -0,0 +1,45 @@
+package top.guoziyang.mydb.backend.server;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import top.guoziyang.mydb.backend.tbm.TableManager;
+import top.guoziyang.mydb.transport.Encoder;
+import top.guoziyang.mydb.transport.Package;
+import top.guoziyang.mydb.transport.Packager;
+
+
+/**
+ * netty服务端Handler
+ * 作者:RioAngele
+ * 时间:2023.5.23
+ */
+public class NettyServerHandler extends ChannelInboundHandlerAdapter {
+ private TableManager tbm;
+
+ public NettyServerHandler(TableManager tbm) {
+ this.tbm = tbm;
+ }
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ ctx.close();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ Package pck=(Package) msg;
+ Executor exe = new Executor(tbm);
+ byte[] result = null;
+ Exception e = null;
+ try {
+ result = exe.execute(pck.getData());
+ } catch (Exception e1) {
+ e = e1;
+ e.printStackTrace();
+ exe.close();
+ }
+ Package pkg=new Package(result,e);
+ ctx.writeAndFlush(pkg);
+ }
+}
diff --git a/src/main/java/top/guoziyang/mydb/client/Launcher.java b/src/main/java/top/guoziyang/mydb/client/Launcher.java
index 86e1c97..f1f14f7 100644
--- a/src/main/java/top/guoziyang/mydb/client/Launcher.java
+++ b/src/main/java/top/guoziyang/mydb/client/Launcher.java
@@ -8,14 +8,21 @@
import top.guoziyang.mydb.transport.Packager;
import top.guoziyang.mydb.transport.Transporter;
+
+/**
+* 改用Netty客户端
+* 作者:RioAngele
+* 时间:2023.5.23
+*/
public class Launcher {
- public static void main(String[] args) throws UnknownHostException, IOException {
- Socket socket = new Socket("127.0.0.1", 9999);
- Encoder e = new Encoder();
- Transporter t = new Transporter(socket);
- Packager packager = new Packager(t, e);
+ public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException {
+// Socket socket = new Socket("127.0.0.1", 9999);
+// Encoder e = new Encoder();
+// Transporter t = new Transporter(socket);
+// Packager packager = new Packager(t, e);
- Client client = new Client(packager);
+// Client client = new Client(packager);
+ NettyClient client=new NettyClient();
Shell shell = new Shell(client);
shell.run();
}
diff --git a/src/main/java/top/guoziyang/mydb/client/NettyClient.java b/src/main/java/top/guoziyang/mydb/client/NettyClient.java
new file mode 100644
index 0000000..d3be99a
--- /dev/null
+++ b/src/main/java/top/guoziyang/mydb/client/NettyClient.java
@@ -0,0 +1,84 @@
+package top.guoziyang.mydb.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import top.guoziyang.mydb.transport.DecoderHandler;
+import top.guoziyang.mydb.transport.EncoderHandler;
+import top.guoziyang.mydb.transport.Package;
+
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+* Netty客户端
+* 作者:RioAngele
+* 时间:2023.5.23
+*/
+public final class NettyClient {
+
+ private final Bootstrap bootstrap;
+ private final EventLoopGroup eventLoopGroup;
+ public Channel channel;
+ public static CompletableFuture resultFuture;
+
+ public NettyClient() throws InterruptedException {
+ CompletableFuture resultFuture= new CompletableFuture<>();
+ eventLoopGroup = new NioEventLoopGroup();
+ bootstrap = new Bootstrap();
+ bootstrap.group(eventLoopGroup)
+ .channel(NioSocketChannel.class)
+ .handler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new EncoderHandler())
+ .addLast(new DecoderHandler())
+ .addLast(new NettyClientHandler());
+ }
+ });
+ channel=bootstrap.connect("127.0.0.1",7777).sync().channel();
+ }
+
+
+
+ public byte[] execute(byte[] sh) throws Exception {
+ resultFuture=new CompletableFuture();
+ Package pkg=new Package(sh,null);
+ if (channel.isActive()) {
+ channel.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
+ if (future.isSuccess()) {
+ System.out.println("success to send");
+ } else {
+ System.out.println("fail to send");
+ future.channel().close();
+ }
+ });
+ } else {
+ throw new IllegalStateException();
+ }
+ Package resPkg=null;
+ while(!resultFuture.isDone()){
+
+
+ }
+ resPkg=resultFuture.get();
+ if(resPkg.getErr() != null) {
+ throw resPkg.getErr();
+ }
+ return resPkg.getData();
+ }
+
+ public void close() {
+ eventLoopGroup.shutdownGracefully();
+ }
+}
+
diff --git a/src/main/java/top/guoziyang/mydb/client/NettyClientHandler.java b/src/main/java/top/guoziyang/mydb/client/NettyClientHandler.java
new file mode 100644
index 0000000..f683e01
--- /dev/null
+++ b/src/main/java/top/guoziyang/mydb/client/NettyClientHandler.java
@@ -0,0 +1,24 @@
+package top.guoziyang.mydb.client;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import top.guoziyang.mydb.transport.Package;
+
+
+/**
+* 改用Netty客户端Handler
+* 作者:RioAngele
+* 时间:2023.5.23
+*/
+public class NettyClientHandler extends ChannelInboundHandlerAdapter {
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ ctx.close();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ NettyClient.resultFuture.complete((Package) msg);
+ }
+}
diff --git a/src/main/java/top/guoziyang/mydb/client/Shell.java b/src/main/java/top/guoziyang/mydb/client/Shell.java
index 9e65179..cc66074 100644
--- a/src/main/java/top/guoziyang/mydb/client/Shell.java
+++ b/src/main/java/top/guoziyang/mydb/client/Shell.java
@@ -2,10 +2,16 @@
import java.util.Scanner;
+
+/**
+* 改用Netty客户端
+* 作者:RioAngele
+* 时间:2023.5.23
+*/
public class Shell {
- private Client client;
+ private NettyClient client;
- public Shell(Client client) {
+ public Shell(NettyClient client) {
this.client = client;
}
diff --git a/src/main/java/top/guoziyang/mydb/transport/DecoderHandler.java b/src/main/java/top/guoziyang/mydb/transport/DecoderHandler.java
new file mode 100644
index 0000000..8c708c9
--- /dev/null
+++ b/src/main/java/top/guoziyang/mydb/transport/DecoderHandler.java
@@ -0,0 +1,26 @@
+package top.guoziyang.mydb.transport;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import java.util.List;
+
+
+/**
+* 解码Handler,将字节流解码为Package
+* 作者:RioAngele
+* 时间:2023.5.23
+*/
+public class DecoderHandler extends ByteToMessageDecoder{
+ Encoder encoder=new Encoder();
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List