Skip to content

Commit

Permalink
use a waiting queue to store more waiting clients
Browse files Browse the repository at this point in the history
  • Loading branch information
zhf committed Nov 27, 2012
1 parent c04c8ac commit da10282
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 26 deletions.
12 changes: 6 additions & 6 deletions lock_client_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,21 @@ lock_client_cache::wait_for_lock(lock_protocol::lockid_t lid){

lock_protocol::status
lock_client_cache::acquire_from_server(lock_protocol::lockid_t lid){
lock_protocol::status ret = lock_protocol::RETRY;
int r;
tprintf("acquire from server: lid=>%llu tid=>%lu id=>%s\n", lid, pthread_self(), id.c_str());
lock_protocol::status ret = cl->call(lock_protocol::acquire, lid, id, r);

// using a particular pthread_cond to replace ?
while(ret == lock_protocol::RETRY){
pthread_mutex_lock(&lock_mutex);
tprintf("acquire from server: lid=>%llu tid=>%lu id=>%s\n", lid, pthread_self(), id.c_str());
if(locks[lid].state == lock_client_info::LOCKED){
pthread_mutex_unlock(&lock_mutex);
return lock_protocol::OK;
}else{
pthread_mutex_unlock(&lock_mutex);
}

int r;
ret = cl->call(lock_protocol::acquire, lid, id, r);
}

//cache this lock
pthread_mutex_lock(&lock_mutex);
locks[lid].state = lock_client_info::LOCKED;
Expand Down
1 change: 0 additions & 1 deletion lock_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "lock_client.h"
#include "lang/verify.h"

#include <map>
#include <set>

class lock_client_info{
Expand Down
46 changes: 28 additions & 18 deletions lock_server_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <unistd.h>
#include <arpa/inet.h>
#include "lang/verify.h"
#include "handle.h"
#include "tprintf.h"


Expand All @@ -15,6 +14,14 @@ lock_server_cache::lock_server_cache()
VERIFY(pthread_mutex_init(&mutex, NULL) == 0);
}

void
lock_server_cache::revoke_owner(lock_protocol::lockid_t lid, handle &h){
rpcc* cl = h.safebind();
if(cl){
int r;
cl->call(rlock_protocol::revoke, lid, r);
}
}

int lock_server_cache::acquire(lock_protocol::lockid_t lid, std::string id,
int &)
Expand All @@ -28,23 +35,17 @@ int lock_server_cache::acquire(lock_protocol::lockid_t lid, std::string id,
VERIFY(pthread_mutex_unlock(&mutex) == 0);
return lock_protocol::OK;
}else{
if(locks[lid].owner == id){
VERIFY(pthread_mutex_unlock(&mutex) == 0);
return lock_protocol::OK;
}else if(locks[lid].waiting.size() > 0){ // there is alreay one client waiting for this lock
VERIFY(pthread_mutex_unlock(&mutex) == 0);
locks[lid].waiting.push(id);
if(locks[lid].waiting.size() > 1){
tprintf("waiting size is: %u\n", locks[lid].waiting.size());
VERIFY(pthread_mutex_unlock(&mutex) == 0);
return lock_protocol::RETRY;
}else if(locks[lid].waiting.size() == 0){ // lock is own by other client, but no client is waiting for this lock
locks[lid].waiting = id;
}else{
// send a revoke to the owner of this lock
handle h(locks[lid].owner);
VERIFY(pthread_mutex_unlock(&mutex) == 0);

rpcc* cl = h.safebind();
if(cl){
int r;
cl->call(rlock_protocol::revoke, lid, r);
}
revoke_owner(lid, h);
return lock_protocol::RETRY;
}
}
Expand All @@ -63,19 +64,28 @@ lock_server_cache::release(lock_protocol::lockid_t lid, std::string id,
locks.erase(lid);
VERIFY(pthread_mutex_unlock(&mutex) == 0);
}else{
// send a retry to the waiting client
handle h(locks[lid].waiting);
// send a retry to the first waiting client
handle h(locks[lid].waiting.front());
VERIFY(pthread_mutex_unlock(&mutex) == 0);

rpcc* cl = h.safebind();
if(cl){
int r;
cl->call(rlock_protocol::retry, lid, r);
VERIFY(pthread_mutex_lock(&mutex) == 0);
locks[lid].owner = locks[lid].waiting;
locks[lid].waiting = "";
locks[lid].owner = locks[lid].waiting.front();
locks[lid].waiting.pop();
tprintf("CHANGE WAITING TO OWNER NEW OWNER: %s\n", locks[lid].owner.c_str());
VERIFY(pthread_mutex_unlock(&mutex) == 0);

// if there are other clients waiting, send a reovke to the owner
if(locks[lid].waiting.size() > 0){
handle hh(locks[lid].owner);
VERIFY(pthread_mutex_unlock(&mutex) == 0);
revoke_owner(lid, hh);
}else{
VERIFY(pthread_mutex_unlock(&mutex) == 0);
}

}
}
return lock_protocol::OK;
Expand Down
5 changes: 4 additions & 1 deletion lock_server_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
#include "lock_protocol.h"
#include "rpc.h"
#include "lock_server.h"
#include "handle.h"

class lock_info{
public:
std::string owner;
std::string waiting;
std::queue<std::string> waiting;
};

class lock_server_cache {
Expand All @@ -20,6 +21,8 @@ class lock_server_cache {

std::map<lock_protocol::lockid_t, lock_info> locks;
pthread_mutex_t mutex;

void revoke_owner(lock_protocol::lockid_t, handle &);
public:
lock_server_cache();
lock_protocol::status stat(lock_protocol::lockid_t, int &);
Expand Down

0 comments on commit da10282

Please sign in to comment.