Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/rage-rb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ def self.events
Rage::Events
end

# Shorthand to access {Rage::SSE Rage::SSE}.
# @return [Rage::SSE]
def self.sse
Rage::SSE
end

# Configure routes for the Rage application.
# @return [Rage::Router::DSL::Handler]
# @example
Expand Down Expand Up @@ -185,6 +191,8 @@ module ActiveRecord
autoload :OpenAPI, "rage/openapi/openapi"
autoload :Deferred, "rage/deferred/deferred"
autoload :Events, "rage/events/events"
autoload :SSE, "rage/sse/sse"
autoload :PubSub, "rage/pubsub/pubsub"
end

module RageController
Expand Down
18 changes: 16 additions & 2 deletions lib/rage/controller/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -492,15 +492,15 @@ def session
# @example
# render plain: "hello world", status: 201
# @note `render` doesn't terminate execution of the action, so if you want to exit an action after rendering, you need to do something like `render(...) and return`.
def render(json: nil, plain: nil, status: nil)
def render(json: nil, plain: nil, sse: nil, status: nil)
raise "Render was called multiple times in this action" if @__rendered
@__rendered = true

if json || plain
@__body << if json
json.is_a?(String) ? json : json.to_json
else
headers["content-type"] = "text/plain; charset=utf-8"
@__headers["content-type"] = "text/plain; charset=utf-8"
plain.to_s
end

Expand All @@ -514,6 +514,20 @@ def render(json: nil, plain: nil, status: nil)
status
end
end

if sse
raise "already rendered" unless @__body.empty?

unless @__env["rack.upgrade?"] == :sse
@__status = 406
@__body << "Bad Request: Expected an SSE connection"
return
end

@__env["rack.upgrade"] = Rage::SSE::Application.new(sse)
@__status = 0 # TODO render 204
@__headers["content-type"] = "text/event-stream"
end
end

# Send a response with no body.
Expand Down
2 changes: 1 addition & 1 deletion lib/rage/fiber_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def block(_blocker, timeout = nil)
unless fulfilled
fulfilled = true
::Iodine.defer { ::Iodine.unsubscribe(channel) }
f.resume
f.resume if f.alive?
end
end

Expand Down
23 changes: 23 additions & 0 deletions lib/rage/pubsub/adapters/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

class Rage::PubSub::Adapters::Base
def pick_a_worker(&block)
_lock, lock_path = Tempfile.new.yield_self { |file| [file, file.path] }

caller = -> do
if File.new(lock_path).flock(File::LOCK_EX | File::LOCK_NB)
if Rage.logger.debug?
puts "INFO: #{Process.pid} is managing #{self.class.name.split("::").last} subscriptions."
end
block.call
end
end

# TODO: move to root
if Iodine.running?
caller.call
else
Iodine.on_state(:on_start, &caller)
end
end
end
128 changes: 128 additions & 0 deletions lib/rage/pubsub/adapters/redis.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# frozen_string_literal: true

require "securerandom"

if !defined?(RedisClient)
fail <<~ERR

Redis adapter depends on the `redis-client` gem. Add the following line to your Gemfile:
gem "redis-client"

ERR
end

class Rage::PubSub::Adapters::Redis < Rage::PubSub::Adapters::Base
DEFAULT_REDIS_OPTIONS = { reconnect_attempts: [0.05, 0.1, 0.5] }
REDIS_MIN_VERSION_SUPPORTED = Gem::Version.create(6)

def initialize(stream_name, broadcaster, config)
@redis_stream = if (prefix = config.delete(:channel_prefix))
"#{prefix}:#{stream_name}"
else
stream_name
end

@broadcaster = broadcaster
@redis_config = RedisClient.config(**DEFAULT_REDIS_OPTIONS.merge(config))
@server_uuid = SecureRandom.uuid

redis_version = get_redis_version
if redis_version < REDIS_MIN_VERSION_SUPPORTED
raise "Redis adapter only supports Redis 6+. Detected Redis version: #{redis_version}."
end

@trimming_strategy = redis_version < Gem::Version.create("6.2.0") ? :maxlen : :minid

pick_a_worker { poll }
end

def publish(stream_name, data)
message_uuid = SecureRandom.uuid

publish_redis.call(
"XADD",
@redis_stream,
trimming_method, "~", trimming_value,
"*",
"1", stream_name,
"2", data.to_json,
"3", @server_uuid,
"4", message_uuid
)
end

private

def publish_redis
@publish_redis ||= @redis_config.new_client
end

def trimming_method
@trimming_strategy == :maxlen ? "MAXLEN" : "MINID"
end

def trimming_value
@trimming_strategy == :maxlen ? "10000" : ((Time.now.to_f - 5 * 60) * 1000).to_i
end

def get_redis_version
service_redis = @redis_config.new_client
version = service_redis.call("INFO").match(/redis_version:([[:graph:]]+)/)[1]

Gem::Version.create(version)

rescue RedisClient::Error => e
puts "FATAL: Couldn't connect to Redis - all broadcasts will be limited to the current server."
puts e.backtrace.join("\n")
REDIS_MIN_VERSION_SUPPORTED

ensure
service_redis.close
end

def error_backoff_intervals
@error_backoff_intervals ||= Enumerator.new do |y|
y << 0.2 << 0.5 << 1 << 2 << 5
loop { y << 10 }
end
end

def poll
unless Fiber.scheduler
Fiber.set_scheduler(Rage::FiberScheduler.new)
end

Iodine.on_state(:start_shutdown) do
@stopping = true
end

Fiber.schedule do
read_redis = @redis_config.new_client
last_id = (Time.now.to_f * 1000).to_i
last_message_uuid = nil

loop do
data = read_redis.blocking_call(5, "XREAD", "COUNT", "100", "BLOCK", "5000", "STREAMS", @redis_stream, last_id)

if data
data[@redis_stream].each do |id, (_, stream_name, _, serialized_data, _, broadcaster_uuid, _, message_uuid)|
if broadcaster_uuid != @server_uuid && message_uuid != last_message_uuid
@broadcaster.broadcast(stream_name, JSON.parse(serialized_data))
end

last_id = id
last_message_uuid = message_uuid
end
end

rescue RedisClient::Error => e
Rage.logger.error("Subscriber error: #{e.message} (#{e.class})")
sleep error_backoff_intervals.next
rescue => e
@stopping ? break : raise(e)
else
error_backoff_intervals.rewind
end
end
end
end
6 changes: 6 additions & 0 deletions lib/rage/pubsub/pubsub.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module Rage::PubSub
module Adapters
autoload :Base, "rage/pubsub/adapters/base"
autoload :Redis, "rage/pubsub/adapters/redis"
end
end
4 changes: 4 additions & 0 deletions lib/rage/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ def route_uri_pattern
end
end

def sse?
@env["rack.upgrade?"] == :sse
end

private

def rack_request
Expand Down
58 changes: 58 additions & 0 deletions lib/rage/sse/application.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

# @private
class Rage::SSE::Application
def initialize(stream)
@stream = stream

@type = if stream.is_a?(Enumerator)
@streamer = create_enum_streamer
:enum
elsif stream.is_a?(Proc)
@streamer = create_proc_streamer
:proc
elsif stream.is_a?(Rage::SSE::Stream)
:stream
else
:object
end
end

def on_open(connection)
case @type
when :enum, :proc
@streamer.resume(connection)
when :stream
connection.subscribe("sse:#{@stream.id}") # TODO: hash? # TODO: broadcast right away?
when :object
connection.write(Rage::SSE.__serialize(@stream))
connection.close
end
end

private

def create_enum_streamer
Fiber.schedule do
connection = Fiber.yield

@stream.each do |event|
break if !connection.open?
connection.write(Rage::SSE.__serialize(event)) if event
end
rescue => e
Rage.logger.error("SSE stream failed with exception: #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}")
ensure
connection.close
end
end

def create_proc_streamer
Fiber.schedule do
connection = Fiber.yield
@stream.call(Rage::SSE::ConnectionProxy.new(connection))
rescue => e
Rage.logger.error("SSE stream failed with exception: #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}")
end
end
end
34 changes: 34 additions & 0 deletions lib/rage/sse/connection_proxy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# frozen_string_literal: true

class Rage::SSE::ConnectionProxy
def initialize(connection)
@connection = connection
end

def write(data)
raise IOError, "closed stream" unless @connection.open?
@connection.write(data)
end

alias_method :<<, :write

def close
@connection.close
end

alias_method :close_write, :close

def closed?
[email protected]?
end

def flush
raise IOError, "closed stream" unless @connection.open?
end

def read(...)
end

def close_read
end
end
22 changes: 22 additions & 0 deletions lib/rage/sse/message.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# A class representing an SSE event. Use it to specify the `id`, `event`, and `retry` fields in an SSE.
#
# @!attribute id
# @return [String] The `id` field of the SSE event.
# @!attribute event
# @return [String] The `event` field of the SSE event.
# @!attribute retry
# @return [Integer] The `retry` field of the SSE event, in milliseconds.
# @!attribute data
# @return [String, #to_json] The `data` field of the SSE event. If it's an object, it will be serialized to JSON.
Rage::SSE::Message = Struct.new(:id, :event, :retry, :data, keyword_init: true) do
def to_s
str = ""

str << "id: #{id}\n" if id
str << "event: #{event}\n" if event
str << "retry: #{self.retry}\n" if self.retry
str << "data: #{data.is_a?(String) ? data : data.to_json}\n" if data # TODO: multiline

str + "\n"
end
end
Loading