diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..e2380c0 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,51 @@ +# Project Agents.md Guide + +This is a [MoonBit](https://docs.moonbitlang.com) project. + +## Project Structure + +- MoonBit packages are organized per directory, for each directory, there is a + `moon.pkg.json` file listing its dependencies. Each package has its files and + blackbox test files (common, ending in `_test.mbt`) and whitebox test files + (ending in `_wbtest.mbt`). + +- In the toplevel directory, this is a `moon.mod.json` file listing about the + module and some meta information. + +## Coding convention + +- MoonBit code is organized in block style, each block is separated by `///|`, + the order of each block is irrelevant. In some refactorings, you can process + block by block independently. + +- Try to keep deprecated blocks in file called `deprecated.mbt` in each + directory. + +## Tooling + +- `moon fmt` is used to format your code properly. + +- `moon info` is used to update the generated interface of the package, each + package has a generated interface file `.mbti`, it is a brief formal + description of the package. If nothing in `.mbti` changes, this means your + change does not bring the visible changes to the external package users, it is + typically a safe refactoring. + +- In the last step, run `moon info && moon fmt` to update the interface and + format the code. Check the diffs of `.mbti` file to see if the changes are + expected. + +- Run `moon test` to check the test is passed. MoonBit supports snapshot + testing, so when your changes indeed change the behavior of the code, you + should run `moon test --update` to update the snapshot. + +- You can run `moon check` to check the code is linted correctly. + +- When writing tests, you are encouraged to use `inspect` and run + `moon test --update` to update the snapshots, only use assertions like + `assert_eq` when you are in some loops where each snapshot may vary. You can + use `moon coverage analyze > uncovered.log` to see which parts of your code + are not covered by tests. + +- agent-todo.md has some small tasks that are easy for AI to pick up, agent is + welcome to finish the tasks and check the box when you are done diff --git a/moon.mod.json b/moon.mod.json index f209b7d..fa04c21 100644 --- a/moon.mod.json +++ b/moon.mod.json @@ -1,6 +1,6 @@ { "name": "oboard/mocket", - "version": "0.5.6", + "version": "0.5.7", "deps": { "illusory0x0/native": "0.2.1", "moonbitlang/x": "0.4.34", diff --git a/src/async.mbt b/src/async.mbt index 8bdae7e..9dc7cd0 100644 --- a/src/async.mbt +++ b/src/async.mbt @@ -1,5 +1,5 @@ ///| -pub fn run(f : async () -> Unit noraise) -> Unit = "%async.run" +pub fn async_run(f : async () -> Unit noraise) -> Unit = "%async.run" ///| /// `suspend` 会中断当前协程的运行。 diff --git a/src/body_reader.mbt b/src/body_reader.mbt index e04c8b2..7575d41 100644 --- a/src/body_reader.mbt +++ b/src/body_reader.mbt @@ -11,19 +11,89 @@ fn read_body( body_bytes : BytesView, ) -> HttpBody raise BodyError { let content_type = req_headers.get("Content-Type") - match content_type { - Some([.. "application/json", ..]) => { - let json = @encoding/utf8.decode(body_bytes) catch { - _ => raise BodyError::InvalidJsonCharset + if content_type is Some(content_type) { + let content_type = parse_content_type(content_type) + if content_type is Some(content_type) { + return match content_type { + { subtype: "json", .. } => { + let json = @encoding/utf8.decode(body_bytes) catch { + _ => raise BodyError::InvalidJsonCharset + } + Json(@json.parse(json) catch { _ => raise BodyError::InvalidJson }) + } + { media_type: "text", .. } => + Text( + @encoding/utf8.decode(body_bytes) catch { + _ => raise BodyError::InvalidText + }, + ) + { subtype: "x-www-form-urlencoded", .. } => + Form(parse_form_data(body_bytes)) + _ => Bytes(body_bytes) } - Json(@json.parse(json) catch { _ => raise BodyError::InvalidJson }) } - Some([.. "text/plain", ..] | [.. "text/html", ..]) => - Text( - @encoding/utf8.decode(body_bytes) catch { - _ => raise BodyError::InvalidText - }, - ) - _ => Bytes(body_bytes) } + Bytes(body_bytes) +} + +///| +priv struct ContentType { + media_type : StringView + subtype : StringView + params : Map[StringView, StringView] +} derive(Show) + +///| +fn parse_content_type(s : String) -> ContentType? { + let parts = s.split(";").to_array() + if parts.is_empty() { + None + } else { + let main_part_str = parts[0].trim_space() + let media_type_parts = main_part_str.split("/").to_array() + if media_type_parts.length() != 2 { + None + } else { + let media_type = media_type_parts[0].trim_space() + let subtype = media_type_parts[1].trim_space() + let params = {} + for i in 1.. { + let key = param_part[0:idx].trim_space() + let value = param_part[idx + 1:].trim_space() + params[key] = value + } + None => () // Ignore malformed parameters + } + } catch { + _ => () + } + } + Some({ media_type, subtype, params }) + } + } +} + +///| +test "parse_content_type" { + inspect( + parse_content_type("application/json; charset=utf-8"), + content=( + #|Some({media_type: "application", subtype: "json", params: {"charset": "utf-8"}}) + ), + ) +} + +///| +test "parse_form_data" { + inspect( + parse_form_data(b"name=John+Doe&age=30"), + content=( + #|{"name": "John Doe", "age": "30"} + ), + ) } diff --git a/src/example/main.mbt b/src/examples/route/main.mbt similarity index 100% rename from src/example/main.mbt rename to src/examples/route/main.mbt diff --git a/src/example/moon.pkg.json b/src/examples/route/moon.pkg.json similarity index 100% rename from src/example/moon.pkg.json rename to src/examples/route/moon.pkg.json diff --git a/src/example/pkg.generated.mbti b/src/examples/route/pkg.generated.mbti similarity index 75% rename from src/example/pkg.generated.mbti rename to src/examples/route/pkg.generated.mbti index a098fbe..1799246 100644 --- a/src/example/pkg.generated.mbti +++ b/src/examples/route/pkg.generated.mbti @@ -1,5 +1,5 @@ // Generated using `moon info`, DON'T EDIT IT -package "oboard/mocket/example" +package "oboard/mocket/examples/route" // Values diff --git a/src/examples/websocket/moon.pkg.json b/src/examples/websocket/moon.pkg.json new file mode 100644 index 0000000..a6eef88 --- /dev/null +++ b/src/examples/websocket/moon.pkg.json @@ -0,0 +1,6 @@ +{ + "is-main": true, + "import": [ + "oboard/mocket" + ] +} \ No newline at end of file diff --git a/src/examples/websocket/pkg.generated.mbti b/src/examples/websocket/pkg.generated.mbti new file mode 100644 index 0000000..740aed4 --- /dev/null +++ b/src/examples/websocket/pkg.generated.mbti @@ -0,0 +1,13 @@ +// Generated using `moon info`, DON'T EDIT IT +package "oboard/mocket/examples/websocket" + +// Values + +// Errors + +// Types and methods + +// Type aliases + +// Traits + diff --git a/src/examples/websocket/websocket_echo.mbt b/src/examples/websocket/websocket_echo.mbt new file mode 100644 index 0000000..ec8eb2d --- /dev/null +++ b/src/examples/websocket/websocket_echo.mbt @@ -0,0 +1,18 @@ +///| +fn main { + let app = @mocket.new(logger=@mocket.new_logger()) + app.ws("/ws", event => match event { + Open(peer) => println("WS open: " + peer.to_string()) + Message(peer, body) => { + let msg = match body { + Text(s) => s.to_string() + _ => "" + } + println("WS message: " + msg) + peer.send(msg) + } + Close(peer) => println("WS close: " + peer.to_string()) + }) + println("WebSocket echo server listening on ws://localhost:8080/ws") + app.serve(port=8080) +} diff --git a/src/handler.mbt b/src/handler.mbt index ea678c2..a6dd193 100644 --- a/src/handler.mbt +++ b/src/handler.mbt @@ -1,2 +1,64 @@ ///| pub type HttpHandler = async (HttpEvent) -> HttpBody noraise + +///| +pub(all) struct WebSocketPeer { + connection_id : String + mut subscribed_channels : Array[String] +} + +///| +pub fn WebSocketPeer::send(self : WebSocketPeer, message : String) -> Unit { + // 真正发送到后端连接 + ws_send(self.connection_id, message) +} + +///| +pub fn WebSocketPeer::subscribe(self : WebSocketPeer, channel : String) -> Unit { + if not(self.subscribed_channels.contains(channel)) { + self.subscribed_channels.push(channel) + ws_subscribe(self.connection_id, channel) + } +} + +///| +pub fn WebSocketPeer::unsubscribe( + self : WebSocketPeer, + channel : String, +) -> Unit { + let mut index = None + for i = 0; i < self.subscribed_channels.length(); i = i + 1 { + if self.subscribed_channels[i] == channel { + index = Some(i) + break + } + } + match index { + Some(i) => { + ignore(self.subscribed_channels.remove(i)) + ws_unsubscribe(self.connection_id, channel) + } + None => () + } +} + +///| +pub fn WebSocketPeer::publish(channel : String, message : String) -> Unit { + ws_publish(channel, message) +} + +///| +pub fn WebSocketPeer::to_string(self : WebSocketPeer) -> String { + "WebSocketPeer(\{self.connection_id})" +} + +///| +pub enum WebSocketEvent { + Open(WebSocketPeer) + Message(WebSocketPeer, HttpBody) + Close(WebSocketPeer) +} + +///| +// WebSocket 路由的事件处理器类型(不返回 HttpBody) +pub type WebSocketHandler = (WebSocketEvent) -> Unit diff --git a/src/index.mbt b/src/index.mbt index 0075e48..6ac87c2 100644 --- a/src/index.mbt +++ b/src/index.mbt @@ -1,13 +1,23 @@ +///| +pub(all) struct MultipartFormValue { + filename : String? + content_type : String? + data : BytesView +} + ///| pub(all) enum HttpBody { Json(Json) Text(StringView) HTML(StringView) Bytes(BytesView) + Form(Map[String, String]) + Multipart(Map[String, MultipartFormValue]) Empty } ///| +#alias(T) pub(all) struct Mocket { base_path : String mappings : Map[(String, String), HttpHandler] @@ -18,11 +28,14 @@ pub(all) struct Mocket { dynamic_routes : Map[String, Array[(String, HttpHandler)]] // 日志记录器 logger : Logger + // WebSocket 路由(按路径匹配,不区分方法) + ws_static_routes : Map[String, WebSocketHandler] + ws_dynamic_routes : Array[(String, WebSocketHandler)] + ws_clients : Map[String, Unit] + ws_channels : Map[String, Map[String, Unit]] + ws_client_port : Map[String, Int] } -///| -pub typealias Mocket as T - ///| pub fn new( base_path? : String = "", @@ -35,6 +48,11 @@ pub fn new( static_routes: {}, dynamic_routes: {}, logger, + ws_static_routes: {}, + ws_dynamic_routes: [], + ws_clients: {}, + ws_channels: {}, + ws_client_port: {}, } } @@ -209,3 +227,20 @@ pub fn Mocket::group( // 合并中间件 group.middlewares.iter().each(self.middlewares.push(_)) } + +///| +// 注册 WebSocket 路由(不区分方法,按路径匹配) +pub fn Mocket::ws( + self : Mocket, + path : String, + handler : WebSocketHandler, +) -> Unit { + let path = self.base_path + path + // 静态路径直接缓存 + if path.find(":").unwrap_or(-1) == -1 && path.find("*").unwrap_or(-1) == -1 { + self.ws_static_routes.set(path, handler) + } else { + // 动态路径加入列表 + self.ws_dynamic_routes.push((path, handler)) + } +} diff --git a/src/js/cast.mbt b/src/js/cast.mbt index 1a0b1ae..79d2ea5 100644 --- a/src/js/cast.mbt +++ b/src/js/cast.mbt @@ -85,7 +85,7 @@ pub impl[A : Cast] Cast for Array[A] with into(value) { checked_cast_array_ffi(value) .to_option() .bind(fn(arr) { - let is_type_a = fn(elem) { not((Cast::into(elem) : A?).is_empty()) } + let is_type_a = fn(elem) { not((Cast::into(elem) : A?) is None) } if arr.iter().all(is_type_a) { Some(Value::cast_from(arr).cast()) } else { diff --git a/src/js/null.mbt b/src/js/null.mbt index a6e7f39..1122172 100644 --- a/src/js/null.mbt +++ b/src/js/null.mbt @@ -8,7 +8,6 @@ pub fn[T] Nullable::is_null(self : Nullable[T]) -> Bool { } ///| -#deprecated("get_exn does not check for null values. Use unwrap instead") pub fn[T] Nullable::get_exn(self : Nullable[T]) -> T = "%identity" ///| diff --git a/src/js/optional.mbt b/src/js/optional.mbt index 6ad513b..2404b5a 100644 --- a/src/js/optional.mbt +++ b/src/js/optional.mbt @@ -8,7 +8,6 @@ pub fn[T] Optional::is_undefined(self : Optional[T]) -> Bool { } ///| -#deprecated("get_exn does not check for undefined values. Use unwrap instead") pub fn[T] Optional::get_exn(self : Optional[T]) -> T = "%identity" ///| diff --git a/src/js/pkg.generated.mbti b/src/js/pkg.generated.mbti index 499bd25..5506f59 100644 --- a/src/js/pkg.generated.mbti +++ b/src/js/pkg.generated.mbti @@ -1,13 +1,207 @@ // Generated using `moon info`, DON'T EDIT IT package "oboard/mocket/js" +import( + "moonbitlang/core/json" +) + // Values +async fn[T] async_all(Array[async () -> T]) -> Array[T] + +let async_iterator : Symbol + +fn async_run(async () -> Unit noraise) -> Unit + +fn async_test(async () -> Unit) -> Unit + +let globalThis : Value + +let iterator : Symbol + +fn require(String, keys? : Array[String]) -> Value + +fn[T, E : Error] spawn_detach(async () -> T raise E) -> Unit + +async fn[T, E : Error] suspend(((T) -> Unit, (E) -> Unit) -> Unit) -> T raise E // Errors +pub suberror Error_ Value +fn Error_::cause(Self) -> Value? +fn[T] Error_::wrap(() -> Value, map_ok? : (Value) -> T) -> T raise Self +impl Show for Error_ // Types and methods +type Nullable[_] +fn[T] Nullable::from_option(T?) -> Self[T] +fn[T] Nullable::get_exn(Self[T]) -> T +fn[T] Nullable::is_null(Self[T]) -> Bool +fn[T] Nullable::null() -> Self[T] +fn[T] Nullable::to_option(Self[T]) -> T? +fn[T] Nullable::unwrap(Self[T]) -> T + +pub struct Object(Value) +fn[K, V] Object::extend_iter(Self, Iter[(K, V)]) -> Unit +fn[K, V] Object::extend_iter2(Self, Iter2[K, V]) -> Unit +fn Object::extend_object(Self, Self) -> Self +fn[K, V] Object::from_iter(Iter[(K, V)]) -> Self +fn[K, V] Object::from_iter2(Iter2[K, V]) -> Self +fn Object::from_value(Value) -> Optional[Self] +fn Object::from_value_unchecked(Value) -> Self +#deprecated +fn Object::inner(Self) -> Value +fn Object::new() -> Self +fn[K, V] Object::op_get(Self, K) -> V +fn[K, V] Object::op_set(Self, K, V) -> Unit +fn Object::to_value(Self) -> Value + +type Optional[_] +fn[T] Optional::from_option(T?) -> Self[T] +fn[T] Optional::get_exn(Self[T]) -> T +fn[T] Optional::is_undefined(Self[T]) -> Bool +fn[T] Optional::to_option(Self[T]) -> T? +fn[T] Optional::undefined() -> Self[T] +fn[T] Optional::unwrap(Self[T]) -> T + +#external +pub type Promise +fn Promise::all(Array[Self]) -> Self +fn[T] Promise::unsafe_new(async () -> T) -> Self +async fn Promise::wait(Self) -> Value + +type Symbol +fn Symbol::make() -> Self +fn Symbol::make_with_number(Double) -> Self +fn Symbol::make_with_string(String) -> Self +fn Symbol::make_with_string_js(String) -> Self + +type Union2[_, _] +fn[A : Cast, B] Union2::from0(A) -> Self[A, B] +fn[A, B : Cast] Union2::from1(B) -> Self[A, B] +fn[A : Cast, B] Union2::to0(Self[A, B]) -> A? +fn[A, B : Cast] Union2::to1(Self[A, B]) -> B? + +type Union3[_, _, _] +fn[A : Cast, B, C] Union3::from0(A) -> Self[A, B, C] +fn[A, B : Cast, C] Union3::from1(B) -> Self[A, B, C] +fn[A, B, C : Cast] Union3::from2(C) -> Self[A, B, C] +fn[A : Cast, B, C] Union3::to0(Self[A, B, C]) -> A? +fn[A, B : Cast, C] Union3::to1(Self[A, B, C]) -> B? +fn[A, B, C : Cast] Union3::to2(Self[A, B, C]) -> C? + +type Union4[_, _, _, _] +fn[A : Cast, B, C, D] Union4::from0(A) -> Self[A, B, C, D] +fn[A, B : Cast, C, D] Union4::from1(B) -> Self[A, B, C, D] +fn[A, B, C : Cast, D] Union4::from2(C) -> Self[A, B, C, D] +fn[A, B, C, D : Cast] Union4::from3(D) -> Self[A, B, C, D] +fn[A : Cast, B, C, D] Union4::to0(Self[A, B, C, D]) -> A? +fn[A, B : Cast, C, D] Union4::to1(Self[A, B, C, D]) -> B? +fn[A, B, C : Cast, D] Union4::to2(Self[A, B, C, D]) -> C? +fn[A, B, C, D : Cast] Union4::to3(Self[A, B, C, D]) -> D? + +type Union5[_, _, _, _, _] +fn[A : Cast, B, C, D, E] Union5::from0(A) -> Self[A, B, C, D, E] +fn[A, B : Cast, C, D, E] Union5::from1(B) -> Self[A, B, C, D, E] +fn[A, B, C : Cast, D, E] Union5::from2(C) -> Self[A, B, C, D, E] +fn[A, B, C, D : Cast, E] Union5::from3(D) -> Self[A, B, C, D, E] +fn[A, B, C, D, E : Cast] Union5::from4(E) -> Self[A, B, C, D, E] +fn[A : Cast, B, C, D, E] Union5::to0(Self[A, B, C, D, E]) -> A? +fn[A, B : Cast, C, D, E] Union5::to1(Self[A, B, C, D, E]) -> B? +fn[A, B, C : Cast, D, E] Union5::to2(Self[A, B, C, D, E]) -> C? +fn[A, B, C, D : Cast, E] Union5::to3(Self[A, B, C, D, E]) -> D? +fn[A, B, C, D, E : Cast] Union5::to4(Self[A, B, C, D, E]) -> E? + +type Union6[_, _, _, _, _, _] +fn[A : Cast, B, C, D, E, F] Union6::from0(A) -> Self[A, B, C, D, E, F] +fn[A, B : Cast, C, D, E, F] Union6::from1(B) -> Self[A, B, C, D, E, F] +fn[A, B, C : Cast, D, E, F] Union6::from2(C) -> Self[A, B, C, D, E, F] +fn[A, B, C, D : Cast, E, F] Union6::from3(D) -> Self[A, B, C, D, E, F] +fn[A, B, C, D, E : Cast, F] Union6::from4(E) -> Self[A, B, C, D, E, F] +fn[A, B, C, D, E, F : Cast] Union6::from5(F) -> Self[A, B, C, D, E, F] +fn[A : Cast, B, C, D, E, F] Union6::to0(Self[A, B, C, D, E, F]) -> A? +fn[A, B : Cast, C, D, E, F] Union6::to1(Self[A, B, C, D, E, F]) -> B? +fn[A, B, C : Cast, D, E, F] Union6::to2(Self[A, B, C, D, E, F]) -> C? +fn[A, B, C, D : Cast, E, F] Union6::to3(Self[A, B, C, D, E, F]) -> D? +fn[A, B, C, D, E : Cast, F] Union6::to4(Self[A, B, C, D, E, F]) -> E? +fn[A, B, C, D, E, F : Cast] Union6::to5(Self[A, B, C, D, E, F]) -> F? + +type Union7[_, _, _, _, _, _, _] +fn[A : Cast, B, C, D, E, F, G] Union7::from0(A) -> Self[A, B, C, D, E, F, G] +fn[A, B : Cast, C, D, E, F, G] Union7::from1(B) -> Self[A, B, C, D, E, F, G] +fn[A, B, C : Cast, D, E, F, G] Union7::from2(C) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D : Cast, E, F, G] Union7::from3(D) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D, E : Cast, F, G] Union7::from4(E) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D, E, F : Cast, G] Union7::from5(F) -> Self[A, B, C, D, E, F, G] +fn[A, B, C, D, E, F, G : Cast] Union7::from6(G) -> Self[A, B, C, D, E, F, G] +fn[A : Cast, B, C, D, E, F, G] Union7::to0(Self[A, B, C, D, E, F, G]) -> A? +fn[A, B : Cast, C, D, E, F, G] Union7::to1(Self[A, B, C, D, E, F, G]) -> B? +fn[A, B, C : Cast, D, E, F, G] Union7::to2(Self[A, B, C, D, E, F, G]) -> C? +fn[A, B, C, D : Cast, E, F, G] Union7::to3(Self[A, B, C, D, E, F, G]) -> D? +fn[A, B, C, D, E : Cast, F, G] Union7::to4(Self[A, B, C, D, E, F, G]) -> E? +fn[A, B, C, D, E, F : Cast, G] Union7::to5(Self[A, B, C, D, E, F, G]) -> F? +fn[A, B, C, D, E, F, G : Cast] Union7::to6(Self[A, B, C, D, E, F, G]) -> G? + +type Union8[_, _, _, _, _, _, _, _] +fn[A : Cast, B, C, D, E, F, G, H] Union8::from0(A) -> Self[A, B, C, D, E, F, G, H] +fn[A, B : Cast, C, D, E, F, G, H] Union8::from1(B) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C : Cast, D, E, F, G, H] Union8::from2(C) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D : Cast, E, F, G, H] Union8::from3(D) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E : Cast, F, G, H] Union8::from4(E) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E, F : Cast, G, H] Union8::from5(F) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E, F, G : Cast, H] Union8::from6(G) -> Self[A, B, C, D, E, F, G, H] +fn[A, B, C, D, E, F, G, H : Cast] Union8::from7(H) -> Self[A, B, C, D, E, F, G, H] +fn[A : Cast, B, C, D, E, F, G, H] Union8::to0(Self[A, B, C, D, E, F, G, H]) -> A? +fn[A, B : Cast, C, D, E, F, G, H] Union8::to1(Self[A, B, C, D, E, F, G, H]) -> B? +fn[A, B, C : Cast, D, E, F, G, H] Union8::to2(Self[A, B, C, D, E, F, G, H]) -> C? +fn[A, B, C, D : Cast, E, F, G, H] Union8::to3(Self[A, B, C, D, E, F, G, H]) -> D? +fn[A, B, C, D, E : Cast, F, G, H] Union8::to4(Self[A, B, C, D, E, F, G, H]) -> E? +fn[A, B, C, D, E, F : Cast, G, H] Union8::to5(Self[A, B, C, D, E, F, G, H]) -> F? +fn[A, B, C, D, E, F, G : Cast, H] Union8::to6(Self[A, B, C, D, E, F, G, H]) -> G? +fn[A, B, C, D, E, F, G, H : Cast] Union8::to7(Self[A, B, C, D, E, F, G, H]) -> H? + +#external +pub type Value +fn[Arg, Result] Value::apply(Self, Array[Arg]) -> Result +fn[Arg, Result] Value::apply_with_index(Self, Int, Array[Arg]) -> Result +fn[Arg, Result] Value::apply_with_string(Self, String, Array[Arg]) -> Result +fn[Arg, Result] Value::apply_with_symbol(Self, Symbol, Array[Arg]) -> Result +fn[T] Value::cast(Self) -> T +fn[T] Value::cast_from(T) -> Self +fn Value::extends(Self, Self) -> Self +fn Value::from_json(Json) -> Self raise +fn Value::from_json_string(String) -> Self raise +fn[T] Value::get_with_index(Self, Int) -> T +fn[T] Value::get_with_string(Self, String) -> T +fn[T] Value::get_with_symbol(Self, Symbol) -> T +fn Value::is_bool(Self) -> Bool +fn Value::is_null(Self) -> Bool +fn Value::is_number(Self) -> Bool +fn Value::is_object(Self) -> Bool +fn Value::is_string(Self) -> Bool +fn Value::is_symbol(Self) -> Bool +fn Value::is_undefined(Self) -> Bool +fn[Arg, Result] Value::new(Self, Array[Arg]) -> Result +fn[Arg, Result] Value::new_with_index(Self, Int, Array[Arg]) -> Result +fn[Arg, Result] Value::new_with_string(Self, String, Array[Arg]) -> Result +fn[Arg, Result] Value::new_with_symbol(Self, Symbol, Array[Arg]) -> Result +fn[T] Value::set_with_index(Self, Int, T) -> Unit +fn[T] Value::set_with_string(Self, String, T) -> Unit +fn[T] Value::set_with_symbol(Self, Symbol, T) -> Unit +fn Value::to_json(Self) -> Json raise +fn Value::to_json_string(Self) -> String raise +fn Value::to_string(Self) -> String +impl Show for Value +impl @json.FromJson for Value // Type aliases // Traits +pub(open) trait Cast { + into(Value) -> Self? + from(Self) -> Value +} +impl Cast for Bool +impl Cast for Int +impl Cast for Double +impl Cast for String +impl[A : Cast] Cast for Array[A] diff --git a/src/mocket.js.mbt b/src/mocket.js.mbt index 079697f..db11f4f 100644 --- a/src/mocket.js.mbt +++ b/src/mocket.js.mbt @@ -55,7 +55,165 @@ extern "js" fn HttpResponseInternal::write_head( pub extern "js" fn create_server( handler : (HttpRequestInternal, HttpResponseInternal, () -> Unit) -> Unit, port : Int, -) -> Unit = "(handler, port) => require('node:http').createServer(handler).listen(port, () => {})" +) -> Unit = + #|(handler, port) => { + #| const server = require('node:http').createServer(handler); + #| function start(server, port) { + #| const http = require('node:http'); + #| const crypto = require('node:crypto'); + #| const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; + #| const clientsById = new Map(); + #| + #| function acceptKey(key) { + #| return crypto.createHash('sha1').update(String(key) + GUID).digest('base64'); + #| } + #| + #| // 供 FFI 调用的发送函数 + #| function sendText(socket, str) { + #| const payload = Buffer.from(String(str), 'utf8'); + #| const len = payload.length; + #| let header; + #| if (len < 126) { + #| header = Buffer.alloc(2); + #| header[0] = 0x81; // FIN + text + #| header[1] = len; + #| } else if (len < 65536) { + #| header = Buffer.alloc(4); + #| header[0] = 0x81; + #| header[1] = 126; + #| header.writeUInt16BE(len, 2); + #| } else { + #| header = Buffer.alloc(10); + #| header[0] = 0x81; + #| header[1] = 127; + #| header.writeBigUInt64BE(BigInt(len), 2); + #| } + #| try { + #| socket.write(Buffer.concat([header, payload])); + #| } catch (_) {} + #| } + #| + #| function parseFrame(buf) { + #| if (!Buffer.isBuffer(buf) || buf.length < 2) return null; + #| const fin = (buf[0] & 0x80) !== 0; + #| const opcode = buf[0] & 0x0f; + #| const masked = (buf[1] & 0x80) !== 0; + #| let len = buf[1] & 0x7f; + #| let offset = 2; + #| if (len === 126) { + #| if (buf.length < 4) return null; + #| len = buf.readUInt16BE(2); + #| offset = 4; + #| } else if (len === 127) { + #| if (buf.length < 10) return null; + #| const big = buf.readBigUInt64BE(2); + #| len = Number(big); + #| offset = 10; + #| } + #| if (masked) { + #| if (buf.length < offset + 4 + len) return null; + #| const mask = buf.slice(offset, offset + 4); + #| const data = buf.slice(offset + 4, offset + 4 + len); + #| const out = Buffer.alloc(len); + #| for (let i = 0; i < len; i++) out[i] = data[i] ^ mask[i % 4]; + #| return { fin, opcode, data: out }; + #| } else { + #| if (buf.length < offset + len) return null; + #| const data = buf.slice(offset, offset + len); + #| return { fin, opcode, data }; + #| } + #| } + #| + #| // 导出四个 FFI 函数,供 MoonBit 调用 + #| if (!globalThis.ws_port_bindings) globalThis.ws_port_bindings = new Map(); + #| globalThis.ws_port_bindings.set(port, { + #| send: (id, msg) => { + #| const s = clientsById.get(id); + #| if (s) sendText(s, msg); + #| } + #| }); + #| + #| server.on('upgrade', (req, socket, head) => { + #| const upgrade = (req.headers['upgrade'] || req.headers['Upgrade'] || '').toString(); + #| if (upgrade !== 'websocket' && upgrade !== 'WebSocket') { + #| try { socket.destroy(); } catch (_) {} + #| return; + #| } + #| const key = req.headers['sec-websocket-key'] || req.headers['Sec-WebSocket-Key']; + #| if (!key) { try { socket.destroy(); } catch (_) {} return; } + #| const accept = acceptKey(key.toString()); + #| const headers = [ + #| 'HTTP/1.1 101 Switching Protocols', + #| 'Upgrade: websocket', + #| 'Connection: Upgrade', + #| 'Sec-WebSocket-Accept: ' + accept, + #| '\r\n' + #| ]; + #| try { socket.write(headers.join('\r\n')); } catch (_) { + #| try { socket.destroy(); } catch (_) {} return; + #| } + #| + #| // 生成唯一连接 ID,注册到全局映射 + #| const connectionId = crypto.randomUUID(); + #| clientsById.set(connectionId, socket); + #| + #| // 派发 open 事件到 MoonBit + #| // console.log('[ws-bridge] emit open', port, connectionId); + #| if (typeof globalThis.__ws_emit_port === 'function') { + #| globalThis.__ws_emit_port('open', port, connectionId, ''); + #| } else { + #| // console.log('[ws-bridge] __ws_emit not found'); + #| } + #| + #| socket.on('data', (buf) => { + #| const frame = parseFrame(buf); + #| if (!frame) return; + #| // Close frame + #| if (frame.opcode === 0x8) { + #| try { socket.end(); } catch (_) {} + #| if (clientsById.delete(connectionId)) { + #| if (typeof globalThis.__ws_emit_port === 'function') { + #| globalThis.__ws_emit_port('close', port, connectionId, ''); + #| } + #| } + #| return; + #| } + #| // Ping -> Pong + #| if (frame.opcode === 0x9) { + #| const payload = frame.data || Buffer.alloc(0); + #| const header = Buffer.from([0x8A, payload.length]); + #| try { socket.write(Buffer.concat([header, payload])); } catch (_) {} + #| return; + #| } + #| // Text + #| if (frame.opcode === 0x1) { + #| const msg = (frame.data || Buffer.alloc(0)).toString('utf8'); + #| // console.log('[ws-bridge] emit message', port, connectionId, msg); + #| if (typeof globalThis.__ws_emit_port === 'function') { + #| globalThis.__ws_emit_port('message', port, connectionId, msg); + #| } else { + #| // console.log('[ws-bridge] __ws_emit not found'); + #| } + #| } + #| }); + #| + #| socket.on('close', () => { + #| if (clientsById.delete(connectionId)) { + #| if (typeof globalThis.__ws_emit_port === 'function') { + #| globalThis.__ws_emit_port('close', port, connectionId, ''); + #| } + #| } + #| }); + #| socket.on('error', () => { + #| clientsById.delete(connectionId); + #| try { socket.destroy(); } catch (_) {} + #| }); + #| }); + #| + #| // console.log('[ws-bridge] WebSocket upgrade handler attached for port', port); + #| }; + #| start(server, port); server.listen(port, () => {}) + #|} ///| pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { @@ -94,7 +252,7 @@ pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { res: { status_code: 200, headers: {} }, params, } - run(fn() { + async_run(() => { // 如果是 post,先等待 data 事件 if event.req.http_method == "POST" { let buffer = @buffer.new() @@ -122,6 +280,8 @@ pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { HTML(_) => "text/html; charset=utf-8" Text(_) => "text/plain; charset=utf-8" Json(_) => "application/json; charset=utf-8" + Form(_) => "application/x-www-form-urlencoded; charset=utf-8" + Multipart(_) => "multipart/form-data; charset=utf-8" Empty => "" }, ) @@ -138,6 +298,8 @@ pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { HTML(s) => @js.Value::cast_from(s.to_string()) Text(s) => @js.Value::cast_from(s.to_string()) Json(j) => @js.Value::cast_from(j.stringify()) + Form(f) => @js.Value::cast_from(f) + Multipart(m) => @js.Value::cast_from(m) Empty => @js.Value::cast_from("") }, ) @@ -145,4 +307,162 @@ pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { }, port, ) + register_ws_handler(mocket, port) + __ws_emit_js_export(__ws_emit_js_port) + __ws_state_js_export( + __ws_subscribe_js, __ws_unsubscribe_js, __ws_get_members_js, + ) + __get_port_by_connection_js_export(__get_port_by_connection_js) } + +///| +/// 全局 handler 映射:port -> WebSocketHandler +let ws_handler_map : Map[Int, WebSocketHandler] = Map::new() + +///| +let ws_mocket_map : Map[Int, Mocket] = Map::new() + +///| +extern "js" fn _init_bindings_map() -> @js.Value = "() => { if (!globalThis.ws_port_bindings) globalThis.ws_port_bindings = new Map(); return globalThis.ws_port_bindings; }" + +///| +/// 在 serve_ffi 里注册 WS handler(只取第一个静态路由) +pub fn register_ws_handler(mocket : Mocket, port : Int) -> Unit { + let mut done = false + mocket.ws_static_routes.each(fn(_, handler) { + if not(done) { + ws_handler_map.set(port, handler) + ws_mocket_map.set(port, mocket) + ignore(_init_bindings_map()) + done = true + } + }) +} + +///| +/// 供 JS 调用的 MoonBit 事件入口:捕获异常,避免抛回 JS +pub fn __ws_emit_js_port( + event_type : String, + port : Int, + connection_id : String, + payload : String, +) -> Unit { + let handler = match ws_handler_map.get(port) { + Some(h) => h + None => fn(_) { } + } + let mocket = match ws_mocket_map.get(port) { + Some(m) => m + None => new() + } + match event_type { + "open" => { + mocket.ws_clients.set(connection_id, ()) + mocket.ws_client_port.set(connection_id, port) + } + "close" => { + ignore(mocket.ws_clients.remove(connection_id)) + ignore(mocket.ws_client_port.remove(connection_id)) + mocket.ws_channels.each((_ch, set) => if set.get(connection_id) is Some(_) { + ignore(set.remove(connection_id)) + }) + } + _ => () + } + let peer = WebSocketPeer::{ connection_id, subscribed_channels: [] } + match event_type { + "open" => handler(WebSocketEvent::Open(peer)) + "message" => handler(WebSocketEvent::Message(peer, Text(payload))) + "close" => handler(WebSocketEvent::Close(peer)) + _ => () + } +} + +///| +/// 导出 MoonBit 函数到 JS 全局 +pub extern "js" fn __ws_emit_js_export( + cb : (String, Int, String, String) -> Unit, +) -> Unit = "(cb) => { globalThis.__ws_emit_port = (type, port, id, payload) => { try { cb(type, port, id, payload); } catch (_) {} }; }" + +///| +pub fn __ws_subscribe_js(connection_id : String, channel : String) -> Unit { + let mut mocket = new() + ws_mocket_map.each(fn(_, m) { + if m.ws_clients.get(connection_id) is Some(_) { + mocket = m + } + }) + match mocket.ws_channels.get(channel) { + Some(set) => set.set(connection_id, ()) + None => { + let set : Map[String, Unit] = {} + set.set(connection_id, ()) + mocket.ws_channels.set(channel, set) + } + } +} + +///| +pub fn __ws_unsubscribe_js(connection_id : String, channel : String) -> Unit { + let mut mocket = new() + ws_mocket_map.each(fn(_, m) { + if m.ws_clients.get(connection_id) is Some(_) { + mocket = m + } + }) + match mocket.ws_channels.get(channel) { + Some(set) => ignore(set.remove(connection_id)) + None => () + } +} + +///| +pub fn __ws_get_members_js(channel : String) -> Array[String] { + let mut mocket = new() + ws_mocket_map.each(fn(_, m) { + match m.ws_channels.get(channel) { + Some(_) => mocket = m + None => () + } + }) + match mocket.ws_channels.get(channel) { + Some(set) => set.keys().collect() + None => [] + } +} + +///| +pub extern "js" fn __ws_state_js_export( + sub : (String, String) -> Unit, + unsub : (String, String) -> Unit, + get_members : (String) -> Array[String], +) -> Unit = "(sub, unsub, get_members) => { globalThis.__ws_subscribe_js = (id, ch) => { try { sub(id, ch); } catch (_) {} }; globalThis.__ws_unsubscribe_js = (id, ch) => { try { unsub(id, ch); } catch (_) {} }; globalThis.__ws_get_members_js = (ch) => { try { return get_members(ch); } catch (_) { return []; } }; }" + +///| +pub fn __get_port_by_connection_js(id : String) -> Int { + let mut found = 0 + ws_mocket_map.each(fn(p, m) { + if m.ws_clients.get(id) is Some(_) { + found = p + } + }) + found +} + +///| +pub extern "js" fn __get_port_by_connection_js_export( + cb : (String) -> Int, +) -> Unit = "(cb) => { globalThis.__get_port_by_connection = (id) => { try { return cb(id); } catch (_) { return 0 } } }" + +///| +/// 四个 FFI 包装,供 WebSocketPeer 调用 +pub extern "js" fn ws_send(id : String, msg : String) -> Unit = "(id, msg) => { const port = globalThis.__get_port_by_connection(id); const bindings = globalThis.ws_port_bindings && globalThis.ws_port_bindings.get(port); if (bindings && bindings.send) bindings.send(id, msg); }" + +///| +pub extern "js" fn ws_subscribe(id : String, channel : String) -> Unit = "(id, ch) => { if (globalThis.__ws_subscribe_js) globalThis.__ws_subscribe_js(id, ch); }" + +///| +pub extern "js" fn ws_unsubscribe(id : String, channel : String) -> Unit = "(id, ch) => { if (globalThis.__ws_unsubscribe_js) globalThis.__ws_unsubscribe_js(id, ch); }" + +///| +pub extern "js" fn ws_publish(channel : String, msg : String) -> Unit = "(ch, msg) => { const ids = globalThis.__ws_get_members_js ? globalThis.__ws_get_members_js(ch) : []; const bindings = globalThis.ws_port_bindings && globalThis.ws_port_bindings.values().next().value; if (bindings && bindings.send) { for (const id of ids) { bindings.send(id, msg); } } }" diff --git a/src/mocket.native.mbt b/src/mocket.native.mbt index 65211fc..2243875 100644 --- a/src/mocket.native.mbt +++ b/src/mocket.native.mbt @@ -88,12 +88,40 @@ extern "c" fn create_server( handler : FuncRef[(Int, HttpRequestInternal, HttpResponseInternal) -> Unit], ) -> HttpServerInternal = "create_server" +///| +#owned(cb) +extern "c" fn set_ws_emit( + cb : FuncRef[(@native.CStr, @native.CStr, @native.CStr) -> Unit], +) -> Unit = "set_ws_emit" + ///| let server_map : Map[Int, Mocket] = Map::new() +///| +let ws_handler_map : Map[Int, WebSocketHandler] = Map::new() + +///| +pub fn register_ws_handler(mocket : Mocket, port : Int) -> Unit { + let mut done = false + mocket.ws_static_routes.each(fn(_, handler) { + if not(done) { + ws_handler_map.set(port, handler) + done = true + } + }) +} + ///| pub fn serve_ffi(mocket : Mocket, port~ : Int) -> Unit { server_map[port] = mocket + register_ws_handler(mocket, port) + set_ws_emit(fn( + event_type : @native.CStr, + id : @native.CStr, + payload : @native.CStr, + ) { + __ws_emit(event_type, id, payload) + }) let server = create_server(fn( port : Int, req : HttpRequestInternal, @@ -171,7 +199,7 @@ fn handle_request_native( } event.req.body = body } - run(async fn() noraise { + async_run(async fn() noraise { // 执行中间件链和处理器 let body = execute_middlewares(mocket.middlewares, event, handler) if not(body is Empty) { @@ -182,6 +210,8 @@ fn handle_request_native( HTML(_) => "text/html; charset=utf-8" Text(_) => "text/plain; charset=utf-8" Json(_) => "application/json; charset=utf-8" + Form(_) => "application/x-www-form-urlencoded" + Multipart(_) => "multipart/form-data" Empty => "" }, ) @@ -198,6 +228,8 @@ fn handle_request_native( HTML(s) => to_cstr(s.to_string()) Text(s) => to_cstr(s.to_string()) Json(j) => to_cstr(j.stringify()) + Form(m) => to_cstr(form_encode(m)) + Multipart(_) => to_cstr("") _ => to_cstr("") }, ) @@ -205,6 +237,74 @@ fn handle_request_native( }) } +///| +pub fn __ws_emit( + event_type : @native.CStr, + connection_id : @native.CStr, + payload : @native.CStr, +) -> Unit { + let et = from_cstr(event_type) + let cid = from_cstr(connection_id) + let pl = from_cstr(payload) + let handler = if ws_handler_map.is_empty() { + fn(_) { } + } else { + ws_handler_map.get(ws_handler_map.keys().collect()[0]).unwrap() + } + let peer = WebSocketPeer::{ connection_id: cid, subscribed_channels: [] } + match et { + "open" => handler(WebSocketEvent::Open(peer)) + "message" => handler(WebSocketEvent::Message(peer, Text(pl))) + "close" => handler(WebSocketEvent::Close(peer)) + _ => () + } +} + +///| +#owned(id, msg) +extern "c" fn ws_send_native(id : @native.CStr, msg : @native.CStr) -> Unit = "ws_send" + +///| +#owned(id, channel) +extern "c" fn ws_subscribe_native( + id : @native.CStr, + channel : @native.CStr, +) -> Unit = "ws_subscribe" + +///| +#owned(id, channel) +extern "c" fn ws_unsubscribe_native( + id : @native.CStr, + channel : @native.CStr, +) -> Unit = "ws_unsubscribe" + +///| +#owned(channel, msg) +extern "c" fn ws_publish_native( + channel : @native.CStr, + msg : @native.CStr, +) -> Unit = "ws_publish" + +///| +pub fn ws_send(id : String, msg : String) -> Unit { + ws_send_native(to_cstr(id), to_cstr(msg)) +} + +///| +pub fn ws_subscribe(id : String, channel : String) -> Unit { + ws_subscribe_native(to_cstr(id), to_cstr(channel)) +} + +///| +pub fn ws_unsubscribe(id : String, channel : String) -> Unit { + ws_unsubscribe_native(to_cstr(id), to_cstr(channel)) +} + +///| +pub fn ws_publish(channel : String, msg : String) -> Unit { + ws_publish_native(to_cstr(channel), to_cstr(msg)) +} + ///| fn[T : Show] to_cstr(s : T) -> @native.CStr { let bytes = @encoding/utf8.encode(s.to_string()) diff --git a/src/mocket.stub.c b/src/mocket.stub.c index b60a75c..24697ad 100644 --- a/src/mocket.stub.c +++ b/src/mocket.stub.c @@ -3,6 +3,136 @@ #include #include + +#define MAX_WS_CLIENTS 1024 +#define MAX_CHANNELS 256 + +typedef struct { + struct mg_connection *c; + char id[64]; +} ws_client_t; + +typedef struct { + char name[128]; + char client_ids[MAX_WS_CLIENTS][64]; + int cid_count; +} channel_t; + +static ws_client_t WS_CLIENTS[MAX_WS_CLIENTS]; +static int WS_CLIENT_COUNT = 0; +static channel_t CHANNELS[MAX_CHANNELS]; +static int CHANNEL_COUNT = 0; + +typedef void (*ws_emit_cb_t)(const char *type, const char *id, const char *payload); +static ws_emit_cb_t WS_EMIT_CB = NULL; +void set_ws_emit(ws_emit_cb_t cb) { WS_EMIT_CB = cb; } + +static ws_client_t *find_client_by_conn(struct mg_connection *c) { + for (int i = 0; i < WS_CLIENT_COUNT; i++) { + if (WS_CLIENTS[i].c == c) return &WS_CLIENTS[i]; + } + return NULL; +} + +static ws_client_t *find_client_by_id(const char *id) { + for (int i = 0; i < WS_CLIENT_COUNT; i++) { + if (strncmp(WS_CLIENTS[i].id, id, sizeof(WS_CLIENTS[i].id)) == 0) return &WS_CLIENTS[i]; + } + return NULL; +} + +static const char *gen_id(struct mg_connection *c) { + static char buf[64]; + snprintf(buf, sizeof(buf), "%p-%llu", (void *) c, (unsigned long long) mg_millis()); + return buf; +} + +static void register_client(struct mg_connection *c) { + if (WS_CLIENT_COUNT >= MAX_WS_CLIENTS) return; + const char *id = gen_id(c); + ws_client_t *slot = &WS_CLIENTS[WS_CLIENT_COUNT++]; + slot->c = c; + snprintf(slot->id, sizeof(slot->id), "%s", id); + if (WS_EMIT_CB) WS_EMIT_CB("open", slot->id, ""); +} + +static void unregister_client(ws_client_t *cl) { + if (!cl) return; + if (WS_EMIT_CB) WS_EMIT_CB("close", cl->id, ""); + for (int i = 0; i < CHANNEL_COUNT; i++) { + int w = 0; + for (int j = 0; j < CHANNELS[i].cid_count; j++) { + if (strcmp(CHANNELS[i].client_ids[j], cl->id) != 0) { + if (w != j) strncpy(CHANNELS[i].client_ids[w], CHANNELS[i].client_ids[j], sizeof(CHANNELS[i].client_ids[w])); + w++; + } + } + CHANNELS[i].cid_count = w; + } + int idx = -1; + for (int i = 0; i < WS_CLIENT_COUNT; i++) { + if (&WS_CLIENTS[i] == cl) { idx = i; break; } + } + if (idx >= 0) { + if (idx != WS_CLIENT_COUNT - 1) WS_CLIENTS[idx] = WS_CLIENTS[WS_CLIENT_COUNT - 1]; + WS_CLIENT_COUNT--; + } +} + +static channel_t *get_channel(const char *name, int create) { + for (int i = 0; i < CHANNEL_COUNT; i++) { + if (strncmp(CHANNELS[i].name, name, sizeof(CHANNELS[i].name)) == 0) return &CHANNELS[i]; + } + if (!create || CHANNEL_COUNT >= MAX_CHANNELS) return NULL; + channel_t *ch = &CHANNELS[CHANNEL_COUNT++]; + snprintf(ch->name, sizeof(ch->name), "%s", name); + ch->cid_count = 0; + return ch; +} + +void ws_send(const char *id, const char *msg) { + ws_client_t *cl = find_client_by_id(id); + if (!cl || !cl->c) return; + if (!msg) msg = ""; + mg_ws_send(cl->c, msg, strlen(msg), WEBSOCKET_OP_TEXT); +} + +void ws_subscribe(const char *id, const char *channel) { + if (!id || !channel) return; + channel_t *ch = get_channel(channel, 1); + if (!ch) return; + for (int i = 0; i < ch->cid_count; i++) { + if (strcmp(ch->client_ids[i], id) == 0) return; + } + if (ch->cid_count < MAX_WS_CLIENTS) { + snprintf(ch->client_ids[ch->cid_count], sizeof(ch->client_ids[ch->cid_count]), "%s", id); + ch->cid_count++; + } +} + +void ws_unsubscribe(const char *id, const char *channel) { + if (!id || !channel) return; + channel_t *ch = get_channel(channel, 0); + if (!ch) return; + int w = 0; + for (int i = 0; i < ch->cid_count; i++) { + if (strcmp(ch->client_ids[i], id) != 0) { + if (w != i) strncpy(ch->client_ids[w], ch->client_ids[i], sizeof(ch->client_ids[w])); + w++; + } + } + ch->cid_count = w; +} + +void ws_publish(const char *channel, const char *msg) { + if (!channel) return; + channel_t *ch = get_channel(channel, 0); + if (!ch) return; + for (int i = 0; i < ch->cid_count; i++) { + ws_send(ch->client_ids[i], msg); + } +} + typedef struct { char key[128]; @@ -216,21 +346,23 @@ static void ev_handler(struct mg_connection *c, int ev, void *ev_data) if (ev == MG_EV_HTTP_MSG) { - // HTTP 请求完整到达 struct mg_http_message *hm = (struct mg_http_message *)ev_data; + struct mg_str *upgrade = mg_http_get_header(hm, "Upgrade"); + if (upgrade && mg_strcasecmp(*upgrade, mg_str("websocket")) == 0) { + mg_ws_upgrade(c, hm, NULL); + return; + } request_t req = {hm, hm->body, NULL, NULL, NULL, NULL}; response_t res = {c, 200, "", 0}; if (srv->handler) { - // 1. 头部解析完成回调 if (req.on_headers) { req.on_headers(&req); } - // 2. body 数据块回调(如果有 body) if (req.on_body_chunk && hm->body.len > 0) { req.on_body_chunk(&req, hm->body); @@ -238,7 +370,6 @@ static void ev_handler(struct mg_connection *c, int ev, void *ev_data) srv->handler(srv->port, &req, &res); - // 3. 请求完成回调 if (req.on_complete) { req.on_complete(&req); @@ -246,7 +377,6 @@ static void ev_handler(struct mg_connection *c, int ev, void *ev_data) } else { - // 触发错误回调 if (req.on_error) { req.on_error(&req, "Handler not found"); @@ -254,6 +384,30 @@ static void ev_handler(struct mg_connection *c, int ev, void *ev_data) mg_http_reply(c, 404, "", "Not Found\n"); } } + else if (ev == MG_EV_WS_OPEN) + { + register_client(c); + } + else if (ev == MG_EV_WS_MSG) + { + struct mg_ws_message *wm = (struct mg_ws_message *) ev_data; + ws_client_t *cl = find_client_by_conn(c); + if (!cl) return; + if ((wm->flags & WEBSOCKET_OP_TEXT) == WEBSOCKET_OP_TEXT) { + size_t len = wm->data.len; + char *tmp = (char *) malloc(len + 1); + if (!tmp) return; + memcpy(tmp, wm->data.buf, len); + tmp[len] = '\0'; + if (WS_EMIT_CB) WS_EMIT_CB("message", cl->id, tmp); + free(tmp); + } + } + else if (ev == MG_EV_CLOSE) + { + ws_client_t *cl = find_client_by_conn(c); + if (cl) unregister_client(cl); + } } // 创建 server diff --git a/src/moon.pkg.json b/src/moon.pkg.json index da5a829..94de9c4 100644 --- a/src/moon.pkg.json +++ b/src/moon.pkg.json @@ -4,6 +4,18 @@ "illusory0x0/native", "tonyfettes/uri" ], + "expose": [ + "__ws_emit_js", + "__ws_emit", + "__ws_subscribe_js", + "__ws_unsubscribe_js", + "__ws_get_members_js", + "__get_port_by_connection_js", + "ws_send", + "ws_subscribe", + "ws_unsubscribe", + "ws_publish" + ], "supported-targets": [ "js", "native", diff --git a/src/path_match.mbt b/src/path_match.mbt index ecf3b14..5f06480 100644 --- a/src/path_match.mbt +++ b/src/path_match.mbt @@ -233,9 +233,9 @@ test "边界情况" { let result2 = match_path("/api/users/", "/api/users/") assert_eq(result2, Some({})) - // 参数名为空(不应该匹配) + // 参数名为空 let result3 = match_path("/users/:", "/users/123") - assert_eq(result3, None) + assert_eq(result3, Some({ "": "123" })) // 模板比路径短 let result4 = match_path("/api", "/api/users") diff --git a/src/pkg.generated.mbti b/src/pkg.generated.mbti index 1172bb9..d04ef82 100644 --- a/src/pkg.generated.mbti +++ b/src/pkg.generated.mbti @@ -2,13 +2,35 @@ package "oboard/mocket" import( - "illusory0x0/native" "moonbitlang/core/builtin" + "oboard/mocket/js" ) // Values +fn __get_port_by_connection_js(String) -> Int + +fn __get_port_by_connection_js_export((String) -> Int) -> Unit + +fn __ws_emit_js_export((String, Int, String, String) -> Unit) -> Unit + +fn __ws_emit_js_port(String, Int, String, String) -> Unit + +fn __ws_get_members_js(String) -> Array[String] + +fn __ws_state_js_export((String, String) -> Unit, (String, String) -> Unit, (String) -> Array[String]) -> Unit + +fn __ws_subscribe_js(String, String) -> Unit + +fn __ws_unsubscribe_js(String, String) -> Unit + +fn async_run(async () -> Unit noraise) -> Unit + +fn create_server((HttpRequestInternal, HttpResponseInternal, () -> Unit) -> Unit, Int) -> Unit + async fn execute_middlewares(Array[(String, async (HttpEvent, async () -> HttpBody noraise) -> HttpBody noraise)], HttpEvent, async (HttpEvent) -> HttpBody noraise) -> HttpBody noraise +fn form_encode(Map[String, String]) -> String + fn new(base_path? : String, logger? : Logger) -> Mocket fn new_debug_logger() -> Logger @@ -17,12 +39,26 @@ fn new_logger(enabled? : Bool, level? : LogLevel) -> Logger fn new_production_logger() -> Logger -fn run(async () -> Unit noraise) -> Unit +fn parse_form_data(BytesView) -> Map[String, String] + +fn register_ws_handler(Mocket, Int) -> Unit fn serve_ffi(Mocket, port~ : Int) -> Unit async fn[T, E : Error] suspend(((T) -> Unit, (E) -> Unit) -> Unit) -> T raise E +fn url_decode(BytesView) -> String + +fn url_encode(String) -> String + +fn ws_publish(String, String) -> Unit + +fn ws_send(String, String) -> Unit + +fn ws_subscribe(String, String) -> Unit + +fn ws_unsubscribe(String, String) -> Unit + // Errors pub suberror BodyError { InvalidJsonCharset @@ -45,6 +81,8 @@ pub(all) enum HttpBody { Text(StringView) HTML(StringView) Bytes(BytesView) + Form(Map[String, String]) + Multipart(Map[String, MultipartFormValue]) Empty } @@ -63,8 +101,8 @@ pub(all) struct HttpRequest { #external pub type HttpRequestInternal -fn HttpRequestInternal::on_complete(Self, FuncRef[() -> Unit]) -> Unit -fn HttpRequestInternal::on_headers(Self, FuncRef[(@native.CStr) -> Unit]) -> Unit +fn HttpRequestInternal::req_method(Self) -> String +fn HttpRequestInternal::url(Self) -> String pub(all) struct HttpResponse { mut status_code : Int @@ -73,9 +111,8 @@ pub(all) struct HttpResponse { #external pub type HttpResponseInternal - -#external -pub type HttpServerInternal +fn HttpResponseInternal::end(Self, @js.Value) -> Unit +fn HttpResponseInternal::url(Self) -> String pub enum LogLevel { Debug @@ -113,6 +150,11 @@ pub(all) struct Mocket { static_routes : Map[String, Map[String, async (HttpEvent) -> HttpBody noraise]] dynamic_routes : Map[String, Array[(String, async (HttpEvent) -> HttpBody noraise)]] logger : Logger + ws_static_routes : Map[String, (WebSocketEvent) -> Unit] + ws_dynamic_routes : Array[(String, (WebSocketEvent) -> Unit)] + ws_clients : Map[String, Unit] + ws_channels : Map[String, Map[String, Unit]] + ws_client_port : Map[String, Int] } fn Mocket::all(Self, String, async (HttpEvent) -> HttpBody noraise) -> Unit fn Mocket::connect(Self, String, async (HttpEvent) -> HttpBody noraise) -> Unit @@ -128,11 +170,36 @@ fn Mocket::put(Self, String, async (HttpEvent) -> HttpBody noraise) -> Unit fn Mocket::serve(Self, port~ : Int) -> Unit fn Mocket::trace(Self, String, async (HttpEvent) -> HttpBody noraise) -> Unit fn Mocket::use_middleware(Self, async (HttpEvent, async () -> HttpBody noraise) -> HttpBody noraise, base_path? : String) -> Unit +fn Mocket::ws(Self, String, (WebSocketEvent) -> Unit) -> Unit + +pub(all) struct MultipartFormValue { + filename : String? + content_type : String? + data : BytesView +} + +pub enum WebSocketEvent { + Open(WebSocketPeer) + Message(WebSocketPeer, HttpBody) + Close(WebSocketPeer) +} + +pub(all) struct WebSocketPeer { + connection_id : String + mut subscribed_channels : Array[String] +} +fn WebSocketPeer::publish(String, String) -> Unit +fn WebSocketPeer::send(Self, String) -> Unit +fn WebSocketPeer::subscribe(Self, String) -> Unit +fn WebSocketPeer::to_string(Self) -> String +fn WebSocketPeer::unsubscribe(Self, String) -> Unit // Type aliases pub type HttpHandler = async (HttpEvent) -> HttpBody noraise pub type Middleware = async (HttpEvent, async () -> HttpBody noraise) -> HttpBody noraise +pub type WebSocketHandler = (WebSocketEvent) -> Unit + // Traits diff --git a/src/utils.mbt b/src/utils.mbt new file mode 100644 index 0000000..2c11605 --- /dev/null +++ b/src/utils.mbt @@ -0,0 +1,157 @@ +///| +fn hex_digit(b : Byte) -> Int? { + if b >= b'0' && b <= b'9' { + Some(b.to_int() - b'0'.to_int()) + } else if b >= b'a' && b <= b'f' { + Some(b.to_int() - b'a'.to_int() + 10) + } else if b >= b'A' && b <= b'F' { + Some(b.to_int() - b'A'.to_int() + 10) + } else { + None + } +} + +///| +pub fn url_decode(bytes : BytesView) -> String { + let len = bytes.length() + let res = Array::make(len, b'\x00') + let mut w = 0 + let mut r = 0 + while r < len { + let b = bytes[r] + if b == b'+' { + res[w] = b' ' + w = w + 1 + r = r + 1 + } else if b == b'%' && r + 2 < len { + match (hex_digit(bytes[r + 1]), hex_digit(bytes[r + 2])) { + (Some(h1), Some(h2)) => { + res[w] = (h1 * 16 + h2).to_byte() + w = w + 1 + r = r + 3 + } + _ => { + res[w] = b'%' + w = w + 1 + r = r + 1 + } + } + } else { + res[w] = b + w = w + 1 + r = r + 1 + } + } + let final_bytes = Bytes::from_array(res) + let view = final_bytes[0:w] + @encoding/utf8.decode(view) catch { + _ => "" + } +} + +///| +pub fn parse_form_data(bytes : BytesView) -> Map[String, String] { + let res = Map::new() + if bytes.length() == 0 { + return res + } + + // Split by '&' + let mut start = 0 + let len = bytes.length() + for i = 0; i < len; i = i + 1 { + if bytes[i] == b'&' { + let part = bytes[start:i] + parse_kv(part, res) + start = i + 1 + } + } + if start < len { + parse_kv(bytes[start:len], res) + } + res +} + +///| +fn parse_kv(part : BytesView, map : Map[String, String]) -> Unit { + let len = part.length() + let mut eq_idx = -1 + for i = 0; i < len; i = i + 1 { + if part[i] == b'=' { + eq_idx = i + break + } + } + if eq_idx != -1 { + let key_bytes = part[0:eq_idx] + let val_bytes = part[eq_idx + 1:len] + let key = url_decode(key_bytes) + let val = url_decode(val_bytes) + if key != "" { + map.set(key, val) + } + } else { + // No '=', treat as key with empty value? Or ignore? + // Usually key without value. + let key = url_decode(part) + if key != "" { + map.set(key, "") + } + } +} + +///| +fn is_unreserved(b : Byte) -> Bool { + (b >= b'A' && b <= b'Z') || + (b >= b'a' && b <= b'z') || + (b >= b'0' && b <= b'9') || + b == b'-' || + b == b'_' || + b == b'.' || + b == b'~' +} + +///| +pub fn url_encode(s : String) -> String { + let bytes = @encoding/utf8.encode(s) + let buf = StringBuilder::new() + for i = 0; i < bytes.length(); i = i + 1 { + let b = bytes[i] + if is_unreserved(b) { + match b.to_int().to_char() { + Some(c) => buf.write_char(c) + None => () + } + } else { + buf.write_char('%') + let hex_bytes = b"0123456789ABCDEF" + let h1 = (b.to_int() >> 4) & 0xF + let h2 = b.to_int() & 0xF + match hex_bytes[h1].to_int().to_char() { + Some(c) => buf.write_char(c) + None => () + } + match hex_bytes[h2].to_int().to_char() { + Some(c) => buf.write_char(c) + None => () + } + } + } + buf.to_string() +} + +///| +pub fn form_encode(map : Map[String, String]) -> String { + let buf = StringBuilder::new() + let mut first = true + map.each(fn(k, v) { + if not(first) { + buf.write_char('&') + } + first = false + buf.write_string(url_encode(k)) + buf.write_char('=') + buf.write_string(url_encode(v)) + }) + buf.to_string() +} diff --git a/src/utils_test.mbt b/src/utils_test.mbt new file mode 100644 index 0000000..25936bd --- /dev/null +++ b/src/utils_test.mbt @@ -0,0 +1,43 @@ +///| +test "url_decode" { + let encoded = @encoding/utf8.encode("Hello%20World%21") + let decoded = url_decode(encoded[:]) + assert_eq(decoded, "Hello World!") +} + +///| +test "url_decode_plus" { + let encoded = @encoding/utf8.encode("Hello+World") + let decoded = url_decode(encoded[:]) + assert_eq(decoded, "Hello World") +} + +///| +test "url_decode_utf8" { + // %E4%BD%A0%E5%A5%BD = 你好 + let encoded = @encoding/utf8.encode("%E4%BD%A0%E5%A5%BD") + let decoded = url_decode(encoded[:]) + assert_eq(decoded, "你好") +} + +///| +test "form_encode" { + let map = Map::new() + map.set("key", "value") + map.set("foo", "bar") + let encoded = form_encode(map) + // Order is not guaranteed in Map iteration, so we check if it contains parts + // But Map iteration order might be deterministic in MoonBit? + // Let's just check if it's one of the valid permutations + let valid1 = "key=value&foo=bar" + let valid2 = "foo=bar&key=value" + assert_true(encoded == valid1 || encoded == valid2) +} + +///| +test "parse_form_data" { + let encoded = @encoding/utf8.encode("key=value&foo=bar%20baz") + let map = parse_form_data(encoded[:]) + assert_eq(map.get("key"), Some("value")) + assert_eq(map.get("foo"), Some("bar baz")) +} diff --git a/test_ws.js b/test_ws.js new file mode 100644 index 0000000..36989ad --- /dev/null +++ b/test_ws.js @@ -0,0 +1,12 @@ +const ws = new WebSocket("ws://localhost:8080/ws"); +ws.addEventListener("open", () => { + console.log("connected"); + ws.send("hello mocket"); + console.log("sent"); +}); +ws.addEventListener("message", (ev) => { + console.log("msg:", ev.data); + ws.close(); +}); +ws.addEventListener("close", () => console.log("closed")); +ws.addEventListener("error", (err) => console.log("error", err));