diff --git a/spec/std/fiber_spec.cr b/spec/std/fiber_spec.cr index 5c85d1a7475d..f22f52cc82db 100644 --- a/spec/std/fiber_spec.cr +++ b/spec/std/fiber_spec.cr @@ -1,4 +1,5 @@ require "spec" +require "wait_group" describe Fiber do it "#resumable?" do @@ -18,4 +19,61 @@ describe Fiber do resumable.should be_false end + + describe ".sleep" do + it "expires" do + cancelation_token = nil + channel = Channel(Fiber::TimeoutResult).new + + spawn do + result = Fiber.sleep(10.milliseconds) { |token| cancelation_token = token } + channel.send(result) + end + + channel.receive.should eq(Fiber::TimeoutResult::EXPIRED) + end + + it "is canceled" do + cancelation_token = nil + channel = Channel(Fiber::TimeoutResult).new + + fiber = spawn do + result = Fiber.sleep(1.second) { |token| cancelation_token = token } + channel.send(result) + end + + until cancelation_token + Fiber.yield + end + + if fiber.resolve_timer?(cancelation_token.not_nil!) + fiber.enqueue + end + + channel.receive.should eq(Fiber::TimeoutResult::CANCELED) + end + + it "expires or is canceled" do + 20.times do + WaitGroup.wait do |wg| + cancelation_token = nil + + suspended_fiber = wg.spawn do + Fiber.sleep(10.milliseconds) do |token| + # save the token so another fiber can try to cancel the timer + cancelation_token = token + end + end + + sleep rand(9..11).milliseconds + + # let's try to cancel the timer + if suspended_fiber.resolve_timer?(cancelation_token.not_nil!) + # canceled: we must enqueue the fiber + suspended_fiber.enqueue + end + end + end + end + end end diff --git a/src/concurrent.cr b/src/concurrent.cr index 58efd182a927..4160504bd71d 100644 --- a/src/concurrent.cr +++ b/src/concurrent.cr @@ -26,7 +26,7 @@ end # fibers might start their execution. def sleep(time : Time::Span) : Nil Crystal.trace :sched, "sleep", for: time - Crystal::EventLoop.current.sleep(time) + Fiber.sleep(time) { } end # Blocks the current fiber forever. diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index fb7ab98bfa3a..a613bee154a9 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -81,8 +81,12 @@ abstract class Crystal::EventLoop # time in parallel, but this assumption may change in the future! abstract def interrupt : Nil - # Suspend the current fiber for *duration*. - abstract def sleep(duration : Time::Span) : Nil + # Suspends the current fiber until the absolute *time* is reached (as per the + # monotonic clock). Must resolve the timeout using `Fiber.resolve_timer?` and + # *token* before resuming the fiber. + # + # Returns true if the timer expired, false if it was canceled. + abstract def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool # Create a new resume event for a fiber. # diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index a78721ba4372..59e7cd0d7f39 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -143,7 +143,12 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop fiber = timer.value.fiber case timer.value.type - in .sleep?, .timeout? + in .sleep? + # the timer might have been canceled already, and we must synchronize with + # the resumed `#timeout` fiber; by rule we must always resume the fiber, + # regardless of whether we resolve the timeout or not. + timer.value.timed_out! if fiber.resolve_timer?(timer.value.cancelation_token) + in .timeout? timer.value.timed_out! in .select_timeout? return unless select_action = fiber.timeout_select_action @@ -168,10 +173,11 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop end end - protected def delete_timer(timer : Pointer(Timer)) : Nil + protected def delete_timer(timer : Pointer(Timer)) : Bool @mutex.synchronize do - _, was_next_ready = @timers.delete(timer) + dequeued, was_next_ready = @timers.delete(timer) rearm_waitable_timer(@timers.next_ready?, interruptible: false) if was_next_ready + dequeued end end @@ -195,25 +201,31 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop end end - def sleep(duration : Time::Span) : Nil - timer = Timer.new(:sleep, Fiber.current, duration) + def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool + timer = Timer.new(:sleep, Fiber.current) + timer.wake_at = time + timer.cancelation_token = token add_timer(pointerof(timer)) + Fiber.suspend - # safety check - return if timer.timed_out? + unless timer.timed_out? || delete_timer(pointerof(timer)) + # the timeout was canceled while another thread dequeued the timer while + # running the event loop: we must synchronize with #process_timer + # (otherwise *event* might go out of scope); by rule the timer will always + # enqueue the fiber and we must suspend again. + Fiber.suspend + end - # try to avoid a double resume if possible, but another thread might be - # running the evloop and dequeue the event in parallel, so a "can't resume - # dead fiber" can still happen in a MT execution context. - delete_timer(pointerof(timer)) - raise "BUG: #{timer.fiber} called sleep but was manually resumed before the timer expired!" + timer.timed_out? end # Suspend the current fiber for *duration* and returns true if the timer # expired and false if the fiber was resumed early. # # Specific to IOCP to handle IO timeouts. + # + # TODO: use sleep(time, token) instead def timeout(duration : Time::Span) : Bool event = Fiber.current.resume_event event.add(duration) diff --git a/src/crystal/event_loop/iocp/timer.cr b/src/crystal/event_loop/iocp/timer.cr index 3c667f026599..60eb02abd962 100644 --- a/src/crystal/event_loop/iocp/timer.cr +++ b/src/crystal/event_loop/iocp/timer.cr @@ -19,6 +19,8 @@ struct Crystal::EventLoop::IOCP::Timer # trigger. Nil for IO events without a timeout. getter! wake_at : Time::Span + property! cancelation_token : Fiber::CancelationToken + # True if an IO event has timed out (i.e. we're past `#wake_at`). getter? timed_out : Bool = false diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index d0eef0f88425..84c967d000ba 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -50,9 +50,37 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop event_base.loop_exit end - def sleep(duration : ::Time::Span) : Nil - Fiber.current.resume_event.add(duration) + class SleepData + property fiber : Fiber + property cancelation_token : Fiber::CancelationToken + property? expired : Bool = false + + def initialize(@fiber, @cancelation_token) + end + end + + def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool + arg = SleepData.new(Fiber.current, token) + + event = event_base.new_event(-1, LibEvent2::EventFlags::None, arg) do |s, flags, data| + d = data.as(SleepData) + f = d.fiber + if f.resolve_timer?(d.cancelation_token) + d.expired = true + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} + end + end + event.add(time - Time.monotonic) + Fiber.suspend + + event.delete unless arg.expired? + arg.expired? end # Create a new resume event for a fiber (sleep). diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 61c10fe85567..5d31cfd256b6 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -141,19 +141,23 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop # fiber interface, see Crystal::EventLoop - def sleep(duration : ::Time::Span) : Nil - event = Event.new(:sleep, Fiber.current, timeout: duration) + def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool + event = Event.new(:sleep, Fiber.current) + event.wake_at = time + event.cancelation_token = token add_timer(pointerof(event)) + Fiber.suspend - # safety check - return if event.timed_out? + unless event.timed_out? || delete_timer(pointerof(event)) + # the timeout was canceled while another thread dequeued the timer while + # running the event loop: we must synchronize with #process_timer + # (otherwise *event* might go out of scope); by rule the timer will always + # enqueue the fiber and we must suspend again. + Fiber.suspend + end - # try to avoid a double resume if possible, but another thread might be - # running the evloop and dequeue the event in parallel, so a "can't resume - # dead fiber" can still happen in a MT execution context. - delete_timer(pointerof(event)) - raise "BUG: #{event.fiber} called sleep but was manually resumed before the timer expired!" + event.timed_out? end def create_timeout_event(fiber : Fiber) : FiberEvent @@ -607,7 +611,10 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop return unless select_action.time_expired? fiber.@timeout_event.as(FiberEvent).clear when .sleep? - event.value.timed_out! + # the timer might have been canceled already, and we must synchronize with + # the resumed `#timeout` fiber; by rule we must always resume the fiber, + # regardless of whether we resolve the timeout or not. + event.value.timed_out! if fiber.resolve_timer?(event.value.cancelation_token) else raise RuntimeError.new("BUG: unexpected event in timers: #{event.value}%s\n") end diff --git a/src/crystal/event_loop/polling/event.cr b/src/crystal/event_loop/polling/event.cr index 93caf843b049..6f4a1d49ec85 100644 --- a/src/crystal/event_loop/polling/event.cr +++ b/src/crystal/event_loop/polling/event.cr @@ -33,6 +33,8 @@ struct Crystal::EventLoop::Polling::Event # True if an IO event has timed out (i.e. we're past `#wake_at`). getter? timed_out : Bool = false + property! cancelation_token : Fiber::CancelationToken + # The event can be added to `Waiters` lists. include PointerLinkedList::Node diff --git a/src/crystal/event_loop/wasi.cr b/src/crystal/event_loop/wasi.cr index 19b2f5e65868..bd8383bb79ee 100644 --- a/src/crystal/event_loop/wasi.cr +++ b/src/crystal/event_loop/wasi.cr @@ -17,8 +17,8 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop raise NotImplementedError.new("Crystal::Wasi::EventLoop.interrupt") end - def sleep(duration : ::Time::Span) : Nil - raise NotImplementedError.new("Crystal::Wasi::EventLoop.sleep") + def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool + raise NotImplementedError.new("Crystal::Wasi::EventLoop#sleep") end # Creates a timeout_event. diff --git a/src/fiber.cr b/src/fiber.cr index d56ad81ec380..64d56d3ea0fd 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -300,6 +300,119 @@ class Fiber Fiber.current.cancel_timeout end + struct CancelationToken + # :nodoc: + getter value : UInt32 + + def initialize(@value : UInt32) + end + end + + enum TimeoutResult + EXPIRED + CANCELED + end + + private TIMEOUT_FLAG = 1_u32 # bit 1 states if the timeout is set or unset + private TIMEOUT_COUNTER = 2_u32 # bits 2..32 is a counter to avoid ABA issues + @timer = Atomic(UInt32).new(0_u32) + + # Suspends the current `Fiber` for *duration*. + # + # Yields a `CancelationToken` before suspending the fiber. The token can be + # used to manually cancel the timer before *duration* expires to resume the + # fiber before *duration*. See `#resolve_timer?` for details. + # + # The fiber will be resumed once, either because the timer expired (*duration* + # has elapsed) or because the timer has been canceled. The returned + # `TimeoutResult` can be used to determine what happened and act accordingly, + # for example do some cleanup or raise an exception if the timer expired. + # + # ``` + # result = Fiber.sleep(5.seconds) do |cancelation_token| + # enqueue_waiter(Fiber.current, cancelation_token) + # end + # + # if result.expired? + # dequeue_waiter(Fiber.current) + # end + # ``` + def self.sleep(duration : Time::Span, & : CancelationToken ->) : TimeoutResult + sleep(until: Time.monotonic + duration) { |token| yield token } + end + + # Identical to `.sleep` but suspends the fiber until an absolute time, as per + # the monotonic clock, is reached. + # + # For example, we can retry something until 5 seconds have elapsed: + # + # ``` + # time = Time.monotonic + 5.seconds + # loop do + # break if try_something? + # result = Fiber.sleep(until: time) { |token| add_waiter(token) } + # raise "timeout" if result.expired? + # end + # ``` + def self.sleep(*, until time : Time::Span, & : CancelationToken ->) : TimeoutResult + token = Fiber.current.new_cancelation_token + yield token + result = Crystal::EventLoop.current.sleep(time, token) + result ? TimeoutResult::EXPIRED : TimeoutResult::CANCELED + end + + # Sets the timeout flag and increments the counter to avoid ABA issues with + # parallel threads trying to resolve the timer while the timer was unset then + # set again (new timer). Since the current fiber is the only one that can set + # the timer, we can merely set the atomic (no need for CAS). + protected def new_cancelation_token : CancelationToken + value = (@timer.get(:relaxed) | TIMEOUT_FLAG) &+ TIMEOUT_COUNTER + @timer.set(value, :relaxed) + CancelationToken.new(value) + end + + # Tries to resolve a sleeping `Fiber` using the cancelation *token*. + # + # Returns true when the timer has been resolved, false otherwise. + # + # On success, the caller owns the fiber and must eventually enqueue it. + # Failing to do so means that the fiber will never be resumed. + # + # On failure, the caller must skip the fiber. Trying to enqueue the fiber + # would lead to resume the fiber twice. + # + # ``` + # require "wait_group" + # + # WaitGroup.wait do |wg| + # cancelation_token = nil + # + # suspended_fiber = wg.spawn do + # result = Fiber.sleep(5.seconds) do |token| + # # save the token so another fiber can try to cancel + # cancelation_token = token + # end + # + # # prints either EXPIRED or CANCELED + # puts result + # end + # + # sleep rand(4..6).seconds + # + # # let's try to cancel + # if suspended_fiber.resolve_timer?(cancelation_token.not_nil!) + # # canceled: we must enqueue the fiber + # suspended_fiber.enqueue + # else + # # expired: do nothing + # end + # end + # ``` + def resolve_timer?(token : CancelationToken) : Bool + _, success = @timer.compare_and_set(token.value, token.value & ~TIMEOUT_FLAG, :relaxed, :relaxed) + success + end + # Yields to the scheduler and allows it to swap execution to other # waiting fibers. # @@ -333,7 +446,7 @@ class Fiber # TODO: Fiber switching and evloop for wasm32 {% unless flag?(:wasi) %} - Crystal::EventLoop.current.sleep(0.seconds) + sleep(0.seconds) { } {% end %} end