diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f0e29bc..5b0d415 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -26,3 +26,4 @@ jobs: run: npx haxe tests.hxml - name: run test run: hl test.hl + timeout-minutes: 3 diff --git a/tests.hxml b/tests.hxml index d172e5c..7146bf0 100644 --- a/tests.hxml +++ b/tests.hxml @@ -1,4 +1,5 @@ -cp tests -main Test +--define no-deprecation-warnings --macro nullSafety("weblink._internal.ds", StrictThreaded) -hl test.hl \ No newline at end of file diff --git a/weblink/Weblink.hx b/weblink/Weblink.hx index 1b5783f..cd52c51 100644 --- a/weblink/Weblink.hx +++ b/weblink/Weblink.hx @@ -1,6 +1,7 @@ package weblink; import haxe.http.HttpMethod; +import sys.net.Host; import weblink.Handler; import weblink._internal.Server; import weblink._internal.ds.RadixTree; @@ -12,7 +13,9 @@ import weblink.security.OAuth.OAuthEndpoints; using haxe.io.Path; class Weblink { + /** The internal web server. **/ public var server:Null; + public var routeTree:RadixTree; private var middlewareToChain:Array = []; @@ -85,8 +88,9 @@ class Weblink { public function listen(port:Int, blocking:Bool = true) { this.pathNotFound = chainMiddleware(this.pathNotFound); - server = new Server(port, this); - server.update(blocking); + + final server = this.server = new Server(this); + server.start(new Host("0.0.0.0"), port, blocking ? BlockUntilClosed : BlockUntilReady); } public function serve(path:String = "", dir:String = "", cors:String = "*") { @@ -97,7 +101,7 @@ class Weblink { } public function close() { - server.close(); + this.server.closeSync(); } /** diff --git a/weblink/_internal/Server.hx b/weblink/_internal/Server.hx index 5809e1c..8d75900 100644 --- a/weblink/_internal/Server.hx +++ b/weblink/_internal/Server.hx @@ -1,32 +1,58 @@ package weblink._internal; -import haxe.MainLoop; +import haxe.EntryPoint; +import haxe.Exception; +import haxe.Timer; import haxe.io.Bytes; -import hl.uv.Stream; +import hl.Gc; +import hl.uv.Loop; import sys.net.Host; +import sys.thread.EventLoop; +import sys.thread.Lock; +import sys.thread.Thread; import weblink._internal.Socket; class Server extends SocketServer { + /** + Is the server currently running? + **/ + public var running:Bool; - var parent:Weblink; - public var running:Bool = true; - var loop:hl.uv.Loop; - - public function new(port:Int, parent:Weblink) { - // sockets = []; - loop = hl.uv.Loop.getDefault(); - super(loop); - bind(new Host("0.0.0.0"), port); - noDelay(true); - listen(100, function() { - final stream = accept(); - final socket:Socket = cast stream; + private final parent:Weblink; + private final uvLoop:Loop; + private var serverThread:Null; + private var helperTimer:Null; + + public function new(app:Weblink) { + this.uvLoop = @:privateAccess Loop.default_loop(); // don't register MainLoop event + super(this.uvLoop); + + this.parent = app; + this.running = false; + } + + public function start(host:Host, port:Int, model:StartModel) { + final lock = new Lock(); + + // Prepare the libuv TCP socket + super.bind(host, port); + super.noDelay(true); + + // Configure new connection callback + super.listen(100, function() { + Gc.blocking(false); + + final client = this.accept(); + + // Register a handler for incoming data (HTTP/1.1 specific) var request:Null = null; + client.readStart(function(data:Null) @:privateAccess { + Gc.blocking(false); - stream.readStart(function(data:Null) @:privateAccess { if (data == null) { // EOF request = null; - stream.close(); + client.close(); + Gc.blocking(true); return; } @@ -35,8 +61,9 @@ class Server extends SocketServer { request = new Request(lines); if (request.pos >= request.length) { - complete(request, socket); + complete(request, cast client); request = null; + Gc.blocking(true); return; } } else { @@ -45,8 +72,9 @@ class Server extends SocketServer { request.pos += length; if (request.pos >= request.length) { - complete(request, socket); + complete(request, cast client); request = null; + Gc.blocking(true); return; } } @@ -54,19 +82,87 @@ class Server extends SocketServer { if (request.chunked) { request.chunk(data.toString()); if (request.chunkSize == 0) { - complete(request, socket); + complete(request, cast client); request = null; + Gc.blocking(true); return; } } if (request.method != Post && request.method != Put) { - complete(request, socket); + complete(request, cast client); request = null; } + + Gc.blocking(true); }); + + // Hashlink libuv bindings only allow for filesystem and TCP connection events. + // We use the fact that a new connection is opened to trigger Haxe's event loop. + // We have to run it on the same thread + // in case some of the events call (non-thread safe) libuv APIs. + final currentThread = Thread.current(); + final events = currentThread.events; + try { + events.progress(); + } catch (e) { + trace(e.details()); + } + + Gc.blocking(true); }); - this.parent = parent; + + // Create a thread to run the server's event loop + final serverThread = this.serverThread = Thread.create(() -> { + final currentThread = Thread.current(); + #if (haxe_ver >= 4.3) currentThread.setName("TCP listener"); #end + + // If we simply called Thread.createWithEventLoop up here, + // the thread would not stop after this block, + // but would continue running through the registered events. + // This way, setting Haxe's loop manually, + // our thread is guaranteed to eventually terminate. + Reflect.setProperty(currentThread, "events", new EventLoop()); + + this.running = true; + if (model == BlockUntilReady) { + lock.release(); + } + + Gc.blocking(true); + this.uvLoop.run(Default); + Gc.blocking(false); + + if (model == BlockUntilClosed) { + lock.release(); + } + }); + + // Create thread #2 which will periodically wake up thread #1 with TCP connections. + // When the server gets no traffic, this has two side effects: + // 1) the Haxe event loop is still run, + // 2) the GC can still collect garbage. + if (port != 0) { + // Of course, this trick only works if we know the port: + // unfortunately, we cannot get it from a running server + Thread.createWithEventLoop(() -> { + #if (haxe_ver >= 4.3) Thread.current().setName("Timer sch. hack"); #end + final host = new Host("127.0.0.1"); + final timer = this.helperTimer = new Timer(557); + timer.run = () -> { + final socket = new sys.net.Socket(); + final _ = @:privateAccess sys.net.Socket.socket_connect(socket.__s, host.ip, port); + socket.close(); // Immediately close not to eat up too much resources + }; + }); + } + + // Prevent the process from exiting + final mainThread = @:privateAccess EntryPoint.mainThread; + mainThread.events.promise(); + + // Wait until the server is either ready or closed + lock.wait(); } private function complete(request:Request, socket:Socket) { @@ -96,16 +192,50 @@ class Server extends SocketServer { } } + @:deprecated("Updates are now done in the background. You can try to remove this call.") public function update(blocking:Bool = true) { - do { - @:privateAccess MainLoop.tick(); // for timers - loop.run(NoWait); - } while (running && blocking); + // Pretend to block not to change the method's semantics + final lock = new Lock(); + while (this.running && blocking) { + lock.wait(0.1); + } } - override function close(?callb:() -> Void) { - super.close(callb); - loop.stop(); - running = false; + /** + Closes this server. + **/ + public function closeSync() { + final serverThread = this.serverThread; + this.serverThread = null; + if (serverThread != null) { + final lock = new Lock(); + serverThread.events.run(() -> { + this.close(() -> { + this.uvLoop.stop(); + this.running = false; + + final helperTimer = this.helperTimer; + this.helperTimer = null; + if (helperTimer != null) { + helperTimer.stop(); + } + + lock.release(); + + // Allow the app to exit + final mainThread = @:privateAccess EntryPoint.mainThread; + mainThread.events.runPromised(() -> {}); + }); + }); + lock.wait(10.0); + } } } + +private enum abstract StartModel(Bool) { + /** The `start()` call will return when the server is ready to accept connections. **/ + public var BlockUntilReady = false; + + /** The `start()` call will return when the server is closed. **/ + public var BlockUntilClosed = true; +}