From 9188d864ac35db81ee645f4f0953ae2052016ffc Mon Sep 17 00:00:00 2001 From: zhf Date: Thu, 22 Nov 2012 18:04:04 +0800 Subject: [PATCH] add lab4 --- GNUmakefile | 2 +- handle.cc | 109 +++++++++++++++++++++++++++++++++++++++++++ handle.h | 79 +++++++++++++++++++++++++++++++ lock_client_cache.cc | 58 +++++++++++++++++++++++ lock_client_cache.h | 40 ++++++++++++++++ lock_protocol.h | 10 ++++ lock_server_cache.cc | 40 ++++++++++++++++ lock_server_cache.h | 22 +++++++++ lock_smain.cc | 2 +- lock_tester.cc | 7 +-- rpc/connection.cc | 6 ++- rpc/connection.h | 4 +- rpc/rpc.h | 2 +- tprintf.h | 10 ++++ 14 files changed, 382 insertions(+), 9 deletions(-) create mode 100644 handle.cc create mode 100644 handle.h create mode 100644 lock_client_cache.cc create mode 100644 lock_client_cache.h create mode 100644 lock_server_cache.cc create mode 100644 lock_server_cache.h create mode 100644 tprintf.h diff --git a/GNUmakefile b/GNUmakefile index ef29753..28f864e 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -1,4 +1,4 @@ -LAB=3 +LAB=4 SOL=0 RPC=./rpc LAB2GE=$(shell expr $(LAB) \>\= 2) diff --git a/handle.cc b/handle.cc new file mode 100644 index 0000000..143691e --- /dev/null +++ b/handle.cc @@ -0,0 +1,109 @@ +#include "handle.h" +#include +#include "tprintf.h" + +handle_mgr mgr; + +handle::handle(std::string m) +{ + h = mgr.get_handle(m); +} + +rpcc * +handle::safebind() +{ + if (!h) + return NULL; + ScopedLock ml(&h->cl_mutex); + if (h->del) + return NULL; + if (h->cl) + return h->cl; + sockaddr_in dstsock; + make_sockaddr(h->m.c_str(), &dstsock); + rpcc *cl = new rpcc(dstsock); + tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str()); + int ret; + // handle class has to tolerate lossy network, since we may test + // students' lab with RPC_LOSSY=5 from lab 1 to lab 5 + ret = cl->bind(); + if (ret < 0) { + tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret); + delete cl; + h->del = true; + } else { + tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str()); + h->cl = cl; + } + return h->cl; +} + +handle::~handle() +{ + if (h) mgr.done_handle(h); +} + +handle_mgr::handle_mgr() +{ + VERIFY (pthread_mutex_init(&handle_mutex, NULL) == 0); +} + +struct hinfo * +handle_mgr::get_handle(std::string m) +{ + ScopedLock ml(&handle_mutex); + struct hinfo *h = 0; + if (hmap.find(m) == hmap.end()) { + h = new hinfo; + h->cl = NULL; + h->del = false; + h->refcnt = 1; + h->m = m; + pthread_mutex_init(&h->cl_mutex, NULL); + hmap[m] = h; + } else if (!hmap[m]->del) { + h = hmap[m]; + h->refcnt ++; + } + return h; +} + +void +handle_mgr::done_handle(struct hinfo *h) +{ + ScopedLock ml(&handle_mutex); + h->refcnt--; + if (h->refcnt == 0 && h->del) + delete_handle_wo(h->m); +} + +void +handle_mgr::delete_handle(std::string m) +{ + ScopedLock ml(&handle_mutex); + delete_handle_wo(m); +} + +// Must be called with handle_mutex locked. +void +handle_mgr::delete_handle_wo(std::string m) +{ + if (hmap.find(m) == hmap.end()) { + tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str()); + } else { + tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(), + hmap[m]->refcnt); + struct hinfo *h = hmap[m]; + if (h->refcnt == 0) { + if (h->cl) { + h->cl->cancel(); + delete h->cl; + } + pthread_mutex_destroy(&h->cl_mutex); + hmap.erase(m); + delete h; + } else { + h->del = true; + } + } +} diff --git a/handle.h b/handle.h new file mode 100644 index 0000000..ecd8884 --- /dev/null +++ b/handle.h @@ -0,0 +1,79 @@ +// manage a cache of RPC connections. +// assuming cid is a std::string holding the +// host:port of the RPC server you want +// to talk to: +// +// handle h(cid); +// rpcc *cl = h.safebind(); +// if(cl){ +// ret = cl->call(...); +// } else { +// bind() failed +// } +// +// if the calling program has not contacted +// cid before, safebind() will create a new +// connection, call bind(), and return +// an rpcc*, or 0 if bind() failed. if the +// program has previously contacted cid, +// safebind() just returns the previously +// created rpcc*. best not to hold any +// mutexes while calling safebind(). + +#ifndef handle_h +#define handle_h + +#include +#include +#include "rpc.h" + +struct hinfo { + rpcc *cl; + int refcnt; + bool del; + std::string m; + pthread_mutex_t cl_mutex; +}; + +class handle { + private: + struct hinfo *h; + public: + handle(std::string m); + ~handle(); + /* safebind will try to bind with the rpc server on the first call. + * Since bind may block, the caller probably should not hold a mutex + * when calling safebind. + * + * return: + * if the first safebind succeeded, all later calls would return + * a rpcc object; otherwise, all later calls would return NULL. + * + * Example: + * handle h(dst); + * XXX_protocol::status ret; + * if (h.safebind()) { + * ret = h.safebind()->call(...); + * } + * if (!h.safebind() || ret != XXX_protocol::OK) { + * // handle failure + * } + */ + rpcc *safebind(); +}; + +class handle_mgr { + private: + pthread_mutex_t handle_mutex; + std::map hmap; + public: + handle_mgr(); + struct hinfo *get_handle(std::string m); + void done_handle(struct hinfo *h); + void delete_handle(std::string m); + void delete_handle_wo(std::string m); +}; + +extern class handle_mgr mgr; + +#endif diff --git a/lock_client_cache.cc b/lock_client_cache.cc new file mode 100644 index 0000000..c68ccb3 --- /dev/null +++ b/lock_client_cache.cc @@ -0,0 +1,58 @@ +// RPC stubs for clients to talk to lock_server, and cache the locks +// see lock_client.cache.h for protocol details. + +#include "lock_client_cache.h" +#include "rpc.h" +#include +#include +#include +#include "tprintf.h" + + +lock_client_cache::lock_client_cache(std::string xdst, + class lock_release_user *_lu) + : lock_client(xdst), lu(_lu) +{ + rpcs *rlsrpc = new rpcs(0); + rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache::revoke_handler); + rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache::retry_handler); + + const char *hname; + hname = "127.0.0.1"; + std::ostringstream host; + host << hname << ":" << rlsrpc->port(); + id = host.str(); +} + +lock_protocol::status +lock_client_cache::acquire(lock_protocol::lockid_t lid) +{ + int ret = lock_protocol::OK; + return lock_protocol::OK; +} + +lock_protocol::status +lock_client_cache::release(lock_protocol::lockid_t lid) +{ + return lock_protocol::OK; + +} + +rlock_protocol::status +lock_client_cache::revoke_handler(lock_protocol::lockid_t lid, + int &) +{ + int ret = rlock_protocol::OK; + return ret; +} + +rlock_protocol::status +lock_client_cache::retry_handler(lock_protocol::lockid_t lid, + int &) +{ + int ret = rlock_protocol::OK; + return ret; +} + + + diff --git a/lock_client_cache.h b/lock_client_cache.h new file mode 100644 index 0000000..fec8577 --- /dev/null +++ b/lock_client_cache.h @@ -0,0 +1,40 @@ +// lock client interface. + +#ifndef lock_client_cache_h + +#define lock_client_cache_h + +#include +#include "lock_protocol.h" +#include "rpc.h" +#include "lock_client.h" +#include "lang/verify.h" + +// Classes that inherit lock_release_user can override dorelease so that +// that they will be called when lock_client releases a lock. +// You will not need to do anything with this class until Lab 5. +class lock_release_user { + public: + virtual void dorelease(lock_protocol::lockid_t) = 0; + virtual ~lock_release_user() {}; +}; + +class lock_client_cache : public lock_client { + private: + class lock_release_user *lu; + int rlock_port; + std::string hostname; + std::string id; + public: + lock_client_cache(std::string xdst, class lock_release_user *l = 0); + virtual ~lock_client_cache() {}; + lock_protocol::status acquire(lock_protocol::lockid_t); + lock_protocol::status release(lock_protocol::lockid_t); + rlock_protocol::status revoke_handler(lock_protocol::lockid_t, + int &); + rlock_protocol::status retry_handler(lock_protocol::lockid_t, + int &); +}; + + +#endif diff --git a/lock_protocol.h b/lock_protocol.h index 5321279..a327c11 100644 --- a/lock_protocol.h +++ b/lock_protocol.h @@ -10,6 +10,7 @@ class lock_protocol { enum xxstatus { OK, RETRY, RPCERR, NOENT, IOERR }; typedef int status; typedef unsigned long long lockid_t; + typedef unsigned long long xid_t; enum rpc_numbers { acquire = 0x7001, release, @@ -17,4 +18,13 @@ class lock_protocol { }; }; +class rlock_protocol { + public: + enum xxstatus { OK, RPCERR }; + typedef int status; + enum rpc_numbers { + revoke = 0x8001, + retry = 0x8002 + }; +}; #endif diff --git a/lock_server_cache.cc b/lock_server_cache.cc new file mode 100644 index 0000000..2ee466b --- /dev/null +++ b/lock_server_cache.cc @@ -0,0 +1,40 @@ +// the caching lock server implementation + +#include "lock_server_cache.h" +#include +#include +#include +#include +#include "lang/verify.h" +#include "handle.h" +#include "tprintf.h" + + +lock_server_cache::lock_server_cache() +{ +} + + +int lock_server_cache::acquire(lock_protocol::lockid_t lid, std::string id, + int &) +{ + lock_protocol::status ret = lock_protocol::OK; + return ret; +} + +int +lock_server_cache::release(lock_protocol::lockid_t lid, std::string id, + int &r) +{ + lock_protocol::status ret = lock_protocol::OK; + return ret; +} + +lock_protocol::status +lock_server_cache::stat(lock_protocol::lockid_t lid, int &r) +{ + tprintf("stat request\n"); + r = nacquire; + return lock_protocol::OK; +} + diff --git a/lock_server_cache.h b/lock_server_cache.h new file mode 100644 index 0000000..3c710b5 --- /dev/null +++ b/lock_server_cache.h @@ -0,0 +1,22 @@ +#ifndef lock_server_cache_h +#define lock_server_cache_h + +#include + +#include +#include "lock_protocol.h" +#include "rpc.h" +#include "lock_server.h" + + +class lock_server_cache { + private: + int nacquire; + public: + lock_server_cache(); + lock_protocol::status stat(lock_protocol::lockid_t, int &); + int acquire(lock_protocol::lockid_t, std::string id, int &); + int release(lock_protocol::lockid_t, std::string id, int &); +}; + +#endif diff --git a/lock_smain.cc b/lock_smain.cc index 72dac4a..4ea74e8 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -2,7 +2,7 @@ #include #include #include -#include "lock_server.h" +#include "lock_server_cache.h" #include "jsl_log.h" diff --git a/lock_tester.cc b/lock_tester.cc index f847f32..672fe49 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -11,11 +11,12 @@ #include #include #include "lang/verify.h" +#include "lock_client_cache.h" // must be >= 2 int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc. std::string dst; -lock_client **lc = new lock_client * [nt]; +lock_client_cache **lc = new lock_client_cache * [nt]; lock_protocol::lockid_t a = 1; lock_protocol::lockid_t b = 2; lock_protocol::lockid_t c = 3; @@ -171,8 +172,8 @@ main(int argc, char *argv[]) } VERIFY(pthread_mutex_init(&count_mutex, NULL) == 0); - printf("simple lock client\n"); - for (int i = 0; i < nt; i++) lc[i] = new lock_client(dst); + printf("cache lock client\n"); + for (int i = 0; i < nt; i++) lc[i] = new lock_client_cache(dst); if(!test || test == 1){ test1(); diff --git a/rpc/connection.cc b/rpc/connection.cc index 94b9194..c22ad45 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -326,7 +326,11 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) VERIFY(0); } - jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port, + socklen_t addrlen = sizeof(sin); + VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); + port_ = ntohs(sin.sin_port); + + jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_, sin.sin_port); if (pipe(pipe_) < 0) { diff --git a/rpc/connection.h b/rpc/connection.h index 2c84daf..da48cf4 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -73,10 +73,10 @@ class tcpsconn { public: tcpsconn(chanmgr *m1, int port, int lossytest=0); ~tcpsconn(); - + inline int port() { return port_; } void accept_conn(); private: - + int port_; pthread_mutex_t m_; pthread_t th_; int pipe_[2]; diff --git a/rpc/rpc.h b/rpc/rpc.h index 999810c..899aa72 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -347,7 +347,7 @@ class rpcs : public chanmgr { public: rpcs(unsigned int port, int counts=0); ~rpcs(); - + inline int port() { return listener_->port(); } //RPC handler for clients binding int rpcbind(int a, int &r); diff --git a/tprintf.h b/tprintf.h new file mode 100644 index 0000000..d884046 --- /dev/null +++ b/tprintf.h @@ -0,0 +1,10 @@ +#ifndef TPRINTF_H +#define TPRINTF_H + +#define tprintf(args...) do { \ + struct timeval tv; \ + gettimeofday(&tv, 0); \ + printf("%lu:\t", tv.tv_sec * 1000 + tv.tv_usec / 1000);\ + printf(args); \ + } while (0); +#endif