From 4eaa811cb3dd9272c62efb521dc70f511e7b7c2b Mon Sep 17 00:00:00 2001 From: oboard Date: Mon, 17 Nov 2025 01:45:08 +0800 Subject: [PATCH 1/4] feat(websocket): add WebSocket support to mocket framework Implement WebSocket server functionality including: - WebSocket route registration and handling - Peer management with subscribe/publish capabilities - Native and JS backend implementations - Example echo server and test client - Version bump to 0.5.7 --- moon.mod.json | 2 +- src/{example => examples/route}/main.mbt | 0 src/{example => examples/route}/moon.pkg.json | 0 .../route}/pkg.generated.mbti | 2 +- src/examples/websocket/moon.pkg.json | 6 + src/examples/websocket/pkg.generated.mbti | 13 ++ src/examples/websocket/websocket_echo.mbt | 20 ++ src/handler.mbt | 62 ++++++ src/index.mbt | 22 ++ src/js/pkg.generated.mbti | 196 ++++++++++++++++++ src/mocket.js.mbt | 63 +++++- src/mocket.native.mbt | 96 +++++++++ src/mocket.stub.c | 164 ++++++++++++++- src/moon.pkg.json | 8 + src/pkg.generated.mbti | 50 ++++- src/ws-bridge.js | 183 ++++++++++++++++ test_ws.js | 11 + 17 files changed, 884 insertions(+), 14 deletions(-) rename src/{example => examples/route}/main.mbt (100%) rename src/{example => examples/route}/moon.pkg.json (100%) rename src/{example => examples/route}/pkg.generated.mbti (75%) create mode 100644 src/examples/websocket/moon.pkg.json create mode 100644 src/examples/websocket/pkg.generated.mbti create mode 100644 src/examples/websocket/websocket_echo.mbt create mode 100644 src/ws-bridge.js create mode 100644 test_ws.js diff --git a/moon.mod.json b/moon.mod.json index f209b7d..fa04c21 100644 --- a/moon.mod.json +++ b/moon.mod.json @@ -1,6 +1,6 @@ { "name": "oboard/mocket", - "version": "0.5.6", + "version": "0.5.7", "deps": { "illusory0x0/native": "0.2.1", "moonbitlang/x": "0.4.34", diff --git a/src/example/main.mbt b/src/examples/route/main.mbt similarity index 100% rename from src/example/main.mbt rename to src/examples/route/main.mbt diff --git a/src/example/moon.pkg.json b/src/examples/route/moon.pkg.json similarity index 100% rename from src/example/moon.pkg.json rename to src/examples/route/moon.pkg.json diff --git a/src/example/pkg.generated.mbti b/src/examples/route/pkg.generated.mbti similarity index 75% rename from src/example/pkg.generated.mbti rename to src/examples/route/pkg.generated.mbti index a098fbe..1799246 100644 --- a/src/example/pkg.generated.mbti +++ b/src/examples/route/pkg.generated.mbti @@ -1,5 +1,5 @@ // Generated using `moon info`, DON'T EDIT IT -package "oboard/mocket/example" +package "oboard/mocket/examples/route" // Values diff --git a/src/examples/websocket/moon.pkg.json b/src/examples/websocket/moon.pkg.json new file mode 100644 index 0000000..a6eef88 --- /dev/null +++ b/src/examples/websocket/moon.pkg.json @@ -0,0 +1,6 @@ +{ + "is-main": true, + "import": [ + "oboard/mocket" + ] +} \ No newline at end of file diff --git a/src/examples/websocket/pkg.generated.mbti b/src/examples/websocket/pkg.generated.mbti new file mode 100644 index 0000000..740aed4 --- /dev/null +++ b/src/examples/websocket/pkg.generated.mbti @@ -0,0 +1,13 @@ +// Generated using `moon info`, DON'T EDIT IT +package "oboard/mocket/examples/websocket" + +// Values + +// Errors + +// Types and methods + +// Type aliases + +// Traits + diff --git a/src/examples/websocket/websocket_echo.mbt b/src/examples/websocket/websocket_echo.mbt new file mode 100644 index 0000000..6eb43e8 --- /dev/null +++ b/src/examples/websocket/websocket_echo.mbt @@ -0,0 +1,20 @@ +///| +fn main { + let app = @mocket.new(logger=@mocket.new_logger()) + app.ws("/ws", fn(event) { + match event { + Open(peer) => println("WS open: " + peer.to_string()) + Message(peer, body) => { + let msg = match body { + Text(s) => s.to_string() + _ => "" + } + println("WS message: " + msg) + peer.send(msg) + } + Close(peer) => println("WS close: " + peer.to_string()) + } + }) + println("WebSocket echo server listening on ws://localhost:8080/ws") + @mocket.serve_ffi(app, port=8080) +} diff --git a/src/handler.mbt b/src/handler.mbt index ea678c2..a6dd193 100644 --- a/src/handler.mbt +++ b/src/handler.mbt @@ -1,2 +1,64 @@ ///| pub type HttpHandler = async (HttpEvent) -> HttpBody noraise + +///| +pub(all) struct WebSocketPeer { + connection_id : String + mut subscribed_channels : Array[String] +} + +///| +pub fn WebSocketPeer::send(self : WebSocketPeer, message : String) -> Unit { + // 真正发送到后端连接 + ws_send(self.connection_id, message) +} + +///| +pub fn WebSocketPeer::subscribe(self : WebSocketPeer, channel : String) -> Unit { + if not(self.subscribed_channels.contains(channel)) { + self.subscribed_channels.push(channel) + ws_subscribe(self.connection_id, channel) + } +} + +///| +pub fn WebSocketPeer::unsubscribe( + self : WebSocketPeer, + channel : String, +) -> Unit { + let mut index = None + for i = 0; i < self.subscribed_channels.length(); i = i + 1 { + if self.subscribed_channels[i] == channel { + index = Some(i) + break + } + } + match index { + Some(i) => { + ignore(self.subscribed_channels.remove(i)) + ws_unsubscribe(self.connection_id, channel) + } + None => () + } +} + +///| +pub fn WebSocketPeer::publish(channel : String, message : String) -> Unit { + ws_publish(channel, message) +} + +///| +pub fn WebSocketPeer::to_string(self : WebSocketPeer) -> String { + "WebSocketPeer(\{self.connection_id})" +} + +///| +pub enum WebSocketEvent { + Open(WebSocketPeer) + Message(WebSocketPeer, HttpBody) + Close(WebSocketPeer) +} + +///| +// WebSocket 路由的事件处理器类型(不返回 HttpBody) +pub type WebSocketHandler = (WebSocketEvent) -> Unit diff --git a/src/index.mbt b/src/index.mbt index 0075e48..6ef6497 100644 --- a/src/index.mbt +++ b/src/index.mbt @@ -18,6 +18,9 @@ pub(all) struct Mocket { dynamic_routes : Map[String, Array[(String, HttpHandler)]] // 日志记录器 logger : Logger + // WebSocket 路由(按路径匹配,不区分方法) + ws_static_routes : Map[String, WebSocketHandler] + ws_dynamic_routes : Array[(String, WebSocketHandler)] } ///| @@ -35,6 +38,8 @@ pub fn new( static_routes: {}, dynamic_routes: {}, logger, + ws_static_routes: {}, + ws_dynamic_routes: [], } } @@ -209,3 +214,20 @@ pub fn Mocket::group( // 合并中间件 group.middlewares.iter().each(self.middlewares.push(_)) } + +///| +// 注册 WebSocket 路由(不区分方法,按路径匹配) +pub fn Mocket::ws( + self : Mocket, + path : String, + handler : WebSocketHandler, +) -> Unit { + let path = self.base_path + path + // 静态路径直接缓存 + if path.find(":").unwrap_or(-1) == -1 && path.find("*").unwrap_or(-1) == -1 { + self.ws_static_routes.set(path, handler) + } else { + // 动态路径加入列表 + self.ws_dynamic_routes.push((path, handler)) + } +} diff --git a/src/js/pkg.generated.mbti b/src/js/pkg.generated.mbti index 499bd25..299855e 100644 --- a/src/js/pkg.generated.mbti +++ b/src/js/pkg.generated.mbti @@ -1,13 +1,209 @@ // Generated using `moon info`, DON'T EDIT IT package "oboard/mocket/js" +import( + "moonbitlang/core/json" +) + // Values +async fn[T] async_all(Array[async () -> T]) -> Array[T] + +let async_iterator : Symbol + +fn async_run(async () -> Unit noraise) -> Unit + +fn async_test(async () -> Unit) -> Unit + +let globalThis : Value + +let iterator : Symbol + +fn require(String, keys? : Array[String]) -> Value + +fn[T, E : Error] spawn_detach(async () -> T raise E) -> Unit + +async fn[T, E : Error] suspend(((T) -> Unit, (E) -> Unit) -> Unit) -> T raise E // Errors +pub suberror Error_ Value +fn Error_::cause(Self) -> Value? +fn[T] Error_::wrap(() -> Value, map_ok? : (Value) -> T) -> T raise Self +impl Show for Error_ // Types and methods +type Nullable[_] +fn[T] Nullable::from_option(T?) -> Self[T] +#deprecated +fn[T] Nullable::get_exn(Self[T]) -> T +fn[T] Nullable::is_null(Self[T]) -> Bool +fn[T] Nullable::null() -> Self[T] +fn[T] Nullable::to_option(Self[T]) -> T? +fn[T] Nullable::unwrap(Self[T]) -> T + +pub struct Object(Value) +fn[K, V] Object::extend_iter(Self, Iter[(K, V)]) -> Unit +fn[K, V] Object::extend_iter2(Self, Iter2[K, V]) -> Unit +fn Object::extend_object(Self, Self) -> Self +fn[K, V] Object::from_iter(Iter[(K, V)]) -> Self +fn[K, V] Object::from_iter2(Iter2[K, V]) -> Self +fn Object::from_value(Value) -> Optional[Self] +fn Object::from_value_unchecked(Value) -> Self +#deprecated +fn Object::inner(Self) -> Value +fn Object::new() -> Self +fn[K, V] Object::op_get(Self, K) -> V +fn[K, V] Object::op_set(Self, K, V) -> Unit +fn Object::to_value(Self) -> Value + +type Optional[_] +fn[T] Optional::from_option(T?) -> Self[T] +#deprecated +fn[T] Optional::get_exn(Self[T]) -> T +fn[T] Optional::is_undefined(Self[T]) -> Bool +fn[T] Optional::to_option(Self[T]) -> T? +fn[T] Optional::undefined() -> Self[T] +fn[T] Optional::unwrap(Self[T]) -> T + +#external +pub type Promise +fn Promise::all(Array[Self]) -> Self +fn[T] Promise::unsafe_new(async () -> T) -> Self +async fn Promise::wait(Self) -> Value + +type Symbol +fn Symbol::make() -> Self +fn Symbol::make_with_number(Double) -> Self +fn Symbol::make_with_string(String) -> Self +fn Symbol::make_with_string_js(String) -> Self + +type Union2[_, _] +fn[A : Cast, B] Union2::from0(A) -> Self[A, B] +fn[A, B : Cast] Union2::from1(B) -> Self[A, B] +fn[A : Cast, B] Union2::to0(Self[A, B]) -> A? +fn[A, B : Cast] Union2::to1(Self[A, B]) -> B? + +type Union3[_, _, _] +fn[A : Cast, B, C] Union3::from0(A) -> Self[A, B, C] +fn[A, B : Cast, C] Union3::from1(B) -> Self[A, B, C] +fn[A, B, C : Cast] Union3::from2(C) -> Self[A, B, C] +fn[A : Cast, B, C] Union3::to0(Self[A, B, C]) -> A? +fn[A, B : Cast, C] Union3::to1(Self[A, B, C]) -> B? +fn[A, B, C : Cast] Union3::to2(Self[A, B, C]) -> C? + +type Union4[_, _, _, _] +fn[A : Cast, B, C, D] Union4::from0(A) -> Self[A, B, C, D] +fn[A, B : Cast, C, D] Union4::from1(B) -> Self[A, B, C, D] +fn[A, B, C : Cast, D] Union4::from2(C) -> Self[A, B, C, D] +fn[A, B, C, D : Cast] Union4::from3(D) -> Self[A, B, C, D] +fn[A : Cast, B, C, D] Union4::to0(Self[A, B, C, D]) -> A? +fn[A, B : Cast, C, D] Union4::to1(Self[A, B, C, D]) -> B? +fn[A, B, C : Cast, D] Union4::to2(Self[A, B, C, D]) -> C? +fn[A, B, C, D : Cast] Union4::to3(Self[A, B, C, D]) -> D? + +type Union5[_, _, _, _, _] +fn[A : Cast, B, C, D, E] Union5::from0(A) -> Self[A, B, C, D, E] +fn[A, B : Cast, C, D, E] Union5::from1(B) -> Self[A, B, C, D, E] +fn[A, B, C : Cast, D, E] Union5::from2(C) -> Self[A, B, C, D, E] +fn[A, B, C, D : Cast, E] Union5::from3(D) -> Self[A, B, C, D, E] +fn[A, B, C, D, E : Cast] Union5::from4(E) -> Self[A, B, C, D, E] +fn[A : Cast, B, C, D, E] Union5::to0(Self[A, B, C, D, E]) -> A? +fn[A, B : Cast, C, D, E] Union5::to1(Self[A, B, C, D, E]) -> B? +fn[A, B, C : Cast, D, E] Union5::to2(Self[A, B, C, D, E]) -> C? +fn[A, B, C, D : Cast, E] Union5::to3(Self[A, B, C, D, E]) -> D? +fn[A, B, C, D, E : Cast] Union5::to4(Self[A, B, C, D, E]) -> E? + +type Union6[_, _, _, _, _, _] +fn[A : Cast, B, C, D, E, F] Union6::from0(A) -> Self[A, B, C, D, E, F] +fn[A, B : Cast, C, D, E, F] Union6::from1(B) -> Self[A, B, C, D, E, F] +fn[A, B, C : Cast, D, E, F] Union6::from2(C) -> Self[A, B, C, D, E, F] +fn[A, B, C, D : Cast, E, F] Union6::from3(D) -> Self[A, B, C, D, E, F] +fn[A, B, C, D, E : Cast, F] Union6::from4(E) -> Self[A, B, C, D, E, F] +fn[A, B, C, D, E, F : Cast] Union6::from5(F) -> Self[A, B, C, D, E, F] +fn[A : Cast, B, C, D, E, F] Union6::to0(Self[A, B, C, D, E, F]) -> A? +fn[A, B : Cast, C, D, E, F] Union6::to1(Self[A, B, C, D, E, F]) -> B? +fn[A, B, C : Cast, D, E, F] Union6::to2(Self[A, B, C, D, E, F]) -> C? +fn[A, B, C, D : Cast, E, F] Union6::to3(Self[A, B, C, D, E, F]) -> D? +fn[A, B, C, D, E : Cast, F] Union6::to4(Self[A, B, C, D, E, F]) -> E? +fn[A, B, C, D, E, F : Cast] Union6::to5(Self[A, B, C, D, E, F]) -> F? + +type Union7[_, _, _, _, _, _, _] +fn[A : Cast, B, C, D, E, F, G] Union7::from0(A) -> Self[A, B, C, D, E, F, G] +fn[A, B : Cast, C, D, E, F, G] Union7::from1(B) -> Self[A, B, C, D, E, F, G] +fn[A, B, C : Cast, D, E, F, G] Union7::from2(C) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D : Cast, E, F, G] Union7::from3(D) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D, E : Cast, F, G] Union7::from4(E) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D, E, F : Cast, G] Union7::from5(F) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D, E, F, G : Cast] Union7::from6(G) -> Self[A, B, C, D, E, F, G] +fn[A : Cast, B, C, D, E, F, G] Union7::to0(Self[A, B, C, D, E, F, G]) -> A? +fn[A, B : Cast, C, D, E, F, G] Union7::to1(Self[A, B, C, D, E, F, G]) -> B? +fn[A, B, C : Cast, D, E, F, G] Union7::to2(Self[A, B, C, D, E, F, G]) -> C? +fn[A, B, C, D : Cast, E, F, G] Union7::to3(Self[A, B, C, D, E, F, G]) -> D? +fn[A, B, C, D, E : Cast, F, G] Union7::to4(Self[A, B, C, D, E, F, G]) -> E? +fn[A, B, C, D, E, F : Cast, G] Union7::to5(Self[A, B, C, D, E, F, G]) -> F? +fn[A, B, C, D, E, F, G : Cast] Union7::to6(Self[A, B, C, D, E, F, G]) -> G? + +type Union8[_, _, _, _, _, _, _, _] +fn[A : Cast, B, C, D, E, F, G, H] Union8::from0(A) -> Self[A, B, C, D, E, F, G, H] +fn[A, B : Cast, C, D, E, F, G, H] Union8::from1(B) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C : Cast, D, E, F, G, H] Union8::from2(C) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D : Cast, E, F, G, H] Union8::from3(D) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E : Cast, F, G, H] Union8::from4(E) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E, F : Cast, G, H] Union8::from5(F) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E, F, G : Cast, H] Union8::from6(G) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E, F, G, H : Cast] Union8::from7(H) -> Self[A, B, C, D, E, F, G, H] +fn[A : Cast, B, C, D, E, F, G, H] Union8::to0(Self[A, B, C, D, E, F, G, H]) -> A? +fn[A, B : Cast, C, D, E, F, G, H] Union8::to1(Self[A, B, C, D, E, F, G, H]) -> B? +fn[A, B, C : Cast, D, E, F, G, H] Union8::to2(Self[A, B, C, D, E, F, G, H]) -> C? +fn[A, B, C, D : Cast, E, F, G, H] Union8::to3(Self[A, B, C, D, E, F, G, H]) -> D? +fn[A, B, C, D, E : Cast, F, G, H] Union8::to4(Self[A, B, C, D, E, F, G, H]) -> E? +fn[A, B, C, D, E, F : Cast, G, H] Union8::to5(Self[A, B, C, D, E, F, G, H]) -> F? +fn[A, B, C, D, E, F, G : Cast, H] Union8::to6(Self[A, B, C, D, E, F, G, H]) -> G? +fn[A, B, C, D, E, F, G, H : Cast] Union8::to7(Self[A, B, C, D, E, F, G, H]) -> H? + +#external +pub type Value +fn[Arg, Result] Value::apply(Self, Array[Arg]) -> Result +fn[Arg, Result] Value::apply_with_index(Self, Int, Array[Arg]) -> Result +fn[Arg, Result] Value::apply_with_string(Self, String, Array[Arg]) -> Result +fn[Arg, Result] Value::apply_with_symbol(Self, Symbol, Array[Arg]) -> Result +fn[T] Value::cast(Self) -> T +fn[T] Value::cast_from(T) -> Self +fn Value::extends(Self, Self) -> Self +fn Value::from_json(Json) -> Self raise +fn Value::from_json_string(String) -> Self raise +fn[T] Value::get_with_index(Self, Int) -> T +fn[T] Value::get_with_string(Self, String) -> T +fn[T] Value::get_with_symbol(Self, Symbol) -> T +fn Value::is_bool(Self) -> Bool +fn Value::is_null(Self) -> Bool +fn Value::is_number(Self) -> Bool +fn Value::is_object(Self) -> Bool +fn Value::is_string(Self) -> Bool +fn Value::is_symbol(Self) -> Bool +fn Value::is_undefined(Self) -> Bool +fn[Arg, Result] Value::new(Self, Array[Arg]) -> Result +fn[Arg, Result] Value::new_with_index(Self, Int, Array[Arg]) -> Result +fn[Arg, Result] Value::new_with_string(Self, String, Array[Arg]) -> Result +fn[Arg, Result] Value::new_with_symbol(Self, Symbol, Array[Arg]) -> Result +fn[T] Value::set_with_index(Self, Int, T) -> Unit +fn[T] Value::set_with_string(Self, String, T) -> Unit +fn[T] Value::set_with_symbol(Self, Symbol, T) -> Unit +fn Value::to_json(Self) -> Json raise +fn Value::to_json_string(Self) -> String raise +fn Value::to_string(Self) -> String +impl Show for Value +impl @json.FromJson for Value // Type aliases // Traits +pub(open) trait Cast { + into(Value) -> Self? + from(Self) -> Value +} +impl Cast for Bool +impl Cast for Int +impl Cast for Double +impl Cast for String +impl[A : Cast] Cast for Array[A] diff --git a/src/mocket.js.mbt b/src/mocket.js.mbt index 079697f..b28edd0 100644 --- a/src/mocket.js.mbt +++ b/src/mocket.js.mbt @@ -55,7 +55,7 @@ extern "js" fn HttpResponseInternal::write_head( pub extern "js" fn create_server( handler : (HttpRequestInternal, HttpResponseInternal, () -> Unit) -> Unit, port : Int, -) -> Unit = "(handler, port) => require('node:http').createServer(handler).listen(port, () => {})" +) -> Unit = "(handler, port) => { const server = require('node:http').createServer(handler); globalThis.MOCKET_HTTP_SERVER = server; require('./ws-bridge.js')(); server.listen(port, () => {}) }" ///| pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { @@ -145,4 +145,65 @@ pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { }, port, ) + register_ws_handler(mocket, port) + __ws_emit_js_export() } + +///| +/// 全局 handler 映射:port -> WebSocketHandler +let ws_handler_map : Map[Int, WebSocketHandler] = Map::new() + +///| +/// 在 serve_ffi 里注册 WS handler(只取第一个静态路由) +pub fn register_ws_handler(mocket : Mocket, port : Int) -> Unit { + let mut done = false + mocket.ws_static_routes.each(fn(_, handler) { + if not(done) { + ws_handler_map.set(port, handler) + done = true + } + }) +} + +///| +/// 供 JS 调用的 MoonBit 事件入口:捕获异常,避免抛回 JS +pub fn __ws_emit_js( + event_type : String, + connection_id : String, + payload : String, +) -> Unit { + // 拿第一个 handler(例子只注册一个端口) + let handler = if ws_handler_map.is_empty() { + fn(_) { } + } else { + ws_handler_map.get(ws_handler_map.keys().collect()[0]).unwrap() + } + let peer = WebSocketPeer::{ connection_id, subscribed_channels: [] } + match event_type { + "open" => handler(WebSocketEvent::Open(peer)) + "message" => handler(WebSocketEvent::Message(peer, Text(payload))) + "close" => handler(WebSocketEvent::Close(peer)) + _ => () + } +} + +///| +/// 导出 MoonBit 函数到 JS 全局 +pub extern "js" fn __ws_emit_js_export() -> Unit = "globalThis.__ws_emit = (type, id, payload) => { try { __ws_emit_js(type, id, payload); } catch (_) {} };" + +///| +/// 在 JS 端启动 WebSocket 服务器(监听同端口,复用 HTTP 服务器的 server 实例) +pub extern "js" fn start_ws_server() -> Unit = "(function() { const http = require('node:http'); const crypto = require('node:crypto'); const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; if (!globalThis.WS_SERVER_STARTED) { globalThis.WS_SERVER_STARTED = true; const server = globalThis.MOCKET_HTTP_SERVER; if (!server) return; function acceptKey(key) { return crypto.createHash('sha1').update(String(key) + GUID).digest('base64'); } function sendText(socket, str) { const payload = Buffer.from(String(str), 'utf8'); const len = payload.length; let header; if (len < 126) { header = Buffer.alloc(2); header[0] = 0x81; header[1] = len; } else if (len < 65536) { header = Buffer.alloc(4); header[0] = 0x81; header[1] = 126; header.writeUInt16BE(len, 2); } else { header = Buffer.alloc(10); header[0] = 0x81; header[1] = 127; header.writeBigUInt64BE(BigInt(len), 2); } try { socket.write(Buffer.concat([header, payload])); } catch (_) {} } function parseFrame(buf) { if (!Buffer.isBuffer(buf) || buf.length < 2) return null; const fin = (buf[0] & 0x80) !== 0; const opcode = buf[0] & 0x0f; const masked = (buf[1] & 0x80) !== 0; let len = buf[1] & 0x7f; let offset = 2; if (len === 126) { if (buf.length < 4) return null; len = buf.readUInt16BE(2); offset = 4; } else if (len === 127) { if (buf.length < 10) return null; const big = buf.readBigUInt64BE(2); len = Number(big); offset = 10; } if (masked) { if (buf.length < offset + 4 + len) return null; const mask = buf.slice(offset, offset + 4); const data = buf.slice(offset + 4, offset + 4 + len); const out = Buffer.alloc(len); for (let i = 0; i < len; i++) out[i] = data[i] ^ mask[i % 4]; return { fin, opcode, data: out }; } else { if (buf.length < offset + len) return null; const data = buf.slice(offset, offset + len); return { fin, opcode, data }; } } server.on('upgrade', (req, socket, head) => { const upgrade = (req.headers['upgrade'] || req.headers['Upgrade'] || '').toString(); if (upgrade !== 'websocket' && upgrade !== 'WebSocket') { try { socket.destroy(); } catch (_) {} return; } const key = req.headers['sec-websocket-key'] || req.headers['Sec-WebSocket-Key']; if (!key) { try { socket.destroy(); } catch (_) {} return; } const accept = acceptKey(key.toString()); const headers = ['HTTP/1.1 101 Switching Protocols', 'Upgrade: websocket', 'Connection: Upgrade', 'Sec-WebSocket-Accept: ' + accept, '\r\n']; try { socket.write(headers.join('\r\n')); } catch (_) { try { socket.destroy(); } catch (_) {} return; } const connectionId = crypto.randomUUID(); globalThis.WS.clientsById.set(connectionId, socket); if (typeof globalThis.__ws_emit === 'function') { globalThis.__ws_emit('open', connectionId); } socket.on('data', (buf) => { const frame = parseFrame(buf); if (!frame) return; if (frame.opcode === 0x8) { try { socket.end(); } catch (_) {} globalThis.WS.clientsById.delete(connectionId); for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); if (typeof globalThis.__ws_emit === 'function') { globalThis.__ws_emit('close', connectionId); } return; } if (frame.opcode === 0x9) { const payload = frame.data || Buffer.alloc(0); const header = Buffer.from([0x8A, payload.length]); try { socket.write(Buffer.concat([header, payload])); } catch (_) {} return; } if (frame.opcode === 0x1) { const msg = (frame.data || Buffer.alloc(0)).toString('utf8'); if (typeof globalThis.__ws_emit === 'function') { globalThis.__ws_emit('message', connectionId, msg); } } }); socket.on('close', () => { globalThis.WS.clientsById.delete(connectionId); for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); if (typeof globalThis.__ws_emit === 'function') { globalThis.__ws_emit('close', connectionId); } }); socket.on('error', () => { globalThis.WS.clientsById.delete(connectionId); for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); try { socket.destroy(); } catch (_) {} }); }); } })()" + +///| +/// 四个 FFI 包装,供 WebSocketPeer 调用 +pub extern "js" fn ws_send(id : String, msg : String) -> Unit = "globalThis.ws_send" + +///| +pub extern "js" fn ws_subscribe(id : String, channel : String) -> Unit = "globalThis.ws_subscribe" + +///| +pub extern "js" fn ws_unsubscribe(id : String, channel : String) -> Unit = "globalThis.ws_unsubscribe" + +///| +pub extern "js" fn ws_publish(channel : String, msg : String) -> Unit = "globalThis.ws_publish" diff --git a/src/mocket.native.mbt b/src/mocket.native.mbt index 65211fc..a6c99ab 100644 --- a/src/mocket.native.mbt +++ b/src/mocket.native.mbt @@ -88,12 +88,40 @@ extern "c" fn create_server( handler : FuncRef[(Int, HttpRequestInternal, HttpResponseInternal) -> Unit], ) -> HttpServerInternal = "create_server" +///| +#owned(cb) +extern "c" fn set_ws_emit( + cb : FuncRef[(@native.CStr, @native.CStr, @native.CStr) -> Unit], +) -> Unit = "set_ws_emit" + ///| let server_map : Map[Int, Mocket] = Map::new() +///| +let ws_handler_map : Map[Int, WebSocketHandler] = Map::new() + +///| +pub fn register_ws_handler(mocket : Mocket, port : Int) -> Unit { + let mut done = false + mocket.ws_static_routes.each(fn(_, handler) { + if not(done) { + ws_handler_map.set(port, handler) + done = true + } + }) +} + ///| pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { server_map[port] = mocket + register_ws_handler(mocket, port) + set_ws_emit(fn( + event_type : @native.CStr, + id : @native.CStr, + payload : @native.CStr, + ) { + __ws_emit(event_type, id, payload) + }) let server = create_server(fn( port : Int, req : HttpRequestInternal, @@ -205,6 +233,74 @@ fn handle_request_native( }) } +///| +pub fn __ws_emit( + event_type : @native.CStr, + connection_id : @native.CStr, + payload : @native.CStr, +) -> Unit { + let et = from_cstr(event_type) + let cid = from_cstr(connection_id) + let pl = from_cstr(payload) + let handler = if ws_handler_map.is_empty() { + fn(_) { } + } else { + ws_handler_map.get(ws_handler_map.keys().collect()[0]).unwrap() + } + let peer = WebSocketPeer::{ connection_id: cid, subscribed_channels: [] } + match et { + "open" => handler(WebSocketEvent::Open(peer)) + "message" => handler(WebSocketEvent::Message(peer, Text(pl))) + "close" => handler(WebSocketEvent::Close(peer)) + _ => () + } +} + +///| +#owned(id, msg) +extern "c" fn ws_send_native(id : @native.CStr, msg : @native.CStr) -> Unit = "ws_send" + +///| +#owned(id, channel) +extern "c" fn ws_subscribe_native( + id : @native.CStr, + channel : @native.CStr, +) -> Unit = "ws_subscribe" + +///| +#owned(id, channel) +extern "c" fn ws_unsubscribe_native( + id : @native.CStr, + channel : @native.CStr, +) -> Unit = "ws_unsubscribe" + +///| +#owned(channel, msg) +extern "c" fn ws_publish_native( + channel : @native.CStr, + msg : @native.CStr, +) -> Unit = "ws_publish" + +///| +pub fn ws_send(id : String, msg : String) -> Unit { + ws_send_native(to_cstr(id), to_cstr(msg)) +} + +///| +pub fn ws_subscribe(id : String, channel : String) -> Unit { + ws_subscribe_native(to_cstr(id), to_cstr(channel)) +} + +///| +pub fn ws_unsubscribe(id : String, channel : String) -> Unit { + ws_unsubscribe_native(to_cstr(id), to_cstr(channel)) +} + +///| +pub fn ws_publish(channel : String, msg : String) -> Unit { + ws_publish_native(to_cstr(channel), to_cstr(msg)) +} + ///| fn[T : Show] to_cstr(s : T) -> @native.CStr { let bytes = @encoding/utf8.encode(s.to_string()) diff --git a/src/mocket.stub.c b/src/mocket.stub.c index b60a75c..24697ad 100644 --- a/src/mocket.stub.c +++ b/src/mocket.stub.c @@ -3,6 +3,136 @@ #include #include + +#define MAX_WS_CLIENTS 1024 +#define MAX_CHANNELS 256 + +typedef struct { + struct mg_connection *c; + char id[64]; +} ws_client_t; + +typedef struct { + char name[128]; + char client_ids[MAX_WS_CLIENTS][64]; + int cid_count; +} channel_t; + +static ws_client_t WS_CLIENTS[MAX_WS_CLIENTS]; +static int WS_CLIENT_COUNT = 0; +static channel_t CHANNELS[MAX_CHANNELS]; +static int CHANNEL_COUNT = 0; + +typedef void (*ws_emit_cb_t)(const char *type, const char *id, const char *payload); +static ws_emit_cb_t WS_EMIT_CB = NULL; +void set_ws_emit(ws_emit_cb_t cb) { WS_EMIT_CB = cb; } + +static ws_client_t *find_client_by_conn(struct mg_connection *c) { + for (int i = 0; i < WS_CLIENT_COUNT; i++) { + if (WS_CLIENTS[i].c == c) return &WS_CLIENTS[i]; + } + return NULL; +} + +static ws_client_t *find_client_by_id(const char *id) { + for (int i = 0; i < WS_CLIENT_COUNT; i++) { + if (strncmp(WS_CLIENTS[i].id, id, sizeof(WS_CLIENTS[i].id)) == 0) return &WS_CLIENTS[i]; + } + return NULL; +} + +static const char *gen_id(struct mg_connection *c) { + static char buf[64]; + snprintf(buf, sizeof(buf), "%p-%llu", (void *) c, (unsigned long long) mg_millis()); + return buf; +} + +static void register_client(struct mg_connection *c) { + if (WS_CLIENT_COUNT >= MAX_WS_CLIENTS) return; + const char *id = gen_id(c); + ws_client_t *slot = &WS_CLIENTS[WS_CLIENT_COUNT++]; + slot->c = c; + snprintf(slot->id, sizeof(slot->id), "%s", id); + if (WS_EMIT_CB) WS_EMIT_CB("open", slot->id, ""); +} + +static void unregister_client(ws_client_t *cl) { + if (!cl) return; + if (WS_EMIT_CB) WS_EMIT_CB("close", cl->id, ""); + for (int i = 0; i < CHANNEL_COUNT; i++) { + int w = 0; + for (int j = 0; j < CHANNELS[i].cid_count; j++) { + if (strcmp(CHANNELS[i].client_ids[j], cl->id) != 0) { + if (w != j) strncpy(CHANNELS[i].client_ids[w], CHANNELS[i].client_ids[j], sizeof(CHANNELS[i].client_ids[w])); + w++; + } + } + CHANNELS[i].cid_count = w; + } + int idx = -1; + for (int i = 0; i < WS_CLIENT_COUNT; i++) { + if (&WS_CLIENTS[i] == cl) { idx = i; break; } + } + if (idx >= 0) { + if (idx != WS_CLIENT_COUNT - 1) WS_CLIENTS[idx] = WS_CLIENTS[WS_CLIENT_COUNT - 1]; + WS_CLIENT_COUNT--; + } +} + +static channel_t *get_channel(const char *name, int create) { + for (int i = 0; i < CHANNEL_COUNT; i++) { + if (strncmp(CHANNELS[i].name, name, sizeof(CHANNELS[i].name)) == 0) return &CHANNELS[i]; + } + if (!create || CHANNEL_COUNT >= MAX_CHANNELS) return NULL; + channel_t *ch = &CHANNELS[CHANNEL_COUNT++]; + snprintf(ch->name, sizeof(ch->name), "%s", name); + ch->cid_count = 0; + return ch; +} + +void ws_send(const char *id, const char *msg) { + ws_client_t *cl = find_client_by_id(id); + if (!cl || !cl->c) return; + if (!msg) msg = ""; + mg_ws_send(cl->c, msg, strlen(msg), WEBSOCKET_OP_TEXT); +} + +void ws_subscribe(const char *id, const char *channel) { + if (!id || !channel) return; + channel_t *ch = get_channel(channel, 1); + if (!ch) return; + for (int i = 0; i < ch->cid_count; i++) { + if (strcmp(ch->client_ids[i], id) == 0) return; + } + if (ch->cid_count < MAX_WS_CLIENTS) { + snprintf(ch->client_ids[ch->cid_count], sizeof(ch->client_ids[ch->cid_count]), "%s", id); + ch->cid_count++; + } +} + +void ws_unsubscribe(const char *id, const char *channel) { + if (!id || !channel) return; + channel_t *ch = get_channel(channel, 0); + if (!ch) return; + int w = 0; + for (int i = 0; i < ch->cid_count; i++) { + if (strcmp(ch->client_ids[i], id) != 0) { + if (w != i) strncpy(ch->client_ids[w], ch->client_ids[i], sizeof(ch->client_ids[w])); + w++; + } + } + ch->cid_count = w; +} + +void ws_publish(const char *channel, const char *msg) { + if (!channel) return; + channel_t *ch = get_channel(channel, 0); + if (!ch) return; + for (int i = 0; i < ch->cid_count; i++) { + ws_send(ch->client_ids[i], msg); + } +} + typedef struct { char key[128]; @@ -216,21 +346,23 @@ static void ev_handler(struct mg_connection *c, int ev, void *ev_data) if (ev == MG_EV_HTTP_MSG) { - // HTTP 请求完整到达 struct mg_http_message *hm = (struct mg_http_message *)ev_data; + struct mg_str *upgrade = mg_http_get_header(hm, "Upgrade"); + if (upgrade && mg_strcasecmp(*upgrade, mg_str("websocket")) == 0) { + mg_ws_upgrade(c, hm, NULL); + return; + } request_t req = {hm, hm->body, NULL, NULL, NULL, NULL}; response_t res = {c, 200, "", 0}; if (srv->handler) { - // 1. 头部解析完成回调 if (req.on_headers) { req.on_headers(&req); } - // 2. body 数据块回调(如果有 body) if (req.on_body_chunk && hm->body.len > 0) { req.on_body_chunk(&req, hm->body); @@ -238,7 +370,6 @@ static void ev_handler(struct mg_connection *c, int ev, void *ev_data) srv->handler(srv->port, &req, &res); - // 3. 请求完成回调 if (req.on_complete) { req.on_complete(&req); @@ -246,7 +377,6 @@ static void ev_handler(struct mg_connection *c, int ev, void *ev_data) } else { - // 触发错误回调 if (req.on_error) { req.on_error(&req, "Handler not found"); @@ -254,6 +384,30 @@ static void ev_handler(struct mg_connection *c, int ev, void *ev_data) mg_http_reply(c, 404, "", "Not Found\n"); } } + else if (ev == MG_EV_WS_OPEN) + { + register_client(c); + } + else if (ev == MG_EV_WS_MSG) + { + struct mg_ws_message *wm = (struct mg_ws_message *) ev_data; + ws_client_t *cl = find_client_by_conn(c); + if (!cl) return; + if ((wm->flags & WEBSOCKET_OP_TEXT) == WEBSOCKET_OP_TEXT) { + size_t len = wm->data.len; + char *tmp = (char *) malloc(len + 1); + if (!tmp) return; + memcpy(tmp, wm->data.buf, len); + tmp[len] = '\0'; + if (WS_EMIT_CB) WS_EMIT_CB("message", cl->id, tmp); + free(tmp); + } + } + else if (ev == MG_EV_CLOSE) + { + ws_client_t *cl = find_client_by_conn(c); + if (cl) unregister_client(cl); + } } // 创建 server diff --git a/src/moon.pkg.json b/src/moon.pkg.json index da5a829..cb8e9cf 100644 --- a/src/moon.pkg.json +++ b/src/moon.pkg.json @@ -4,6 +4,14 @@ "illusory0x0/native", "tonyfettes/uri" ], + "expose": [ + "__ws_emit_js", + "__ws_emit", + "ws_send", + "ws_subscribe", + "ws_unsubscribe", + "ws_publish" + ], "supported-targets": [ "js", "native", diff --git a/src/pkg.generated.mbti b/src/pkg.generated.mbti index 1172bb9..b2dcdfa 100644 --- a/src/pkg.generated.mbti +++ b/src/pkg.generated.mbti @@ -2,11 +2,17 @@ package "oboard/mocket" import( - "illusory0x0/native" "moonbitlang/core/builtin" + "oboard/mocket/js" ) // Values +fn __ws_emit_js(String, String, String) -> Unit + +fn __ws_emit_js_export() -> Unit + +fn create_server((HttpRequestInternal, HttpResponseInternal, () -> Unit) -> Unit, Int) -> Unit + async fn execute_middlewares(Array[(String, async (HttpEvent, async () -> HttpBody noraise) -> HttpBody noraise)], HttpEvent, async (HttpEvent) -> HttpBody noraise) -> HttpBody noraise fn new(base_path? : String, logger? : Logger) -> Mocket @@ -17,12 +23,24 @@ fn new_logger(enabled? : Bool, level? : LogLevel) -> Logger fn new_production_logger() -> Logger +fn register_ws_handler(Mocket, Int) -> Unit + fn run(async () -> Unit noraise) -> Unit fn serve_ffi(Mocket, port~ : Int) -> Unit +fn start_ws_server() -> Unit + async fn[T, E : Error] suspend(((T) -> Unit, (E) -> Unit) -> Unit) -> T raise E +fn ws_publish(String, String) -> Unit + +fn ws_send(String, String) -> Unit + +fn ws_subscribe(String, String) -> Unit + +fn ws_unsubscribe(String, String) -> Unit + // Errors pub suberror BodyError { InvalidJsonCharset @@ -63,8 +81,8 @@ pub(all) struct HttpRequest { #external pub type HttpRequestInternal -fn HttpRequestInternal::on_complete(Self, FuncRef[() -> Unit]) -> Unit -fn HttpRequestInternal::on_headers(Self, FuncRef[(@native.CStr) -> Unit]) -> Unit +fn HttpRequestInternal::req_method(Self) -> String +fn HttpRequestInternal::url(Self) -> String pub(all) struct HttpResponse { mut status_code : Int @@ -73,9 +91,8 @@ pub(all) struct HttpResponse { #external pub type HttpResponseInternal - -#external -pub type HttpServerInternal +fn HttpResponseInternal::end(Self, @js.Value) -> Unit +fn HttpResponseInternal::url(Self) -> String pub enum LogLevel { Debug @@ -113,6 +130,8 @@ pub(all) struct Mocket { static_routes : Map[String, Map[String, async (HttpEvent) -> HttpBody noraise]] dynamic_routes : Map[String, Array[(String, async (HttpEvent) -> HttpBody noraise)]] logger : Logger + ws_static_routes : Map[String, (WebSocketEvent) -> Unit] + ws_dynamic_routes : Array[(String, (WebSocketEvent) -> Unit)] } fn Mocket::all(Self, String, async (HttpEvent) -> HttpBody noraise) -> Unit fn Mocket::connect(Self, String, async (HttpEvent) -> HttpBody noraise) -> Unit @@ -128,11 +147,30 @@ fn Mocket::put(Self, String, async (HttpEvent) -> HttpBody noraise) -> Unit fn Mocket::serve(Self, port~ : Int) -> Unit fn Mocket::trace(Self, String, async (HttpEvent) -> HttpBody noraise) -> Unit fn Mocket::use_middleware(Self, async (HttpEvent, async () -> HttpBody noraise) -> HttpBody noraise, base_path? : String) -> Unit +fn Mocket::ws(Self, String, (WebSocketEvent) -> Unit) -> Unit + +pub enum WebSocketEvent { + Open(WebSocketPeer) + Message(WebSocketPeer, HttpBody) + Close(WebSocketPeer) +} + +pub(all) struct WebSocketPeer { + connection_id : String + mut subscribed_channels : Array[String] +} +fn WebSocketPeer::publish(String, String) -> Unit +fn WebSocketPeer::send(Self, String) -> Unit +fn WebSocketPeer::subscribe(Self, String) -> Unit +fn WebSocketPeer::to_string(Self) -> String +fn WebSocketPeer::unsubscribe(Self, String) -> Unit // Type aliases pub type HttpHandler = async (HttpEvent) -> HttpBody noraise pub type Middleware = async (HttpEvent, async () -> HttpBody noraise) -> HttpBody noraise +pub type WebSocketHandler = (WebSocketEvent) -> Unit + // Traits diff --git a/src/ws-bridge.js b/src/ws-bridge.js new file mode 100644 index 0000000..2456fe5 --- /dev/null +++ b/src/ws-bridge.js @@ -0,0 +1,183 @@ +// ws-bridge.js - WebSocket 升级桥接,供 MoonBit JS 后端调用 +module.exports = function start() { + const http = require('node:http'); + const crypto = require('node:crypto'); + const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; + + // 复用全局 HTTP 服务器实例 + const server = globalThis.MOCKET_HTTP_SERVER; + if (!server) { + console.error('[ws-bridge] MOCKET_HTTP_SERVER not found'); + return; + } + + // 全局 WebSocket 状态 + globalThis.WS = { + clientsById: new Map(), // Map + channels: new Map(), // Map> + }; + + function acceptKey(key) { + return crypto.createHash('sha1').update(String(key) + GUID).digest('base64'); + } + + // 供 FFI 调用的发送函数 + function sendText(socket, str) { + const payload = Buffer.from(String(str), 'utf8'); + const len = payload.length; + let header; + if (len < 126) { + header = Buffer.alloc(2); + header[0] = 0x81; // FIN + text + header[1] = len; + } else if (len < 65536) { + header = Buffer.alloc(4); + header[0] = 0x81; + header[1] = 126; + header.writeUInt16BE(len, 2); + } else { + header = Buffer.alloc(10); + header[0] = 0x81; + header[1] = 127; + header.writeBigUInt64BE(BigInt(len), 2); + } + try { + socket.write(Buffer.concat([header, payload])); + } catch (_) {} + } + + function parseFrame(buf) { + if (!Buffer.isBuffer(buf) || buf.length < 2) return null; + const fin = (buf[0] & 0x80) !== 0; + const opcode = buf[0] & 0x0f; + const masked = (buf[1] & 0x80) !== 0; + let len = buf[1] & 0x7f; + let offset = 2; + if (len === 126) { + if (buf.length < 4) return null; + len = buf.readUInt16BE(2); + offset = 4; + } else if (len === 127) { + if (buf.length < 10) return null; + const big = buf.readBigUInt64BE(2); + len = Number(big); + offset = 10; + } + if (masked) { + if (buf.length < offset + 4 + len) return null; + const mask = buf.slice(offset, offset + 4); + const data = buf.slice(offset + 4, offset + 4 + len); + const out = Buffer.alloc(len); + for (let i = 0; i < len; i++) out[i] = data[i] ^ mask[i % 4]; + return { fin, opcode, data: out }; + } else { + if (buf.length < offset + len) return null; + const data = buf.slice(offset, offset + len); + return { fin, opcode, data }; + } + } + + // 导出四个 FFI 函数,供 MoonBit 调用 + globalThis.ws_send = (id, msg) => { + const s = globalThis.WS.clientsById.get(id); + if (s) sendText(s, msg); + }; + globalThis.ws_subscribe = (id, ch) => { + let set = globalThis.WS.channels.get(ch); + if (!set) { set = new Set(); globalThis.WS.channels.set(ch, set); } + set.add(id); + }; + globalThis.ws_unsubscribe = (id, ch) => { + const set = globalThis.WS.channels.get(ch); + if (set) { set.delete(id); if (set.size === 0) globalThis.WS.channels.delete(ch); } + }; + globalThis.ws_publish = (ch, msg) => { + const set = globalThis.WS.channels.get(ch); + if (set) { + for (const cid of set) { + const s = globalThis.WS.clientsById.get(cid); + if (s) sendText(s, msg); + } + } + }; + + server.on('upgrade', (req, socket, head) => { + const upgrade = (req.headers['upgrade'] || req.headers['Upgrade'] || '').toString(); + if (upgrade !== 'websocket' && upgrade !== 'WebSocket') { + try { socket.destroy(); } catch (_) {} + return; + } + const key = req.headers['sec-websocket-key'] || req.headers['Sec-WebSocket-Key']; + if (!key) { try { socket.destroy(); } catch (_) {} return; } + const accept = acceptKey(key.toString()); + const headers = [ + 'HTTP/1.1 101 Switching Protocols', + 'Upgrade: websocket', + 'Connection: Upgrade', + 'Sec-WebSocket-Accept: ' + accept, + '\r\n' + ]; + try { socket.write(headers.join('\r\n')); } catch (_) { + try { socket.destroy(); } catch (_) {} return; + } + + // 生成唯一连接 ID,注册到全局映射 + const connectionId = crypto.randomUUID(); + globalThis.WS.clientsById.set(connectionId, socket); + + // 派发 open 事件到 MoonBit + console.log('[ws-bridge] emit open', connectionId); + if (typeof globalThis.__ws_emit === 'function') { + globalThis.__ws_emit('open', connectionId, ''); + } else { + console.log('[ws-bridge] __ws_emit not found'); + } + + socket.on('data', (buf) => { + const frame = parseFrame(buf); + if (!frame) return; + // Close frame + if (frame.opcode === 0x8) { + try { socket.end(); } catch (_) {} + globalThis.WS.clientsById.delete(connectionId); + for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); + if (typeof globalThis.__ws_emit === 'function') { + globalThis.__ws_emit('close', connectionId, ''); + } + return; + } + // Ping -> Pong + if (frame.opcode === 0x9) { + const payload = frame.data || Buffer.alloc(0); + const header = Buffer.from([0x8A, payload.length]); + try { socket.write(Buffer.concat([header, payload])); } catch (_) {} + return; + } + // Text + if (frame.opcode === 0x1) { + const msg = (frame.data || Buffer.alloc(0)).toString('utf8'); + console.log('[ws-bridge] emit message', connectionId, msg); + if (typeof globalThis.__ws_emit === 'function') { + globalThis.__ws_emit('message', connectionId, msg); + } else { + console.log('[ws-bridge] __ws_emit not found'); + } + } + }); + + socket.on('close', () => { + globalThis.WS.clientsById.delete(connectionId); + for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); + if (typeof globalThis.__ws_emit === 'function') { + globalThis.__ws_emit('close', connectionId, ''); + } + }); + socket.on('error', () => { + globalThis.WS.clientsById.delete(connectionId); + for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); + try { socket.destroy(); } catch (_) {} + }); + }); + + console.log('[ws-bridge] WebSocket upgrade handler attached'); +}; \ No newline at end of file diff --git a/test_ws.js b/test_ws.js new file mode 100644 index 0000000..6e3fd08 --- /dev/null +++ b/test_ws.js @@ -0,0 +1,11 @@ +const ws = new WebSocket('ws://localhost:8080/ws'); +ws.addEventListener('open', () => { + console.log('connected'); + ws.send('hello mocket'); +}); +ws.addEventListener('message', (ev) => { + console.log('echo:', ev.data); + ws.close(); +}); +ws.addEventListener('close', () => console.log('closed')); +ws.addEventListener('error', (err) => console.log('error', err)); \ No newline at end of file From 3c23d0eb968e81e543c45282cb3a59fc43103e24 Mon Sep 17 00:00:00 2001 From: oboard Date: Mon, 17 Nov 2025 13:08:46 +0800 Subject: [PATCH 2/4] feat(websocket): refactor websocket implementation to support multi-port - Rename `run` to `async_run` for clarity - Move websocket bridge code into mocket.js - Add support for tracking clients and channels per port - Implement new FFI functions for websocket management - Remove standalone ws-bridge.js file --- src/async.mbt | 2 +- src/examples/websocket/websocket_echo.mbt | 2 +- src/index.mbt | 6 + src/mocket.js.mbt | 273 ++++++++++++++++++++-- src/mocket.native.mbt | 2 +- src/moon.pkg.json | 4 + src/pkg.generated.mbti | 23 +- src/ws-bridge.js | 183 --------------- 8 files changed, 289 insertions(+), 206 deletions(-) delete mode 100644 src/ws-bridge.js diff --git a/src/async.mbt b/src/async.mbt index 8bdae7e..9dc7cd0 100644 --- a/src/async.mbt +++ b/src/async.mbt @@ -1,5 +1,5 @@ ///| -pub fn run(f : async () -> Unit noraise) -> Unit = "%async.run" +pub fn async_run(f : async () -> Unit noraise) -> Unit = "%async.run" ///| /// `suspend` 会中断当前协程的运行。 diff --git a/src/examples/websocket/websocket_echo.mbt b/src/examples/websocket/websocket_echo.mbt index 6eb43e8..753d8fa 100644 --- a/src/examples/websocket/websocket_echo.mbt +++ b/src/examples/websocket/websocket_echo.mbt @@ -16,5 +16,5 @@ fn main { } }) println("WebSocket echo server listening on ws://localhost:8080/ws") - @mocket.serve_ffi(app, port=8080) + app.serve(port=8080) } diff --git a/src/index.mbt b/src/index.mbt index 6ef6497..0dc7833 100644 --- a/src/index.mbt +++ b/src/index.mbt @@ -21,6 +21,9 @@ pub(all) struct Mocket { // WebSocket 路由(按路径匹配,不区分方法) ws_static_routes : Map[String, WebSocketHandler] ws_dynamic_routes : Array[(String, WebSocketHandler)] + ws_clients : Map[String, Unit] + ws_channels : Map[String, Map[String, Unit]] + ws_client_port : Map[String, Int] } ///| @@ -40,6 +43,9 @@ pub fn new( logger, ws_static_routes: {}, ws_dynamic_routes: [], + ws_clients: {}, + ws_channels: {}, + ws_client_port: {}, } } diff --git a/src/mocket.js.mbt b/src/mocket.js.mbt index b28edd0..5dc0dc3 100644 --- a/src/mocket.js.mbt +++ b/src/mocket.js.mbt @@ -55,7 +55,163 @@ extern "js" fn HttpResponseInternal::write_head( pub extern "js" fn create_server( handler : (HttpRequestInternal, HttpResponseInternal, () -> Unit) -> Unit, port : Int, -) -> Unit = "(handler, port) => { const server = require('node:http').createServer(handler); globalThis.MOCKET_HTTP_SERVER = server; require('./ws-bridge.js')(); server.listen(port, () => {}) }" +) -> Unit = + #|(handler, port) => { + #| const server = require('node:http').createServer(handler); + #| function start(server, port) { + #| const http = require('node:http'); + #| const crypto = require('node:crypto'); + #| const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; + #| const clientsById = new Map(); + #| + #| function acceptKey(key) { + #| return crypto.createHash('sha1').update(String(key) + GUID).digest('base64'); + #| } + #| + #| // 供 FFI 调用的发送函数 + #| function sendText(socket, str) { + #| const payload = Buffer.from(String(str), 'utf8'); + #| const len = payload.length; + #| let header; + #| if (len < 126) { + #| header = Buffer.alloc(2); + #| header[0] = 0x81; // FIN + text + #| header[1] = len; + #| } else if (len < 65536) { + #| header = Buffer.alloc(4); + #| header[0] = 0x81; + #| header[1] = 126; + #| header.writeUInt16BE(len, 2); + #| } else { + #| header = Buffer.alloc(10); + #| header[0] = 0x81; + #| header[1] = 127; + #| header.writeBigUInt64BE(BigInt(len), 2); + #| } + #| try { + #| socket.write(Buffer.concat([header, payload])); + #| } catch (_) {} + #| } + #| + #| function parseFrame(buf) { + #| if (!Buffer.isBuffer(buf) || buf.length < 2) return null; + #| const fin = (buf[0] & 0x80) !== 0; + #| const opcode = buf[0] & 0x0f; + #| const masked = (buf[1] & 0x80) !== 0; + #| let len = buf[1] & 0x7f; + #| let offset = 2; + #| if (len === 126) { + #| if (buf.length < 4) return null; + #| len = buf.readUInt16BE(2); + #| offset = 4; + #| } else if (len === 127) { + #| if (buf.length < 10) return null; + #| const big = buf.readBigUInt64BE(2); + #| len = Number(big); + #| offset = 10; + #| } + #| if (masked) { + #| if (buf.length < offset + 4 + len) return null; + #| const mask = buf.slice(offset, offset + 4); + #| const data = buf.slice(offset + 4, offset + 4 + len); + #| const out = Buffer.alloc(len); + #| for (let i = 0; i < len; i++) out[i] = data[i] ^ mask[i % 4]; + #| return { fin, opcode, data: out }; + #| } else { + #| if (buf.length < offset + len) return null; + #| const data = buf.slice(offset, offset + len); + #| return { fin, opcode, data }; + #| } + #| } + #| + #| // 导出四个 FFI 函数,供 MoonBit 调用 + #| if (!globalThis.ws_port_bindings) globalThis.ws_port_bindings = new Map(); + #| globalThis.ws_port_bindings.set(port, { + #| send: (id, msg) => { + #| const s = clientsById.get(id); + #| if (s) sendText(s, msg); + #| } + #| }); + #| + #| server.on('upgrade', (req, socket, head) => { + #| const upgrade = (req.headers['upgrade'] || req.headers['Upgrade'] || '').toString(); + #| if (upgrade !== 'websocket' && upgrade !== 'WebSocket') { + #| try { socket.destroy(); } catch (_) {} + #| return; + #| } + #| const key = req.headers['sec-websocket-key'] || req.headers['Sec-WebSocket-Key']; + #| if (!key) { try { socket.destroy(); } catch (_) {} return; } + #| const accept = acceptKey(key.toString()); + #| const headers = [ + #| 'HTTP/1.1 101 Switching Protocols', + #| 'Upgrade: websocket', + #| 'Connection: Upgrade', + #| 'Sec-WebSocket-Accept: ' + accept, + #| '\r\n' + #| ]; + #| try { socket.write(headers.join('\r\n')); } catch (_) { + #| try { socket.destroy(); } catch (_) {} return; + #| } + #| + #| // 生成唯一连接 ID,注册到全局映射 + #| const connectionId = crypto.randomUUID(); + #| clientsById.set(connectionId, socket); + #| + #| // 派发 open 事件到 MoonBit + #| // console.log('[ws-bridge] emit open', port, connectionId); + #| if (typeof globalThis.__ws_emit_port === 'function') { + #| globalThis.__ws_emit_port('open', port, connectionId, ''); + #| } else { + #| // console.log('[ws-bridge] __ws_emit not found'); + #| } + #| + #| socket.on('data', (buf) => { + #| const frame = parseFrame(buf); + #| if (!frame) return; + #| // Close frame + #| if (frame.opcode === 0x8) { + #| try { socket.end(); } catch (_) {} + #| clientsById.delete(connectionId); + #| if (typeof globalThis.__ws_emit_port === 'function') { + #| globalThis.__ws_emit_port('close', port, connectionId, ''); + #| } + #| return; + #| } + #| // Ping -> Pong + #| if (frame.opcode === 0x9) { + #| const payload = frame.data || Buffer.alloc(0); + #| const header = Buffer.from([0x8A, payload.length]); + #| try { socket.write(Buffer.concat([header, payload])); } catch (_) {} + #| return; + #| } + #| // Text + #| if (frame.opcode === 0x1) { + #| const msg = (frame.data || Buffer.alloc(0)).toString('utf8'); + #| // console.log('[ws-bridge] emit message', port, connectionId, msg); + #| if (typeof globalThis.__ws_emit_port === 'function') { + #| globalThis.__ws_emit_port('message', port, connectionId, msg); + #| } else { + #| // console.log('[ws-bridge] __ws_emit not found'); + #| } + #| } + #| }); + #| + #| socket.on('close', () => { + #| clientsById.delete(connectionId); + #| if (typeof globalThis.__ws_emit_port === 'function') { + #| globalThis.__ws_emit_port('close', port, connectionId, ''); + #| } + #| }); + #| socket.on('error', () => { + #| clientsById.delete(connectionId); + #| try { socket.destroy(); } catch (_) {} + #| }); + #| }); + #| + #| // console.log('[ws-bridge] WebSocket upgrade handler attached for port', port); + #| }; + #| start(server, port); server.listen(port, () => {}) + #|} ///| pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { @@ -94,7 +250,7 @@ pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { res: { status_code: 200, headers: {} }, params, } - run(fn() { + async_run(() => { // 如果是 post,先等待 data 事件 if event.req.http_method == "POST" { let buffer = @buffer.new() @@ -147,12 +303,20 @@ pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { ) register_ws_handler(mocket, port) __ws_emit_js_export() + __ws_state_js_export() + __get_port_by_connection_js_export() } ///| /// 全局 handler 映射:port -> WebSocketHandler let ws_handler_map : Map[Int, WebSocketHandler] = Map::new() +///| +let ws_mocket_map : Map[Int, Mocket] = Map::new() + +///| +extern "js" fn _init_bindings_map() -> @js.Value = "() => { if (!globalThis.ws_port_bindings) globalThis.ws_port_bindings = new Map(); return globalThis.ws_port_bindings; }" + ///| /// 在 serve_ffi 里注册 WS handler(只取第一个静态路由) pub fn register_ws_handler(mocket : Mocket, port : Int) -> Unit { @@ -160,6 +324,8 @@ pub fn register_ws_handler(mocket : Mocket, port : Int) -> Unit { mocket.ws_static_routes.each(fn(_, handler) { if not(done) { ws_handler_map.set(port, handler) + ws_mocket_map.set(port, mocket) + ignore(_init_bindings_map()) done = true } }) @@ -167,16 +333,33 @@ pub fn register_ws_handler(mocket : Mocket, port : Int) -> Unit { ///| /// 供 JS 调用的 MoonBit 事件入口:捕获异常,避免抛回 JS -pub fn __ws_emit_js( +pub fn __ws_emit_js_port( event_type : String, + port : Int, connection_id : String, payload : String, ) -> Unit { - // 拿第一个 handler(例子只注册一个端口) - let handler = if ws_handler_map.is_empty() { - fn(_) { } - } else { - ws_handler_map.get(ws_handler_map.keys().collect()[0]).unwrap() + let handler = match ws_handler_map.get(port) { + Some(h) => h + None => fn(_) { } + } + let mocket = match ws_mocket_map.get(port) { + Some(m) => m + None => new() + } + match event_type { + "open" => { + mocket.ws_clients.set(connection_id, ()) + mocket.ws_client_port.set(connection_id, port) + } + "close" => { + ignore(mocket.ws_clients.remove(connection_id)) + ignore(mocket.ws_client_port.remove(connection_id)) + mocket.ws_channels.each((_ch, set) => if set.get(connection_id) is Some(_) { + ignore(set.remove(connection_id)) + }) + } + _ => () } let peer = WebSocketPeer::{ connection_id, subscribed_channels: [] } match event_type { @@ -189,21 +372,81 @@ pub fn __ws_emit_js( ///| /// 导出 MoonBit 函数到 JS 全局 -pub extern "js" fn __ws_emit_js_export() -> Unit = "globalThis.__ws_emit = (type, id, payload) => { try { __ws_emit_js(type, id, payload); } catch (_) {} };" +pub extern "js" fn __ws_emit_js_export() -> Unit = "globalThis.__ws_emit_port = (type, port, id, payload) => { try { __ws_emit_js_port(type, port, id, payload); } catch (_) {} };" + +///| +pub fn __ws_subscribe_js(connection_id : String, channel : String) -> Unit { + let mut mocket = new() + ws_mocket_map.each(fn(_, m) { + if m.ws_clients.get(connection_id) is Some(_) { + mocket = m + } + }) + match mocket.ws_channels.get(channel) { + Some(set) => set.set(connection_id, ()) + None => { + let set : Map[String, Unit] = {} + set.set(connection_id, ()) + mocket.ws_channels.set(channel, set) + } + } +} + +///| +pub fn __ws_unsubscribe_js(connection_id : String, channel : String) -> Unit { + let mut mocket = new() + ws_mocket_map.each(fn(_, m) { + if m.ws_clients.get(connection_id) is Some(_) { + mocket = m + } + }) + match mocket.ws_channels.get(channel) { + Some(set) => ignore(set.remove(connection_id)) + None => () + } +} + +///| +pub fn __ws_get_members_js(channel : String) -> Array[String] { + let mut mocket = new() + ws_mocket_map.each(fn(_, m) { + match m.ws_channels.get(channel) { + Some(_) => mocket = m + None => () + } + }) + match mocket.ws_channels.get(channel) { + Some(set) => set.keys().collect() + None => [] + } +} + +///| +pub extern "js" fn __ws_state_js_export() -> Unit = "globalThis.__ws_subscribe_js = (id, ch) => { try { __ws_subscribe_js(id, ch); } catch (_) {} }; globalThis.__ws_unsubscribe_js = (id, ch) => { try { __ws_unsubscribe_js(id, ch); } catch (_) {} }; globalThis.__ws_get_members_js = (ch) => { try { return __ws_get_members_js(ch); } catch (_) { return []; } };" + +///| +pub fn __get_port_by_connection_js(id : String) -> Int { + let mut found = 0 + ws_mocket_map.each(fn(p, m) { + if m.ws_clients.get(id) is Some(_) { + found = p + } + }) + found +} ///| -/// 在 JS 端启动 WebSocket 服务器(监听同端口,复用 HTTP 服务器的 server 实例) -pub extern "js" fn start_ws_server() -> Unit = "(function() { const http = require('node:http'); const crypto = require('node:crypto'); const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; if (!globalThis.WS_SERVER_STARTED) { globalThis.WS_SERVER_STARTED = true; const server = globalThis.MOCKET_HTTP_SERVER; if (!server) return; function acceptKey(key) { return crypto.createHash('sha1').update(String(key) + GUID).digest('base64'); } function sendText(socket, str) { const payload = Buffer.from(String(str), 'utf8'); const len = payload.length; let header; if (len < 126) { header = Buffer.alloc(2); header[0] = 0x81; header[1] = len; } else if (len < 65536) { header = Buffer.alloc(4); header[0] = 0x81; header[1] = 126; header.writeUInt16BE(len, 2); } else { header = Buffer.alloc(10); header[0] = 0x81; header[1] = 127; header.writeBigUInt64BE(BigInt(len), 2); } try { socket.write(Buffer.concat([header, payload])); } catch (_) {} } function parseFrame(buf) { if (!Buffer.isBuffer(buf) || buf.length < 2) return null; const fin = (buf[0] & 0x80) !== 0; const opcode = buf[0] & 0x0f; const masked = (buf[1] & 0x80) !== 0; let len = buf[1] & 0x7f; let offset = 2; if (len === 126) { if (buf.length < 4) return null; len = buf.readUInt16BE(2); offset = 4; } else if (len === 127) { if (buf.length < 10) return null; const big = buf.readBigUInt64BE(2); len = Number(big); offset = 10; } if (masked) { if (buf.length < offset + 4 + len) return null; const mask = buf.slice(offset, offset + 4); const data = buf.slice(offset + 4, offset + 4 + len); const out = Buffer.alloc(len); for (let i = 0; i < len; i++) out[i] = data[i] ^ mask[i % 4]; return { fin, opcode, data: out }; } else { if (buf.length < offset + len) return null; const data = buf.slice(offset, offset + len); return { fin, opcode, data }; } } server.on('upgrade', (req, socket, head) => { const upgrade = (req.headers['upgrade'] || req.headers['Upgrade'] || '').toString(); if (upgrade !== 'websocket' && upgrade !== 'WebSocket') { try { socket.destroy(); } catch (_) {} return; } const key = req.headers['sec-websocket-key'] || req.headers['Sec-WebSocket-Key']; if (!key) { try { socket.destroy(); } catch (_) {} return; } const accept = acceptKey(key.toString()); const headers = ['HTTP/1.1 101 Switching Protocols', 'Upgrade: websocket', 'Connection: Upgrade', 'Sec-WebSocket-Accept: ' + accept, '\r\n']; try { socket.write(headers.join('\r\n')); } catch (_) { try { socket.destroy(); } catch (_) {} return; } const connectionId = crypto.randomUUID(); globalThis.WS.clientsById.set(connectionId, socket); if (typeof globalThis.__ws_emit === 'function') { globalThis.__ws_emit('open', connectionId); } socket.on('data', (buf) => { const frame = parseFrame(buf); if (!frame) return; if (frame.opcode === 0x8) { try { socket.end(); } catch (_) {} globalThis.WS.clientsById.delete(connectionId); for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); if (typeof globalThis.__ws_emit === 'function') { globalThis.__ws_emit('close', connectionId); } return; } if (frame.opcode === 0x9) { const payload = frame.data || Buffer.alloc(0); const header = Buffer.from([0x8A, payload.length]); try { socket.write(Buffer.concat([header, payload])); } catch (_) {} return; } if (frame.opcode === 0x1) { const msg = (frame.data || Buffer.alloc(0)).toString('utf8'); if (typeof globalThis.__ws_emit === 'function') { globalThis.__ws_emit('message', connectionId, msg); } } }); socket.on('close', () => { globalThis.WS.clientsById.delete(connectionId); for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); if (typeof globalThis.__ws_emit === 'function') { globalThis.__ws_emit('close', connectionId); } }); socket.on('error', () => { globalThis.WS.clientsById.delete(connectionId); for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); try { socket.destroy(); } catch (_) {} }); }); } })()" +pub extern "js" fn __get_port_by_connection_js_export() -> Unit = "globalThis.__get_port_by_connection = (id) => { try { return __get_port_by_connection_js(id); } catch (_) { return 0 } }" ///| /// 四个 FFI 包装,供 WebSocketPeer 调用 -pub extern "js" fn ws_send(id : String, msg : String) -> Unit = "globalThis.ws_send" +pub extern "js" fn ws_send(id : String, msg : String) -> Unit = "(id, msg) => { const port = globalThis.__get_port_by_connection(id); const bindings = globalThis.ws_port_bindings && globalThis.ws_port_bindings.get(port); if (bindings && bindings.send) bindings.send(id, msg); }" ///| -pub extern "js" fn ws_subscribe(id : String, channel : String) -> Unit = "globalThis.ws_subscribe" +pub extern "js" fn ws_subscribe(id : String, channel : String) -> Unit = "(id, ch) => { if (globalThis.__ws_subscribe_js) globalThis.__ws_subscribe_js(id, ch); }" ///| -pub extern "js" fn ws_unsubscribe(id : String, channel : String) -> Unit = "globalThis.ws_unsubscribe" +pub extern "js" fn ws_unsubscribe(id : String, channel : String) -> Unit = "(id, ch) => { if (globalThis.__ws_unsubscribe_js) globalThis.__ws_unsubscribe_js(id, ch); }" ///| -pub extern "js" fn ws_publish(channel : String, msg : String) -> Unit = "globalThis.ws_publish" +pub extern "js" fn ws_publish(channel : String, msg : String) -> Unit = "(ch, msg) => { const ids = globalThis.__ws_get_members_js ? globalThis.__ws_get_members_js(ch) : []; const bindings = globalThis.ws_port_bindings && globalThis.ws_port_bindings.values().next().value; if (bindings && bindings.send) { for (const id of ids) { bindings.send(id, msg); } } }" diff --git a/src/mocket.native.mbt b/src/mocket.native.mbt index a6c99ab..8385345 100644 --- a/src/mocket.native.mbt +++ b/src/mocket.native.mbt @@ -199,7 +199,7 @@ fn handle_request_native( } event.req.body = body } - run(async fn() noraise { + async_run(async fn() noraise { // 执行中间件链和处理器 let body = execute_middlewares(mocket.middlewares, event, handler) if not(body is Empty) { diff --git a/src/moon.pkg.json b/src/moon.pkg.json index cb8e9cf..94de9c4 100644 --- a/src/moon.pkg.json +++ b/src/moon.pkg.json @@ -7,6 +7,10 @@ "expose": [ "__ws_emit_js", "__ws_emit", + "__ws_subscribe_js", + "__ws_unsubscribe_js", + "__ws_get_members_js", + "__get_port_by_connection_js", "ws_send", "ws_subscribe", "ws_unsubscribe", diff --git a/src/pkg.generated.mbti b/src/pkg.generated.mbti index b2dcdfa..15fed82 100644 --- a/src/pkg.generated.mbti +++ b/src/pkg.generated.mbti @@ -7,10 +7,24 @@ import( ) // Values -fn __ws_emit_js(String, String, String) -> Unit +fn __get_port_by_connection_js(String) -> Int + +fn __get_port_by_connection_js_export() -> Unit fn __ws_emit_js_export() -> Unit +fn __ws_emit_js_port(String, Int, String, String) -> Unit + +fn __ws_get_members_js(String) -> Array[String] + +fn __ws_state_js_export() -> Unit + +fn __ws_subscribe_js(String, String) -> Unit + +fn __ws_unsubscribe_js(String, String) -> Unit + +fn async_run(async () -> Unit noraise) -> Unit + fn create_server((HttpRequestInternal, HttpResponseInternal, () -> Unit) -> Unit, Int) -> Unit async fn execute_middlewares(Array[(String, async (HttpEvent, async () -> HttpBody noraise) -> HttpBody noraise)], HttpEvent, async (HttpEvent) -> HttpBody noraise) -> HttpBody noraise @@ -25,12 +39,8 @@ fn new_production_logger() -> Logger fn register_ws_handler(Mocket, Int) -> Unit -fn run(async () -> Unit noraise) -> Unit - fn serve_ffi(Mocket, port~ : Int) -> Unit -fn start_ws_server() -> Unit - async fn[T, E : Error] suspend(((T) -> Unit, (E) -> Unit) -> Unit) -> T raise E fn ws_publish(String, String) -> Unit @@ -132,6 +142,9 @@ pub(all) struct Mocket { logger : Logger ws_static_routes : Map[String, (WebSocketEvent) -> Unit] ws_dynamic_routes : Array[(String, (WebSocketEvent) -> Unit)] + ws_clients : Map[String, Unit] + ws_channels : Map[String, Map[String, Unit]] + ws_client_port : Map[String, Int] } fn Mocket::all(Self, String, async (HttpEvent) -> HttpBody noraise) -> Unit fn Mocket::connect(Self, String, async (HttpEvent) -> HttpBody noraise) -> Unit diff --git a/src/ws-bridge.js b/src/ws-bridge.js deleted file mode 100644 index 2456fe5..0000000 --- a/src/ws-bridge.js +++ /dev/null @@ -1,183 +0,0 @@ -// ws-bridge.js - WebSocket 升级桥接,供 MoonBit JS 后端调用 -module.exports = function start() { - const http = require('node:http'); - const crypto = require('node:crypto'); - const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; - - // 复用全局 HTTP 服务器实例 - const server = globalThis.MOCKET_HTTP_SERVER; - if (!server) { - console.error('[ws-bridge] MOCKET_HTTP_SERVER not found'); - return; - } - - // 全局 WebSocket 状态 - globalThis.WS = { - clientsById: new Map(), // Map - channels: new Map(), // Map> - }; - - function acceptKey(key) { - return crypto.createHash('sha1').update(String(key) + GUID).digest('base64'); - } - - // 供 FFI 调用的发送函数 - function sendText(socket, str) { - const payload = Buffer.from(String(str), 'utf8'); - const len = payload.length; - let header; - if (len < 126) { - header = Buffer.alloc(2); - header[0] = 0x81; // FIN + text - header[1] = len; - } else if (len < 65536) { - header = Buffer.alloc(4); - header[0] = 0x81; - header[1] = 126; - header.writeUInt16BE(len, 2); - } else { - header = Buffer.alloc(10); - header[0] = 0x81; - header[1] = 127; - header.writeBigUInt64BE(BigInt(len), 2); - } - try { - socket.write(Buffer.concat([header, payload])); - } catch (_) {} - } - - function parseFrame(buf) { - if (!Buffer.isBuffer(buf) || buf.length < 2) return null; - const fin = (buf[0] & 0x80) !== 0; - const opcode = buf[0] & 0x0f; - const masked = (buf[1] & 0x80) !== 0; - let len = buf[1] & 0x7f; - let offset = 2; - if (len === 126) { - if (buf.length < 4) return null; - len = buf.readUInt16BE(2); - offset = 4; - } else if (len === 127) { - if (buf.length < 10) return null; - const big = buf.readBigUInt64BE(2); - len = Number(big); - offset = 10; - } - if (masked) { - if (buf.length < offset + 4 + len) return null; - const mask = buf.slice(offset, offset + 4); - const data = buf.slice(offset + 4, offset + 4 + len); - const out = Buffer.alloc(len); - for (let i = 0; i < len; i++) out[i] = data[i] ^ mask[i % 4]; - return { fin, opcode, data: out }; - } else { - if (buf.length < offset + len) return null; - const data = buf.slice(offset, offset + len); - return { fin, opcode, data }; - } - } - - // 导出四个 FFI 函数,供 MoonBit 调用 - globalThis.ws_send = (id, msg) => { - const s = globalThis.WS.clientsById.get(id); - if (s) sendText(s, msg); - }; - globalThis.ws_subscribe = (id, ch) => { - let set = globalThis.WS.channels.get(ch); - if (!set) { set = new Set(); globalThis.WS.channels.set(ch, set); } - set.add(id); - }; - globalThis.ws_unsubscribe = (id, ch) => { - const set = globalThis.WS.channels.get(ch); - if (set) { set.delete(id); if (set.size === 0) globalThis.WS.channels.delete(ch); } - }; - globalThis.ws_publish = (ch, msg) => { - const set = globalThis.WS.channels.get(ch); - if (set) { - for (const cid of set) { - const s = globalThis.WS.clientsById.get(cid); - if (s) sendText(s, msg); - } - } - }; - - server.on('upgrade', (req, socket, head) => { - const upgrade = (req.headers['upgrade'] || req.headers['Upgrade'] || '').toString(); - if (upgrade !== 'websocket' && upgrade !== 'WebSocket') { - try { socket.destroy(); } catch (_) {} - return; - } - const key = req.headers['sec-websocket-key'] || req.headers['Sec-WebSocket-Key']; - if (!key) { try { socket.destroy(); } catch (_) {} return; } - const accept = acceptKey(key.toString()); - const headers = [ - 'HTTP/1.1 101 Switching Protocols', - 'Upgrade: websocket', - 'Connection: Upgrade', - 'Sec-WebSocket-Accept: ' + accept, - '\r\n' - ]; - try { socket.write(headers.join('\r\n')); } catch (_) { - try { socket.destroy(); } catch (_) {} return; - } - - // 生成唯一连接 ID,注册到全局映射 - const connectionId = crypto.randomUUID(); - globalThis.WS.clientsById.set(connectionId, socket); - - // 派发 open 事件到 MoonBit - console.log('[ws-bridge] emit open', connectionId); - if (typeof globalThis.__ws_emit === 'function') { - globalThis.__ws_emit('open', connectionId, ''); - } else { - console.log('[ws-bridge] __ws_emit not found'); - } - - socket.on('data', (buf) => { - const frame = parseFrame(buf); - if (!frame) return; - // Close frame - if (frame.opcode === 0x8) { - try { socket.end(); } catch (_) {} - globalThis.WS.clientsById.delete(connectionId); - for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); - if (typeof globalThis.__ws_emit === 'function') { - globalThis.__ws_emit('close', connectionId, ''); - } - return; - } - // Ping -> Pong - if (frame.opcode === 0x9) { - const payload = frame.data || Buffer.alloc(0); - const header = Buffer.from([0x8A, payload.length]); - try { socket.write(Buffer.concat([header, payload])); } catch (_) {} - return; - } - // Text - if (frame.opcode === 0x1) { - const msg = (frame.data || Buffer.alloc(0)).toString('utf8'); - console.log('[ws-bridge] emit message', connectionId, msg); - if (typeof globalThis.__ws_emit === 'function') { - globalThis.__ws_emit('message', connectionId, msg); - } else { - console.log('[ws-bridge] __ws_emit not found'); - } - } - }); - - socket.on('close', () => { - globalThis.WS.clientsById.delete(connectionId); - for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); - if (typeof globalThis.__ws_emit === 'function') { - globalThis.__ws_emit('close', connectionId, ''); - } - }); - socket.on('error', () => { - globalThis.WS.clientsById.delete(connectionId); - for (const [_, set] of globalThis.WS.channels) set.delete(connectionId); - try { socket.destroy(); } catch (_) {} - }); - }); - - console.log('[ws-bridge] WebSocket upgrade handler attached'); -}; \ No newline at end of file From 88c1e3e6fc2755eaaaf6639ad07d2a8ebb210083 Mon Sep 17 00:00:00 2001 From: oboard Date: Sat, 22 Nov 2025 02:38:26 +0800 Subject: [PATCH 3/4] feat: Improve WebSocket bridge JS interop by passing callbacks, refine --- AGENTS.md | 51 ++++++ src/examples/websocket/websocket_echo.mbt | 20 +-- src/index.mbt | 4 +- src/js/cast.mbt | 2 +- src/js/null.mbt | 1 - src/js/optional.mbt | 1 - src/js/pkg.generated.mbti | 196 ---------------------- src/mocket.js.mbt | 84 ++++++---- src/pkg.generated.mbti | 29 +--- test_ws.js | 17 +- 10 files changed, 126 insertions(+), 279 deletions(-) create mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..e2380c0 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,51 @@ +# Project Agents.md Guide + +This is a [MoonBit](https://docs.moonbitlang.com) project. + +## Project Structure + +- MoonBit packages are organized per directory, for each directory, there is a + `moon.pkg.json` file listing its dependencies. Each package has its files and + blackbox test files (common, ending in `_test.mbt`) and whitebox test files + (ending in `_wbtest.mbt`). + +- In the toplevel directory, this is a `moon.mod.json` file listing about the + module and some meta information. + +## Coding convention + +- MoonBit code is organized in block style, each block is separated by `///|`, + the order of each block is irrelevant. In some refactorings, you can process + block by block independently. + +- Try to keep deprecated blocks in file called `deprecated.mbt` in each + directory. + +## Tooling + +- `moon fmt` is used to format your code properly. + +- `moon info` is used to update the generated interface of the package, each + package has a generated interface file `.mbti`, it is a brief formal + description of the package. If nothing in `.mbti` changes, this means your + change does not bring the visible changes to the external package users, it is + typically a safe refactoring. + +- In the last step, run `moon info && moon fmt` to update the interface and + format the code. Check the diffs of `.mbti` file to see if the changes are + expected. + +- Run `moon test` to check the test is passed. MoonBit supports snapshot + testing, so when your changes indeed change the behavior of the code, you + should run `moon test --update` to update the snapshot. + +- You can run `moon check` to check the code is linted correctly. + +- When writing tests, you are encouraged to use `inspect` and run + `moon test --update` to update the snapshots, only use assertions like + `assert_eq` when you are in some loops where each snapshot may vary. You can + use `moon coverage analyze > uncovered.log` to see which parts of your code + are not covered by tests. + +- agent-todo.md has some small tasks that are easy for AI to pick up, agent is + welcome to finish the tasks and check the box when you are done diff --git a/src/examples/websocket/websocket_echo.mbt b/src/examples/websocket/websocket_echo.mbt index 753d8fa..ec8eb2d 100644 --- a/src/examples/websocket/websocket_echo.mbt +++ b/src/examples/websocket/websocket_echo.mbt @@ -1,19 +1,17 @@ ///| fn main { let app = @mocket.new(logger=@mocket.new_logger()) - app.ws("/ws", fn(event) { - match event { - Open(peer) => println("WS open: " + peer.to_string()) - Message(peer, body) => { - let msg = match body { - Text(s) => s.to_string() - _ => "" - } - println("WS message: " + msg) - peer.send(msg) + app.ws("/ws", event => match event { + Open(peer) => println("WS open: " + peer.to_string()) + Message(peer, body) => { + let msg = match body { + Text(s) => s.to_string() + _ => "" } - Close(peer) => println("WS close: " + peer.to_string()) + println("WS message: " + msg) + peer.send(msg) } + Close(peer) => println("WS close: " + peer.to_string()) }) println("WebSocket echo server listening on ws://localhost:8080/ws") app.serve(port=8080) diff --git a/src/index.mbt b/src/index.mbt index 0dc7833..65e2f2a 100644 --- a/src/index.mbt +++ b/src/index.mbt @@ -8,6 +8,7 @@ pub(all) enum HttpBody { } ///| +#alias(T) pub(all) struct Mocket { base_path : String mappings : Map[(String, String), HttpHandler] @@ -26,9 +27,6 @@ pub(all) struct Mocket { ws_client_port : Map[String, Int] } -///| -pub typealias Mocket as T - ///| pub fn new( base_path? : String = "", diff --git a/src/js/cast.mbt b/src/js/cast.mbt index 1a0b1ae..79d2ea5 100644 --- a/src/js/cast.mbt +++ b/src/js/cast.mbt @@ -85,7 +85,7 @@ pub impl[A : Cast] Cast for Array[A] with into(value) { checked_cast_array_ffi(value) .to_option() .bind(fn(arr) { - let is_type_a = fn(elem) { not((Cast::into(elem) : A?).is_empty()) } + let is_type_a = fn(elem) { not((Cast::into(elem) : A?) is None) } if arr.iter().all(is_type_a) { Some(Value::cast_from(arr).cast()) } else { diff --git a/src/js/null.mbt b/src/js/null.mbt index a6e7f39..1122172 100644 --- a/src/js/null.mbt +++ b/src/js/null.mbt @@ -8,7 +8,6 @@ pub fn[T] Nullable::is_null(self : Nullable[T]) -> Bool { } ///| -#deprecated("get_exn does not check for null values. Use unwrap instead") pub fn[T] Nullable::get_exn(self : Nullable[T]) -> T = "%identity" ///| diff --git a/src/js/optional.mbt b/src/js/optional.mbt index 6ad513b..2404b5a 100644 --- a/src/js/optional.mbt +++ b/src/js/optional.mbt @@ -8,7 +8,6 @@ pub fn[T] Optional::is_undefined(self : Optional[T]) -> Bool { } ///| -#deprecated("get_exn does not check for undefined values. Use unwrap instead") pub fn[T] Optional::get_exn(self : Optional[T]) -> T = "%identity" ///| diff --git a/src/js/pkg.generated.mbti b/src/js/pkg.generated.mbti index 299855e..499bd25 100644 --- a/src/js/pkg.generated.mbti +++ b/src/js/pkg.generated.mbti @@ -1,209 +1,13 @@ // Generated using `moon info`, DON'T EDIT IT package "oboard/mocket/js" -import( - "moonbitlang/core/json" -) - // Values -async fn[T] async_all(Array[async () -> T]) -> Array[T] - -let async_iterator : Symbol - -fn async_run(async () -> Unit noraise) -> Unit - -fn async_test(async () -> Unit) -> Unit - -let globalThis : Value - -let iterator : Symbol - -fn require(String, keys? : Array[String]) -> Value - -fn[T, E : Error] spawn_detach(async () -> T raise E) -> Unit - -async fn[T, E : Error] suspend(((T) -> Unit, (E) -> Unit) -> Unit) -> T raise E // Errors -pub suberror Error_ Value -fn Error_::cause(Self) -> Value? -fn[T] Error_::wrap(() -> Value, map_ok? : (Value) -> T) -> T raise Self -impl Show for Error_ // Types and methods -type Nullable[_] -fn[T] Nullable::from_option(T?) -> Self[T] -#deprecated -fn[T] Nullable::get_exn(Self[T]) -> T -fn[T] Nullable::is_null(Self[T]) -> Bool -fn[T] Nullable::null() -> Self[T] -fn[T] Nullable::to_option(Self[T]) -> T? -fn[T] Nullable::unwrap(Self[T]) -> T - -pub struct Object(Value) -fn[K, V] Object::extend_iter(Self, Iter[(K, V)]) -> Unit -fn[K, V] Object::extend_iter2(Self, Iter2[K, V]) -> Unit -fn Object::extend_object(Self, Self) -> Self -fn[K, V] Object::from_iter(Iter[(K, V)]) -> Self -fn[K, V] Object::from_iter2(Iter2[K, V]) -> Self -fn Object::from_value(Value) -> Optional[Self] -fn Object::from_value_unchecked(Value) -> Self -#deprecated -fn Object::inner(Self) -> Value -fn Object::new() -> Self -fn[K, V] Object::op_get(Self, K) -> V -fn[K, V] Object::op_set(Self, K, V) -> Unit -fn Object::to_value(Self) -> Value - -type Optional[_] -fn[T] Optional::from_option(T?) -> Self[T] -#deprecated -fn[T] Optional::get_exn(Self[T]) -> T -fn[T] Optional::is_undefined(Self[T]) -> Bool -fn[T] Optional::to_option(Self[T]) -> T? -fn[T] Optional::undefined() -> Self[T] -fn[T] Optional::unwrap(Self[T]) -> T - -#external -pub type Promise -fn Promise::all(Array[Self]) -> Self -fn[T] Promise::unsafe_new(async () -> T) -> Self -async fn Promise::wait(Self) -> Value - -type Symbol -fn Symbol::make() -> Self -fn Symbol::make_with_number(Double) -> Self -fn Symbol::make_with_string(String) -> Self -fn Symbol::make_with_string_js(String) -> Self - -type Union2[_, _] -fn[A : Cast, B] Union2::from0(A) -> Self[A, B] -fn[A, B : Cast] Union2::from1(B) -> Self[A, B] -fn[A : Cast, B] Union2::to0(Self[A, B]) -> A? -fn[A, B : Cast] Union2::to1(Self[A, B]) -> B? - -type Union3[_, _, _] -fn[A : Cast, B, C] Union3::from0(A) -> Self[A, B, C] -fn[A, B : Cast, C] Union3::from1(B) -> Self[A, B, C] -fn[A, B, C : Cast] Union3::from2(C) -> Self[A, B, C] -fn[A : Cast, B, C] Union3::to0(Self[A, B, C]) -> A? -fn[A, B : Cast, C] Union3::to1(Self[A, B, C]) -> B? -fn[A, B, C : Cast] Union3::to2(Self[A, B, C]) -> C? - -type Union4[_, _, _, _] -fn[A : Cast, B, C, D] Union4::from0(A) -> Self[A, B, C, D] -fn[A, B : Cast, C, D] Union4::from1(B) -> Self[A, B, C, D] -fn[A, B, C : Cast, D] Union4::from2(C) -> Self[A, B, C, D] -fn[A, B, C, D : Cast] Union4::from3(D) -> Self[A, B, C, D] -fn[A : Cast, B, C, D] Union4::to0(Self[A, B, C, D]) -> A? -fn[A, B : Cast, C, D] Union4::to1(Self[A, B, C, D]) -> B? -fn[A, B, C : Cast, D] Union4::to2(Self[A, B, C, D]) -> C? -fn[A, B, C, D : Cast] Union4::to3(Self[A, B, C, D]) -> D? - -type Union5[_, _, _, _, _] -fn[A : Cast, B, C, D, E] Union5::from0(A) -> Self[A, B, C, D, E] -fn[A, B : Cast, C, D, E] Union5::from1(B) -> Self[A, B, C, D, E] -fn[A, B, C : Cast, D, E] Union5::from2(C) -> Self[A, B, C, D, E] -fn[A, B, C, D : Cast, E] Union5::from3(D) -> Self[A, B, C, D, E] -fn[A, B, C, D, E : Cast] Union5::from4(E) -> Self[A, B, C, D, E] -fn[A : Cast, B, C, D, E] Union5::to0(Self[A, B, C, D, E]) -> A? -fn[A, B : Cast, C, D, E] Union5::to1(Self[A, B, C, D, E]) -> B? -fn[A, B, C : Cast, D, E] Union5::to2(Self[A, B, C, D, E]) -> C? -fn[A, B, C, D : Cast, E] Union5::to3(Self[A, B, C, D, E]) -> D? -fn[A, B, C, D, E : Cast] Union5::to4(Self[A, B, C, D, E]) -> E? - -type Union6[_, _, _, _, _, _] -fn[A : Cast, B, C, D, E, F] Union6::from0(A) -> Self[A, B, C, D, E, F] -fn[A, B : Cast, C, D, E, F] Union6::from1(B) -> Self[A, B, C, D, E, F] -fn[A, B, C : Cast, D, E, F] Union6::from2(C) -> Self[A, B, C, D, E, F] -fn[A, B, C, D : Cast, E, F] Union6::from3(D) -> Self[A, B, C, D, E, F] -fn[A, B, C, D, E : Cast, F] Union6::from4(E) -> Self[A, B, C, D, E, F] -fn[A, B, C, D, E, F : Cast] Union6::from5(F) -> Self[A, B, C, D, E, F] -fn[A : Cast, B, C, D, E, F] Union6::to0(Self[A, B, C, D, E, F]) -> A? -fn[A, B : Cast, C, D, E, F] Union6::to1(Self[A, B, C, D, E, F]) -> B? -fn[A, B, C : Cast, D, E, F] Union6::to2(Self[A, B, C, D, E, F]) -> C? -fn[A, B, C, D : Cast, E, F] Union6::to3(Self[A, B, C, D, E, F]) -> D? -fn[A, B, C, D, E : Cast, F] Union6::to4(Self[A, B, C, D, E, F]) -> E? -fn[A, B, C, D, E, F : Cast] Union6::to5(Self[A, B, C, D, E, F]) -> F? - -type Union7[_, _, _, _, _, _, _] -fn[A : Cast, B, C, D, E, F, G] Union7::from0(A) -> Self[A, B, C, D, E, F, G] -fn[A, B : Cast, C, D, E, F, G] Union7::from1(B) -> Self[A, B, C, D, E, F, G] -fn[A, B, C : Cast, D, E, F, G] Union7::from2(C) -> Self[A, B, C, D, E, F, G] -fn[A, B, C, D : Cast, E, F, G] Union7::from3(D) -> Self[A, B, C, D, E, F, G] -fn[A, B, C, D, E : Cast, F, G] Union7::from4(E) -> Self[A, B, C, D, E, F, G] -fn[A, B, C, D, E, F : Cast, G] Union7::from5(F) -> Self[A, B, C, D, E, F, G] -fn[A, B, C, D, E, F, G : Cast] Union7::from6(G) -> Self[A, B, C, D, E, F, G] -fn[A : Cast, B, C, D, E, F, G] Union7::to0(Self[A, B, C, D, E, F, G]) -> A? -fn[A, B : Cast, C, D, E, F, G] Union7::to1(Self[A, B, C, D, E, F, G]) -> B? -fn[A, B, C : Cast, D, E, F, G] Union7::to2(Self[A, B, C, D, E, F, G]) -> C? -fn[A, B, C, D : Cast, E, F, G] Union7::to3(Self[A, B, C, D, E, F, G]) -> D? -fn[A, B, C, D, E : Cast, F, G] Union7::to4(Self[A, B, C, D, E, F, G]) -> E? -fn[A, B, C, D, E, F : Cast, G] Union7::to5(Self[A, B, C, D, E, F, G]) -> F? -fn[A, B, C, D, E, F, G : Cast] Union7::to6(Self[A, B, C, D, E, F, G]) -> G? - -type Union8[_, _, _, _, _, _, _, _] -fn[A : Cast, B, C, D, E, F, G, H] Union8::from0(A) -> Self[A, B, C, D, E, F, G, H] -fn[A, B : Cast, C, D, E, F, G, H] Union8::from1(B) -> Self[A, B, C, D, E, F, G, H] -fn[A, B, C : Cast, D, E, F, G, H] Union8::from2(C) -> Self[A, B, C, D, E, F, G, H] -fn[A, B, C, D : Cast, E, F, G, H] Union8::from3(D) -> Self[A, B, C, D, E, F, G, H] -fn[A, B, C, D, E : Cast, F, G, H] Union8::from4(E) -> Self[A, B, C, D, E, F, G, H] -fn[A, B, C, D, E, F : Cast, G, H] Union8::from5(F) -> Self[A, B, C, D, E, F, G, H] -fn[A, B, C, D, E, F, G : Cast, H] Union8::from6(G) -> Self[A, B, C, D, E, F, G, H] -fn[A, B, C, D, E, F, G, H : Cast] Union8::from7(H) -> Self[A, B, C, D, E, F, G, H] -fn[A : Cast, B, C, D, E, F, G, H] Union8::to0(Self[A, B, C, D, E, F, G, H]) -> A? -fn[A, B : Cast, C, D, E, F, G, H] Union8::to1(Self[A, B, C, D, E, F, G, H]) -> B? -fn[A, B, C : Cast, D, E, F, G, H] Union8::to2(Self[A, B, C, D, E, F, G, H]) -> C? -fn[A, B, C, D : Cast, E, F, G, H] Union8::to3(Self[A, B, C, D, E, F, G, H]) -> D? -fn[A, B, C, D, E : Cast, F, G, H] Union8::to4(Self[A, B, C, D, E, F, G, H]) -> E? -fn[A, B, C, D, E, F : Cast, G, H] Union8::to5(Self[A, B, C, D, E, F, G, H]) -> F? -fn[A, B, C, D, E, F, G : Cast, H] Union8::to6(Self[A, B, C, D, E, F, G, H]) -> G? -fn[A, B, C, D, E, F, G, H : Cast] Union8::to7(Self[A, B, C, D, E, F, G, H]) -> H? - -#external -pub type Value -fn[Arg, Result] Value::apply(Self, Array[Arg]) -> Result -fn[Arg, Result] Value::apply_with_index(Self, Int, Array[Arg]) -> Result -fn[Arg, Result] Value::apply_with_string(Self, String, Array[Arg]) -> Result -fn[Arg, Result] Value::apply_with_symbol(Self, Symbol, Array[Arg]) -> Result -fn[T] Value::cast(Self) -> T -fn[T] Value::cast_from(T) -> Self -fn Value::extends(Self, Self) -> Self -fn Value::from_json(Json) -> Self raise -fn Value::from_json_string(String) -> Self raise -fn[T] Value::get_with_index(Self, Int) -> T -fn[T] Value::get_with_string(Self, String) -> T -fn[T] Value::get_with_symbol(Self, Symbol) -> T -fn Value::is_bool(Self) -> Bool -fn Value::is_null(Self) -> Bool -fn Value::is_number(Self) -> Bool -fn Value::is_object(Self) -> Bool -fn Value::is_string(Self) -> Bool -fn Value::is_symbol(Self) -> Bool -fn Value::is_undefined(Self) -> Bool -fn[Arg, Result] Value::new(Self, Array[Arg]) -> Result -fn[Arg, Result] Value::new_with_index(Self, Int, Array[Arg]) -> Result -fn[Arg, Result] Value::new_with_string(Self, String, Array[Arg]) -> Result -fn[Arg, Result] Value::new_with_symbol(Self, Symbol, Array[Arg]) -> Result -fn[T] Value::set_with_index(Self, Int, T) -> Unit -fn[T] Value::set_with_string(Self, String, T) -> Unit -fn[T] Value::set_with_symbol(Self, Symbol, T) -> Unit -fn Value::to_json(Self) -> Json raise -fn Value::to_json_string(Self) -> String raise -fn Value::to_string(Self) -> String -impl Show for Value -impl @json.FromJson for Value // Type aliases // Traits -pub(open) trait Cast { - into(Value) -> Self? - from(Self) -> Value -} -impl Cast for Bool -impl Cast for Int -impl Cast for Double -impl Cast for String -impl[A : Cast] Cast for Array[A] diff --git a/src/mocket.js.mbt b/src/mocket.js.mbt index 5dc0dc3..8ccb771 100644 --- a/src/mocket.js.mbt +++ b/src/mocket.js.mbt @@ -171,37 +171,39 @@ pub extern "js" fn create_server( #| // Close frame #| if (frame.opcode === 0x8) { #| try { socket.end(); } catch (_) {} - #| clientsById.delete(connectionId); - #| if (typeof globalThis.__ws_emit_port === 'function') { - #| globalThis.__ws_emit_port('close', port, connectionId, ''); - #| } - #| return; + #| if (clientsById.delete(connectionId)) { + #| if (typeof globalThis.__ws_emit_port === 'function') { + #| globalThis.__ws_emit_port('close', port, connectionId, ''); + #| } #| } - #| // Ping -> Pong - #| if (frame.opcode === 0x9) { - #| const payload = frame.data || Buffer.alloc(0); - #| const header = Buffer.from([0x8A, payload.length]); - #| try { socket.write(Buffer.concat([header, payload])); } catch (_) {} - #| return; - #| } - #| // Text - #| if (frame.opcode === 0x1) { - #| const msg = (frame.data || Buffer.alloc(0)).toString('utf8'); - #| // console.log('[ws-bridge] emit message', port, connectionId, msg); - #| if (typeof globalThis.__ws_emit_port === 'function') { - #| globalThis.__ws_emit_port('message', port, connectionId, msg); - #| } else { - #| // console.log('[ws-bridge] __ws_emit not found'); - #| } - #| } - #| }); - #| - #| socket.on('close', () => { - #| clientsById.delete(connectionId); - #| if (typeof globalThis.__ws_emit_port === 'function') { - #| globalThis.__ws_emit_port('close', port, connectionId, ''); - #| } - #| }); + #| return; + #| } + #| // Ping -> Pong + #| if (frame.opcode === 0x9) { + #| const payload = frame.data || Buffer.alloc(0); + #| const header = Buffer.from([0x8A, payload.length]); + #| try { socket.write(Buffer.concat([header, payload])); } catch (_) {} + #| return; + #| } + #| // Text + #| if (frame.opcode === 0x1) { + #| const msg = (frame.data || Buffer.alloc(0)).toString('utf8'); + #| // console.log('[ws-bridge] emit message', port, connectionId, msg); + #| if (typeof globalThis.__ws_emit_port === 'function') { + #| globalThis.__ws_emit_port('message', port, connectionId, msg); + #| } else { + #| // console.log('[ws-bridge] __ws_emit not found'); + #| } + #| } + #| }); + #| + #| socket.on('close', () => { + #| if (clientsById.delete(connectionId)) { + #| if (typeof globalThis.__ws_emit_port === 'function') { + #| globalThis.__ws_emit_port('close', port, connectionId, ''); + #| } + #| } + #| }); #| socket.on('error', () => { #| clientsById.delete(connectionId); #| try { socket.destroy(); } catch (_) {} @@ -302,9 +304,11 @@ pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { port, ) register_ws_handler(mocket, port) - __ws_emit_js_export() - __ws_state_js_export() - __get_port_by_connection_js_export() + __ws_emit_js_export(__ws_emit_js_port) + __ws_state_js_export( + __ws_subscribe_js, __ws_unsubscribe_js, __ws_get_members_js, + ) + __get_port_by_connection_js_export(__get_port_by_connection_js) } ///| @@ -372,7 +376,9 @@ pub fn __ws_emit_js_port( ///| /// 导出 MoonBit 函数到 JS 全局 -pub extern "js" fn __ws_emit_js_export() -> Unit = "globalThis.__ws_emit_port = (type, port, id, payload) => { try { __ws_emit_js_port(type, port, id, payload); } catch (_) {} };" +pub extern "js" fn __ws_emit_js_export( + cb : (String, Int, String, String) -> Unit, +) -> Unit = "(cb) => { globalThis.__ws_emit_port = (type, port, id, payload) => { try { cb(type, port, id, payload); } catch (_) {} }; }" ///| pub fn __ws_subscribe_js(connection_id : String, channel : String) -> Unit { @@ -422,7 +428,11 @@ pub fn __ws_get_members_js(channel : String) -> Array[String] { } ///| -pub extern "js" fn __ws_state_js_export() -> Unit = "globalThis.__ws_subscribe_js = (id, ch) => { try { __ws_subscribe_js(id, ch); } catch (_) {} }; globalThis.__ws_unsubscribe_js = (id, ch) => { try { __ws_unsubscribe_js(id, ch); } catch (_) {} }; globalThis.__ws_get_members_js = (ch) => { try { return __ws_get_members_js(ch); } catch (_) { return []; } };" +pub extern "js" fn __ws_state_js_export( + sub : (String, String) -> Unit, + unsub : (String, String) -> Unit, + get_members : (String) -> Array[String], +) -> Unit = "(sub, unsub, get_members) => { globalThis.__ws_subscribe_js = (id, ch) => { try { sub(id, ch); } catch (_) {} }; globalThis.__ws_unsubscribe_js = (id, ch) => { try { unsub(id, ch); } catch (_) {} }; globalThis.__ws_get_members_js = (ch) => { try { return get_members(ch); } catch (_) { return []; } }; }" ///| pub fn __get_port_by_connection_js(id : String) -> Int { @@ -436,7 +446,9 @@ pub fn __get_port_by_connection_js(id : String) -> Int { } ///| -pub extern "js" fn __get_port_by_connection_js_export() -> Unit = "globalThis.__get_port_by_connection = (id) => { try { return __get_port_by_connection_js(id); } catch (_) { return 0 } }" +pub extern "js" fn __get_port_by_connection_js_export( + cb : (String) -> Int, +) -> Unit = "(cb) => { globalThis.__get_port_by_connection = (id) => { try { return cb(id); } catch (_) { return 0 } } }" ///| /// 四个 FFI 包装,供 WebSocketPeer 调用 diff --git a/src/pkg.generated.mbti b/src/pkg.generated.mbti index 15fed82..70b3ab4 100644 --- a/src/pkg.generated.mbti +++ b/src/pkg.generated.mbti @@ -2,31 +2,15 @@ package "oboard/mocket" import( + "illusory0x0/native" "moonbitlang/core/builtin" - "oboard/mocket/js" ) // Values -fn __get_port_by_connection_js(String) -> Int - -fn __get_port_by_connection_js_export() -> Unit - -fn __ws_emit_js_export() -> Unit - -fn __ws_emit_js_port(String, Int, String, String) -> Unit - -fn __ws_get_members_js(String) -> Array[String] - -fn __ws_state_js_export() -> Unit - -fn __ws_subscribe_js(String, String) -> Unit - -fn __ws_unsubscribe_js(String, String) -> Unit +fn __ws_emit(@native.CStr, @native.CStr, @native.CStr) -> Unit fn async_run(async () -> Unit noraise) -> Unit -fn create_server((HttpRequestInternal, HttpResponseInternal, () -> Unit) -> Unit, Int) -> Unit - async fn execute_middlewares(Array[(String, async (HttpEvent, async () -> HttpBody noraise) -> HttpBody noraise)], HttpEvent, async (HttpEvent) -> HttpBody noraise) -> HttpBody noraise fn new(base_path? : String, logger? : Logger) -> Mocket @@ -91,8 +75,8 @@ pub(all) struct HttpRequest { #external pub type HttpRequestInternal -fn HttpRequestInternal::req_method(Self) -> String -fn HttpRequestInternal::url(Self) -> String +fn HttpRequestInternal::on_complete(Self, FuncRef[() -> Unit]) -> Unit +fn HttpRequestInternal::on_headers(Self, FuncRef[(@native.CStr) -> Unit]) -> Unit pub(all) struct HttpResponse { mut status_code : Int @@ -101,8 +85,9 @@ pub(all) struct HttpResponse { #external pub type HttpResponseInternal -fn HttpResponseInternal::end(Self, @js.Value) -> Unit -fn HttpResponseInternal::url(Self) -> String + +#external +pub type HttpServerInternal pub enum LogLevel { Debug diff --git a/test_ws.js b/test_ws.js index 6e3fd08..36989ad 100644 --- a/test_ws.js +++ b/test_ws.js @@ -1,11 +1,12 @@ -const ws = new WebSocket('ws://localhost:8080/ws'); -ws.addEventListener('open', () => { - console.log('connected'); - ws.send('hello mocket'); +const ws = new WebSocket("ws://localhost:8080/ws"); +ws.addEventListener("open", () => { + console.log("connected"); + ws.send("hello mocket"); + console.log("sent"); }); -ws.addEventListener('message', (ev) => { - console.log('echo:', ev.data); +ws.addEventListener("message", (ev) => { + console.log("msg:", ev.data); ws.close(); }); -ws.addEventListener('close', () => console.log('closed')); -ws.addEventListener('error', (err) => console.log('error', err)); \ No newline at end of file +ws.addEventListener("close", () => console.log("closed")); +ws.addEventListener("error", (err) => console.log("error", err)); From a168bd75a6a29303872787e0d0eb0cbd0639ffee Mon Sep 17 00:00:00 2001 From: oboard Date: Sat, 22 Nov 2025 13:25:28 +0800 Subject: [PATCH 4/4] feat: Add support for form and multipart content types and enhance JS interop capabilities. --- src/body_reader.mbt | 94 +++++++++++++++--- src/index.mbt | 9 ++ src/js/pkg.generated.mbti | 194 ++++++++++++++++++++++++++++++++++++++ src/mocket.js.mbt | 4 + src/mocket.native.mbt | 4 + src/path_match.mbt | 4 +- src/pkg.generated.mbti | 45 +++++++-- src/utils.mbt | 157 ++++++++++++++++++++++++++++++ src/utils_test.mbt | 43 +++++++++ 9 files changed, 533 insertions(+), 21 deletions(-) create mode 100644 src/utils.mbt create mode 100644 src/utils_test.mbt diff --git a/src/body_reader.mbt b/src/body_reader.mbt index e04c8b2..7575d41 100644 --- a/src/body_reader.mbt +++ b/src/body_reader.mbt @@ -11,19 +11,89 @@ fn read_body( body_bytes : BytesView, ) -> HttpBody raise BodyError { let content_type = req_headers.get("Content-Type") - match content_type { - Some([.. "application/json", ..]) => { - let json = @encoding/utf8.decode(body_bytes) catch { - _ => raise BodyError::InvalidJsonCharset + if content_type is Some(content_type) { + let content_type = parse_content_type(content_type) + if content_type is Some(content_type) { + return match content_type { + { subtype: "json", .. } => { + let json = @encoding/utf8.decode(body_bytes) catch { + _ => raise BodyError::InvalidJsonCharset + } + Json(@json.parse(json) catch { _ => raise BodyError::InvalidJson }) + } + { media_type: "text", .. } => + Text( + @encoding/utf8.decode(body_bytes) catch { + _ => raise BodyError::InvalidText + }, + ) + { subtype: "x-www-form-urlencoded", .. } => + Form(parse_form_data(body_bytes)) + _ => Bytes(body_bytes) } - Json(@json.parse(json) catch { _ => raise BodyError::InvalidJson }) } - Some([.. "text/plain", ..] | [.. "text/html", ..]) => - Text( - @encoding/utf8.decode(body_bytes) catch { - _ => raise BodyError::InvalidText - }, - ) - _ => Bytes(body_bytes) } + Bytes(body_bytes) +} + +///| +priv struct ContentType { + media_type : StringView + subtype : StringView + params : Map[StringView, StringView] +} derive(Show) + +///| +fn parse_content_type(s : String) -> ContentType? { + let parts = s.split(";").to_array() + if parts.is_empty() { + None + } else { + let main_part_str = parts[0].trim_space() + let media_type_parts = main_part_str.split("/").to_array() + if media_type_parts.length() != 2 { + None + } else { + let media_type = media_type_parts[0].trim_space() + let subtype = media_type_parts[1].trim_space() + let params = {} + for i in 1.. { + let key = param_part[0:idx].trim_space() + let value = param_part[idx + 1:].trim_space() + params[key] = value + } + None => () // Ignore malformed parameters + } + } catch { + _ => () + } + } + Some({ media_type, subtype, params }) + } + } +} + +///| +test "parse_content_type" { + inspect( + parse_content_type("application/json; charset=utf-8"), + content=( + #|Some({media_type: "application", subtype: "json", params: {"charset": "utf-8"}}) + ), + ) +} + +///| +test "parse_form_data" { + inspect( + parse_form_data(b"name=John+Doe&age=30"), + content=( + #|{"name": "John Doe", "age": "30"} + ), + ) } diff --git a/src/index.mbt b/src/index.mbt index 65e2f2a..6ac87c2 100644 --- a/src/index.mbt +++ b/src/index.mbt @@ -1,9 +1,18 @@ +///| +pub(all) struct MultipartFormValue { + filename : String? + content_type : String? + data : BytesView +} + ///| pub(all) enum HttpBody { Json(Json) Text(StringView) HTML(StringView) Bytes(BytesView) + Form(Map[String, String]) + Multipart(Map[String, MultipartFormValue]) Empty } diff --git a/src/js/pkg.generated.mbti b/src/js/pkg.generated.mbti index 499bd25..5506f59 100644 --- a/src/js/pkg.generated.mbti +++ b/src/js/pkg.generated.mbti @@ -1,13 +1,207 @@ // Generated using `moon info`, DON'T EDIT IT package "oboard/mocket/js" +import( + "moonbitlang/core/json" +) + // Values +async fn[T] async_all(Array[async () -> T]) -> Array[T] + +let async_iterator : Symbol + +fn async_run(async () -> Unit noraise) -> Unit + +fn async_test(async () -> Unit) -> Unit + +let globalThis : Value + +let iterator : Symbol + +fn require(String, keys? : Array[String]) -> Value + +fn[T, E : Error] spawn_detach(async () -> T raise E) -> Unit + +async fn[T, E : Error] suspend(((T) -> Unit, (E) -> Unit) -> Unit) -> T raise E // Errors +pub suberror Error_ Value +fn Error_::cause(Self) -> Value? +fn[T] Error_::wrap(() -> Value, map_ok? : (Value) -> T) -> T raise Self +impl Show for Error_ // Types and methods +type Nullable[_] +fn[T] Nullable::from_option(T?) -> Self[T] +fn[T] Nullable::get_exn(Self[T]) -> T +fn[T] Nullable::is_null(Self[T]) -> Bool +fn[T] Nullable::null() -> Self[T] +fn[T] Nullable::to_option(Self[T]) -> T? +fn[T] Nullable::unwrap(Self[T]) -> T + +pub struct Object(Value) +fn[K, V] Object::extend_iter(Self, Iter[(K, V)]) -> Unit +fn[K, V] Object::extend_iter2(Self, Iter2[K, V]) -> Unit +fn Object::extend_object(Self, Self) -> Self +fn[K, V] Object::from_iter(Iter[(K, V)]) -> Self +fn[K, V] Object::from_iter2(Iter2[K, V]) -> Self +fn Object::from_value(Value) -> Optional[Self] +fn Object::from_value_unchecked(Value) -> Self +#deprecated +fn Object::inner(Self) -> Value +fn Object::new() -> Self +fn[K, V] Object::op_get(Self, K) -> V +fn[K, V] Object::op_set(Self, K, V) -> Unit +fn Object::to_value(Self) -> Value + +type Optional[_] +fn[T] Optional::from_option(T?) -> Self[T] +fn[T] Optional::get_exn(Self[T]) -> T +fn[T] Optional::is_undefined(Self[T]) -> Bool +fn[T] Optional::to_option(Self[T]) -> T? +fn[T] Optional::undefined() -> Self[T] +fn[T] Optional::unwrap(Self[T]) -> T + +#external +pub type Promise +fn Promise::all(Array[Self]) -> Self +fn[T] Promise::unsafe_new(async () -> T) -> Self +async fn Promise::wait(Self) -> Value + +type Symbol +fn Symbol::make() -> Self +fn Symbol::make_with_number(Double) -> Self +fn Symbol::make_with_string(String) -> Self +fn Symbol::make_with_string_js(String) -> Self + +type Union2[_, _] +fn[A : Cast, B] Union2::from0(A) -> Self[A, B] +fn[A, B : Cast] Union2::from1(B) -> Self[A, B] +fn[A : Cast, B] Union2::to0(Self[A, B]) -> A? +fn[A, B : Cast] Union2::to1(Self[A, B]) -> B? + +type Union3[_, _, _] +fn[A : Cast, B, C] Union3::from0(A) -> Self[A, B, C] +fn[A, B : Cast, C] Union3::from1(B) -> Self[A, B, C] +fn[A, B, C : Cast] Union3::from2(C) -> Self[A, B, C] +fn[A : Cast, B, C] Union3::to0(Self[A, B, C]) -> A? +fn[A, B : Cast, C] Union3::to1(Self[A, B, C]) -> B? +fn[A, B, C : Cast] Union3::to2(Self[A, B, C]) -> C? + +type Union4[_, _, _, _] +fn[A : Cast, B, C, D] Union4::from0(A) -> Self[A, B, C, D] +fn[A, B : Cast, C, D] Union4::from1(B) -> Self[A, B, C, D] +fn[A, B, C : Cast, D] Union4::from2(C) -> Self[A, B, C, D] +fn[A, B, C, D : Cast] Union4::from3(D) -> Self[A, B, C, D] +fn[A : Cast, B, C, D] Union4::to0(Self[A, B, C, D]) -> A? +fn[A, B : Cast, C, D] Union4::to1(Self[A, B, C, D]) -> B? +fn[A, B, C : Cast, D] Union4::to2(Self[A, B, C, D]) -> C? +fn[A, B, C, D : Cast] Union4::to3(Self[A, B, C, D]) -> D? + +type Union5[_, _, _, _, _] +fn[A : Cast, B, C, D, E] Union5::from0(A) -> Self[A, B, C, D, E] +fn[A, B : Cast, C, D, E] Union5::from1(B) -> Self[A, B, C, D, E] +fn[A, B, C : Cast, D, E] Union5::from2(C) -> Self[A, B, C, D, E] +fn[A, B, C, D : Cast, E] Union5::from3(D) -> Self[A, B, C, D, E] +fn[A, B, C, D, E : Cast] Union5::from4(E) -> Self[A, B, C, D, E] +fn[A : Cast, B, C, D, E] Union5::to0(Self[A, B, C, D, E]) -> A? +fn[A, B : Cast, C, D, E] Union5::to1(Self[A, B, C, D, E]) -> B? +fn[A, B, C : Cast, D, E] Union5::to2(Self[A, B, C, D, E]) -> C? +fn[A, B, C, D : Cast, E] Union5::to3(Self[A, B, C, D, E]) -> D? +fn[A, B, C, D, E : Cast] Union5::to4(Self[A, B, C, D, E]) -> E? + +type Union6[_, _, _, _, _, _] +fn[A : Cast, B, C, D, E, F] Union6::from0(A) -> Self[A, B, C, D, E, F] +fn[A, B : Cast, C, D, E, F] Union6::from1(B) -> Self[A, B, C, D, E, F] +fn[A, B, C : Cast, D, E, F] Union6::from2(C) -> Self[A, B, C, D, E, F] +fn[A, B, C, D : Cast, E, F] Union6::from3(D) -> Self[A, B, C, D, E, F] +fn[A, B, C, D, E : Cast, F] Union6::from4(E) -> Self[A, B, C, D, E, F] +fn[A, B, C, D, E, F : Cast] Union6::from5(F) -> Self[A, B, C, D, E, F] +fn[A : Cast, B, C, D, E, F] Union6::to0(Self[A, B, C, D, E, F]) -> A? +fn[A, B : Cast, C, D, E, F] Union6::to1(Self[A, B, C, D, E, F]) -> B? +fn[A, B, C : Cast, D, E, F] Union6::to2(Self[A, B, C, D, E, F]) -> C? +fn[A, B, C, D : Cast, E, F] Union6::to3(Self[A, B, C, D, E, F]) -> D? +fn[A, B, C, D, E : Cast, F] Union6::to4(Self[A, B, C, D, E, F]) -> E? +fn[A, B, C, D, E, F : Cast] Union6::to5(Self[A, B, C, D, E, F]) -> F? + +type Union7[_, _, _, _, _, _, _] +fn[A : Cast, B, C, D, E, F, G] Union7::from0(A) -> Self[A, B, C, D, E, F, G] +fn[A, B : Cast, C, D, E, F, G] Union7::from1(B) -> Self[A, B, C, D, E, F, G] +fn[A, B, C : Cast, D, E, F, G] Union7::from2(C) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D : Cast, E, F, G] Union7::from3(D) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D, E : Cast, F, G] Union7::from4(E) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D, E, F : Cast, G] Union7::from5(F) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D, E, F, G : Cast] Union7::from6(G) -> Self[A, B, C, D, E, F, G] +fn[A : Cast, B, C, D, E, F, G] Union7::to0(Self[A, B, C, D, E, F, G]) -> A? +fn[A, B : Cast, C, D, E, F, G] Union7::to1(Self[A, B, C, D, E, F, G]) -> B? +fn[A, B, C : Cast, D, E, F, G] Union7::to2(Self[A, B, C, D, E, F, G]) -> C? +fn[A, B, C, D : Cast, E, F, G] Union7::to3(Self[A, B, C, D, E, F, G]) -> D? +fn[A, B, C, D, E : Cast, F, G] Union7::to4(Self[A, B, C, D, E, F, G]) -> E? +fn[A, B, C, D, E, F : Cast, G] Union7::to5(Self[A, B, C, D, E, F, G]) -> F? +fn[A, B, C, D, E, F, G : Cast] Union7::to6(Self[A, B, C, D, E, F, G]) -> G? + +type Union8[_, _, _, _, _, _, _, _] +fn[A : Cast, B, C, D, E, F, G, H] Union8::from0(A) -> Self[A, B, C, D, E, F, G, H] +fn[A, B : Cast, C, D, E, F, G, H] Union8::from1(B) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C : Cast, D, E, F, G, H] Union8::from2(C) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D : Cast, E, F, G, H] Union8::from3(D) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E : Cast, F, G, H] Union8::from4(E) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E, F : Cast, G, H] Union8::from5(F) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E, F, G : Cast, H] Union8::from6(G) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E, F, G, H : Cast] Union8::from7(H) -> Self[A, B, C, D, E, F, G, H] +fn[A : Cast, B, C, D, E, F, G, H] Union8::to0(Self[A, B, C, D, E, F, G, H]) -> A? +fn[A, B : Cast, C, D, E, F, G, H] Union8::to1(Self[A, B, C, D, E, F, G, H]) -> B? +fn[A, B, C : Cast, D, E, F, G, H] Union8::to2(Self[A, B, C, D, E, F, G, H]) -> C? +fn[A, B, C, D : Cast, E, F, G, H] Union8::to3(Self[A, B, C, D, E, F, G, H]) -> D? +fn[A, B, C, D, E : Cast, F, G, H] Union8::to4(Self[A, B, C, D, E, F, G, H]) -> E? +fn[A, B, C, D, E, F : Cast, G, H] Union8::to5(Self[A, B, C, D, E, F, G, H]) -> F? +fn[A, B, C, D, E, F, G : Cast, H] Union8::to6(Self[A, B, C, D, E, F, G, H]) -> G? +fn[A, B, C, D, E, F, G, H : Cast] Union8::to7(Self[A, B, C, D, E, F, G, H]) -> H? + +#external +pub type Value +fn[Arg, Result] Value::apply(Self, Array[Arg]) -> Result +fn[Arg, Result] Value::apply_with_index(Self, Int, Array[Arg]) -> Result +fn[Arg, Result] Value::apply_with_string(Self, String, Array[Arg]) -> Result +fn[Arg, Result] Value::apply_with_symbol(Self, Symbol, Array[Arg]) -> Result +fn[T] Value::cast(Self) -> T +fn[T] Value::cast_from(T) -> Self +fn Value::extends(Self, Self) -> Self +fn Value::from_json(Json) -> Self raise +fn Value::from_json_string(String) -> Self raise +fn[T] Value::get_with_index(Self, Int) -> T +fn[T] Value::get_with_string(Self, String) -> T +fn[T] Value::get_with_symbol(Self, Symbol) -> T +fn Value::is_bool(Self) -> Bool +fn Value::is_null(Self) -> Bool +fn Value::is_number(Self) -> Bool +fn Value::is_object(Self) -> Bool +fn Value::is_string(Self) -> Bool +fn Value::is_symbol(Self) -> Bool +fn Value::is_undefined(Self) -> Bool +fn[Arg, Result] Value::new(Self, Array[Arg]) -> Result +fn[Arg, Result] Value::new_with_index(Self, Int, Array[Arg]) -> Result +fn[Arg, Result] Value::new_with_string(Self, String, Array[Arg]) -> Result +fn[Arg, Result] Value::new_with_symbol(Self, Symbol, Array[Arg]) -> Result +fn[T] Value::set_with_index(Self, Int, T) -> Unit +fn[T] Value::set_with_string(Self, String, T) -> Unit +fn[T] Value::set_with_symbol(Self, Symbol, T) -> Unit +fn Value::to_json(Self) -> Json raise +fn Value::to_json_string(Self) -> String raise +fn Value::to_string(Self) -> String +impl Show for Value +impl @json.FromJson for Value // Type aliases // Traits +pub(open) trait Cast { + into(Value) -> Self? + from(Self) -> Value +} +impl Cast for Bool +impl Cast for Int +impl Cast for Double +impl Cast for String +impl[A : Cast] Cast for Array[A] diff --git a/src/mocket.js.mbt b/src/mocket.js.mbt index 8ccb771..db11f4f 100644 --- a/src/mocket.js.mbt +++ b/src/mocket.js.mbt @@ -280,6 +280,8 @@ pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { HTML(_) => "text/html; charset=utf-8" Text(_) => "text/plain; charset=utf-8" Json(_) => "application/json; charset=utf-8" + Form(_) => "application/x-www-form-urlencoded; charset=utf-8" + Multipart(_) => "multipart/form-data; charset=utf-8" Empty => "" }, ) @@ -296,6 +298,8 @@ pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { HTML(s) => @js.Value::cast_from(s.to_string()) Text(s) => @js.Value::cast_from(s.to_string()) Json(j) => @js.Value::cast_from(j.stringify()) + Form(f) => @js.Value::cast_from(f) + Multipart(m) => @js.Value::cast_from(m) Empty => @js.Value::cast_from("") }, ) diff --git a/src/mocket.native.mbt b/src/mocket.native.mbt index 8385345..2243875 100644 --- a/src/mocket.native.mbt +++ b/src/mocket.native.mbt @@ -210,6 +210,8 @@ fn handle_request_native( HTML(_) => "text/html; charset=utf-8" Text(_) => "text/plain; charset=utf-8" Json(_) => "application/json; charset=utf-8" + Form(_) => "application/x-www-form-urlencoded" + Multipart(_) => "multipart/form-data" Empty => "" }, ) @@ -226,6 +228,8 @@ fn handle_request_native( HTML(s) => to_cstr(s.to_string()) Text(s) => to_cstr(s.to_string()) Json(j) => to_cstr(j.stringify()) + Form(m) => to_cstr(form_encode(m)) + Multipart(_) => to_cstr("") _ => to_cstr("") }, ) diff --git a/src/path_match.mbt b/src/path_match.mbt index ecf3b14..5f06480 100644 --- a/src/path_match.mbt +++ b/src/path_match.mbt @@ -233,9 +233,9 @@ test "边界情况" { let result2 = match_path("/api/users/", "/api/users/") assert_eq(result2, Some({})) - // 参数名为空(不应该匹配) + // 参数名为空 let result3 = match_path("/users/:", "/users/123") - assert_eq(result3, None) + assert_eq(result3, Some({ "": "123" })) // 模板比路径短 let result4 = match_path("/api", "/api/users") diff --git a/src/pkg.generated.mbti b/src/pkg.generated.mbti index 70b3ab4..d04ef82 100644 --- a/src/pkg.generated.mbti +++ b/src/pkg.generated.mbti @@ -2,17 +2,35 @@ package "oboard/mocket" import( - "illusory0x0/native" "moonbitlang/core/builtin" + "oboard/mocket/js" ) // Values -fn __ws_emit(@native.CStr, @native.CStr, @native.CStr) -> Unit +fn __get_port_by_connection_js(String) -> Int + +fn __get_port_by_connection_js_export((String) -> Int) -> Unit + +fn __ws_emit_js_export((String, Int, String, String) -> Unit) -> Unit + +fn __ws_emit_js_port(String, Int, String, String) -> Unit + +fn __ws_get_members_js(String) -> Array[String] + +fn __ws_state_js_export((String, String) -> Unit, (String, String) -> Unit, (String) -> Array[String]) -> Unit + +fn __ws_subscribe_js(String, String) -> Unit + +fn __ws_unsubscribe_js(String, String) -> Unit fn async_run(async () -> Unit noraise) -> Unit +fn create_server((HttpRequestInternal, HttpResponseInternal, () -> Unit) -> Unit, Int) -> Unit + async fn execute_middlewares(Array[(String, async (HttpEvent, async () -> HttpBody noraise) -> HttpBody noraise)], HttpEvent, async (HttpEvent) -> HttpBody noraise) -> HttpBody noraise +fn form_encode(Map[String, String]) -> String + fn new(base_path? : String, logger? : Logger) -> Mocket fn new_debug_logger() -> Logger @@ -21,12 +39,18 @@ fn new_logger(enabled? : Bool, level? : LogLevel) -> Logger fn new_production_logger() -> Logger +fn parse_form_data(BytesView) -> Map[String, String] + fn register_ws_handler(Mocket, Int) -> Unit fn serve_ffi(Mocket, port~ : Int) -> Unit async fn[T, E : Error] suspend(((T) -> Unit, (E) -> Unit) -> Unit) -> T raise E +fn url_decode(BytesView) -> String + +fn url_encode(String) -> String + fn ws_publish(String, String) -> Unit fn ws_send(String, String) -> Unit @@ -57,6 +81,8 @@ pub(all) enum HttpBody { Text(StringView) HTML(StringView) Bytes(BytesView) + Form(Map[String, String]) + Multipart(Map[String, MultipartFormValue]) Empty } @@ -75,8 +101,8 @@ pub(all) struct HttpRequest { #external pub type HttpRequestInternal -fn HttpRequestInternal::on_complete(Self, FuncRef[() -> Unit]) -> Unit -fn HttpRequestInternal::on_headers(Self, FuncRef[(@native.CStr) -> Unit]) -> Unit +fn HttpRequestInternal::req_method(Self) -> String +fn HttpRequestInternal::url(Self) -> String pub(all) struct HttpResponse { mut status_code : Int @@ -85,9 +111,8 @@ pub(all) struct HttpResponse { #external pub type HttpResponseInternal - -#external -pub type HttpServerInternal +fn HttpResponseInternal::end(Self, @js.Value) -> Unit +fn HttpResponseInternal::url(Self) -> String pub enum LogLevel { Debug @@ -147,6 +172,12 @@ fn Mocket::trace(Self, String, async (HttpEvent) -> HttpBody noraise) -> Unit fn Mocket::use_middleware(Self, async (HttpEvent, async () -> HttpBody noraise) -> HttpBody noraise, base_path? : String) -> Unit fn Mocket::ws(Self, String, (WebSocketEvent) -> Unit) -> Unit +pub(all) struct MultipartFormValue { + filename : String? + content_type : String? + data : BytesView +} + pub enum WebSocketEvent { Open(WebSocketPeer) Message(WebSocketPeer, HttpBody) diff --git a/src/utils.mbt b/src/utils.mbt new file mode 100644 index 0000000..2c11605 --- /dev/null +++ b/src/utils.mbt @@ -0,0 +1,157 @@ +///| +fn hex_digit(b : Byte) -> Int? { + if b >= b'0' && b <= b'9' { + Some(b.to_int() - b'0'.to_int()) + } else if b >= b'a' && b <= b'f' { + Some(b.to_int() - b'a'.to_int() + 10) + } else if b >= b'A' && b <= b'F' { + Some(b.to_int() - b'A'.to_int() + 10) + } else { + None + } +} + +///| +pub fn url_decode(bytes : BytesView) -> String { + let len = bytes.length() + let res = Array::make(len, b'\x00') + let mut w = 0 + let mut r = 0 + while r < len { + let b = bytes[r] + if b == b'+' { + res[w] = b' ' + w = w + 1 + r = r + 1 + } else if b == b'%' && r + 2 < len { + match (hex_digit(bytes[r + 1]), hex_digit(bytes[r + 2])) { + (Some(h1), Some(h2)) => { + res[w] = (h1 * 16 + h2).to_byte() + w = w + 1 + r = r + 3 + } + _ => { + res[w] = b'%' + w = w + 1 + r = r + 1 + } + } + } else { + res[w] = b + w = w + 1 + r = r + 1 + } + } + let final_bytes = Bytes::from_array(res) + let view = final_bytes[0:w] + @encoding/utf8.decode(view) catch { + _ => "" + } +} + +///| +pub fn parse_form_data(bytes : BytesView) -> Map[String, String] { + let res = Map::new() + if bytes.length() == 0 { + return res + } + + // Split by '&' + let mut start = 0 + let len = bytes.length() + for i = 0; i < len; i = i + 1 { + if bytes[i] == b'&' { + let part = bytes[start:i] + parse_kv(part, res) + start = i + 1 + } + } + if start < len { + parse_kv(bytes[start:len], res) + } + res +} + +///| +fn parse_kv(part : BytesView, map : Map[String, String]) -> Unit { + let len = part.length() + let mut eq_idx = -1 + for i = 0; i < len; i = i + 1 { + if part[i] == b'=' { + eq_idx = i + break + } + } + if eq_idx != -1 { + let key_bytes = part[0:eq_idx] + let val_bytes = part[eq_idx + 1:len] + let key = url_decode(key_bytes) + let val = url_decode(val_bytes) + if key != "" { + map.set(key, val) + } + } else { + // No '=', treat as key with empty value? Or ignore? + // Usually key without value. + let key = url_decode(part) + if key != "" { + map.set(key, "") + } + } +} + +///| +fn is_unreserved(b : Byte) -> Bool { + (b >= b'A' && b <= b'Z') || + (b >= b'a' && b <= b'z') || + (b >= b'0' && b <= b'9') || + b == b'-' || + b == b'_' || + b == b'.' || + b == b'~' +} + +///| +pub fn url_encode(s : String) -> String { + let bytes = @encoding/utf8.encode(s) + let buf = StringBuilder::new() + for i = 0; i < bytes.length(); i = i + 1 { + let b = bytes[i] + if is_unreserved(b) { + match b.to_int().to_char() { + Some(c) => buf.write_char(c) + None => () + } + } else { + buf.write_char('%') + let hex_bytes = b"0123456789ABCDEF" + let h1 = (b.to_int() >> 4) & 0xF + let h2 = b.to_int() & 0xF + match hex_bytes[h1].to_int().to_char() { + Some(c) => buf.write_char(c) + None => () + } + match hex_bytes[h2].to_int().to_char() { + Some(c) => buf.write_char(c) + None => () + } + } + } + buf.to_string() +} + +///| +pub fn form_encode(map : Map[String, String]) -> String { + let buf = StringBuilder::new() + let mut first = true + map.each(fn(k, v) { + if not(first) { + buf.write_char('&') + } + first = false + buf.write_string(url_encode(k)) + buf.write_char('=') + buf.write_string(url_encode(v)) + }) + buf.to_string() +} diff --git a/src/utils_test.mbt b/src/utils_test.mbt new file mode 100644 index 0000000..25936bd --- /dev/null +++ b/src/utils_test.mbt @@ -0,0 +1,43 @@ +///| +test "url_decode" { + let encoded = @encoding/utf8.encode("Hello%20World%21") + let decoded = url_decode(encoded[:]) + assert_eq(decoded, "Hello World!") +} + +///| +test "url_decode_plus" { + let encoded = @encoding/utf8.encode("Hello+World") + let decoded = url_decode(encoded[:]) + assert_eq(decoded, "Hello World") +} + +///| +test "url_decode_utf8" { + // %E4%BD%A0%E5%A5%BD = 你好 + let encoded = @encoding/utf8.encode("%E4%BD%A0%E5%A5%BD") + let decoded = url_decode(encoded[:]) + assert_eq(decoded, "你好") +} + +///| +test "form_encode" { + let map = Map::new() + map.set("key", "value") + map.set("foo", "bar") + let encoded = form_encode(map) + // Order is not guaranteed in Map iteration, so we check if it contains parts + // But Map iteration order might be deterministic in MoonBit? + // Let's just check if it's one of the valid permutations + let valid1 = "key=value&foo=bar" + let valid2 = "foo=bar&key=value" + assert_true(encoded == valid1 || encoded == valid2) +} + +///| +test "parse_form_data" { + let encoded = @encoding/utf8.encode("key=value&foo=bar%20baz") + let map = parse_form_data(encoded[:]) + assert_eq(map.get("key"), Some("value")) + assert_eq(map.get("foo"), Some("bar baz")) +}