From 7a51b8024f935d07660f8107cb771edb308455fe Mon Sep 17 00:00:00 2001 From: Wei Yang Date: Tue, 19 May 2020 01:30:25 -0700 Subject: [PATCH] Initial checkin --- Makefile | 40 +++++++++ README.md | 2 + XcacheH.cc | 166 ++++++++++++++++++++++++++++++++++++++ XcacheH.hh | 11 +++ XrdOucName2NameXcacheH.cc | 163 +++++++++++++++++++++++++++++++++++++ cacheFileOpr.cc | 62 ++++++++++++++ cacheFileOpr.hh | 11 +++ url2lfn.cc | 16 ++++ url2lfn.hh | 4 + xcacheh.cfg | 33 ++++++++ 10 files changed, 508 insertions(+) create mode 100644 Makefile create mode 100644 README.md create mode 100644 XcacheH.cc create mode 100644 XcacheH.hh create mode 100644 XrdOucName2NameXcacheH.cc create mode 100644 cacheFileOpr.cc create mode 100644 cacheFileOpr.hh create mode 100644 url2lfn.cc create mode 100644 url2lfn.hh create mode 100644 xcacheh.cfg diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3000143 --- /dev/null +++ b/Makefile @@ -0,0 +1,40 @@ +all: XrdName2NameXcacheH.so + +# make sure xrootd-devel, xrootd-server-devel and xrootd-client-devel rpms +# are installed for the needed xrootd header files. + +ifeq ($(strip $(XRD_INC)),) + XRD_INC=/usr/include/xrootd +endif + +ifeq ($(strip $(XRD_LIB)),) + XRD_LIB=/usr/lib64 +endif + +FLAGS=-D_REENTRANT -D_THREAD_SAFE -Wno-deprecated -std=c++0x + +HEADERS=cacheFileOpr.hh url2lfn.hh XcacheH.hh +SOURCES=XrdOucName2NameXcacheH.cc cacheFileOpr.cc url2lfn.cc XcacheH.cc +OBJECTS=XrdOucName2NameXcacheH.o cacheFileOpr.o url2lfn.o XcacheH.o + +DEBUG=-g + +XrdName2NameXcacheH.so: $(OBJECTS) Makefile + g++ ${DEBUG} -shared -fPIC -o $@ $(OBJECTS) -L${XRD_LIB} -L${XRD_LIB}/XrdCl -ldl -lssl -lcurl -lXrdCl -lXrdPosix -lstdc++ + +XrdOucName2NameXcacheH.o: XrdOucName2NameXcacheH.cc ${HEADERS} Makefile + g++ ${DEBUG} ${FLAGS} -fPIC -I ${XRD_INC} -I ${XRD_LIB} -c -o $@ $< + +cacheFileOpr.o: cacheFileOpr.cc ${HEADERS} Makefile + g++ ${DEBUG} ${FLAGS} -fPIC -I ${XRD_INC} -I ${XRD_LIB} -c -o $@ $< + +url2lfn.o: url2lfn.cc ${HEADERS} Makefile + g++ ${DEBUG} ${FLAGS} -fPIC -I ${XRD_INC} -I ${XRD_LIB} -c -o $@ $< + +XcacheH.o: XcacheH.cc ${HEADERS} Makefile + g++ ${DEBUG} ${FLAGS} -fPIC -I ${XRD_INC} -I ${XRD_LIB} -c -o $@ $< + +clean: + rm -vf *.{o,so} + + diff --git a/README.md b/README.md new file mode 100644 index 0000000..6a521cd --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +XcacheH is a Xcache plugin that will update cache contents +when the source of data is modified. diff --git a/XcacheH.cc b/XcacheH.cc new file mode 100644 index 0000000..98a52cc --- /dev/null +++ b/XcacheH.cc @@ -0,0 +1,166 @@ +/* + * Author: Wei Yang + * SLAC National Accelerator Laboratory / Stanford University, 2020 + */ + +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "url2lfn.hh" +#include "cacheFileOpr.hh" +#include "XrdCl/XrdClURL.hh" +#include "XrdSys/XrdSysError.hh" + +struct httpResp +{ + char *data; + size_t size; +}; + +void cleaner() +{ + std::string cmd = "yes"; + while (! sleep(600)) + { + system(cmd.c_str()); + } +} + +time_t cacheLifeTime; + +static int XcacheH_DBG = 1; + +void XcacheHInit(XrdSysError* eDest, + const std::string myName, + time_t cacheLifeT) +{ + cacheLifeTime = cacheLifeT; + + // std::thread cleanning(cleaner); + // cleanning.detach(); + curl_global_init(CURL_GLOBAL_ALL); + + if (getenv("XcacheH_DBG") != NULL) XcacheH_DBG = atoi(getenv("XcacheH_DBG")); +} + +static size_t XcacheHRemoteStatCallback(void *contents, + size_t size, + size_t nmemb, + void *userp) +{ + size_t realsize = size * nmemb; + struct httpResp *mem = (struct httpResp *)userp; + + mem->data = (char*)realloc(mem->data, mem->size + realsize + 1); + memcpy(&(mem->data[mem->size]), contents, realsize); + mem->size += realsize; + mem->data[mem->size] = 0; + return realsize; +} + +// Return +// 1: yes file need to be fetched again. +// 0: no data source hasn't changed yet. +// +int NeedRefetch_HTTP(std::string myPfn, time_t mTime) +{ + char* rmturl = strdup(myPfn.c_str()); + + struct httpResp chunk; + CURL *curl_handle; + CURLcode res; + + chunk.data = (char*)malloc(1); // will be grown as needed by the realloc above + chunk.size = 0; // no data at this point + + curl_handle = curl_easy_init(); + curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1); // to make it thread-safe? + curl_easy_setopt(curl_handle, CURLOPT_URL, rmturl); + curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0); // the curl -k option + curl_easy_setopt(curl_handle, CURLOPT_HEADER, 1); // http header + curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, XcacheHRemoteStatCallback); + curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)&chunk); + curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(curl_handle, CURLOPT_TIMEOUT, 180L); + + // If-Mod-Since + curl_easy_setopt(curl_handle, CURLOPT_TIMEVALUE, (long)mTime); + curl_easy_setopt(curl_handle, CURLOPT_TIMECONDITION, CURL_TIMECOND_IFMODSINCE); + + // Header only, ask the server not to send body data + curl_easy_setopt(curl_handle, CURLOPT_NOBODY, 1L); + + // Follow redirection + curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(curl_handle, CURLOPT_MAXREDIRS, 5L); + + // some servers don't like requests that are made without a user-agent + // field, so we provide one + curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0"); + res = curl_easy_perform(curl_handle); + curl_easy_cleanup(curl_handle); + + // check for errors + int rc = 0; + if(res == CURLE_OK) + { + // Two possible reponse HTTP/1.1 304 Not Modified or HTTP/1.1 200 OK (modified) + // If http redirection happens, these two will be the last of HTTP/1.1 response + // in the chunk.data stream. + if (strcasestr(chunk.data, "HTTP/1.1 200 OK") != NULL) + rc = 1; + // else if (strcasestr(chunk.data, "HTTP/1.1 304 Not Modified") != NULL) + // rc = 0; + } + free(chunk.data); + free(rmturl); + return rc; +} + + +std::string XcacheHCheckFile(XrdSysError* eDest, + const std::string myName, + const std::string myPfn) +{ + std::string rmtUrl, myLfn, msg; + + myLfn = url2lfn(myPfn); + + time_t mTime = cacheFileAtime(myPfn); + time_t currTime = time(NULL); + + if ((currTime - mTime) > cacheLifeTime) + { + if (myPfn.find("http") == 0) // http or https protocol + if (NeedRefetch_HTTP(myPfn, mTime) == 1) + { + if (cacheFilePurge(myPfn) == 0) + msg = myName + ": purge " + myLfn; + else + msg = myName + ": fail to purge " + myLfn; + } + else // no need to refetch, or data soruce is not available + msg = myName + ": no need to refetch or data source no available" + myLfn; + else if (myPfn.find("root") == 0) // + msg = myName + ": dont not know what to do " + myLfn; + if (XcacheH_DBG != 0) eDest->Say(msg.c_str()); + } + else + { + msg = myName + ": recently cached " + myLfn; + if (XcacheH_DBG != 0) eDest->Say(msg.c_str()); + } + + return myLfn; +} diff --git a/XcacheH.hh b/XcacheH.hh new file mode 100644 index 0000000..255f493 --- /dev/null +++ b/XcacheH.hh @@ -0,0 +1,11 @@ +/* + * Author: Wei Yang + * SLAC National Accelerator Laboratory / Stanford University, 2020 + */ + +#include "XrdSys/XrdSysError.hh" + +void XcacheHInit(XrdSysError* eDest, const std::string myName, time_t cacheLifeT); +std::string XcacheHCheckFile(XrdSysError* eDest, + const std::string myName, + const std::string myPfn); diff --git a/XrdOucName2NameXcacheH.cc b/XrdOucName2NameXcacheH.cc new file mode 100644 index 0000000..1d9bd63 --- /dev/null +++ b/XrdOucName2NameXcacheH.cc @@ -0,0 +1,163 @@ +/* + * Author: Wei Yang + * SLAC National Accelerator Laboratory / Stanford University, 2020 + */ + +using namespace std; + +#include +#include +#include +#include +#include "XrdVersion.hh" +XrdVERSIONINFO(XrdOucgetName2Name, "N2N-XcacheH"); + +#include "XcacheH.hh" +#include "XrdOuc/XrdOucEnv.hh" +#include "XrdOuc/XrdOucName2Name.hh" +#include "XrdSys/XrdSysPlatform.hh" +#include "XrdSys/XrdSysError.hh" + +class XrdOucName2NameXcacheH : public XrdOucName2Name +{ +public: + virtual int lfn2pfn(const char* lfn, char* buff, int blen); + virtual int lfn2rfn(const char* lfn, char* buff, int blen); + virtual int pfn2lfn(const char* lfn, char* buff, int blen); + + XrdOucName2NameXcacheH(XrdSysError *erp, const char* confg, const char* parms); + virtual ~XrdOucName2NameXcacheH() {}; + + friend XrdOucName2Name *XrdOucgetName2Name(XrdOucgetName2NameArgs); +private: + string myName, optCacheLife = ""; // unit: seconds + time_t cacheLifeT; + XrdSysError *eDest; + bool isCmsd; +}; + +XrdOucName2NameXcacheH::XrdOucName2NameXcacheH(XrdSysError* erp, const char* confg, const char* parms) +{ + std::string myProg; + std::string opts, message, key, value; + std::string::iterator it; + std::size_t i; + int x; + + myName = "XcacheH"; + eDest = erp; + + isCmsd = false; + if (getenv("XRDPROG")) + { + myProg = getenv("XRDPROG"); + if (myProg == "cmsd") isCmsd = true; + } + + setenv("XRD_METALINKPROCESSING", "1", 0); + setenv("XRD_LOCALMETALINKFILE", "1", 0); + + x = 0; + key = ""; + value = ""; + + opts = parms; + opts += " "; + for (it=opts.begin(); it!=opts.end(); ++it) + { + if (*it == '=') x = 1; + else if (*it == ' ') + { + if (key == "cacheLife") // unit: seconds + optCacheLife = value; + key = ""; + value = ""; + x = 0; + } + else + { + if (x == 0) key += *it; + if (x == 1) value += *it; + } + } + + + if (optCacheLife.find_first_not_of("0123456789.") == std::string::npos) + { + cacheLifeT = atoi(optCacheLife.c_str()); + message = myName + " Init: cacheLife = " + optCacheLife; + } + else + { + cacheLifeT = 3600; + message = myName + " Init: cacheLife = " + optCacheLife + " is invalid or not set. Set it to 1 hour"; + } + eDest->Say(message.c_str()); + + XcacheHInit(eDest, myName, cacheLifeT); +} + +int XrdOucName2NameXcacheH::lfn2pfn(const char* lfn, char* buff, int blen) +{ return -EOPNOTSUPP; } + +// when "pss.namelib -lfncachesrc ..." is used, pfn will look +// like /images/junk1?src=http://u25@wt2.slac.stanford.edu/ +int XrdOucName2NameXcacheH::pfn2lfn(const char* pfn, char* buff, int blen) +{ + std::string myLfn, myPfn, myUrl; + + myPfn = myUrl = pfn; + if (isCmsd) // cmsd shouldn't do pfn2lfn() + { + blen = myPfn.find("?src="); + blen = strlen(pfn); + strncpy(buff, pfn, blen); + return 0; + } + + // it is import to use string::rfind() to search from the end. + myUrl.replace(0, myUrl.rfind("?src=") +5, ""); + myUrl.replace(myUrl.length() -1, 1, ""); // remove the tailing "/" + + // remove u25@ from the URL (see above) + if (myUrl.find("http://") == 0) + myUrl.replace(0, myUrl.find("@") +1, "http://"); + else if (myUrl.find("https://") == 0) + myUrl.replace(0, myUrl.find("@") +1, "https://"); + else // this scenarios should NOT happen + { + blen = 0; + buff[0] = 0; + return EINVAL; // see XrdOucName2Name.hh + } + + myUrl += myPfn.substr(0, myPfn.rfind("?src=")); + + myLfn = XcacheHCheckFile(eDest, myName, myUrl); + + if (myLfn == "EFAULT") + return EFAULT; + else if (myLfn == "ENOENT") + return ENOENT; + + blen = myLfn.length(); + strncpy(buff, myLfn.c_str(), blen); + buff[blen] = 0; + + return 0; +} + +int XrdOucName2NameXcacheH::lfn2rfn(const char* lfn, char* buff, int blen) +{ return -EOPNOTSUPP; } + +XrdOucName2Name *XrdOucgetName2Name(XrdOucgetName2NameArgs) +{ + static XrdOucName2NameXcacheH *inst = NULL; + + if (inst) return (XrdOucName2Name *)inst; + + inst = new XrdOucName2NameXcacheH(eDest, confg, parms); + if (!inst) return NULL; + + return (XrdOucName2Name *)inst; +} diff --git a/cacheFileOpr.cc b/cacheFileOpr.cc new file mode 100644 index 0000000..0ff81dd --- /dev/null +++ b/cacheFileOpr.cc @@ -0,0 +1,62 @@ +/* + * Author: Wei Yang (SLAC National Accelerator Laboratory / Stanford University, 2019) + */ + +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "url2lfn.hh" +#include "XrdVersion.hh" +#include "XrdOuc/XrdOucCacheCM.hh" +#include "XrdPosix/XrdPosixCache.hh" + +//______________________________________________________________________________ + + +XrdPosixCache *myCache; + +extern "C" { +XrdOucCacheCMInit_t XrdOucCacheCMInit(XrdPosixCache &Cache, + XrdSysLogger *Logger, + const char *Config, + const char *Parms, + XrdOucEnv *envP) +{ + myCache = &Cache; +} +}; +XrdVERSIONINFO(XrdOucCacheCMInit,CacheCM-4-XcacheH); + +time_t cacheFileAtime(std::string url) +{ + int rc; + char *lfn = url2lfn(url); + struct stat mystat; + + rc = myCache->Stat(lfn, mystat); + free(lfn); + + // if the file doesn't exist, return 0 as mtime (older than any real file) + if (rc == 0) + return mystat.st_atime; + else + return 0; +} + +int cacheFilePurge(std::string url) +{ + int rc; + char *lfn = url2lfn(url); + + rc = myCache->Unlink(lfn); + free(lfn); + return rc; +} diff --git a/cacheFileOpr.hh b/cacheFileOpr.hh new file mode 100644 index 0000000..bc84354 --- /dev/null +++ b/cacheFileOpr.hh @@ -0,0 +1,11 @@ +/* + * Author: Wei Yang (SLAC National Accelerator Laboratory / Stanford University, 2019) + */ + +#include + +// url is in the form or /http:/host... or /https:/host +time_t cacheFileAtime(std::string url); + +// return 0 if file is purged, !0 if not +int cacheFilePurge(std::string url); diff --git a/url2lfn.cc b/url2lfn.cc new file mode 100644 index 0000000..b16895a --- /dev/null +++ b/url2lfn.cc @@ -0,0 +1,16 @@ +#include +#include + +using namespace std; + +char* url2lfn(const std::string url) +{ + std::string lfn = url; + + if (url.find("http:/") == 0) + lfn.replace(0, 6, "/http"); + else if (url.find("https:/") == 0) + lfn.replace(0, 7, "/https"); + + return strdup(lfn.c_str()); +} diff --git a/url2lfn.hh b/url2lfn.hh new file mode 100644 index 0000000..0be8b07 --- /dev/null +++ b/url2lfn.hh @@ -0,0 +1,4 @@ +// convert url to a path. e.g. +// http:// to /http:/ +// https:// to /https:/ +char* url2lfn(const std::string url); diff --git a/xcacheh.cfg b/xcacheh.cfg new file mode 100644 index 0000000..db3782e --- /dev/null +++ b/xcacheh.cfg @@ -0,0 +1,33 @@ +#set rootdir = /gpfs/slac/atlas/fs1/sw/xcachedata +set rootdir = /data + +all.adminpath $(rootdir)/xrd/var/spool/xrootd +all.pidpath $(rootdir)/xrd/var/run/xrootd + +oss.localroot $(rootdir)/xrd/namespace + +all.export / +all.export * + +#oss.space meta $(rootdir)/xrd/xrdcinfos +#oss.space data $(rootdir)/xrd/datafiles + +ofs.osslib libXrdPss.so +pss.cachelib libXrdPfc.so +pss.config streams 8 +pss.origin = +pss.namelib -lfncachesrc XrdName2NameXcacheH.so cacheLife=30 +pss.ccmlib XrdName2NameXcacheH.so + +pfc.ram 2g +pfc.diskusage 0.75 0.85 +#pfc.spaces data meta +pfc.blocksize 1M +pfc.prefetch 0 +#pfc.trace info + +#if exec xrootd +# xrd.protocol http:8080 libXrdHttp.so +#fi +#http.header2cgi GET httpdst +