Skip to content

Commit b206d8d

Browse files
committed
Handle TCP (run libuv loop) in a background thread
1 parent 90426fc commit b206d8d

4 files changed

Lines changed: 169 additions & 33 deletions

File tree

.github/workflows/test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ jobs:
2626
run: npx haxe tests.hxml
2727
- name: run test
2828
run: hl test.hl
29+
timeout-minutes: 3

tests.hxml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
-cp tests
22
-main Test
3+
--define no-deprecation-warnings
34
--macro nullSafety("weblink._internal.ds", StrictThreaded)
45
-hl test.hl

weblink/Weblink.hx

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package weblink;
22

33
import haxe.http.HttpMethod;
4+
import sys.net.Host;
45
import weblink.Handler;
56
import weblink._internal.Server;
67
import weblink._internal.ds.RadixTree;
@@ -12,7 +13,9 @@ import weblink.security.OAuth.OAuthEndpoints;
1213
using haxe.io.Path;
1314

1415
class Weblink {
16+
/** The internal web server. **/
1517
public var server:Null<Server>;
18+
1619
public var routeTree:RadixTree<Handler>;
1720

1821
private var middlewareToChain:Array<Middleware> = [];
@@ -85,8 +88,9 @@ class Weblink {
8588

8689
public function listen(port:Int, blocking:Bool = true) {
8790
this.pathNotFound = chainMiddleware(this.pathNotFound);
88-
server = new Server(port, this);
89-
server.update(blocking);
91+
92+
final server = this.server = new Server(this);
93+
server.start(new Host("0.0.0.0"), port, blocking ? BlockUntilClosed : BlockUntilReady);
9094
}
9195

9296
public function serve(path:String = "", dir:String = "", cors:String = "*") {
@@ -97,7 +101,7 @@ class Weblink {
97101
}
98102

99103
public function close() {
100-
server.close();
104+
this.server.closeSync();
101105
}
102106

103107
/**

weblink/_internal/Server.hx

Lines changed: 160 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,58 @@
11
package weblink._internal;
22

3-
import haxe.MainLoop;
3+
import haxe.EntryPoint;
4+
import haxe.Exception;
5+
import haxe.Timer;
46
import haxe.io.Bytes;
5-
import hl.uv.Stream;
7+
import hl.Gc;
8+
import hl.uv.Loop;
69
import sys.net.Host;
10+
import sys.thread.EventLoop;
11+
import sys.thread.Lock;
12+
import sys.thread.Thread;
713
import weblink._internal.Socket;
814

915
class Server extends SocketServer {
16+
/**
17+
Is the server currently running?
18+
**/
19+
public var running:Bool;
1020

11-
var parent:Weblink;
12-
public var running:Bool = true;
13-
var loop:hl.uv.Loop;
14-
15-
public function new(port:Int, parent:Weblink) {
16-
// sockets = [];
17-
loop = hl.uv.Loop.getDefault();
18-
super(loop);
19-
bind(new Host("0.0.0.0"), port);
20-
noDelay(true);
21-
listen(100, function() {
22-
final stream = accept();
23-
final socket:Socket = cast stream;
21+
private final parent:Weblink;
22+
private final uvLoop:Loop;
23+
private var serverThread:Null<Thread>;
24+
private var helperTimer:Null<Timer>;
25+
26+
public function new(app:Weblink) {
27+
this.uvLoop = @:privateAccess Loop.default_loop(); // don't register MainLoop event
28+
super(this.uvLoop);
29+
30+
this.parent = app;
31+
this.running = false;
32+
}
33+
34+
public function start(host:Host, port:Int, model:StartModel) {
35+
final lock = new Lock();
36+
37+
// Prepare the libuv TCP socket
38+
super.bind(host, port);
39+
super.noDelay(true);
40+
41+
// Configure new connection callback
42+
super.listen(100, function() {
43+
Gc.blocking(false);
44+
45+
final client = this.accept();
46+
47+
// Register a handler for incoming data (HTTP/1.1 specific)
2448
var request:Null<Request> = null;
49+
client.readStart(function(data:Null<Bytes>) @:privateAccess {
50+
Gc.blocking(false);
2551

26-
stream.readStart(function(data:Null<Bytes>) @:privateAccess {
2752
if (data == null) { // EOF
2853
request = null;
29-
stream.close();
54+
client.close();
55+
Gc.blocking(true);
3056
return;
3157
}
3258

@@ -35,8 +61,9 @@ class Server extends SocketServer {
3561
request = new Request(lines);
3662

3763
if (request.pos >= request.length) {
38-
complete(request, socket);
64+
complete(request, cast client);
3965
request = null;
66+
Gc.blocking(true);
4067
return;
4168
}
4269
} else {
@@ -45,28 +72,97 @@ class Server extends SocketServer {
4572
request.pos += length;
4673

4774
if (request.pos >= request.length) {
48-
complete(request, socket);
75+
complete(request, cast client);
4976
request = null;
77+
Gc.blocking(true);
5078
return;
5179
}
5280
}
5381

5482
if (request.chunked) {
5583
request.chunk(data.toString());
5684
if (request.chunkSize == 0) {
57-
complete(request, socket);
85+
complete(request, cast client);
5886
request = null;
87+
Gc.blocking(true);
5988
return;
6089
}
6190
}
6291

6392
if (request.method != Post && request.method != Put) {
64-
complete(request, socket);
93+
complete(request, cast client);
6594
request = null;
6695
}
96+
97+
Gc.blocking(true);
6798
});
99+
100+
// Hashlink libuv bindings only allow for filesystem and TCP connection events.
101+
// We use the fact that a new connection is opened to trigger Haxe's event loop.
102+
// We have to run it on the same thread
103+
// in case some of the events call (non-thread safe) libuv APIs.
104+
final currentThread = Thread.current();
105+
final events = currentThread.events;
106+
try {
107+
events.progress();
108+
} catch (e) {
109+
trace(e.details());
110+
}
111+
112+
Gc.blocking(true);
68113
});
69-
this.parent = parent;
114+
115+
// Create a thread to run the server's event loop
116+
final serverThread = this.serverThread = Thread.create(() -> {
117+
final currentThread = Thread.current();
118+
#if (haxe_ver >= 4.3) currentThread.setName("TCP listener"); #end
119+
120+
// If we simply called Thread.createWithEventLoop up here,
121+
// the thread would not stop after this block,
122+
// but would continue running through the registered events.
123+
// This way, setting Haxe's loop manually,
124+
// our thread is guaranteed to eventually terminate.
125+
Reflect.setProperty(currentThread, "events", new EventLoop());
126+
127+
this.running = true;
128+
if (model == BlockUntilReady) {
129+
lock.release();
130+
}
131+
132+
Gc.blocking(true);
133+
this.uvLoop.run(Default);
134+
Gc.blocking(false);
135+
136+
if (model == BlockUntilClosed) {
137+
lock.release();
138+
}
139+
});
140+
141+
// Create thread #2 which will periodically wake up thread #1 with TCP connections.
142+
// When the server gets no traffic, this has two side effects:
143+
// 1) the Haxe event loop is still run,
144+
// 2) the GC can still collect garbage.
145+
if (port != 0) {
146+
// Of course, this trick only works if we know the port:
147+
// unfortunately, we cannot get it from a running server
148+
Thread.createWithEventLoop(() -> {
149+
#if (haxe_ver >= 4.3) Thread.current().setName("Timer sch. hack"); #end
150+
final host = new Host("127.0.0.1");
151+
final timer = this.helperTimer = new Timer(557);
152+
timer.run = () -> {
153+
final socket = new sys.net.Socket();
154+
final _ = @:privateAccess sys.net.Socket.socket_connect(socket.__s, host.ip, port);
155+
socket.close(); // Immediately close not to eat up too much resources
156+
};
157+
});
158+
}
159+
160+
// Prevent the process from exiting
161+
final mainThread = @:privateAccess EntryPoint.mainThread;
162+
mainThread.events.promise();
163+
164+
// Wait until the server is either ready or closed
165+
lock.wait();
70166
}
71167

72168
private function complete(request:Request, socket:Socket) {
@@ -96,16 +192,50 @@ class Server extends SocketServer {
96192
}
97193
}
98194

195+
@:deprecated("Updates are now done in the background. You can try to remove this call.")
99196
public function update(blocking:Bool = true) {
100-
do {
101-
@:privateAccess MainLoop.tick(); // for timers
102-
loop.run(NoWait);
103-
} while (running && blocking);
197+
// Pretend to block not to change the method's semantics
198+
final lock = new Lock();
199+
while (this.running && blocking) {
200+
lock.wait(0.1);
201+
}
104202
}
105203

106-
override function close(?callb:() -> Void) {
107-
super.close(callb);
108-
loop.stop();
109-
running = false;
204+
/**
205+
Closes this server.
206+
**/
207+
public function closeSync() {
208+
final serverThread = this.serverThread;
209+
this.serverThread = null;
210+
if (serverThread != null) {
211+
final lock = new Lock();
212+
serverThread.events.run(() -> {
213+
this.close(() -> {
214+
this.uvLoop.stop();
215+
this.running = false;
216+
217+
final helperTimer = this.helperTimer;
218+
this.helperTimer = null;
219+
if (helperTimer != null) {
220+
helperTimer.stop();
221+
}
222+
223+
lock.release();
224+
225+
// Allow the app to exit
226+
final mainThread = @:privateAccess EntryPoint.mainThread;
227+
mainThread.events.runPromised(() -> {});
228+
});
229+
});
230+
lock.wait(10.0);
231+
}
110232
}
111233
}
234+
235+
private enum abstract StartModel(Bool) {
236+
/** The `start()` call will return when the server is ready to accept connections. **/
237+
public var BlockUntilReady = false;
238+
239+
/** The `start()` call will return when the server is closed. **/
240+
public var BlockUntilClosed = true;
241+
}

0 commit comments

Comments
 (0)