From 7dad1b595d4ebc1b660754403f331b2248741494 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Thu, 7 Aug 2025 16:28:03 +0200 Subject: [PATCH 1/6] Add `Crystal::EventLoop#timeout(time, token)` --- src/crystal/event_loop.cr | 7 +++++ src/crystal/event_loop/iocp.cr | 35 ++++++++++++++++++++++--- src/crystal/event_loop/iocp/timer.cr | 2 ++ src/crystal/event_loop/libevent.cr | 33 +++++++++++++++++++++++ src/crystal/event_loop/polling.cr | 24 +++++++++++++++++ src/crystal/event_loop/polling/event.cr | 3 +++ src/crystal/event_loop/wasi.cr | 4 +++ 7 files changed, 105 insertions(+), 3 deletions(-) diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index fb7ab98bfa3a..3eb3b871d514 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -84,6 +84,13 @@ abstract class Crystal::EventLoop # Suspend the current fiber for *duration*. abstract def sleep(duration : Time::Span) : Nil + # Suspend the current fiber until the absolute *time* is reached (as per the + # monotonic clock). Must resolve the timeout using `Fiber.resolve_timeout?` + # and *token* before resuming the fiber. + # + # Returns true if the timer expired, false otherwise. + abstract def timeout(time : Time::Span, token : Fiber::TimeoutToken) : Bool + # Create a new resume event for a fiber. # # NOTE: optional. diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index a78721ba4372..d31bf829cd3c 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -143,8 +143,17 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop fiber = timer.value.fiber case timer.value.type - in .sleep?, .timeout? + in .sleep? timer.value.timed_out! + in .timeout? + if token = timer.value.timeout_token? + # the timeout might have been canceled already, and we must synchronize + # with the resumed `#timeout` fiber; by rule we must always resume the + # fiber, regardless of whether we resolve the timeout or not. + timer.value.timed_out! if fiber.resolve_timeout?(token) + else + timer.value.timed_out! + end in .select_timeout? return unless select_action = fiber.timeout_select_action fiber.timeout_select_action = nil @@ -168,10 +177,11 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop end end - protected def delete_timer(timer : Pointer(Timer)) : Nil + protected def delete_timer(timer : Pointer(Timer)) : Bool @mutex.synchronize do - _, was_next_ready = @timers.delete(timer) + dequeued, was_next_ready = @timers.delete(timer) rearm_waitable_timer(@timers.next_ready?, interruptible: false) if was_next_ready + dequeued end end @@ -210,6 +220,25 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop raise "BUG: #{timer.fiber} called sleep but was manually resumed before the timer expired!" end + def timeout(time : ::Time::Span, token : Fiber::TimeoutToken) : Bool + timer = Timer.new(:timeout, Fiber.current) + timer.wake_at = time + timer.timeout_token = token + add_timer(pointerof(timer)) + + Fiber.suspend + + unless timer.timed_out? || delete_timer(pointerof(timer)) + # the timeout was canceled while another thread dequeued the timer while + # running the event loop: we must synchronize with #process_timer + # (otherwise *event* might go out of scope); by rule the timer will always + # enqueue the fiber and we must suspend again. + Fiber.suspend + end + + timer.timed_out? + end + # Suspend the current fiber for *duration* and returns true if the timer # expired and false if the fiber was resumed early. # diff --git a/src/crystal/event_loop/iocp/timer.cr b/src/crystal/event_loop/iocp/timer.cr index 3c667f026599..ff8bcda3b754 100644 --- a/src/crystal/event_loop/iocp/timer.cr +++ b/src/crystal/event_loop/iocp/timer.cr @@ -19,6 +19,8 @@ struct Crystal::EventLoop::IOCP::Timer # trigger. Nil for IO events without a timeout. getter! wake_at : Time::Span + property! timeout_token : Fiber::TimeoutToken + # True if an IO event has timed out (i.e. we're past `#wake_at`). getter? timed_out : Bool = false diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index d0eef0f88425..9025305df118 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -55,6 +55,39 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop Fiber.suspend end + class TimeoutData + property fiber : Fiber + property timeout_token : Fiber::TimeoutToken + property? expired : Bool = false + + def initialize(@fiber, @timeout_token) + end + end + + def timeout(time : ::Time::Span, token : Fiber::TimeoutToken) : Bool + arg = TimeoutData.new(Fiber.current, token) + + event = event_base.new_event(-1, LibEvent2::EventFlags::None, arg) do |s, flags, data| + d = data.as(TimeoutData) + f = d.fiber + if f.resolve_timeout?(d.timeout_token) + d.expired = true + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} + end + end + event.add(time - Time.monotonic) + + Fiber.suspend + + event.delete unless arg.expired? + arg.expired? + end + # Create a new resume event for a fiber (sleep). def create_resume_event(fiber : Fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 61c10fe85567..41ce558fcbcb 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -156,6 +156,25 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop raise "BUG: #{event.fiber} called sleep but was manually resumed before the timer expired!" end + def timeout(time : ::Time::Span, token : Fiber::TimeoutToken) : Bool + event = Event.new(:timeout, Fiber.current) + event.wake_at = time + event.timeout_token = token + add_timer(pointerof(event)) + + Fiber.suspend + + unless event.timed_out? || delete_timer(pointerof(event)) + # the timeout was canceled while another thread dequeued the timer while + # running the event loop: we must synchronize with #process_timer + # (otherwise *event* might go out of scope); by rule the timer will always + # enqueue the fiber and we must suspend again. + Fiber.suspend + end + + event.timed_out? + end + def create_timeout_event(fiber : Fiber) : FiberEvent FiberEvent.new(:select_timeout, fiber) end @@ -606,6 +625,11 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop fiber.timeout_select_action = nil return unless select_action.time_expired? fiber.@timeout_event.as(FiberEvent).clear + when .timeout? + # the timeout might have been canceled already, and we must synchronize + # with the resumed `#timeout` fiber; by rule we must always resume the + # fiber, regardless of whether we resolve the timeout or not. + event.value.timed_out! if fiber.resolve_timeout?(event.value.timeout_token) when .sleep? event.value.timed_out! else diff --git a/src/crystal/event_loop/polling/event.cr b/src/crystal/event_loop/polling/event.cr index 93caf843b049..ee3e474bac2e 100644 --- a/src/crystal/event_loop/polling/event.cr +++ b/src/crystal/event_loop/polling/event.cr @@ -14,6 +14,7 @@ struct Crystal::EventLoop::Polling::Event IoWrite Sleep SelectTimeout + Timeout end getter type : Type @@ -33,6 +34,8 @@ struct Crystal::EventLoop::Polling::Event # True if an IO event has timed out (i.e. we're past `#wake_at`). getter? timed_out : Bool = false + property! timeout_token : Fiber::TimeoutToken + # The event can be added to `Waiters` lists. include PointerLinkedList::Node diff --git a/src/crystal/event_loop/wasi.cr b/src/crystal/event_loop/wasi.cr index 19b2f5e65868..7eb499572ba4 100644 --- a/src/crystal/event_loop/wasi.cr +++ b/src/crystal/event_loop/wasi.cr @@ -21,6 +21,10 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop raise NotImplementedError.new("Crystal::Wasi::EventLoop.sleep") end + def timeout(time : ::Time::Span, token : Fiber::TimeoutToken) : Bool + raise NotImplementedError.new("Crystal::Wasi::EventLoop#timeout") + end + # Creates a timeout_event. def create_timeout_event(fiber) : Crystal::EventLoop::Event raise NotImplementedError.new("Crystal::Wasi::EventLoop.create_timeout_event") From 72c66d8180081a0b1eef754cc440d572c9b3292b Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Thu, 7 Aug 2025 16:28:56 +0200 Subject: [PATCH 2/6] Add `Fiber.timeout` and `#resolve_timeout?` Also `Fiber::TimeoutToken` and `Fiber::TimeoutResult`. --- src/fiber.cr | 117 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/src/fiber.cr b/src/fiber.cr index d56ad81ec380..69f24ac7b4eb 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -300,6 +300,123 @@ class Fiber Fiber.current.cancel_timeout end + struct TimeoutToken + # :nodoc: + getter value : UInt32 + + def initialize(@value : UInt32) + end + end + + enum TimeoutResult + EXPIRED + CANCELED + end + + private TIMEOUT_FLAG = 1_u32 + private TIMEOUT_COUNTER = 2_u32 + + @timeout = Atomic(UInt32).new(0_u32) + + # Suspends the current `Fiber` for *duration*. + # + # Yields a `TimeoutToken` before suspending the fiber. The token is required + # to manually cancel the timeout before *duration* expires. See + # `#resolve_timeout?` for details. + # + # The fiber will be automatically resumed after *duration* has elapsed, but it + # may be resumed earlier if the timeout has been manually canceled, yet the + # fiber will only ever be resumed once. The returned `TimeoutResult` can be + # used to determine what happened and act accordingly, for example do some + # cleanup or raise an exception if the timeout expired. + # + # ``` + # result = Fiber.timeout(5.seconds) do |cancelation_token| + # enqueue_waiter(Fiber.current, cancelation_token) + # end + # + # if result.expired? + # dequeue_waiter(Fiber.current) + # end + # ``` + # + # Consider `::sleep` if you don't need to cancel the timeout. + def self.timeout(duration : Time::Span, & : TimeoutToken ->) : TimeoutResult + timeout(until: Time.monotonic + duration) { |token| yield token } + end + + # Identical to `.timeout` but suspending the fiber until an absolute time, as + # per the monotonic clock, is reached. + # + # For example, we can retry something until 5 seconds have elapsed: + # + # ``` + # time = Time.monotonic + 5.seconds + # loop do + # break if try_something? + # result = Fiber.timeout(until: time) { |token| add_waiter(token) } + # raise "timeout" if result.expired? + # end + # ``` + def self.timeout(*, until time : Time::Span, & : TimeoutToken ->) : TimeoutResult + token = Fiber.current.create_timeout + yield token + result = Crystal::EventLoop.current.timeout(time, token) + result ? TimeoutResult::EXPIRED : TimeoutResult::CANCELED + end + + # Sets the timeout flag and increments the counter to avoid ABA issues with + # parallel threads trying to resolve the timeout while the timeout was unset + # then set again (new timeout). Since the current fiber is the only one that + # can set the timeout, we can merely set the atomic (no need for CAS). + protected def create_timeout : TimeoutToken + value = (@timeout.get(:relaxed) | TIMEOUT_FLAG) &+ TIMEOUT_COUNTER + @timeout.set(value, :relaxed) + TimeoutToken.new(value) + end + + # Tries to resolve the timeout previously set on `Fiber` using the cancelation + # *token*. See `Fiber.timeout` for details on setting the timeout. + # + # Returns true when the timeout has been resolved, false otherwise. + # + # The caller that succeeded to resolve the timeout owns the fiber and must + # eventually enqueue it. Failing to do so means that the fiber will never be + # resumed. + # + # A caller that failed to resolve the timeout must skip the fiber. Trying to + # enqueue the fiber would lead the fiber to be resumed twice! + # + # ``` + # require "wait_group" + # + # WaitGroup.wait do |wg| + # cancelation_token = nil + # + # suspended_fiber = wg.spawn do + # result = Fiber.timeout(5.seconds) do |token| + # # save the token so another fiber can try to cancel the timeout + # cancelation_token = token + # end + # + # # prints either EXPIRED or CANCELED + # puts result + # end + # + # sleep rand(4..6).seconds + # + # # let's try to cancel the timeout + # if suspended_fiber.resolve_timeout?(cancelation_token.not_nil!) + # # canceled: we must enqueue the fiber + # suspended_fiber.enqueue + # end + # end + # ``` + def resolve_timeout?(token : TimeoutToken) : Bool + _, success = @timeout.compare_and_set(token.value, token.value & ~TIMEOUT_FLAG, :relaxed, :relaxed) + success + end + # Yields to the scheduler and allows it to swap execution to other # waiting fibers. # From 841f33d38962c67c3940ae236ffe17fb6e8436ee Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 8 Aug 2025 19:47:40 +0200 Subject: [PATCH 3/6] Suggestions from @straight-shoota --- src/crystal/event_loop.cr | 4 ++-- src/crystal/event_loop/iocp.cr | 2 +- src/crystal/event_loop/libevent.cr | 2 +- src/crystal/event_loop/polling.cr | 2 +- src/crystal/event_loop/wasi.cr | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index 3eb3b871d514..7e38b4862cca 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -88,8 +88,8 @@ abstract class Crystal::EventLoop # monotonic clock). Must resolve the timeout using `Fiber.resolve_timeout?` # and *token* before resuming the fiber. # - # Returns true if the timer expired, false otherwise. - abstract def timeout(time : Time::Span, token : Fiber::TimeoutToken) : Bool + # Returns true if the timer expired, false if it was canceled. + abstract def timeout(until time : Time::Span, token : Fiber::TimeoutToken) : Bool # Create a new resume event for a fiber. # diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index d31bf829cd3c..7a0fdc5e7f1c 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -220,7 +220,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop raise "BUG: #{timer.fiber} called sleep but was manually resumed before the timer expired!" end - def timeout(time : ::Time::Span, token : Fiber::TimeoutToken) : Bool + def timeout(until time : Time::Span, token : Fiber::TimeoutToken) : Bool timer = Timer.new(:timeout, Fiber.current) timer.wake_at = time timer.timeout_token = token diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index 9025305df118..11a838e0f2d0 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -64,7 +64,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop end end - def timeout(time : ::Time::Span, token : Fiber::TimeoutToken) : Bool + def timeout(until time : Time::Span, token : Fiber::TimeoutToken) : Bool arg = TimeoutData.new(Fiber.current, token) event = event_base.new_event(-1, LibEvent2::EventFlags::None, arg) do |s, flags, data| diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 41ce558fcbcb..9a78d9cd81ff 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -156,7 +156,7 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop raise "BUG: #{event.fiber} called sleep but was manually resumed before the timer expired!" end - def timeout(time : ::Time::Span, token : Fiber::TimeoutToken) : Bool + def timeout(until time : Time::Span, token : Fiber::TimeoutToken) : Bool event = Event.new(:timeout, Fiber.current) event.wake_at = time event.timeout_token = token diff --git a/src/crystal/event_loop/wasi.cr b/src/crystal/event_loop/wasi.cr index 7eb499572ba4..037216b345b7 100644 --- a/src/crystal/event_loop/wasi.cr +++ b/src/crystal/event_loop/wasi.cr @@ -21,7 +21,7 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop raise NotImplementedError.new("Crystal::Wasi::EventLoop.sleep") end - def timeout(time : ::Time::Span, token : Fiber::TimeoutToken) : Bool + def timeout(until time : Time::Span, token : Fiber::TimeoutToken) : Bool raise NotImplementedError.new("Crystal::Wasi::EventLoop#timeout") end From a9919abc57758af170f311e9aff88b2110f605bc Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 8 Aug 2025 19:57:17 +0200 Subject: [PATCH 4/6] Add simple specs --- spec/std/fiber_spec.cr | 58 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/spec/std/fiber_spec.cr b/spec/std/fiber_spec.cr index 5c85d1a7475d..f0f6d86daf8d 100644 --- a/spec/std/fiber_spec.cr +++ b/spec/std/fiber_spec.cr @@ -1,4 +1,5 @@ require "spec" +require "wait_group" describe Fiber do it "#resumable?" do @@ -18,4 +19,61 @@ describe Fiber do resumable.should be_false end + + describe ".timeout" do + it "expires" do + cancelation_token = nil + channel = Channel(Fiber::TimeoutResult).new + + fiber = spawn do + result = Fiber.timeout(10.milliseconds) { |token| cancelation_token = token } + channel.send(result) + end + + channel.receive.should eq(Fiber::TimeoutResult::EXPIRED) + end + + it "is canceled" do + cancelation_token = nil + channel = Channel(Fiber::TimeoutResult).new + + fiber = spawn do + result = Fiber.timeout(1.second) { |token| cancelation_token = token } + channel.send(result) + end + + until cancelation_token + Fiber.yield + end + + if fiber.resolve_timeout?(cancelation_token.not_nil!) + fiber.enqueue + end + + channel.receive.should eq(Fiber::TimeoutResult::CANCELED) + end + + it "expires or is canceled" do + 20.times do + WaitGroup.wait do |wg| + cancelation_token = nil + + suspended_fiber = wg.spawn do + Fiber.timeout(10.milliseconds) do |token| + # save the token so another fiber can try to cancel the timeout + cancelation_token = token + end + end + + sleep rand(9..11).milliseconds + + # let's try to cancel the timeout + if suspended_fiber.resolve_timeout?(cancelation_token.not_nil!) + # canceled: we must enqueue the fiber + suspended_fiber.enqueue + end + end + end + end + end end From 097893c6a9e4de841589e80843bab4d2681a29f0 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 26 Aug 2025 12:19:36 +0200 Subject: [PATCH 5/6] Proposed changes from RFC 14 comments - `Fiber::CancelationToken` instead of `TimeoutToken` so we can internally reuse the mechanism for select regardless of the presence of a timeout action - `Fiber.sleep(Time::Span, & : CancelationToken -> TimeoutResult)` instead of `.timeout` - `Fiber#resolve_timer?(token : CancelationToken) : Bool` instead of `#resolve_timeout?` - `::sleep(Time::Span)` becomes a shortcut for `Fiber.sleep(Time::Span) { }` - `Crystal::EventLoop#sleep(Time::Span, CancelationToken)` to replace the existing method instead of introducing a new `#timeout` method --- spec/std/fiber_spec.cr | 4 +- src/concurrent.cr | 2 +- src/crystal/event_loop.cr | 11 ++-- src/crystal/event_loop/iocp.cr | 37 +++-------- src/crystal/event_loop/iocp/timer.cr | 2 +- src/crystal/event_loop/libevent.cr | 19 ++---- src/crystal/event_loop/polling.cr | 31 ++------- src/crystal/event_loop/polling/event.cr | 3 +- src/crystal/event_loop/wasi.cr | 8 +-- src/fiber.cr | 88 ++++++++++++------------- 10 files changed, 77 insertions(+), 128 deletions(-) diff --git a/spec/std/fiber_spec.cr b/spec/std/fiber_spec.cr index f0f6d86daf8d..9b6190bfae0e 100644 --- a/spec/std/fiber_spec.cr +++ b/spec/std/fiber_spec.cr @@ -46,7 +46,7 @@ describe Fiber do Fiber.yield end - if fiber.resolve_timeout?(cancelation_token.not_nil!) + if fiber.resolve_timer?(cancelation_token.not_nil!) fiber.enqueue end @@ -68,7 +68,7 @@ describe Fiber do sleep rand(9..11).milliseconds # let's try to cancel the timeout - if suspended_fiber.resolve_timeout?(cancelation_token.not_nil!) + if suspended_fiber.resolve_timer?(cancelation_token.not_nil!) # canceled: we must enqueue the fiber suspended_fiber.enqueue end diff --git a/src/concurrent.cr b/src/concurrent.cr index 58efd182a927..4160504bd71d 100644 --- a/src/concurrent.cr +++ b/src/concurrent.cr @@ -26,7 +26,7 @@ end # fibers might start their execution. def sleep(time : Time::Span) : Nil Crystal.trace :sched, "sleep", for: time - Crystal::EventLoop.current.sleep(time) + Fiber.sleep(time) { } end # Blocks the current fiber forever. diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index 7e38b4862cca..a613bee154a9 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -81,15 +81,12 @@ abstract class Crystal::EventLoop # time in parallel, but this assumption may change in the future! abstract def interrupt : Nil - # Suspend the current fiber for *duration*. - abstract def sleep(duration : Time::Span) : Nil - - # Suspend the current fiber until the absolute *time* is reached (as per the - # monotonic clock). Must resolve the timeout using `Fiber.resolve_timeout?` - # and *token* before resuming the fiber. + # Suspends the current fiber until the absolute *time* is reached (as per the + # monotonic clock). Must resolve the timeout using `Fiber.resolve_timer?` and + # *token* before resuming the fiber. # # Returns true if the timer expired, false if it was canceled. - abstract def timeout(until time : Time::Span, token : Fiber::TimeoutToken) : Bool + abstract def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool # Create a new resume event for a fiber. # diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index 7a0fdc5e7f1c..59e7cd0d7f39 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -144,16 +144,12 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop case timer.value.type in .sleep? - timer.value.timed_out! + # the timer might have been canceled already, and we must synchronize with + # the resumed `#timeout` fiber; by rule we must always resume the fiber, + # regardless of whether we resolve the timeout or not. + timer.value.timed_out! if fiber.resolve_timer?(timer.value.cancelation_token) in .timeout? - if token = timer.value.timeout_token? - # the timeout might have been canceled already, and we must synchronize - # with the resumed `#timeout` fiber; by rule we must always resume the - # fiber, regardless of whether we resolve the timeout or not. - timer.value.timed_out! if fiber.resolve_timeout?(token) - else - timer.value.timed_out! - end + timer.value.timed_out! in .select_timeout? return unless select_action = fiber.timeout_select_action fiber.timeout_select_action = nil @@ -205,25 +201,10 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop end end - def sleep(duration : Time::Span) : Nil - timer = Timer.new(:sleep, Fiber.current, duration) - add_timer(pointerof(timer)) - Fiber.suspend - - # safety check - return if timer.timed_out? - - # try to avoid a double resume if possible, but another thread might be - # running the evloop and dequeue the event in parallel, so a "can't resume - # dead fiber" can still happen in a MT execution context. - delete_timer(pointerof(timer)) - raise "BUG: #{timer.fiber} called sleep but was manually resumed before the timer expired!" - end - - def timeout(until time : Time::Span, token : Fiber::TimeoutToken) : Bool - timer = Timer.new(:timeout, Fiber.current) + def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool + timer = Timer.new(:sleep, Fiber.current) timer.wake_at = time - timer.timeout_token = token + timer.cancelation_token = token add_timer(pointerof(timer)) Fiber.suspend @@ -243,6 +224,8 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop # expired and false if the fiber was resumed early. # # Specific to IOCP to handle IO timeouts. + # + # TODO: use sleep(time, token) instead def timeout(duration : Time::Span) : Bool event = Fiber.current.resume_event event.add(duration) diff --git a/src/crystal/event_loop/iocp/timer.cr b/src/crystal/event_loop/iocp/timer.cr index ff8bcda3b754..60eb02abd962 100644 --- a/src/crystal/event_loop/iocp/timer.cr +++ b/src/crystal/event_loop/iocp/timer.cr @@ -19,7 +19,7 @@ struct Crystal::EventLoop::IOCP::Timer # trigger. Nil for IO events without a timeout. getter! wake_at : Time::Span - property! timeout_token : Fiber::TimeoutToken + property! cancelation_token : Fiber::CancelationToken # True if an IO event has timed out (i.e. we're past `#wake_at`). getter? timed_out : Bool = false diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index 11a838e0f2d0..84c967d000ba 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -50,27 +50,22 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop event_base.loop_exit end - def sleep(duration : ::Time::Span) : Nil - Fiber.current.resume_event.add(duration) - Fiber.suspend - end - - class TimeoutData + class SleepData property fiber : Fiber - property timeout_token : Fiber::TimeoutToken + property cancelation_token : Fiber::CancelationToken property? expired : Bool = false - def initialize(@fiber, @timeout_token) + def initialize(@fiber, @cancelation_token) end end - def timeout(until time : Time::Span, token : Fiber::TimeoutToken) : Bool - arg = TimeoutData.new(Fiber.current, token) + def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool + arg = SleepData.new(Fiber.current, token) event = event_base.new_event(-1, LibEvent2::EventFlags::None, arg) do |s, flags, data| - d = data.as(TimeoutData) + d = data.as(SleepData) f = d.fiber - if f.resolve_timeout?(d.timeout_token) + if f.resolve_timer?(d.cancelation_token) d.expired = true {% if flag?(:execution_context) %} event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 9a78d9cd81ff..5d31cfd256b6 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -141,25 +141,10 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop # fiber interface, see Crystal::EventLoop - def sleep(duration : ::Time::Span) : Nil - event = Event.new(:sleep, Fiber.current, timeout: duration) - add_timer(pointerof(event)) - Fiber.suspend - - # safety check - return if event.timed_out? - - # try to avoid a double resume if possible, but another thread might be - # running the evloop and dequeue the event in parallel, so a "can't resume - # dead fiber" can still happen in a MT execution context. - delete_timer(pointerof(event)) - raise "BUG: #{event.fiber} called sleep but was manually resumed before the timer expired!" - end - - def timeout(until time : Time::Span, token : Fiber::TimeoutToken) : Bool - event = Event.new(:timeout, Fiber.current) + def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool + event = Event.new(:sleep, Fiber.current) event.wake_at = time - event.timeout_token = token + event.cancelation_token = token add_timer(pointerof(event)) Fiber.suspend @@ -625,13 +610,11 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop fiber.timeout_select_action = nil return unless select_action.time_expired? fiber.@timeout_event.as(FiberEvent).clear - when .timeout? - # the timeout might have been canceled already, and we must synchronize - # with the resumed `#timeout` fiber; by rule we must always resume the - # fiber, regardless of whether we resolve the timeout or not. - event.value.timed_out! if fiber.resolve_timeout?(event.value.timeout_token) when .sleep? - event.value.timed_out! + # the timer might have been canceled already, and we must synchronize with + # the resumed `#timeout` fiber; by rule we must always resume the fiber, + # regardless of whether we resolve the timeout or not. + event.value.timed_out! if fiber.resolve_timer?(event.value.cancelation_token) else raise RuntimeError.new("BUG: unexpected event in timers: #{event.value}%s\n") end diff --git a/src/crystal/event_loop/polling/event.cr b/src/crystal/event_loop/polling/event.cr index ee3e474bac2e..6f4a1d49ec85 100644 --- a/src/crystal/event_loop/polling/event.cr +++ b/src/crystal/event_loop/polling/event.cr @@ -14,7 +14,6 @@ struct Crystal::EventLoop::Polling::Event IoWrite Sleep SelectTimeout - Timeout end getter type : Type @@ -34,7 +33,7 @@ struct Crystal::EventLoop::Polling::Event # True if an IO event has timed out (i.e. we're past `#wake_at`). getter? timed_out : Bool = false - property! timeout_token : Fiber::TimeoutToken + property! cancelation_token : Fiber::CancelationToken # The event can be added to `Waiters` lists. include PointerLinkedList::Node diff --git a/src/crystal/event_loop/wasi.cr b/src/crystal/event_loop/wasi.cr index 037216b345b7..bd8383bb79ee 100644 --- a/src/crystal/event_loop/wasi.cr +++ b/src/crystal/event_loop/wasi.cr @@ -17,12 +17,8 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop raise NotImplementedError.new("Crystal::Wasi::EventLoop.interrupt") end - def sleep(duration : ::Time::Span) : Nil - raise NotImplementedError.new("Crystal::Wasi::EventLoop.sleep") - end - - def timeout(until time : Time::Span, token : Fiber::TimeoutToken) : Bool - raise NotImplementedError.new("Crystal::Wasi::EventLoop#timeout") + def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool + raise NotImplementedError.new("Crystal::Wasi::EventLoop#sleep") end # Creates a timeout_event. diff --git a/src/fiber.cr b/src/fiber.cr index 69f24ac7b4eb..64d56d3ea0fd 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -300,7 +300,7 @@ class Fiber Fiber.current.cancel_timeout end - struct TimeoutToken + struct CancelationToken # :nodoc: getter value : UInt32 @@ -313,25 +313,23 @@ class Fiber CANCELED end - private TIMEOUT_FLAG = 1_u32 - private TIMEOUT_COUNTER = 2_u32 - - @timeout = Atomic(UInt32).new(0_u32) + private TIMEOUT_FLAG = 1_u32 # bit 1 states if the timeout is set or unset + private TIMEOUT_COUNTER = 2_u32 # bits 2..32 is a counter to avoid ABA issues + @timer = Atomic(UInt32).new(0_u32) # Suspends the current `Fiber` for *duration*. # - # Yields a `TimeoutToken` before suspending the fiber. The token is required - # to manually cancel the timeout before *duration* expires. See - # `#resolve_timeout?` for details. + # Yields a `CancelationToken` before suspending the fiber. The token can be + # used to manually cancel the timer before *duration* expires to resume the + # fiber before *duration*. See `#resolve_timer?` for details. # - # The fiber will be automatically resumed after *duration* has elapsed, but it - # may be resumed earlier if the timeout has been manually canceled, yet the - # fiber will only ever be resumed once. The returned `TimeoutResult` can be - # used to determine what happened and act accordingly, for example do some - # cleanup or raise an exception if the timeout expired. + # The fiber will be resumed once, either because the timer expired (*duration* + # has elapsed) or because the timer has been canceled. The returned + # `TimeoutResult` can be used to determine what happened and act accordingly, + # for example do some cleanup or raise an exception if the timer expired. # # ``` - # result = Fiber.timeout(5.seconds) do |cancelation_token| + # result = Fiber.sleep(5.seconds) do |cancelation_token| # enqueue_waiter(Fiber.current, cancelation_token) # end # @@ -339,14 +337,12 @@ class Fiber # dequeue_waiter(Fiber.current) # end # ``` - # - # Consider `::sleep` if you don't need to cancel the timeout. - def self.timeout(duration : Time::Span, & : TimeoutToken ->) : TimeoutResult - timeout(until: Time.monotonic + duration) { |token| yield token } + def self.sleep(duration : Time::Span, & : CancelationToken ->) : TimeoutResult + sleep(until: Time.monotonic + duration) { |token| yield token } end - # Identical to `.timeout` but suspending the fiber until an absolute time, as - # per the monotonic clock, is reached. + # Identical to `.sleep` but suspends the fiber until an absolute time, as per + # the monotonic clock, is reached. # # For example, we can retry something until 5 seconds have elapsed: # @@ -354,38 +350,36 @@ class Fiber # time = Time.monotonic + 5.seconds # loop do # break if try_something? - # result = Fiber.timeout(until: time) { |token| add_waiter(token) } + # result = Fiber.sleep(until: time) { |token| add_waiter(token) } # raise "timeout" if result.expired? # end # ``` - def self.timeout(*, until time : Time::Span, & : TimeoutToken ->) : TimeoutResult - token = Fiber.current.create_timeout + def self.sleep(*, until time : Time::Span, & : CancelationToken ->) : TimeoutResult + token = Fiber.current.new_cancelation_token yield token - result = Crystal::EventLoop.current.timeout(time, token) + result = Crystal::EventLoop.current.sleep(time, token) result ? TimeoutResult::EXPIRED : TimeoutResult::CANCELED end # Sets the timeout flag and increments the counter to avoid ABA issues with - # parallel threads trying to resolve the timeout while the timeout was unset - # then set again (new timeout). Since the current fiber is the only one that - # can set the timeout, we can merely set the atomic (no need for CAS). - protected def create_timeout : TimeoutToken - value = (@timeout.get(:relaxed) | TIMEOUT_FLAG) &+ TIMEOUT_COUNTER - @timeout.set(value, :relaxed) - TimeoutToken.new(value) + # parallel threads trying to resolve the timer while the timer was unset then + # set again (new timer). Since the current fiber is the only one that can set + # the timer, we can merely set the atomic (no need for CAS). + protected def new_cancelation_token : CancelationToken + value = (@timer.get(:relaxed) | TIMEOUT_FLAG) &+ TIMEOUT_COUNTER + @timer.set(value, :relaxed) + CancelationToken.new(value) end - # Tries to resolve the timeout previously set on `Fiber` using the cancelation - # *token*. See `Fiber.timeout` for details on setting the timeout. + # Tries to resolve a sleeping `Fiber` using the cancelation *token*. # - # Returns true when the timeout has been resolved, false otherwise. + # Returns true when the timer has been resolved, false otherwise. # - # The caller that succeeded to resolve the timeout owns the fiber and must - # eventually enqueue it. Failing to do so means that the fiber will never be - # resumed. + # On success, the caller owns the fiber and must eventually enqueue it. + # Failing to do so means that the fiber will never be resumed. # - # A caller that failed to resolve the timeout must skip the fiber. Trying to - # enqueue the fiber would lead the fiber to be resumed twice! + # On failure, the caller must skip the fiber. Trying to enqueue the fiber + # would lead to resume the fiber twice. # # ``` # require "wait_group" @@ -394,8 +388,8 @@ class Fiber # cancelation_token = nil # # suspended_fiber = wg.spawn do - # result = Fiber.timeout(5.seconds) do |token| - # # save the token so another fiber can try to cancel the timeout + # result = Fiber.sleep(5.seconds) do |token| + # # save the token so another fiber can try to cancel # cancelation_token = token # end # @@ -405,15 +399,17 @@ class Fiber # # sleep rand(4..6).seconds # - # # let's try to cancel the timeout - # if suspended_fiber.resolve_timeout?(cancelation_token.not_nil!) + # # let's try to cancel + # if suspended_fiber.resolve_timer?(cancelation_token.not_nil!) # # canceled: we must enqueue the fiber # suspended_fiber.enqueue + # else + # # expired: do nothing # end # end # ``` - def resolve_timeout?(token : TimeoutToken) : Bool - _, success = @timeout.compare_and_set(token.value, token.value & ~TIMEOUT_FLAG, :relaxed, :relaxed) + def resolve_timer?(token : CancelationToken) : Bool + _, success = @timer.compare_and_set(token.value, token.value & ~TIMEOUT_FLAG, :relaxed, :relaxed) success end @@ -450,7 +446,7 @@ class Fiber # TODO: Fiber switching and evloop for wasm32 {% unless flag?(:wasi) %} - Crystal::EventLoop.current.sleep(0.seconds) + sleep(0.seconds) { } {% end %} end From 97697d954493a814f588c44c5f4ecfd2c07582cf Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 26 Aug 2025 12:27:54 +0200 Subject: [PATCH 6/6] Fix: spec + ameba issue --- spec/std/fiber_spec.cr | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spec/std/fiber_spec.cr b/spec/std/fiber_spec.cr index 9b6190bfae0e..f22f52cc82db 100644 --- a/spec/std/fiber_spec.cr +++ b/spec/std/fiber_spec.cr @@ -20,13 +20,13 @@ describe Fiber do resumable.should be_false end - describe ".timeout" do + describe ".sleep" do it "expires" do cancelation_token = nil channel = Channel(Fiber::TimeoutResult).new - fiber = spawn do - result = Fiber.timeout(10.milliseconds) { |token| cancelation_token = token } + spawn do + result = Fiber.sleep(10.milliseconds) { |token| cancelation_token = token } channel.send(result) end @@ -38,7 +38,7 @@ describe Fiber do channel = Channel(Fiber::TimeoutResult).new fiber = spawn do - result = Fiber.timeout(1.second) { |token| cancelation_token = token } + result = Fiber.sleep(1.second) { |token| cancelation_token = token } channel.send(result) end @@ -59,15 +59,15 @@ describe Fiber do cancelation_token = nil suspended_fiber = wg.spawn do - Fiber.timeout(10.milliseconds) do |token| - # save the token so another fiber can try to cancel the timeout + Fiber.sleep(10.milliseconds) do |token| + # save the token so another fiber can try to cancel the timer cancelation_token = token end end sleep rand(9..11).milliseconds - # let's try to cancel the timeout + # let's try to cancel the timer if suspended_fiber.resolve_timer?(cancelation_token.not_nil!) # canceled: we must enqueue the fiber suspended_fiber.enqueue