Skip to content

Commit

Permalink
Initial checkin
Browse files Browse the repository at this point in the history
  • Loading branch information
wyang007 committed May 19, 2020
0 parents commit 7a51b80
Show file tree
Hide file tree
Showing 10 changed files with 508 additions and 0 deletions.
40 changes: 40 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
all: XrdName2NameXcacheH.so

# make sure xrootd-devel, xrootd-server-devel and xrootd-client-devel rpms
# are installed for the needed xrootd header files.

ifeq ($(strip $(XRD_INC)),)
XRD_INC=/usr/include/xrootd
endif

ifeq ($(strip $(XRD_LIB)),)
XRD_LIB=/usr/lib64
endif

FLAGS=-D_REENTRANT -D_THREAD_SAFE -Wno-deprecated -std=c++0x

HEADERS=cacheFileOpr.hh url2lfn.hh XcacheH.hh
SOURCES=XrdOucName2NameXcacheH.cc cacheFileOpr.cc url2lfn.cc XcacheH.cc
OBJECTS=XrdOucName2NameXcacheH.o cacheFileOpr.o url2lfn.o XcacheH.o

DEBUG=-g

XrdName2NameXcacheH.so: $(OBJECTS) Makefile
g++ ${DEBUG} -shared -fPIC -o $@ $(OBJECTS) -L${XRD_LIB} -L${XRD_LIB}/XrdCl -ldl -lssl -lcurl -lXrdCl -lXrdPosix -lstdc++

XrdOucName2NameXcacheH.o: XrdOucName2NameXcacheH.cc ${HEADERS} Makefile
g++ ${DEBUG} ${FLAGS} -fPIC -I ${XRD_INC} -I ${XRD_LIB} -c -o $@ $<

cacheFileOpr.o: cacheFileOpr.cc ${HEADERS} Makefile
g++ ${DEBUG} ${FLAGS} -fPIC -I ${XRD_INC} -I ${XRD_LIB} -c -o $@ $<

url2lfn.o: url2lfn.cc ${HEADERS} Makefile
g++ ${DEBUG} ${FLAGS} -fPIC -I ${XRD_INC} -I ${XRD_LIB} -c -o $@ $<

XcacheH.o: XcacheH.cc ${HEADERS} Makefile
g++ ${DEBUG} ${FLAGS} -fPIC -I ${XRD_INC} -I ${XRD_LIB} -c -o $@ $<

clean:
rm -vf *.{o,so}


2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
XcacheH is a Xcache plugin that will update cache contents
when the source of data is modified.
166 changes: 166 additions & 0 deletions XcacheH.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Author: Wei Yang
* SLAC National Accelerator Laboratory / Stanford University, 2020
*/

using namespace std;

#include <fcntl.h>
#include <curl/curl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <time.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <openssl/md5.h>
#include <string>
#include <thread>
#include "url2lfn.hh"
#include "cacheFileOpr.hh"
#include "XrdCl/XrdClURL.hh"
#include "XrdSys/XrdSysError.hh"

struct httpResp
{
char *data;
size_t size;
};

void cleaner()
{
std::string cmd = "yes";
while (! sleep(600))
{
system(cmd.c_str());
}
}

time_t cacheLifeTime;

static int XcacheH_DBG = 1;

void XcacheHInit(XrdSysError* eDest,
const std::string myName,
time_t cacheLifeT)
{
cacheLifeTime = cacheLifeT;

// std::thread cleanning(cleaner);
// cleanning.detach();
curl_global_init(CURL_GLOBAL_ALL);

if (getenv("XcacheH_DBG") != NULL) XcacheH_DBG = atoi(getenv("XcacheH_DBG"));
}

static size_t XcacheHRemoteStatCallback(void *contents,
size_t size,
size_t nmemb,
void *userp)
{
size_t realsize = size * nmemb;
struct httpResp *mem = (struct httpResp *)userp;

mem->data = (char*)realloc(mem->data, mem->size + realsize + 1);
memcpy(&(mem->data[mem->size]), contents, realsize);
mem->size += realsize;
mem->data[mem->size] = 0;
return realsize;
}

// Return
// 1: yes file need to be fetched again.
// 0: no data source hasn't changed yet.
//
int NeedRefetch_HTTP(std::string myPfn, time_t mTime)
{
char* rmturl = strdup(myPfn.c_str());

struct httpResp chunk;
CURL *curl_handle;
CURLcode res;

chunk.data = (char*)malloc(1); // will be grown as needed by the realloc above
chunk.size = 0; // no data at this point

curl_handle = curl_easy_init();
curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1); // to make it thread-safe?
curl_easy_setopt(curl_handle, CURLOPT_URL, rmturl);
curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0); // the curl -k option
curl_easy_setopt(curl_handle, CURLOPT_HEADER, 1); // http header
curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, XcacheHRemoteStatCallback);
curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)&chunk);
curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(curl_handle, CURLOPT_TIMEOUT, 180L);

// If-Mod-Since
curl_easy_setopt(curl_handle, CURLOPT_TIMEVALUE, (long)mTime);
curl_easy_setopt(curl_handle, CURLOPT_TIMECONDITION, CURL_TIMECOND_IFMODSINCE);

// Header only, ask the server not to send body data
curl_easy_setopt(curl_handle, CURLOPT_NOBODY, 1L);

// Follow redirection
curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(curl_handle, CURLOPT_MAXREDIRS, 5L);

// some servers don't like requests that are made without a user-agent
// field, so we provide one
curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0");
res = curl_easy_perform(curl_handle);
curl_easy_cleanup(curl_handle);

// check for errors
int rc = 0;
if(res == CURLE_OK)
{
// Two possible reponse HTTP/1.1 304 Not Modified or HTTP/1.1 200 OK (modified)
// If http redirection happens, these two will be the last of HTTP/1.1 response
// in the chunk.data stream.
if (strcasestr(chunk.data, "HTTP/1.1 200 OK") != NULL)
rc = 1;
// else if (strcasestr(chunk.data, "HTTP/1.1 304 Not Modified") != NULL)
// rc = 0;
}
free(chunk.data);
free(rmturl);
return rc;
}


std::string XcacheHCheckFile(XrdSysError* eDest,
const std::string myName,
const std::string myPfn)
{
std::string rmtUrl, myLfn, msg;

myLfn = url2lfn(myPfn);

time_t mTime = cacheFileAtime(myPfn);
time_t currTime = time(NULL);

if ((currTime - mTime) > cacheLifeTime)
{
if (myPfn.find("http") == 0) // http or https protocol
if (NeedRefetch_HTTP(myPfn, mTime) == 1)
{
if (cacheFilePurge(myPfn) == 0)
msg = myName + ": purge " + myLfn;
else
msg = myName + ": fail to purge " + myLfn;
}
else // no need to refetch, or data soruce is not available
msg = myName + ": no need to refetch or data source no available" + myLfn;
else if (myPfn.find("root") == 0) //
msg = myName + ": dont not know what to do " + myLfn;
if (XcacheH_DBG != 0) eDest->Say(msg.c_str());
}
else
{
msg = myName + ": recently cached " + myLfn;
if (XcacheH_DBG != 0) eDest->Say(msg.c_str());
}

return myLfn;
}
11 changes: 11 additions & 0 deletions XcacheH.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Author: Wei Yang
* SLAC National Accelerator Laboratory / Stanford University, 2020
*/

#include "XrdSys/XrdSysError.hh"

void XcacheHInit(XrdSysError* eDest, const std::string myName, time_t cacheLifeT);
std::string XcacheHCheckFile(XrdSysError* eDest,
const std::string myName,
const std::string myPfn);
163 changes: 163 additions & 0 deletions XrdOucName2NameXcacheH.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Author: Wei Yang
* SLAC National Accelerator Laboratory / Stanford University, 2020
*/

using namespace std;

#include <stdio.h>
#include <string>
#include <errno.h>
#include <openssl/md5.h>
#include "XrdVersion.hh"
XrdVERSIONINFO(XrdOucgetName2Name, "N2N-XcacheH");

#include "XcacheH.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucName2Name.hh"
#include "XrdSys/XrdSysPlatform.hh"
#include "XrdSys/XrdSysError.hh"

class XrdOucName2NameXcacheH : public XrdOucName2Name
{
public:
virtual int lfn2pfn(const char* lfn, char* buff, int blen);
virtual int lfn2rfn(const char* lfn, char* buff, int blen);
virtual int pfn2lfn(const char* lfn, char* buff, int blen);

XrdOucName2NameXcacheH(XrdSysError *erp, const char* confg, const char* parms);
virtual ~XrdOucName2NameXcacheH() {};

friend XrdOucName2Name *XrdOucgetName2Name(XrdOucgetName2NameArgs);
private:
string myName, optCacheLife = ""; // unit: seconds
time_t cacheLifeT;
XrdSysError *eDest;
bool isCmsd;
};

XrdOucName2NameXcacheH::XrdOucName2NameXcacheH(XrdSysError* erp, const char* confg, const char* parms)
{
std::string myProg;
std::string opts, message, key, value;
std::string::iterator it;
std::size_t i;
int x;

myName = "XcacheH";
eDest = erp;

isCmsd = false;
if (getenv("XRDPROG"))
{
myProg = getenv("XRDPROG");
if (myProg == "cmsd") isCmsd = true;
}

setenv("XRD_METALINKPROCESSING", "1", 0);
setenv("XRD_LOCALMETALINKFILE", "1", 0);

x = 0;
key = "";
value = "";

opts = parms;
opts += " ";
for (it=opts.begin(); it!=opts.end(); ++it)
{
if (*it == '=') x = 1;
else if (*it == ' ')
{
if (key == "cacheLife") // unit: seconds
optCacheLife = value;
key = "";
value = "";
x = 0;
}
else
{
if (x == 0) key += *it;
if (x == 1) value += *it;
}
}


if (optCacheLife.find_first_not_of("0123456789.") == std::string::npos)
{
cacheLifeT = atoi(optCacheLife.c_str());
message = myName + " Init: cacheLife = " + optCacheLife;
}
else
{
cacheLifeT = 3600;
message = myName + " Init: cacheLife = " + optCacheLife + " is invalid or not set. Set it to 1 hour";
}
eDest->Say(message.c_str());

XcacheHInit(eDest, myName, cacheLifeT);
}

int XrdOucName2NameXcacheH::lfn2pfn(const char* lfn, char* buff, int blen)
{ return -EOPNOTSUPP; }

// when "pss.namelib -lfncachesrc ..." is used, pfn will look
// like /images/junk1?src=http://[email protected]/
int XrdOucName2NameXcacheH::pfn2lfn(const char* pfn, char* buff, int blen)
{
std::string myLfn, myPfn, myUrl;

myPfn = myUrl = pfn;
if (isCmsd) // cmsd shouldn't do pfn2lfn()
{
blen = myPfn.find("?src=");
blen = strlen(pfn);
strncpy(buff, pfn, blen);
return 0;
}

// it is import to use string::rfind() to search from the end.
myUrl.replace(0, myUrl.rfind("?src=") +5, "");
myUrl.replace(myUrl.length() -1, 1, ""); // remove the tailing "/"

// remove u25@ from the URL (see above)
if (myUrl.find("http://") == 0)
myUrl.replace(0, myUrl.find("@") +1, "http://");
else if (myUrl.find("https://") == 0)
myUrl.replace(0, myUrl.find("@") +1, "https://");
else // this scenarios should NOT happen
{
blen = 0;
buff[0] = 0;
return EINVAL; // see XrdOucName2Name.hh
}

myUrl += myPfn.substr(0, myPfn.rfind("?src="));

myLfn = XcacheHCheckFile(eDest, myName, myUrl);

if (myLfn == "EFAULT")
return EFAULT;
else if (myLfn == "ENOENT")
return ENOENT;

blen = myLfn.length();
strncpy(buff, myLfn.c_str(), blen);
buff[blen] = 0;

return 0;
}

int XrdOucName2NameXcacheH::lfn2rfn(const char* lfn, char* buff, int blen)
{ return -EOPNOTSUPP; }

XrdOucName2Name *XrdOucgetName2Name(XrdOucgetName2NameArgs)
{
static XrdOucName2NameXcacheH *inst = NULL;

if (inst) return (XrdOucName2Name *)inst;

inst = new XrdOucName2NameXcacheH(eDest, confg, parms);
if (!inst) return NULL;

return (XrdOucName2Name *)inst;
}
Loading

0 comments on commit 7a51b80

Please sign in to comment.