Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Experimental] Worker based expiration strategy. #447

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bin/expiration_worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env ruby

require 'identity_cache'

# This is meant to run a long lived process along these lines
# Parse args from command line / env then call
#
# IdentityCache::BinlogExpirationWorker.new(*args).run
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a previous hack day project @pushrax and I explored identity cache binlog based expiration. It consisted of two parts, one part to create expiration rules from the cached associations (https://github.com/Shopify/shopify/compare/identity_cache_expiry_rules) and a binlog reader that used those rules to invalidate the cache (https://github.com/Shopify/binlog-cache-invalidator).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh cool thanks @dylanahsmith ! @ignacio-chiazzo and I might take a stab at a second expirment using CDC instead of raw logs but I think the code/principles can be adapted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If CDC is more likely to be delayed (because the message needs to flow from the binglog and to kafka and then to consumer), wouldn't we have more benefits of getting closer to the metal and consuming from the raw binlog?

Copy link

@insom insom Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did someone say CDC? 😁 anyway we have an SLO [REDACTED] -- TL;DR is that the Kafka+Sieve parts add around 500ms on top of the MySQL replication lag. The MySQL replication lag is by far the single biggest contributor to the delay. We intend to make CDC read from writers to remove the MySQL replication lag, but we're not there yet.

This is the steady state, if there were an incident or a shop move then (for a given shop's data) you would see delayed records. That said, we're solving a bunch of hard problems for you, so doing things "raw" means having to keep state on the MySQL schema and (depending on your service) shop moves, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If CDC is more likely to be delayed (because the message needs to flow from the binglog and to kafka and then to consumer), wouldn't we have more benefits of getting closer to the metal and consuming from the raw binlog?

Why is that a problem? I think the fundamental problem with IDC is the assumption of freshness.

#
# BinlongExpiration worker should be able to receive binlog events
# and based on those events expire the cache


32 changes: 32 additions & 0 deletions lib/identity_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
require "identity_cache/fallback_fetcher"
require 'identity_cache/without_primary_index'
require 'identity_cache/with_primary_index'
require 'identity_cache/noop_expirator'
require 'identity_cache/inline_expirator'

module IdentityCache
extend ActiveSupport::Concern
Expand All @@ -50,13 +52,22 @@ module IdentityCache
BATCH_SIZE = 1000
DELETED = :idc_cached_deleted
DELETED_TTL = 1000
EXPIRATION_STRATEGIES = {inline: InlineExpirator, worker: NoopExpirator}

class AlreadyIncludedError < StandardError; end
class AssociationError < StandardError; end
class InverseAssociationError < StandardError; end
class UnsupportedScopeError < StandardError; end
class UnsupportedAssociationError < StandardError; end
class DerivedModelError < StandardError; end
class ExpirationStrategyNotFound < StandardError
attr_reader :strategy
def initialize(strategy=nil)
msg = "#{strategy.to_s} is not a valid expiration strategy"
super(msg)
end
end


mattr_accessor :cache_namespace
self.cache_namespace = "IDC:#{CACHE_VERSION}:"
Expand All @@ -67,6 +78,9 @@ class DerivedModelError < StandardError; end
mattr_accessor :fetch_read_only_records
self.fetch_read_only_records = true

mattr_accessor :expiration_strategy
self.expiration_strategy = :inline

class << self
include IdentityCache::CacheHash

Expand Down Expand Up @@ -132,6 +146,24 @@ def unmap_cached_nil_for(value)
value == IdentityCache::CACHED_NIL ? nil : value
end

def reset_expiration_strategy(strategy)
@expirator = nil
self.expiration_strategy = strategy
end

def expirator
return @expirator if defined?(@expirator) and @expirator

strategy = self.expiration_strategy

unless EXPIRATION_STRATEGIES[strategy]
raise ExpirationStrategyNotFound.new(strategy)
end

@expirator = EXPIRATION_STRATEGIES[strategy].new

end

# Same as +fetch+, except that it will try a collection of keys, using the
# multiget operation of the cache adaptor.
#
Expand Down
1 change: 1 addition & 0 deletions lib/identity_cache/cache_key_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def load(cache_fetcher, db_key)
db_value = nil

cache_value = IdentityCache.fetch(cache_key) do
IdentityCache.logger.debug "Resolving miss key=#{self.name} db_key=#{db_key}"
db_value = cache_fetcher.load_one_from_db(db_key)
cache_fetcher.cache_encode(db_value)
end
Expand Down
5 changes: 3 additions & 2 deletions lib/identity_cache/cached/attribute.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ def fetch(db_key)
def expire(record)
unless record.send(:was_new_record?)
old_key = old_cache_key(record)
IdentityCache.cache.delete(old_key)
IdentityCache.expirator.expire(old_key)
end

unless record.destroyed?
new_key = new_cache_key(record)
if new_key != old_key
IdentityCache.cache.delete(new_key)
IdentityCache.expirator.expire(new_key)
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/identity_cache/cached/primary_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def fetch_multi(ids)
end

def expire(id)
id = cast_id(id)
IdentityCache.cache.delete(cache_key(id))
key = cache_key(cast_id(id))
IdentityCache.expirator.expire(key)
end

def cache_key(id)
Expand Down
8 changes: 8 additions & 0 deletions lib/identity_cache/inline_expirator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module IdentityCache
class InlineExpirator
def expire(key)
IdentityCache.logger.debug "Expiring key=#{key}"
IdentityCache.cache.delete(key)
end
end
end
7 changes: 7 additions & 0 deletions lib/identity_cache/noop_expirator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module IdentityCache
class NoopExpirator
def expire(*)
#NOOP
end
end
end
20 changes: 20 additions & 0 deletions test/identity_cache_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,24 @@ def test_should_use_cache_in_transaction
assert_equal false, IdentityCache.should_use_cache?
end
end

def test_should_use_inline_expirator_by_default
assert_instance_of IdentityCache::InlineExpirator, IdentityCache.expirator
end

def test_should_be_able_to_set_expirator_to_worker
with_expiration_strategy(:worker) do
assert_instance_of IdentityCache::NoopExpirator, IdentityCache.expirator
end
end

def test_should_raise_when_expiration_strategy_is_not_supported
error = assert_raises(IdentityCache::ExpirationStrategyNotFound) do
with_expiration_strategy(:hope) do
IdentityCache.expirator
end
end

assert_equal "hope is not a valid expiration strategy", error.message
end
end
10 changes: 10 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ def teardown
teardown_models
end

def with_expiration_strategy(new_strategy, &b)
old_strategy = IdentityCache.expiration_strategy
IdentityCache.reset_expiration_strategy(new_strategy)
yield

ensure

IdentityCache.reset_expiration_strategy(old_strategy)
end

private

def create(class_symbol)
Expand Down