diff --git a/src/XrdSys/XrdSysIOEvents.cc b/src/XrdSys/XrdSysIOEvents.cc index 1c8c317a2a1..786765da420 100644 --- a/src/XrdSys/XrdSysIOEvents.cc +++ b/src/XrdSys/XrdSysIOEvents.cc @@ -13,7 +13,6 @@ #include #include #include -#include #include "XrdSys/XrdSysIOEvents.hh" #include "XrdSys/XrdSysHeaders.hh" @@ -51,8 +50,6 @@ /* G l o b a l D a t a */ /******************************************************************************/ - int XrdSys::IOEvents::Poller::maxFD = 0; - time_t XrdSys::IOEvents::Poller::maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff); @@ -92,7 +89,6 @@ void *BootStrap::Start(void *parg) thePoller->Begin(&(pollArg->pollSync), pollArg->retCode, &(pollArg->retMsg)); - thePoller->pollDone.Post(); return (void *)0; } @@ -236,6 +232,7 @@ XrdSys::IOEvents::Channel::Channel(Poller *pollP, int fd, XrdSys::IOEvents::Channel::~Channel() { + Poller *myPoller; bool isLocked = true; // Lock ourselves during the delete process. If the channel is disassociated @@ -247,26 +244,28 @@ XrdSys::IOEvents::Channel::~Channel() return; } -// If we are in callback mode then we will need to delay the destruction until -// after the callback completes. Note that we deadlock if the channel is deleted -// or the poller stopped by the callback object. We ignore double deletes. -// - if (chStat) - {XrdSysSemaphore cbDone(0); - chStat = isDead; - chCBA = (void *)&cbDone; - chMutex.UnLock(); - cbDone.Wait(); - chMutex.Lock(); - } - // Disable and remove ourselves from all queues // - if (chPollXQ && chPollXQ != &pollErr1) chPollXQ->Detach(this,isLocked,false); + myPoller = chPollXQ; + chPollXQ->Detach(this,isLocked,false); + if (!isLocked) chMutex.Lock(); -// All done +// If we are in callback mode then we will need to delay the destruction until +// after the callback completes unless this is the poller thread. In that case, +// we need to tell the poller that we have been destroyed in a shelf-stable way. // - if (isLocked) chMutex.UnLock(); + if (chStat) + {if (XrdSysThread::Same(XrdSysThread::ID(),myPoller->pollTid)) + {myPoller->chDead = true; + chMutex.UnLock(); + } else { + XrdSysSemaphore cbDone(0); + chStat = isDead; + chCBA = (void *)&cbDone; + chMutex.UnLock(); + cbDone.Wait(); + } + } } /******************************************************************************/ @@ -471,7 +470,7 @@ void XrdSys::IOEvents::Channel::SetFD(int fd) /* C o n s t r u c t o r */ /******************************************************************************/ -XrdSys::IOEvents::Poller::Poller(int rFD, int cFD) : pollDone(0) +XrdSys::IOEvents::Poller::Poller(int rFD, int cFD) { // Now initialize local class members @@ -588,26 +587,32 @@ bool XrdSys::IOEvents::Poller::CbkXeq(XrdSys::IOEvents::Channel *cP, int events, } } -// Indicate that we are in callback mode then drop the channel and effect the -// callback. We then regain the lock. +// Indicate that we are in callback mode then drop the channel lock and effect +// the callback. This allows the callback to freely manage locks. // cP->chStat = Channel::isCBMode; + chDead = false; cP->chMutex.UnLock(); cbok = cP->chCB->Event(cP,cP->chCBA, events); + +// If channel destroyed by the callback, bail really fast. Otherwise, regain +// the channel lock. +// + if (chDead) return true; cP->chMutex.Lock(); -// If the channel is being destroyed; then bail fast! +// If the channel is being destroyed; then another thread must have done so. +// Tell it the callback has finished and just return. // if (cP->chStat != Channel::isCBMode) {if (cP->chStat == Channel::isDead) - {((XrdSysSemaphore *)cP->chCBA)->Post(); return false;} - cP->chStat = Channel::isClear; + ((XrdSysSemaphore *)cP->chCBA)->Post(); return true; } cP->chStat = Channel::isClear; // Handle enable or disable here. If we keep the channel enabled then reset -// the timeout it hasn't been handled via a call from the callback. +// the timeout if it hasn't been handled via a call from the callback. // if (!cbok) Detach(cP,isLocked,false); else if (!(cP->inTOQ) && (cP->chRTO || cP->chWTO)) @@ -638,21 +643,6 @@ XrdSys::IOEvents::Poller *XrdSys::IOEvents::Poller::Create(int &eNum, struct pollArg pArg; pthread_t tid; -// Get maximum number of file descriptors we can have (hard limit) -// - if (!maxFD) - {struct rlimit rlim; - if (getrlimit(RLIMIT_NOFILE, &rlim) < 0) - {eNum = errno; - if (eTxt) *eTxt= "getting file descriptor limit"; - return 0; - } - maxFD = rlim.rlim_max; -#if (defined(__macos__) && defined(MAC_OS_X_VERSION_10_5)) - if (maxFD == RLIM_INFINITY || maxFD > OPEN_MAX) maxFD=OPEN_MAX; -#endif - } - // Create a pipe used to break the poll wait loop // if (pipe(fildes)) @@ -854,30 +844,28 @@ int XrdSys::IOEvents::Poller::Poll2Enum(short events) /* S e n d C m d */ /******************************************************************************/ -int XrdSys::IOEvents::Poller::SendCmd(char *cmdbuff, int cmdblen, bool doWait) +int XrdSys::IOEvents::Poller::SendCmd(PipeData &cmd) { - int wlen, myerrno = 0; - -// Lock the pipe -// - pollPipe.Lock(); - -// Send off the command -// - do {if ((wlen = write(cmdFD, cmdbuff, cmdblen)) < 0) - if (errno == EINTR) wlen = 0; - else {myerrno = errno; break;} - cmdbuff += wlen; cmdblen -= wlen; - } while(cmdblen > 0); - -// Check if we should wait for completion -// - if (doWait && !myerrno) pollDone.Wait(); + int wlen; + +// Pipe writes are atomic so we don't need locks. Some commands require +// confirmation. We handle that here based on the command. Note that pipes +// gaurantee that all of the data will be written or we will block. +// + if (cmd.req >= PipeData::Post) + {XrdSysSemaphore mySem(0); + cmd.theSem = &mySem; + do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));} + while (wlen < 0 && errno == EINTR); + if (wlen > 0) mySem.Wait(); + } else { + do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));} + while (wlen < 0 && errno == EINTR); + } // All done // - pollPipe.UnLock(); - return myerrno; + return (wlen >= 0 ? 0 : errno); } /******************************************************************************/ @@ -911,7 +899,7 @@ void XrdSys::IOEvents::Poller::Stop() // First we must stop the poller thread in an orderly fashion. // - SendCmd((char *)&cmdbuff, sizeof(cmdbuff), true); + SendCmd(cmdbuff); // Close the pipe communication mechanism // @@ -1048,7 +1036,7 @@ void XrdSys::IOEvents::Poller::WakeUp() if (wakePend) toMutex.UnLock(); else {wakePend = true; toMutex.UnLock(); - SendCmd((char *)&cmdbuff, sizeof(cmdbuff)); + SendCmd(cmdbuff); } } diff --git a/src/XrdSys/XrdSysIOEvents.hh b/src/XrdSys/XrdSysIOEvents.hh index 7dec9ff2c0a..24435363098 100644 --- a/src/XrdSys/XrdSysIOEvents.hh +++ b/src/XrdSys/XrdSysIOEvents.hh @@ -50,9 +50,7 @@ namespace IOEvents //! callbacks are handled before the poller resumes any channels. This provides //! simple serialization for all channels associated with a single poller. //! You may call any channel method from a callback to effect appropriate -//! changes. However, you may not delete the associated channel or stop its -//! poller from the callback method as a deadlock will occur. The only exception -//! is that a channel may be safely deleted within a stop callback. +//! changes. You may also delete the channel at any time. //----------------------------------------------------------------------------- class Channel; @@ -287,8 +285,6 @@ enum EventCode {readEvents = 0x01, //!< Read and Read Timeouts //! Destuctor. When this object is deleted, all events are disabled, pending //! callbacks are completed, and the channel is removed from the //! assigned poller. Only then is the storage freed. -//! Warning! Deleting a channel from a callback other than in a stop event -//! will create a deadlock! //----------------------------------------------------------------------------- ~Channel(); @@ -392,6 +388,7 @@ static Poller *Create(int &eNum, const char **eTxt=0); virtual ~Poller() {} protected: +struct PipeData; void CbkTMO(); bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt); @@ -401,7 +398,7 @@ inline int GetPollEnt(Channel *cP) {return cP->pollEnt;} bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd); inline void LockChannel(Channel *cP) {cP->chMutex.Lock();} int Poll2Enum(short events); - int SendCmd(char *cmdbuff, int cmdblen, bool doWait=0); + int SendCmd(PipeData &cmd); void SetPollEnt(Channel *cP, int ptEnt); bool TmoAdd(Channel *cP); void TmoDel(Channel *cP); @@ -449,23 +446,23 @@ virtual void Shutdown() = 0; Channel *attBase; // -> First channel in attach queue or 0 Channel *tmoBase; // -> First channel in timeout queue or 0 -XrdSysMutex pollPipe; // Mutex to control sending to poll thread -XrdSysSemaphore pollDone; // Semaphore for syncing with poller thread pthread_t pollTid; // Poller's thread ID struct pollfd pipePoll; // Stucture to wait for pipe events int cmdFD; // FD to send PipeData commands int reqFD; // FD to recv PipeData requests struct PipeData {char req; char evt; short ent; int fd; - enum cmd {MdFD = 0, MiFD, RmFD, NoOp, Post, Stop}; + XrdSysSemaphore *theSem; + enum cmd {NoOp = 0, MdFD = 1, Post = 2, + MiFD = 3, RmFD = 4, Stop = 5}; }; - PipeData reqBuff; // Buffer used by poller thread to recv data -char *pipeBuff; // Read resumption point in buffer -int pipeBlen; // Number of outstanding bytes -bool wakePend; // Wakeup is effectively pending (don't send) +PipeData reqBuff; // Buffer used by poller thread to recv data +char *pipeBuff; // Read resumption point in buffer +int pipeBlen; // Number of outstanding bytes +bool wakePend; // Wakeup is effectively pending (don't send) +bool chDead; // True if channel deleted by callback -static int maxFD; // Maximum number of FD's allowed -static time_t maxTime; // Maximum time allowed +static time_t maxTime; // Maximum time allowed private: diff --git a/src/XrdSys/XrdSysIOEventsPollE.icc b/src/XrdSys/XrdSysIOEventsPollE.icc index 360a6d29b01..a50299baaa7 100644 --- a/src/XrdSys/XrdSysIOEventsPollE.icc +++ b/src/XrdSys/XrdSysIOEventsPollE.icc @@ -254,7 +254,7 @@ void XrdSys::IOEvents::PollE::Exclude(XrdSys::IOEvents::Channel *cP, UnLockChannel(cP); } cmdbuff.req = PipeData::Post; - SendCmd((char *)&cmdbuff, sizeof(cmdbuff), true); + SendCmd(cmdbuff); } } @@ -330,8 +330,10 @@ bool XrdSys::IOEvents::PollE::Process() // Get the pipe request and check out actions of interest. // if (GetRequest()) - { if (reqBuff.req == PipeData::Post) pollDone.Post(); - else if (reqBuff.req == PipeData::Stop) return false; + { if (reqBuff.req == PipeData::Post) reqBuff.theSem->Post(); + else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post(); + return false; + } } // Return true diff --git a/src/XrdSys/XrdSysIOEventsPollPoll.icc b/src/XrdSys/XrdSysIOEventsPollPoll.icc index d95876ba358..0d796acb7de 100644 --- a/src/XrdSys/XrdSysIOEventsPollPoll.icc +++ b/src/XrdSys/XrdSysIOEventsPollPoll.icc @@ -262,7 +262,7 @@ void XrdSys::IOEvents::PollPoll::Exclude(XrdSys::IOEvents::Channel *cP, } else { PipeData cmdbuff = {(char)PipeData::RmFD, 0, (short)ctnum, cP->GetFD()}; if (isLocked) {isLocked = false; UnLockChannel(cP);} - SendCmd((char *)&cmdbuff, sizeof(cmdbuff), true); + SendCmd(cmdbuff); } } @@ -415,7 +415,7 @@ bool XrdSys::IOEvents::PollPoll::Include(XrdSys::IOEvents::Channel *cP, PipeData cmdbuff = {(char)PipeData::MiFD, (char)cP->GetEvents(), (short)ctnum, fd}; if (isLocked) {isLocked = false; UnLockChannel(cP);} - SendCmd((char *)&cmdbuff, sizeof(cmdbuff), true); + SendCmd(cmdbuff); } // All done @@ -444,7 +444,7 @@ bool XrdSys::IOEvents::PollPoll::Modify(XrdSys::IOEvents::Channel *cP, PipeData cmdbuff = {(char)PipeData::MdFD, (char)cP->GetEvents(), (short)GetPollEnt(cP), cP->GetFD()}; if (isLocked) {isLocked = false; UnLockChannel(cP);} - SendCmd((char *)&cmdbuff, sizeof(cmdbuff), false); + SendCmd(cmdbuff); } // All done @@ -465,15 +465,16 @@ bool XrdSys::IOEvents::PollPoll::Process() {case PipeData::MdFD: FDMod(reqBuff.ent, reqBuff.fd, reqBuff.evt); break; case PipeData::MiFD: FDMod(reqBuff.ent, reqBuff.fd, reqBuff.evt); - pollDone.Post(); + reqBuff.theSem->Post(); break; case PipeData::RmFD: FDRem(reqBuff.ent); - pollDone.Post(); + reqBuff.theSem->Post(); break; case PipeData::NoOp: break; - case PipeData::Post: pollDone.Post(); + case PipeData::Post: reqBuff.theSem->Post(); break; - case PipeData::Stop: return false; + case PipeData::Stop: reqBuff.theSem->Post(); + return false; break; default: break; } diff --git a/src/XrdSys/XrdSysIOEventsPollPort.icc b/src/XrdSys/XrdSysIOEventsPollPort.icc index 618f9d6f801..e7b470ed703 100644 --- a/src/XrdSys/XrdSysIOEventsPollPort.icc +++ b/src/XrdSys/XrdSysIOEventsPollPort.icc @@ -240,7 +240,7 @@ void XrdSys::IOEvents::PollPort::Exclude(XrdSys::IOEvents::Channel *cP, UnLockChannel(cP); } cmdbuff.req = PipeData::Post; - SendCmd((char *)&cmdbuff, sizeof(cmdbuff), true); + SendCmd(cmdbuff); } } @@ -311,8 +311,10 @@ bool XrdSys::IOEvents::PollPort::Process() // Get the pipe request and check out actions of interest. // if (GetRequest()) - {if (reqBuff.req == PipeData::Stop) return false; - if (reqBuff.req == PipeData::Post) pollDone.Post(); + if (reqBuff.req == PipeData::Post) reqBuff.theSem->Post(); + {if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post(); + return false; + } } // Associate the pipe again and return true