Skip to content

Add CAS support #197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion TODO.org
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@
** TODO add AS::Cache wrappper instead of Rails class
** TODO Add resiliency around the client
** TODO Implement connection pool
** TODO CAS
** TODO add a better pipeline API
28 changes: 26 additions & 2 deletions ext/memcached/memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const char *MEMCACHED_ERROR_NAMES[] = {
"ClientError", // MEMCACHED_CLIENT_ERROR
"ServerError", // MEMCACHED_SERVER_ERROR
"StdError", // MEMCACHED_ERROR
"DataExist", // MEMCACHED_DATA_EXISTS
NULL, // MEMCACHED_DATA_EXISTS
"DataDoesNotExist", // MEMCACHED_DATA_DOES_NOT_EXIST
NULL, // MEMCACHED_NOTSTORED
NULL, // MEMCACHED_STORED
Expand Down Expand Up @@ -175,6 +175,28 @@ rb_connection_flush(VALUE self)
return (rc == MEMCACHED_SUCCESS) ? Qtrue : Qfalse;
}

static VALUE
rb_connection_cas(VALUE self, VALUE rb_key, VALUE rb_value, VALUE rb_ttl, VALUE rb_flags, VALUE rb_cas)
{
memcached_st *mc;
memcached_return_t rc;

UnwrapMemcached(self, mc);
Check_Type(rb_key, T_STRING);
Check_Type(rb_value, T_STRING);
Check_Type(rb_ttl, T_FIXNUM);
Check_Type(rb_flags, T_FIXNUM);
Check_Type(rb_cas, T_FIXNUM);

rc = memcached_cas(mc,
RSTRING_PTR(rb_key), RSTRING_LEN(rb_key),
RSTRING_PTR(rb_value), RSTRING_LEN(rb_value),
FIX2INT(rb_ttl), FIX2INT(rb_flags), FIX2INT(rb_cas)
);

rb_memcached_return(rc);
}

static VALUE
rb_connection_set(VALUE self, VALUE rb_key, VALUE rb_value, VALUE rb_ttl, VALUE rb_flags)
{
Expand Down Expand Up @@ -236,8 +258,9 @@ rb_connection__mget_callback(const memcached_st *mc, memcached_result_st *result
memcached_result_value(result),
memcached_result_length(result));
uint32_t ret_flags = memcached_result_flags(result);
uint64_t cas = memcached_result_cas(result);

rb_hash_aset(rb_result, rb_key, rb_ary_new3(2, rb_value, INT2FIX(ret_flags)));
rb_hash_aset(rb_result, rb_key, rb_ary_new3(3, rb_value, INT2FIX(ret_flags), INT2FIX(cas)));
return MEMCACHED_SUCCESS;
}

Expand Down Expand Up @@ -530,6 +553,7 @@ void Init_memcached(void)
rb_define_method(rb_cConnection, "servers", rb_connection_servers, 0);
rb_define_method(rb_cConnection, "flush", rb_connection_flush, 0);
rb_define_method(rb_cConnection, "set", rb_connection_set, 4);
rb_define_method(rb_cConnection, "cas", rb_connection_cas, 5);
rb_define_method(rb_cConnection, "get", rb_connection_get, 1);
rb_define_method(rb_cConnection, "get_multi", rb_connection_get_multi, 1);
rb_define_method(rb_cConnection, "delete", rb_connection_delete, 1);
Expand Down
29 changes: 29 additions & 0 deletions lib/memcached/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,35 @@ def get_multi(keys, raw: nil)
hash
end

def cas(keys, ttl: @default_ttl, raw: nil)
responses = connection.get_multi(keys)
return false if responses.empty?

values_hash = if raw
responses.transform_values(&:first)
else
hash = responses.dup
hash.each do |key, (raw_value, flags)|
hash[key] = @codec.decode(key, raw_value, flags)
end
end
values_hash = yield values_hash

success = true
responses.each do |key, (_orig_value, flags, cas)|
if values_hash.key?(key)
new_value = values_hash[key]
unless raw
new_value, flags = @codec.encode(key, new_value, flags)
end
success &= connection.cas(key, new_value, ttl, flags, cas)
else
success = false
end
end
success
end

def delete(key)
connection.delete(key)
end
Expand Down
2 changes: 1 addition & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def teardown

def cache
return @cache if @cache
@cache = Memcached::Client.new(@servers, hash: :default, distribution: :modula, prefix_key: @prefix_key)
@cache = Memcached::Client.new(@servers, hash: :default, distribution: :modula, prefix_key: @prefix_key, support_cas: true)
end

def binary_protocol_cache
Expand Down
52 changes: 52 additions & 0 deletions test/unit/client_cas_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
require 'test_helper'

class ClientCASTest < BaseTest

def test_simple_cas
cache.set(key, "foo")
assert_equal "foo", cache.get(key)
result = cache.cas([key]) do
{ key => @value }
end
assert_equal true, result
assert_equal @value, cache.get(key)
end

def test_cas_conflict
cache.set(key, "foo")
assert_equal "foo", cache.get(key)
result = cache.cas([key]) do
cache.set(key, "bar")
{ key => @value }
end
assert_equal false, result
assert_equal "bar", cache.get(key)
end

def test_cas_miss
result = cache.cas([key]) do
flunk "CAS block shouldn't be called on miss"
end
assert_equal false, result
assert_nil cache.get(key)
end

def test_cas_changed_keys
cache.set(key, "foo")
result = cache.cas([key]) do
{ "#{key}:new" => "bar" }
end
assert_equal false, result
assert_nil cache.get("#{key}:new")
end

def test_cas_partial_miss
cache.set(key, "foo")
result = cache.cas([key, "missing:#{key}"]) do |values|
assert_equal({ key => "foo" }, values)
{ key => "bar" }
end
assert_equal true, result
assert_equal "bar", cache.get(key)
end
end