diff --git a/Makefile b/Makefile index f4db3b7..bea2a2e 100644 --- a/Makefile +++ b/Makefile @@ -1,2 +1,2 @@ all: - dune build + dune runtest diff --git a/README.md b/README.md index fb3b5b8..c990103 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,18 @@ # Lwt_eio - run Lwt code from within Eio -Status: experimental - -Lwt_eio is an Lwt engine that uses Eio. +Lwt_eio is a Lwt engine that uses Eio. It can be used to run Lwt and Eio code in a single domain. +It allows converting existing code to Eio incrementally. See `lib/lwt_eio.mli` for the API. -# Examples +## Examples There are two example programs: -- `simple.ml` runs an Lwt thread and an Eio fibre, communicating over a pair of streams. +- `simple.ml` runs a Lwt thread and an Eio fibre, communicating over a pair of streams. - `chat.ml` runs a chat server. - An Eio fibre accepts (loopback) connections on port 8001, while an Lwt thread accepts connections on port 8002. + An Eio fibre accepts (loopback) connections on port 8001, while a Lwt thread accepts connections on port 8002. All clients on either port see the room history, which consists of all messages sent by any client as well as join/leave events. ``` @@ -66,10 +65,225 @@ Hello from client B Both clients should show all messages. -# Limitations +## Limitations - Lwt code can only run in a single domain, and using `Lwt_eio` does not change this. You can only run Lwt code in the domain that ran `Lwt_eio.with_event_loop`. - `Lwt_eio` does not make your Lwt programs run faster than before. - Lwt jobs are still run by Lwt, and do not take advantage of Eio's io_uring support, for example. + Lwt jobs are still run by Lwt, and do not take advantage of Eio's `io_uring` support, for example. + +## Porting a Lwt Application to Eio + +This guide will show how to migrate an existing Lwt application or library to Eio. +We'll start with this Lwt program, which reads in a list of lines, sorts them, +and writes the result to stdout: + +```ocaml +# #require "lwt.unix";; +# open Lwt.Syntax;; + +# let process_lines src fn = + let stream = Lwt_io.read_lines src in + let* lines = Lwt_stream.to_list stream in + let* lines = fn lines in + let rec write = function + | [] -> Lwt.return_unit + | x :: xs -> + let* () = Lwt_io.(write_line stdout) x in + write xs + in + let* () = write lines in + Lwt_io.(flush stdout);; +val process_lines : + Lwt_io.input_channel -> (string list -> string list Lwt.t) -> unit Lwt.t = + + +# let sort src = + process_lines src @@ fun lines -> + let* () = Lwt.pause () in (* Simulate async work *) + Lwt.return (List.sort String.compare lines);; +val sort : Lwt_io.input_channel -> unit Lwt.t = + +# Lwt_main.run begin + let input = Lwt_io.(of_bytes ~mode:input) + (Lwt_bytes.of_bytes (Bytes.of_string "b\na\nd\nc\n")) in + sort input + end;; +a +b +c +d +- : unit = () +``` + +The first step is to replace `Lwt_main.run`, and check that the program still works: + +```ocaml +# #require "eio_main";; +# #require "lwt_eio";; +# open Eio.Std;; + +# Eio_main.run @@ fun env -> + Lwt_eio.with_event_loop ~clock:env#clock @@ fun () -> + Lwt_eio.Promise.await_lwt begin + let input = Lwt_io.(of_bytes ~mode:input) + (Lwt_bytes.of_bytes (Bytes.of_string "b\na\nd\nc\n")) in + sort input + end;; +a +b +c +d +- : unit = () +``` + +Here, we're using the Eio event loop instead of the normal Lwt one, +but everything else stays the same. + +When I first tried this, it failed with `Fatal error: exception Unhandled` +because I'd forgotten to flush stdout in the Lwt code. +That meant that `sort` returned before Lwt had completely finished and then it +tried to flush lazily after the Eio loop had finished, which is an error. + +We can now start converting code to Eio. +There are several places we could start. +Here we'll begin with the `process_lines` function. +We'll take an Eio flow instead of a Lwt_io input: + +```ocaml +# let process_lines src fn = + let* lines = + Lwt_eio.run_eio @@ fun () -> + Eio.Buf_read.of_flow src ~max_size:max_int + |> Eio.Buf_read.lines + |> List.of_seq + in + let* lines = fn lines in + let rec write = function + | [] -> Lwt.return_unit + | x :: xs -> + let* () = Lwt_io.(write_line stdout) x in + write xs + in + let* () = write lines in + Lwt_io.(flush stdout);; +val process_lines : + #Eio.Flow.read -> (string list -> string list Lwt.t) -> unit Lwt.t = +``` + +Note that `process_lines` is still a Lwt function, +but it now uses `run_eio` internally to read from the input using Eio. + +**Warning:** It's important not to call Eio functions directly from Lwt, but instead wrap such code with `run_eio`. +If you replace the `Lwt_eio.run_eio @@ fun () ->` line with `Lwt.return @@` +then it will appear to work in simple cases, but it will act as a blocking read. +It's similar to trying to turn a blocking call like `Stdlib.input_line` into an asynchronous one +using `Lwt.return`. It doesn't actually make it concurrent. + +We can now test it using an Eio flow: + +```ocaml +# let sort src = + process_lines src @@ fun lines -> + let* () = Lwt.pause () in (* Simulate async work *) + Lwt.return (List.sort String.compare lines);; +val sort : #Eio.Flow.read -> unit Lwt.t = + +# Eio_main.run @@ fun env -> + Lwt_eio.with_event_loop ~clock:env#clock @@ fun () -> + Lwt_eio.Promise.await_lwt begin + sort (Eio.Flow.string_source "b\na\nd\nc\n") + end;; +a +b +c +d +- : unit = () +``` + +Let's finish converting `process_lines`: + +```ocaml +# let process_lines ~src ~dst fn = + Eio.Buf_read.of_flow src ~max_size:max_int + |> Eio.Buf_read.lines + |> List.of_seq + |> fn + |> List.iter (fun line -> + Eio.Flow.copy_string (line ^ "\n") dst + );; +val process_lines : + src:#Eio.Flow.read -> + dst:#Eio.Flow.write -> (string list -> string list) -> unit = +``` + +Now `process_lines` is an Eio function. The `Lwt.t` types have disappeared from its signature. + +Note that we now take an extra `dst` argument for the output: +Eio functions should always receive access to external resources explicitly. + +To use the new version, we'll have to update `sort` to wrap its Lwt callback: + +```ocaml +# let sort ~src ~dst = + process_lines ~src ~dst @@ fun lines -> + Lwt_eio.Promise.await_lwt begin + let* () = Lwt.pause () in (* Simulate async work *) + Lwt.return (List.sort String.compare lines) + end;; +val sort : src:#Eio.Flow.read -> dst:#Eio.Flow.write -> unit = +``` + +`sort` itself now looks like a normal Eio function from its signature. +We can therefore now call it directly from Eio: + +```ocaml +# Eio_main.run @@ fun env -> + Lwt_eio.with_event_loop ~clock:env#clock @@ fun () -> + sort + ~src:(Eio.Flow.string_source "b\na\nd\nc\n") + ~dst:env#stdout;; +a +b +c +d +- : unit = () +``` + +Finally, we can convert `sort`'s callback to Eio code and drop the use of `Lwt` and `Lwt_eio` completely: + +```ocaml +# let sort ~src ~dst = + process_lines ~src ~dst @@ fun lines -> + Fibre.yield (); + List.sort String.compare lines;; +val sort : src:#Eio.Flow.read -> dst:#Eio.Flow.write -> unit = + +# Eio_main.run @@ fun env -> + sort + ~src:(Eio.Flow.string_source "b\na\nd\nc\n") + ~dst:env#stdout;; +a +b +c +d +- : unit = () +``` + +Key points: + +- Start by replacing `Lwt_main.run` while keeping the rest of the code the same. + +- Update your program piece by piece, using `Lwt_eio` when moving between Eio and Lwt contexts. + +- Never call Eio code directly from Lwt code. Wrap it with `Lwt_eio.run_eio`. + Simply wrapping the result of an Eio call with `Lwt.return` is NOT safe. + +- You don't have to do the conversion in any particular order. + +- You may need to make other changes to your API. In particular: + + - External resources (such as `stdout`, the network and the filesystem) should be passed as inputs to Eio code. + + - Take a `Switch.t` argument if your function creates fibres or file handles that out-live the function. diff --git a/dune b/dune index 3591ad3..c9cef08 100644 --- a/dune +++ b/dune @@ -1,3 +1,7 @@ (executables (names simple chat) (libraries eio_main lwt_eio logs.fmt ctf.unix)) + +(mdx + (packages lwt_eio) + (files README.md)) diff --git a/dune-project b/dune-project index 5a3195c..f3d3c2d 100644 --- a/dune-project +++ b/dune-project @@ -13,4 +13,6 @@ (depends eio lwt + (mdx (and (>= 1.10.0) :with-test)) (eio_main :with-test))) +(using mdx 0.1) diff --git a/lib/lwt_eio.ml b/lib/lwt_eio.ml index 6198827..d5c17f3 100644 --- a/lib/lwt_eio.ml +++ b/lib/lwt_eio.ml @@ -20,7 +20,7 @@ let notify () = Lazy.force !ready let fork_with_cancel ~sw fn = let cancel = ref None in Fibre.fork_sub ~sw ~on_error:ignore_cancel (fun sw -> - cancel := Some (lazy (Switch.fail sw Cancel)); + cancel := Some (lazy (try Switch.fail sw Cancel with Invalid_argument _ -> ())); fn () ); (* The forked fibre runs first, so [cancel] must be set by now. *) @@ -29,7 +29,9 @@ let fork_with_cancel ~sw fn = let make_engine ~sw ~clock = object inherit Lwt_engine.abstract - method private cleanup = Switch.fail sw Exit + method private cleanup = + try Switch.fail sw Exit + with Invalid_argument _ -> () (* Already destroyed *) method private register_readable fd callback = fork_with_cancel ~sw @@ fun () -> diff --git a/lwt_eio.opam b/lwt_eio.opam index ed3260e..b658b73 100644 --- a/lwt_eio.opam +++ b/lwt_eio.opam @@ -12,6 +12,7 @@ depends: [ "dune" {>= "2.9"} "eio" "lwt" + "mdx" {>= "1.10.0" & with-test} "eio_main" {with-test} "odoc" {with-doc} ]