Skip to content

Commit

Permalink
Further liberalize locking strategy in the poller to allow multi-cont…
Browse files Browse the repository at this point in the history
…ext deletes.
  • Loading branch information
abh3 committed Aug 31, 2012
1 parent 3276f69 commit d075a94
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 92 deletions.
116 changes: 52 additions & 64 deletions src/XrdSys/XrdSysIOEvents.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/resource.h>

#include "XrdSys/XrdSysIOEvents.hh"
#include "XrdSys/XrdSysHeaders.hh"
Expand Down Expand Up @@ -51,8 +50,6 @@
/* G l o b a l D a t a */
/******************************************************************************/

int XrdSys::IOEvents::Poller::maxFD = 0;

time_t XrdSys::IOEvents::Poller::maxTime
= (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff);

Expand Down Expand Up @@ -92,7 +89,6 @@ void *BootStrap::Start(void *parg)

thePoller->Begin(&(pollArg->pollSync), pollArg->retCode, &(pollArg->retMsg));

thePoller->pollDone.Post();
return (void *)0;
}

Expand Down Expand Up @@ -236,6 +232,7 @@ XrdSys::IOEvents::Channel::Channel(Poller *pollP, int fd,

XrdSys::IOEvents::Channel::~Channel()
{
Poller *myPoller;
bool isLocked = true;

// Lock ourselves during the delete process. If the channel is disassociated
Expand All @@ -247,26 +244,28 @@ XrdSys::IOEvents::Channel::~Channel()
return;
}

// If we are in callback mode then we will need to delay the destruction until
// after the callback completes. Note that we deadlock if the channel is deleted
// or the poller stopped by the callback object. We ignore double deletes.
//
if (chStat)
{XrdSysSemaphore cbDone(0);
chStat = isDead;
chCBA = (void *)&cbDone;
chMutex.UnLock();
cbDone.Wait();
chMutex.Lock();
}

// Disable and remove ourselves from all queues
//
if (chPollXQ && chPollXQ != &pollErr1) chPollXQ->Detach(this,isLocked,false);
myPoller = chPollXQ;
chPollXQ->Detach(this,isLocked,false);
if (!isLocked) chMutex.Lock();

// All done
// If we are in callback mode then we will need to delay the destruction until
// after the callback completes unless this is the poller thread. In that case,
// we need to tell the poller that we have been destroyed in a shelf-stable way.
//
if (isLocked) chMutex.UnLock();
if (chStat)
{if (XrdSysThread::Same(XrdSysThread::ID(),myPoller->pollTid))
{myPoller->chDead = true;
chMutex.UnLock();
} else {
XrdSysSemaphore cbDone(0);
chStat = isDead;
chCBA = (void *)&cbDone;
chMutex.UnLock();
cbDone.Wait();
}
}
}

/******************************************************************************/
Expand Down Expand Up @@ -471,7 +470,7 @@ void XrdSys::IOEvents::Channel::SetFD(int fd)
/* C o n s t r u c t o r */
/******************************************************************************/

XrdSys::IOEvents::Poller::Poller(int rFD, int cFD) : pollDone(0)
XrdSys::IOEvents::Poller::Poller(int rFD, int cFD)
{

// Now initialize local class members
Expand Down Expand Up @@ -588,26 +587,32 @@ bool XrdSys::IOEvents::Poller::CbkXeq(XrdSys::IOEvents::Channel *cP, int events,
}
}

// Indicate that we are in callback mode then drop the channel and effect the
// callback. We then regain the lock.
// Indicate that we are in callback mode then drop the channel lock and effect
// the callback. This allows the callback to freely manage locks.
//
cP->chStat = Channel::isCBMode;
chDead = false;
cP->chMutex.UnLock();
cbok = cP->chCB->Event(cP,cP->chCBA, events);

// If channel destroyed by the callback, bail really fast. Otherwise, regain
// the channel lock.
//
if (chDead) return true;
cP->chMutex.Lock();

// If the channel is being destroyed; then bail fast!
// If the channel is being destroyed; then another thread must have done so.
// Tell it the callback has finished and just return.
//
if (cP->chStat != Channel::isCBMode)
{if (cP->chStat == Channel::isDead)
{((XrdSysSemaphore *)cP->chCBA)->Post(); return false;}
cP->chStat = Channel::isClear;
((XrdSysSemaphore *)cP->chCBA)->Post();
return true;
}
cP->chStat = Channel::isClear;

// Handle enable or disable here. If we keep the channel enabled then reset
// the timeout it hasn't been handled via a call from the callback.
// the timeout if it hasn't been handled via a call from the callback.
//
if (!cbok) Detach(cP,isLocked,false);
else if (!(cP->inTOQ) && (cP->chRTO || cP->chWTO))
Expand Down Expand Up @@ -638,21 +643,6 @@ XrdSys::IOEvents::Poller *XrdSys::IOEvents::Poller::Create(int &eNum,
struct pollArg pArg;
pthread_t tid;

// Get maximum number of file descriptors we can have (hard limit)
//
if (!maxFD)
{struct rlimit rlim;
if (getrlimit(RLIMIT_NOFILE, &rlim) < 0)
{eNum = errno;
if (eTxt) *eTxt= "getting file descriptor limit";
return 0;
}
maxFD = rlim.rlim_max;
#if (defined(__macos__) && defined(MAC_OS_X_VERSION_10_5))
if (maxFD == RLIM_INFINITY || maxFD > OPEN_MAX) maxFD=OPEN_MAX;
#endif
}

// Create a pipe used to break the poll wait loop
//
if (pipe(fildes))
Expand Down Expand Up @@ -854,30 +844,28 @@ int XrdSys::IOEvents::Poller::Poll2Enum(short events)
/* S e n d C m d */
/******************************************************************************/

int XrdSys::IOEvents::Poller::SendCmd(char *cmdbuff, int cmdblen, bool doWait)
int XrdSys::IOEvents::Poller::SendCmd(PipeData &cmd)
{
int wlen, myerrno = 0;

// Lock the pipe
//
pollPipe.Lock();

// Send off the command
//
do {if ((wlen = write(cmdFD, cmdbuff, cmdblen)) < 0)
if (errno == EINTR) wlen = 0;
else {myerrno = errno; break;}
cmdbuff += wlen; cmdblen -= wlen;
} while(cmdblen > 0);

// Check if we should wait for completion
//
if (doWait && !myerrno) pollDone.Wait();
int wlen;

// Pipe writes are atomic so we don't need locks. Some commands require
// confirmation. We handle that here based on the command. Note that pipes
// gaurantee that all of the data will be written or we will block.
//
if (cmd.req >= PipeData::Post)
{XrdSysSemaphore mySem(0);
cmd.theSem = &mySem;
do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
while (wlen < 0 && errno == EINTR);
if (wlen > 0) mySem.Wait();
} else {
do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
while (wlen < 0 && errno == EINTR);
}

// All done
//
pollPipe.UnLock();
return myerrno;
return (wlen >= 0 ? 0 : errno);
}

/******************************************************************************/
Expand Down Expand Up @@ -911,7 +899,7 @@ void XrdSys::IOEvents::Poller::Stop()

// First we must stop the poller thread in an orderly fashion.
//
SendCmd((char *)&cmdbuff, sizeof(cmdbuff), true);
SendCmd(cmdbuff);

// Close the pipe communication mechanism
//
Expand Down Expand Up @@ -1048,7 +1036,7 @@ void XrdSys::IOEvents::Poller::WakeUp()
if (wakePend) toMutex.UnLock();
else {wakePend = true;
toMutex.UnLock();
SendCmd((char *)&cmdbuff, sizeof(cmdbuff));
SendCmd(cmdbuff);
}
}

Expand Down
27 changes: 12 additions & 15 deletions src/XrdSys/XrdSysIOEvents.hh
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ namespace IOEvents
//! callbacks are handled before the poller resumes any channels. This provides
//! simple serialization for all channels associated with a single poller.
//! You may call any channel method from a callback to effect appropriate
//! changes. However, you may not delete the associated channel or stop its
//! poller from the callback method as a deadlock will occur. The only exception
//! is that a channel may be safely deleted within a stop callback.
//! changes. You may also delete the channel at any time.
//-----------------------------------------------------------------------------

class Channel;
Expand Down Expand Up @@ -287,8 +285,6 @@ enum EventCode {readEvents = 0x01, //!< Read and Read Timeouts
//! Destuctor. When this object is deleted, all events are disabled, pending
//! callbacks are completed, and the channel is removed from the
//! assigned poller. Only then is the storage freed.
//! Warning! Deleting a channel from a callback other than in a stop event
//! will create a deadlock!
//-----------------------------------------------------------------------------

~Channel();
Expand Down Expand Up @@ -392,6 +388,7 @@ static Poller *Create(int &eNum, const char **eTxt=0);
virtual ~Poller() {}

protected:
struct PipeData;

void CbkTMO();
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt);
Expand All @@ -401,7 +398,7 @@ inline int GetPollEnt(Channel *cP) {return cP->pollEnt;}
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd);
inline void LockChannel(Channel *cP) {cP->chMutex.Lock();}
int Poll2Enum(short events);
int SendCmd(char *cmdbuff, int cmdblen, bool doWait=0);
int SendCmd(PipeData &cmd);
void SetPollEnt(Channel *cP, int ptEnt);
bool TmoAdd(Channel *cP);
void TmoDel(Channel *cP);
Expand Down Expand Up @@ -449,23 +446,23 @@ virtual void Shutdown() = 0;
Channel *attBase; // -> First channel in attach queue or 0
Channel *tmoBase; // -> First channel in timeout queue or 0

XrdSysMutex pollPipe; // Mutex to control sending to poll thread
XrdSysSemaphore pollDone; // Semaphore for syncing with poller thread
pthread_t pollTid; // Poller's thread ID

struct pollfd pipePoll; // Stucture to wait for pipe events
int cmdFD; // FD to send PipeData commands
int reqFD; // FD to recv PipeData requests
struct PipeData {char req; char evt; short ent; int fd;
enum cmd {MdFD = 0, MiFD, RmFD, NoOp, Post, Stop};
XrdSysSemaphore *theSem;
enum cmd {NoOp = 0, MdFD = 1, Post = 2,
MiFD = 3, RmFD = 4, Stop = 5};
};
PipeData reqBuff; // Buffer used by poller thread to recv data
char *pipeBuff; // Read resumption point in buffer
int pipeBlen; // Number of outstanding bytes
bool wakePend; // Wakeup is effectively pending (don't send)
PipeData reqBuff; // Buffer used by poller thread to recv data
char *pipeBuff; // Read resumption point in buffer
int pipeBlen; // Number of outstanding bytes
bool wakePend; // Wakeup is effectively pending (don't send)
bool chDead; // True if channel deleted by callback

static int maxFD; // Maximum number of FD's allowed
static time_t maxTime; // Maximum time allowed
static time_t maxTime; // Maximum time allowed

private:

Expand Down
8 changes: 5 additions & 3 deletions src/XrdSys/XrdSysIOEventsPollE.icc
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void XrdSys::IOEvents::PollE::Exclude(XrdSys::IOEvents::Channel *cP,
UnLockChannel(cP);
}
cmdbuff.req = PipeData::Post;
SendCmd((char *)&cmdbuff, sizeof(cmdbuff), true);
SendCmd(cmdbuff);
}
}

Expand Down Expand Up @@ -330,8 +330,10 @@ bool XrdSys::IOEvents::PollE::Process()
// Get the pipe request and check out actions of interest.
//
if (GetRequest())
{ if (reqBuff.req == PipeData::Post) pollDone.Post();
else if (reqBuff.req == PipeData::Stop) return false;
{ if (reqBuff.req == PipeData::Post) reqBuff.theSem->Post();
else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
return false;
}
}

// Return true
Expand Down
15 changes: 8 additions & 7 deletions src/XrdSys/XrdSysIOEventsPollPoll.icc
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ void XrdSys::IOEvents::PollPoll::Exclude(XrdSys::IOEvents::Channel *cP,
} else {
PipeData cmdbuff = {(char)PipeData::RmFD, 0, (short)ctnum, cP->GetFD()};
if (isLocked) {isLocked = false; UnLockChannel(cP);}
SendCmd((char *)&cmdbuff, sizeof(cmdbuff), true);
SendCmd(cmdbuff);
}
}

Expand Down Expand Up @@ -415,7 +415,7 @@ bool XrdSys::IOEvents::PollPoll::Include(XrdSys::IOEvents::Channel *cP,
PipeData cmdbuff = {(char)PipeData::MiFD, (char)cP->GetEvents(),
(short)ctnum, fd};
if (isLocked) {isLocked = false; UnLockChannel(cP);}
SendCmd((char *)&cmdbuff, sizeof(cmdbuff), true);
SendCmd(cmdbuff);
}

// All done
Expand Down Expand Up @@ -444,7 +444,7 @@ bool XrdSys::IOEvents::PollPoll::Modify(XrdSys::IOEvents::Channel *cP,
PipeData cmdbuff = {(char)PipeData::MdFD, (char)cP->GetEvents(),
(short)GetPollEnt(cP), cP->GetFD()};
if (isLocked) {isLocked = false; UnLockChannel(cP);}
SendCmd((char *)&cmdbuff, sizeof(cmdbuff), false);
SendCmd(cmdbuff);
}

// All done
Expand All @@ -465,15 +465,16 @@ bool XrdSys::IOEvents::PollPoll::Process()
{case PipeData::MdFD: FDMod(reqBuff.ent, reqBuff.fd, reqBuff.evt);
break;
case PipeData::MiFD: FDMod(reqBuff.ent, reqBuff.fd, reqBuff.evt);
pollDone.Post();
reqBuff.theSem->Post();
break;
case PipeData::RmFD: FDRem(reqBuff.ent);
pollDone.Post();
reqBuff.theSem->Post();
break;
case PipeData::NoOp: break;
case PipeData::Post: pollDone.Post();
case PipeData::Post: reqBuff.theSem->Post();
break;
case PipeData::Stop: return false;
case PipeData::Stop: reqBuff.theSem->Post();
return false;
break;
default: break;
}
Expand Down
8 changes: 5 additions & 3 deletions src/XrdSys/XrdSysIOEventsPollPort.icc
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ void XrdSys::IOEvents::PollPort::Exclude(XrdSys::IOEvents::Channel *cP,
UnLockChannel(cP);
}
cmdbuff.req = PipeData::Post;
SendCmd((char *)&cmdbuff, sizeof(cmdbuff), true);
SendCmd(cmdbuff);
}
}

Expand Down Expand Up @@ -311,8 +311,10 @@ bool XrdSys::IOEvents::PollPort::Process()
// Get the pipe request and check out actions of interest.
//
if (GetRequest())
{if (reqBuff.req == PipeData::Stop) return false;
if (reqBuff.req == PipeData::Post) pollDone.Post();
if (reqBuff.req == PipeData::Post) reqBuff.theSem->Post();
{if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
return false;
}
}

// Associate the pipe again and return true
Expand Down

0 comments on commit d075a94

Please sign in to comment.