diff --git a/CMakeLists.txt b/CMakeLists.txt index 496a759..6574426 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -77,6 +77,8 @@ if(WANT_VALGRIND) endif(NOT HAVE_VALGRIND_H) endif(WANT_VALGRIND) +check_include_files(poll.h HAVE_POLL_H) + include_directories( "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" diff --git a/include/evfibers/config.h.in b/include/evfibers/config.h.in index f47b42b..fe81320 100644 --- a/include/evfibers/config.h.in +++ b/include/evfibers/config.h.in @@ -19,6 +19,7 @@ #ifndef _FBR_CONFIG_H_ #define _FBR_CONFIG_H_ +#cmakedefine HAVE_POLL_H #cmakedefine HAVE_VALGRIND_H #cmakedefine FBR_EIO_ENABLED diff --git a/include/evfibers/fiber.h b/include/evfibers/fiber.h index d4a2d97..8e82c79 100644 --- a/include/evfibers/fiber.h +++ b/include/evfibers/fiber.h @@ -137,6 +137,8 @@ * @file evfibers/fiber.h * This file contains all client-visible API functions for working with fibers. */ +#include + #include #include #include @@ -145,11 +147,12 @@ #include #include #include +#ifdef HAVE_POLL_H +#include +#endif #include #include -#include - /** * Maximum allowed level of fbr_transfer nesting within fibers. */ @@ -1153,6 +1156,22 @@ int fbr_connect(FBR_P_ int sockfd, const struct sockaddr *addr, int fbr_connect_wto(FBR_P_ int sockfd, const struct sockaddr *addr, socklen_t addrlen, ev_tstamp timeout); +#ifdef HAVE_POLL_H +/** + * Fiber friendly poll wrapper. + * @param [in] fds - socket file descriptors to examine + * @param [in] nfds - specifies the size of the fds array + * @param [in] timeout_ms in milliseconds to wait for events + * @return number of descriptors that are ready for I/O on success, -1 in case of error and errno set + * + * Poll wrapper that examines a set of file descriptors to see if some of them + * are ready for I/O or if certain events have occurred on them. + * + * Possible errno values are described in the poll man page. + */ +int fbr_poll(FBR_P_ struct pollfd fds[], nfds_t nfds, int timeout_ms); +#endif /* HAVE_POLL_H */ + /** * Fiber friendly libc read wrapper. * @param [in] fd file descriptor to read from @@ -1528,6 +1547,24 @@ void fbr_cond_destroy(FBR_P_ struct fbr_cond_var *cond); */ int fbr_cond_wait(FBR_P_ struct fbr_cond_var *cond, struct fbr_mutex *mutex); +/** + * Waits timeout number of seconds or until the condition is met. + * + * Current fiber is suspended until a signal is sent via fbr_cond_signal or + * fbr_cond_broadcast to the corresponding conditional variable or + * the timer expires. + * + * A mutex must be acquired by the calling fiber prior to waiting for a + * condition. Internally mutex is released and reacquired again before + * returning. Upon successful return calling fiber will hold the mutex. + * + * @see fbr_cond_init + * @see fbr_cond_destroy + * @see fbr_cond_broadcast + * @see fbr_cond_signal + */ +int fbr_cond_wait_wto(FBR_P_ struct fbr_cond_var *cond, struct fbr_mutex *mutex, ev_tstamp timeout); + /** * Broadcasts a signal to all fibers waiting for condition. * diff --git a/src/fiber.c b/src/fiber.c index 0162110..9cde0d6 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -913,6 +913,45 @@ int fbr_connect_wto(FBR_P_ int sockfd, const struct sockaddr *addr, return r; } +#ifdef HAVE_POLL_H +int fbr_poll(FBR_P_ struct pollfd fds[], nfds_t nfds, int timeout_ms) +{ + nfds_t i; + ev_io io[nfds]; + struct fbr_ev_base *events[nfds+1]; + struct fbr_ev_watcher watcher[nfds]; + struct fbr_destructor dtor[nfds]; + + if (timeout_ms != 0) { + for (i = 0; i < nfds; ++i) { + ev_io_init(&io[i], NULL, fds[i].fd, + (fds[i].events & POLLIN ? EV_READ : 0) | + (fds[i].events & POLLOUT ? EV_WRITE : 0)); + ev_io_start(fctx->__p->loop, &io[i]); + + fbr_ev_watcher_init(FBR_A_ &watcher[i], + (struct ev_watcher *)&io[i]); + events[i] = &watcher[i].ev_base; + + dtor[i].func = watcher_io_dtor; + dtor[i].arg = &io[i]; + dtor[i].active = 0; + fbr_destructor_add(FBR_A_ &dtor[i]); + } + events[i] = NULL; + + if (timeout_ms > 0) + fbr_ev_wait_to(FBR_A_ events, timeout_ms/1000.0); + else + fbr_ev_wait(FBR_A_ events); + + for (i = 0; i < nfds; ++i) { + fbr_destructor_remove(FBR_A_ &dtor[i], 1); + } + } + return poll(fds, nfds, 0); /* Non-blocking when timeout is 0. */ +} +#endif /* HAVE_POLL_H */ ssize_t fbr_read(FBR_P_ int fd, void *buf, size_t count) { @@ -1691,6 +1730,20 @@ int fbr_cond_wait(FBR_P_ struct fbr_cond_var *cond, struct fbr_mutex *mutex) return_success(0); } +int fbr_cond_wait_wto(FBR_P_ struct fbr_cond_var *cond, + struct fbr_mutex *mutex, ev_tstamp timeout) +{ + struct fbr_ev_cond_var ev; + + if (mutex && fbr_id_isnull(mutex->locked_by)) + return_error(-1, FBR_EINVAL); + + fbr_ev_cond_var_init(FBR_A_ &ev, cond, mutex); + if (fbr_ev_wait_one_wto(FBR_A_ &ev.ev_base, timeout) == -1) + fbr_mutex_lock(FBR_A_ mutex); + return_success(0); +} + void fbr_cond_broadcast(FBR_P_ struct fbr_cond_var *cond) { struct fbr_id_tailq_i *item;