Skip to content

Commit

Permalink
Introduce a distributed kv database in preparation for future.
Browse files Browse the repository at this point in the history
Signed-off-by: EdmondFrank <[email protected]>
  • Loading branch information
EdmondFrank committed Oct 21, 2024
1 parent 84833e6 commit d8047f9
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 9 deletions.
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ export MQ_CONNECTION=amqp://admin:admin@localhost
export REDIS_URL=redis://redis:6379/1
export REDIS_CHANNEL_PREFIX=compass-web-service

## Riak Host
export RIAK_HOST=192.168.240.31
export RIAK_PORT=8087

## Action cable
# export ACTION_CABLE_URL=ws://localhost:28080
# export ACTION_CABLE_ALLOWED_REQUEST_ORIGINS=http:\/\/localhost*
Expand Down
3 changes: 3 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ gem 'redis'

gem 'redis-rails'

# Use Riak Client
gem 'riak-client', github: 'spreedly/riak-ruby-client', tag: 'v2.6.2'

# Provides a low-level time-based throttle.
gem 'prorate'

Expand Down
16 changes: 16 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ GIT
omniauth (>= 1.5, < 3.0)
omniauth-oauth2 (>= 1.4.0, < 2.0)

GIT
remote: https://github.com/spreedly/riak-ruby-client.git
revision: 055164c697cbf21ed7b7d1415a984cdca5765c86
tag: v2.6.2
specs:
riak-client (2.6.2)
beefcake (~> 1.1)
cert_validator (~> 0.0.1)
i18n (>= 0.6)
innertube (~> 1.0)
multi_json (~> 1.0)

GEM
remote: https://rubygems.org/
specs:
Expand Down Expand Up @@ -115,6 +127,7 @@ GEM
aws-eventstream (~> 1, >= 1.0.2)
base64 (0.2.0)
bcrypt (3.1.18)
beefcake (1.2.0)
bigdecimal (3.1.8)
bindata (2.5.0)
bindex (0.8.1)
Expand Down Expand Up @@ -147,6 +160,7 @@ GEM
image_processing (~> 1.1)
marcel (~> 1.0.0)
ssrf_filter (~> 1.0)
cert_validator (0.0.1)
childprocess (4.1.0)
coderay (1.1.3)
colorize (0.8.1)
Expand Down Expand Up @@ -274,6 +288,7 @@ GEM
mini_magick (>= 4.9.5, < 5)
ruby-vips (>= 2.0.17, < 3)
iniparse (1.5.0)
innertube (1.1.0)
io-console (0.7.2)
irb (1.11.2)
rdoc
Expand Down Expand Up @@ -965,6 +980,7 @@ DEPENDENCIES
redis-rails
redis-session-store
rest-client
riak-client!
roo
rspec-rails (~> 5.0.0)
rubocop
Expand Down
10 changes: 1 addition & 9 deletions config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module CompassWebService
class Application < Rails::Application
# Initialize configuration defaults for originally generated Rails version.
config.load_defaults 7.0
config.autoload_lib(ignore: ["assets", "tasks", "generators"])

# Configuration for the application, engines, and railties goes here.
#
Expand All @@ -29,15 +30,6 @@ class Application < Rails::Application
namespace: 'cache'
}

config.session_store :redis_session_store,
key: 'session',
redis: {
expire_after: 1.day, # cookie expiration
ttl: 1.day, # Redis expiration, defaults to 'expire_after'
key_prefix: "#{ENV['DEFAULT_HOST']}:session:",
url: ENV.fetch('REDIS_URL') { 'redis://redis:6379/1' },
}

# Set Sneakers as the back-end for Active Job.
config.active_job.queue_adapter = :sneakers

Expand Down
9 changes: 9 additions & 0 deletions config/environments/development.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@
# [Blocked host] clear the entire whitelist, which lets through requests for all hostnames.
config.hosts.clear

config.session_store :redis_session_store,
key: 'session',
redis: {
expire_after: 1.day, # cookie expiration
ttl: 1.day, # Redis expiration, defaults to 'expire_after'
key_prefix: "#{ENV['DEFAULT_HOST']}:session:",
url: ENV.fetch('REDIS_URL') { 'redis://redis:6379/1' },
}

# enable lograge

config.lograge.enabled = true
Expand Down
10 changes: 10 additions & 0 deletions config/environments/production.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@

config.action_mailer.perform_caching = false

config.session_store :riak_session_store,
key: 'session',
riak: {
expire_after: 1.day, # cookie expiration
host: ENV.fetch('RIAK_HOST') { 'localhost' },
pb_port: ENV.fetch('RIAK_PORT') { 8087 },
key_prefix: "#{ENV['DEFAULT_HOST']}:session:"
},
serializer: :json

# Ignore bad email addresses and do not raise email delivery errors.
# Set this to true and configure the email server for immediate delivery to raise delivery errors.
# config.action_mailer.raise_delivery_errors = false
Expand Down
215 changes: 215 additions & 0 deletions lib/riak_session_store.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
require 'riak'

# Riak session storage for Rails, and for Rails only. Derived from
# the MemCacheStore code, simply dropping in Riak instead.
class RiakSessionStore < ActionDispatch::Session::AbstractSecureStore
# Rails 3.1 and beyond defines the constant elsewhere
unless defined?(ENV_SESSION_OPTIONS_KEY)
ENV_SESSION_OPTIONS_KEY = if Rack.release.split('.').first.to_i > 1
Rack::RACK_SESSION_OPTIONS
else
Rack::Session::Abstract::ENV_SESSION_OPTIONS_KEY
end
end

USE_INDIFFERENT_ACCESS = defined?(ActiveSupport).freeze
# ==== Options
# * +:key+ - Same as with the other cookie stores, key name
# * +:riak+ - A hash with riak-specific options
# * +:on_riak_down:+ - Called with err, env, and SID on Errno::ECONNREFUSED
# * +:on_session_load_error:+ - Called with err and SID on Marshal.load fail
# * +:serializer:+ - Serializer to use on session data, default is :marshal.
#
# ==== Examples
#
# Rails.application.config.session_store :riak_session_store,
# key: 'your_session_key',
# riak: {
# expire_after: 120.minutes,
# host: 'localhost',
# pb_port: 8087,
# bucket: 'sessions',
# bucket_type: 'leveled'
# },
# on_riak_down: ->(*a) { logger.error("Riak down! #{a.inspect}") },
# serializer: :hybrid # migrate from Marshal to JSON
#
def initialize(app, options = {})
super

@default_options[:namespace] = 'rack:session'
options[:bucket] ||= 'sessions'
options[:bucket_type] ||= 'default'
@default_options.merge!(options[:riak] || {})
init_options = options[:riak]&.reject { |k, _v| %i[expire_after key_prefix bucket bucket_type].include?(k) } || {}
@riak = init_options[:client] || Riak::Client.new(init_options)
@bucket = @riak.bucket_type(options[:bucket_type]).bucket(options[:bucket])
@on_riak_down = options[:on_riak_down]
@serializer = determine_serializer(options[:serializer])
@on_session_load_error = options[:on_session_load_error]
verify_handlers!
end

attr_accessor :on_riak_down, :on_session_load_error

private

attr_reader :riak, :bucket, :key, :default_options, :serializer

# overrides method defined in rack to actually verify session existence
# Prevents needless new sessions from being created in scenario where
# user HAS session id, but it already expired, or is invalid for some
# other reason, and session was accessed only for reading.
def session_exists?(env)
value = current_session_id(env)

!!(
value && !value.empty? &&
key_exists?(value)
)
rescue RuntimeError => e
on_riak_down.call(e, env, value) if on_riak_down

true
end

def key_exists?(value)
bucket.exists?(prefixed(value))
end

def verify_handlers!
%w(on_riak_down on_session_load_error).each do |h|
next unless (handler = public_send(h)) && !handler.respond_to?(:call)

raise ArgumentError, "#{h} handler is not callable"
end
end

def prefixed(sid)
"#{default_options[:key_prefix]}#{sid}"
end

def session_default_values
[generate_sid, USE_INDIFFERENT_ACCESS ? {}.with_indifferent_access : {}]
end

def get_session(env, sid)
sid && (session = load_session_from_riak(sid)) ? [sid, session] : session_default_values
rescue RuntimeError => e
on_riak_down.call(e, env, sid) if on_riak_down
session_default_values
end
alias find_session get_session

def load_session_from_riak(sid)
begin
data = bucket.get(prefixed(sid))
session_data = data ? decode(data.data) : nil
if session_data &&
(session_data['timestamp'].to_i + session_data['expiry'].to_i) > Time.now.to_i
session_data['data']
else
destroy_session_from_sid(sid, drop: true)
end
rescue StandardError => e
destroy_session_from_sid(sid, drop: true)
on_session_load_error.call(e, sid) if on_session_load_error
nil
end
end

def decode(data)
session = serializer.load(data)
USE_INDIFFERENT_ACCESS ? session.with_indifferent_access : session
end

def set_session(env, sid, session_data, options = nil)
expiry = get_expiry(env, options)
session = bucket.new(prefixed(sid))
session.data = encode({ 'timestamp' => Time.now.to_i, 'expiry' => expiry.to_i, 'data' => session_data })
session.content_type = determine_content_type(options[:serializer])
session.store
sid
rescue RuntimeError => e
on_riak_down.call(e, env, sid) if on_riak_down
false
end
alias write_session set_session

def get_expiry(env, options)
session_storage_options = options || env.fetch(ENV_SESSION_OPTIONS_KEY, {})
session_storage_options[:ttl] || session_storage_options[:expire_after]
end

def encode(session_data)
serializer.dump(session_data)
end

def destroy_session(env, sid, options)
destroy_session_from_sid(sid, (options || {}).to_hash.merge(env: env))
end
alias delete_session destroy_session

def destroy(env)
if env['rack.request.cookie_hash'] &&
(sid = env['rack.request.cookie_hash'][key])
sid = Rack::Session::SessionId.new(sid)
destroy_session_from_sid(sid, drop: true, env: env)
end
false
end

def destroy_session_from_sid(sid, options = {})
bucket.delete(prefixed(sid))
(options || {})[:drop] ? nil : generate_sid
rescue RuntimeError => e
on_riak_down.call(e, options[:env] || {}, sid) if on_riak_down
end

def determine_serializer(serializer)
serializer ||= :marshal
case serializer
when :marshal then Marshal
when :json then JsonSerializer
when :hybrid then HybridSerializer
else serializer
end
end

def determine_content_type(serializer)
serializer ||= :marshal
case serializer
when :marshal then 'application/x-ruby-marshal'
when :json then 'application/json'
else 'application/x-ruby-marshal'
end
end

# Uses built-in JSON library to encode/decode session
class JsonSerializer
def self.load(value)
JSON.parse(value, quirks_mode: true)
end

def self.dump(value)
JSON.generate(value, quirks_mode: true)
end
end

# Transparently migrates existing session values from Marshal to JSON
class HybridSerializer < JsonSerializer
MARSHAL_SIGNATURE = "\x04\x08".freeze

def self.load(value)
if needs_migration?(value)
Marshal.load(value)
else
super
end
end

def self.needs_migration?(value)
value.start_with?(MARSHAL_SIGNATURE)
end
end
end

0 comments on commit d8047f9

Please sign in to comment.