From bc06e229d20c6a69bc228c4e2414882e10195692 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Thu, 9 Jun 2022 15:08:52 +0100 Subject: [PATCH] eio_linux: add support for integration with Async --- lib_eio/unix/eio_unix.ml | 8 ++++++++ lib_eio/unix/eio_unix.mli | 9 +++++++++ lib_eio_linux/eio_linux.ml | 31 +++++++++++++++++++++++++++++-- 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/lib_eio/unix/eio_unix.ml b/lib_eio/unix/eio_unix.ml index 1a2fc4120..bd714204e 100644 --- a/lib_eio/unix/eio_unix.ml +++ b/lib_eio/unix/eio_unix.ml @@ -3,11 +3,19 @@ module Effect = Eio.Private.Effect module Private = struct type _ Eio.Generic.ty += Unix_file_descr : [`Peek | `Take] -> Unix.file_descr Eio.Generic.ty + module Async = struct + type t = { + lock : unit -> unit; + unlock : unit -> unit; + } + end + type _ Effect.t += | Await_readable : Unix.file_descr -> unit Effect.t | Await_writable : Unix.file_descr -> unit Effect.t | Get_system_clock : Eio.Time.clock Effect.t | Socket_of_fd : Eio.Switch.t * bool * Unix.file_descr -> < Eio.Flow.two_way; Eio.Flow.close > Effect.t + | Set_async_integration : Async.t option -> unit Effect.t end let await_readable fd = Effect.perform (Private.Await_readable fd) diff --git a/lib_eio/unix/eio_unix.mli b/lib_eio/unix/eio_unix.mli index 115fc971a..410d4b9dc 100644 --- a/lib_eio/unix/eio_unix.mli +++ b/lib_eio/unix/eio_unix.mli @@ -53,12 +53,21 @@ module Private : sig type _ Eio.Generic.ty += Unix_file_descr : [`Peek | `Take] -> Unix.file_descr Eio.Generic.ty (** See {!FD}. *) + (** For Async_eio integration, we need to give Eio access to Async's lock. *) + module Async : sig + type t = { + lock : unit -> unit; + unlock : unit -> unit; + } + end + type _ Effect.t += | Await_readable : Unix.file_descr -> unit Effect.t (** See {!await_readable} *) | Await_writable : Unix.file_descr -> unit Effect.t (** See {!await_writable} *) | Get_system_clock : Eio.Time.clock Effect.t (** See {!sleep} *) | Socket_of_fd : Eio.Switch.t * bool * Unix.file_descr -> < Eio.Flow.two_way; Eio.Flow.close > Effect.t (** See {!FD.as_socket} *) + | Set_async_integration : Async.t option -> unit Effect.t end module Ctf = Ctf_unix diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index 63302acbf..ce8782b5b 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -156,6 +156,9 @@ type t = { sleep_q: Zzz.t; mutable io_jobs: int; + + (* Optional integration with the Async library. *) + mutable async : Eio_unix.Private.Async.t option; } let wake_buffer = @@ -476,7 +479,15 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = If [need_wakeup] is still [true], this is fine because we don't promise to do that. If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) Ctf.(note_hiatus Wait_for_work); - let result = Uring.wait ?timeout uring in + let result = + match st.async with + | None -> Uring.wait ?timeout uring + | Some async -> + async.unlock (); + let r = Uring.wait ?timeout uring in + async.lock (); + r + in Ctf.note_resume system_thread; Atomic.set st.need_wakeup false; Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) @@ -1194,7 +1205,19 @@ let rec run ?(queue_depth=64) ?n_blocks ?(block_size=4096) ?polling_timeout ?fal let io_q = Queue.create () in let mem_q = Queue.create () in let eventfd = FD.placeholder ~seekable:false ~close_unix:false in - let st = { mem; uring; run_q; eventfd_mutex; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; io_jobs = 0 } in + let st = { + mem; + uring; + run_q; + eventfd_mutex; + io_q; + mem_q; + eventfd; + need_wakeup = Atomic.make false; + sleep_q; + io_jobs = 0; + async = None; + } in Log.debug (fun l -> l "starting main thread"); let rec fork ~new_fiber:fiber fn = let open Effect.Deep in @@ -1288,6 +1311,10 @@ let rec run ?(queue_depth=64) ?n_blocks ?(block_size=4096) ?polling_timeout ?fal let fd = FD.of_unix ~sw ~seekable:false ~close_unix fd in continue k (flow fd :> < Eio.Flow.two_way; Eio.Flow.close >) ) + | Eio_unix.Private.Set_async_integration async -> Some (fun k -> + st.async <- async; + continue k () + ) | Low_level.Alloc -> Some (fun k -> match st.mem with | None -> continue k None