Skip to content

Commit

Permalink
ZOOKEEPER-2802:Zookeeper C client hang @wait_sync_completion
Browse files Browse the repository at this point in the history
We should not terminate the do_io thread immediately when the client
in "unrecoverable" state, because it can not guarantee there is no
new request gets in the work queue at the same time.
The safer way is make do_io keep completing the new requests when the
client is unrecoverable, until a close request gets in.

Also, since we don't terminate do_io thread at SESSIONEXPIRED, remove
the matching testcase from TestZookeeperClose.cc
  • Loading branch information
yzx1983 authored and root committed Jan 4, 2024
1 parent ceebda9 commit 30f1591
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 59 deletions.
137 changes: 79 additions & 58 deletions zookeeper-client/zookeeper-client-c/src/mt_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include <sys/time.h>
#endif

extern void free_completions(zhandle_t *zh,int callCompletion,int reason);

int zoo_lock_auth(zhandle_t *zh)
{
return pthread_mutex_lock(&zh->auth_h.lock);
Expand Down Expand Up @@ -376,27 +378,38 @@ void *do_io(void *v)
int timeout;
int maxfd=1;

zh->io_count++;

zookeeper_interest(zh, &fd, &interest, &tv);
if (fd != -1) {
fds[1].fd=fd;
fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0;
fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0;
maxfd=2;
}
timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000);

poll(fds,maxfd,timeout);
if (fd != -1) {
interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0;
interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0;
}
if(fds[0].revents&POLLIN){
// flush the pipe
char b[128];
while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
}
// check the current state of the zhandle and terminate
// if it is_unrecoverable()
if(is_unrecoverable(zh)) {
/* Signal any syncronous completions */
enter_critical(zh);
free_completions(zh,1,ZSESSIONEXPIRED);
leave_critical(zh);
// don't exhaust cpu
usleep(1000);
} else {

zh->io_count++;

zookeeper_interest(zh, &fd, &interest, &tv);
if (fd != -1) {
fds[1].fd=fd;
fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0;
fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0;
maxfd=2;
}
timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000);

poll(fds,maxfd,timeout);
if (fd != -1) {
interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0;
interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0;
}
if(fds[0].revents&POLLIN){
// flush the pipe
char b[128];
while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
}
#else
fd_set rfds, wfds;
struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
Expand All @@ -410,51 +423,59 @@ void *do_io(void *v)
int interest;
int rc;

zookeeper_interest(zh, &fd, &interest, &tv);

// FD_ZERO is cheap on Win32, it just sets count of elements to zero.
// It needs to be done to ensure no stale entries.
FD_ZERO(&rfds);
FD_ZERO(&wfds);

if (fd != -1) {
if (interest&ZOOKEEPER_READ) {
FD_SET(fd, &rfds);
}

if (interest&ZOOKEEPER_WRITE) {
FD_SET(fd, &wfds);
// check the current state of the zhandle and terminate
// if it is_unrecoverable()
if(is_unrecoverable(zh)) {
/* Signal any syncronous completions */
enter_critical(zh);
free_completions(zh,1,ZSESSIONEXPIRED);
leave_critical(zh);
// dont' exhaust cpu
usleep(1000);
} else {

zookeeper_interest(zh, &fd, &interest, &tv);

// FD_ZERO is cheap on Win32, it just sets count of elements to zero.
// It needs to be done to ensure no stale entries.
FD_ZERO(&rfds);
FD_ZERO(&wfds);

if (fd != -1) {
if (interest&ZOOKEEPER_READ) {
FD_SET(fd, &rfds);
}

if (interest&ZOOKEEPER_WRITE) {
FD_SET(fd, &wfds);
}
}
}

// Always interested in self_pipe.
FD_SET(adaptor_threads->self_pipe[0], &rfds);
// Always interested in self_pipe.
FD_SET(adaptor_threads->self_pipe[0], &rfds);

rc = select(/* unused */0, &rfds, &wfds, NULL, &tv);
if (rc > 0) {
interest=(FD_ISSET(fd, &rfds))? ZOOKEEPER_READ: 0;
interest|=(FD_ISSET(fd, &wfds))? ZOOKEEPER_WRITE: 0;
rc = select(/* unused */0, &rfds, &wfds, NULL, &tv);
if (rc > 0) {
interest=(FD_ISSET(fd, &rfds))? ZOOKEEPER_READ: 0;
interest|=(FD_ISSET(fd, &wfds))? ZOOKEEPER_WRITE: 0;

if (FD_ISSET(adaptor_threads->self_pipe[0], &rfds)){
// flush the pipe/socket
char b[128];
while(recv(adaptor_threads->self_pipe[0],b,sizeof(b), 0)==sizeof(b)){}
if (FD_ISSET(adaptor_threads->self_pipe[0], &rfds)){
// flush the pipe/socket
char b[128];
while(recv(adaptor_threads->self_pipe[0],b,sizeof(b), 0)==sizeof(b)){}
}
}
}
else if (rc < 0) {
LOG_ERROR(LOGCALLBACK(zh), ("select() failed %d [%d].", rc, WSAGetLastError()));
else if (rc < 0) {
LOG_ERROR(LOGCALLBACK(zh), ("select() failed %d [%d].", rc, WSAGetLastError()));

// Clear interest events for zookeeper_process if select() fails.
interest = 0;
}
// Clear interest events for zookeeper_process if select() fails.
interest = 0;
}

#endif
// dispatch zookeeper events
zookeeper_process(zh, interest);
// check the current state of the zhandle and terminate
// if it is_unrecoverable()
if(is_unrecoverable(zh))
break;
// dispatch zookeeper events
zookeeper_process(zh, interest);
}
}
api_epilog(zh, 0);
LOG_DEBUG(LOGCALLBACK(zh), "IO thread terminated");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ class Zookeeper_close : public CPPUNIT_NS::TestFixture

CPPUNIT_ASSERT(zh!=0);
CPPUNIT_ASSERT(ensureCondition(SessionExpired(zh),1000)<1000);
CPPUNIT_ASSERT(ensureCondition(IOThreadStopped(zh),1000)<1000);
// make sure the watcher has been processed
CPPUNIT_ASSERT(ensureCondition(closeAction.isWatcherTriggered(),1000)<1000);
// make sure the threads have not been destroyed yet
Expand Down

0 comments on commit 30f1591

Please sign in to comment.