Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ jobs:
run: npx haxe tests.hxml
- name: run test
run: hl test.hl
timeout-minutes: 3
1 change: 1 addition & 0 deletions tests.hxml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-cp tests
-main Test
--define no-deprecation-warnings
--macro nullSafety("weblink._internal.ds", StrictThreaded)
-hl test.hl
10 changes: 7 additions & 3 deletions weblink/Weblink.hx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package weblink;

import haxe.http.HttpMethod;
import sys.net.Host;
import weblink.Handler;
import weblink._internal.Server;
import weblink._internal.ds.RadixTree;
Expand All @@ -12,7 +13,9 @@ import weblink.security.OAuth.OAuthEndpoints;
using haxe.io.Path;

class Weblink {
/** The internal web server. **/
public var server:Null<Server>;

public var routeTree:RadixTree<Handler>;

private var middlewareToChain:Array<Middleware> = [];
Expand Down Expand Up @@ -85,8 +88,9 @@ class Weblink {

public function listen(port:Int, blocking:Bool = true) {
this.pathNotFound = chainMiddleware(this.pathNotFound);
server = new Server(port, this);
server.update(blocking);

final server = this.server = new Server(this);
server.start(new Host("0.0.0.0"), port, blocking ? BlockUntilClosed : BlockUntilReady);
}

public function serve(path:String = "", dir:String = "", cors:String = "*") {
Expand All @@ -97,7 +101,7 @@ class Weblink {
}

public function close() {
server.close();
this.server.closeSync();
}

/**
Expand Down
190 changes: 160 additions & 30 deletions weblink/_internal/Server.hx
Original file line number Diff line number Diff line change
@@ -1,32 +1,58 @@
package weblink._internal;

import haxe.MainLoop;
import haxe.EntryPoint;
import haxe.Exception;
import haxe.Timer;
import haxe.io.Bytes;
import hl.uv.Stream;
import hl.Gc;
import hl.uv.Loop;
import sys.net.Host;
import sys.thread.EventLoop;
import sys.thread.Lock;
import sys.thread.Thread;
import weblink._internal.Socket;

class Server extends SocketServer {
/**
Is the server currently running?
**/
public var running:Bool;

var parent:Weblink;
public var running:Bool = true;
var loop:hl.uv.Loop;

public function new(port:Int, parent:Weblink) {
// sockets = [];
loop = hl.uv.Loop.getDefault();
super(loop);
bind(new Host("0.0.0.0"), port);
noDelay(true);
listen(100, function() {
final stream = accept();
final socket:Socket = cast stream;
private final parent:Weblink;
private final uvLoop:Loop;
private var serverThread:Null<Thread>;
private var helperTimer:Null<Timer>;

public function new(app:Weblink) {
this.uvLoop = @:privateAccess Loop.default_loop(); // don't register MainLoop event
super(this.uvLoop);

this.parent = app;
this.running = false;
}

public function start(host:Host, port:Int, model:StartModel) {
final lock = new Lock();

// Prepare the libuv TCP socket
super.bind(host, port);
super.noDelay(true);

// Configure new connection callback
super.listen(100, function() {
Gc.blocking(false);

final client = this.accept();

// Register a handler for incoming data (HTTP/1.1 specific)
var request:Null<Request> = null;
client.readStart(function(data:Null<Bytes>) @:privateAccess {
Gc.blocking(false);

stream.readStart(function(data:Null<Bytes>) @:privateAccess {
if (data == null) { // EOF
request = null;
stream.close();
client.close();
Gc.blocking(true);
return;
}

Expand All @@ -35,8 +61,9 @@ class Server extends SocketServer {
request = new Request(lines);

if (request.pos >= request.length) {
complete(request, socket);
complete(request, cast client);
request = null;
Gc.blocking(true);
return;
}
} else {
Expand All @@ -45,28 +72,97 @@ class Server extends SocketServer {
request.pos += length;

if (request.pos >= request.length) {
complete(request, socket);
complete(request, cast client);
request = null;
Gc.blocking(true);
return;
}
}

if (request.chunked) {
request.chunk(data.toString());
if (request.chunkSize == 0) {
complete(request, socket);
complete(request, cast client);
request = null;
Gc.blocking(true);
return;
}
}

if (request.method != Post && request.method != Put) {
complete(request, socket);
complete(request, cast client);
request = null;
}

Gc.blocking(true);
});

// Hashlink libuv bindings only allow for filesystem and TCP connection events.
// We use the fact that a new connection is opened to trigger Haxe's event loop.
// We have to run it on the same thread
// in case some of the events call (non-thread safe) libuv APIs.
final currentThread = Thread.current();
final events = currentThread.events;
try {
events.progress();
} catch (e) {
trace(e.details());
}

Gc.blocking(true);
});
this.parent = parent;

// Create a thread to run the server's event loop
final serverThread = this.serverThread = Thread.create(() -> {
final currentThread = Thread.current();
#if (haxe_ver >= 4.3) currentThread.setName("TCP listener"); #end

// If we simply called Thread.createWithEventLoop up here,
// the thread would not stop after this block,
// but would continue running through the registered events.
// This way, setting Haxe's loop manually,
// our thread is guaranteed to eventually terminate.
Reflect.setProperty(currentThread, "events", new EventLoop());

this.running = true;
if (model == BlockUntilReady) {
lock.release();
}

Gc.blocking(true);
this.uvLoop.run(Default);
Gc.blocking(false);

if (model == BlockUntilClosed) {
lock.release();
}
});

// Create thread #2 which will periodically wake up thread #1 with TCP connections.
// When the server gets no traffic, this has two side effects:
// 1) the Haxe event loop is still run,
// 2) the GC can still collect garbage.
if (port != 0) {
// Of course, this trick only works if we know the port:
// unfortunately, we cannot get it from a running server
Thread.createWithEventLoop(() -> {
#if (haxe_ver >= 4.3) Thread.current().setName("Timer sch. hack"); #end
final host = new Host("127.0.0.1");
final timer = this.helperTimer = new Timer(557);
timer.run = () -> {
final socket = new sys.net.Socket();
final _ = @:privateAccess sys.net.Socket.socket_connect(socket.__s, host.ip, port);
socket.close(); // Immediately close not to eat up too much resources
};
});
}

// Prevent the process from exiting
final mainThread = @:privateAccess EntryPoint.mainThread;
mainThread.events.promise();

// Wait until the server is either ready or closed
lock.wait();
}

private function complete(request:Request, socket:Socket) {
Expand Down Expand Up @@ -96,16 +192,50 @@ class Server extends SocketServer {
}
}

@:deprecated("Updates are now done in the background. You can try to remove this call.")
public function update(blocking:Bool = true) {
do {
@:privateAccess MainLoop.tick(); // for timers
loop.run(NoWait);
} while (running && blocking);
// Pretend to block not to change the method's semantics
final lock = new Lock();
while (this.running && blocking) {
lock.wait(0.1);
}
}

override function close(?callb:() -> Void) {
super.close(callb);
loop.stop();
running = false;
/**
Closes this server.
**/
public function closeSync() {
final serverThread = this.serverThread;
this.serverThread = null;
if (serverThread != null) {
final lock = new Lock();
serverThread.events.run(() -> {
this.close(() -> {
this.uvLoop.stop();
this.running = false;

final helperTimer = this.helperTimer;
this.helperTimer = null;
if (helperTimer != null) {
helperTimer.stop();
}

lock.release();

// Allow the app to exit
final mainThread = @:privateAccess EntryPoint.mainThread;
mainThread.events.runPromised(() -> {});
});
});
lock.wait(10.0);
}
}
}

private enum abstract StartModel(Bool) {
/** The `start()` call will return when the server is ready to accept connections. **/
public var BlockUntilReady = false;

/** The `start()` call will return when the server is closed. **/
public var BlockUntilClosed = true;
}