diff --git a/src/core.c b/src/core.c index 365f4f6..c20245a 100644 --- a/src/core.c +++ b/src/core.c @@ -23,8 +23,6 @@ #include #include -#include -#include #include #include #include @@ -48,68 +46,30 @@ body_cb(void *contents, size_t size, size_t nmemb, void *userp) return realsize; } -static int multi_timer_cb(CURLM *multi, long timeout_ms, LoopState *lstate) { - elog(DEBUG2, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms); - - itimerspec its = - timeout_ms > 0 ? - // assign the timeout normally - (itimerspec){ - .it_value.tv_sec = timeout_ms / 1000, - .it_value.tv_nsec = (timeout_ms % 1000) * 1000 * 1000, - }: - timeout_ms == 0 ? - /* libcurl wants us to timeout now, however setting both fields of - * new_value.it_value to zero disarms the timer. The closest we can - * do is to schedule the timer to fire in 1 ns. */ - (itimerspec){ - .it_value.tv_sec = 0, - .it_value.tv_nsec = 1, - }: - // libcurl passes a -1 to indicate the timer should be deleted - (itimerspec){}; - - int no_flags = 0; - if (timerfd_settime(lstate->timerfd, no_flags, &its, NULL) < 0) { - ereport(ERROR, errmsg("timerfd_settime failed")); - } - - return 0; -} +typedef struct { + int pos; +} marker; -static int multi_socket_cb(CURL *easy, curl_socket_t sockfd, int what, LoopState *lstate, void *socketp) { +static int multi_socket_cb(CURL *easy, curl_socket_t sockfd, int what, LoopState *lstate, marker *mark) { static char *whatstrs[] = { "NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT", "CURL_POLL_REMOVE" }; elog(DEBUG2, "multi_socket_cb: sockfd %d received %s", sockfd, whatstrs[what]); - int epoll_op; - if(!socketp){ - epoll_op = EPOLL_CTL_ADD; - bool *socket_exists = palloc(sizeof(bool)); - curl_multi_assign(lstate->curl_mhandle, sockfd, socket_exists); + int ev = + (what & CURL_POLL_IN) ? + WL_SOCKET_READABLE: + (what & CURL_POLL_OUT) ? + WL_SOCKET_WRITEABLE: + 0; // no event is assigned since here we get CURL_POLL_REMOVE and the sockfd will be removed + + if(!mark){ + marker *new_marker = palloc(sizeof(marker)); + new_marker->pos = AddWaitEventToSet(lstate->event_set, ev, sockfd, NULL, NULL); + curl_multi_assign(lstate->curl_mhandle, sockfd, new_marker); } else if (what == CURL_POLL_REMOVE){ - epoll_op = EPOLL_CTL_DEL; - pfree(socketp); + pfree(mark); curl_multi_assign(lstate->curl_mhandle, sockfd, NULL); } else { - epoll_op = EPOLL_CTL_MOD; - } - - epoll_event ev = { - .data.fd = sockfd, - .events = - (what & CURL_POLL_IN) ? - EPOLLIN: - (what & CURL_POLL_OUT) ? - EPOLLOUT: - 0, // no event is assigned since here we get CURL_POLL_REMOVE and the sockfd will be removed - }; - - // epoll_ctl will copy ev, so there's no need to do palloc for the epoll_event - // https://github.com/torvalds/linux/blob/e32cde8d2bd7d251a8f9b434143977ddf13dcec6/fs/eventpoll.c#L2408-L2418 - if (epoll_ctl(lstate->epfd, epoll_op, sockfd, &ev) < 0) { - int e = errno; - static char *opstrs[] = { "NONE", "EPOLL_CTL_ADD", "EPOLL_CTL_DEL", "EPOLL_CTL_MOD" }; - ereport(ERROR, errmsg("epoll_ctl with %s failed when receiving %s for sockfd %d: %s", whatstrs[what], opstrs[epoll_op], sockfd, strerror(e))); + ModifyWaitEvent(lstate->event_set, mark->pos, ev, NULL); } return 0; @@ -194,8 +154,6 @@ static void init_curl_handle(CURLM *curl_mhandle, MemoryContext curl_memctx, int void set_curl_mhandle(CURLM *curl_mhandle, LoopState *lstate){ CURL_MULTI_SETOPT(curl_mhandle, CURLMOPT_SOCKETFUNCTION, multi_socket_cb); CURL_MULTI_SETOPT(curl_mhandle, CURLMOPT_SOCKETDATA, lstate); - CURL_MULTI_SETOPT(curl_mhandle, CURLMOPT_TIMERFUNCTION, multi_timer_cb); - CURL_MULTI_SETOPT(curl_mhandle, CURLMOPT_TIMERDATA, lstate); } void delete_expired_responses(char *ttl, int batch_size){ diff --git a/src/core.h b/src/core.h index 1c98fc1..be8abdf 100644 --- a/src/core.h +++ b/src/core.h @@ -1,13 +1,9 @@ #ifndef CORE_H #define CORE_H -typedef struct itimerspec itimerspec; -typedef struct epoll_event epoll_event; - typedef struct { - int epfd; - int timerfd; CURLM *curl_mhandle; + WaitEventSet *event_set; } LoopState; void delete_expired_responses(char *ttl, int batch_size); diff --git a/src/worker.c b/src/worker.c index c320479..74ec205 100644 --- a/src/worker.c +++ b/src/worker.c @@ -23,8 +23,6 @@ #include #include -#include -#include #include #include #include @@ -100,28 +98,14 @@ void pg_net_worker(Datum main_arg) { ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret))); LoopState lstate = { - .epfd = epoll_create1(0), - .timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC), .curl_mhandle = curl_multi_init(), }; - if (lstate.epfd < 0) { - ereport(ERROR, errmsg("Failed to create epoll file descriptor")); - } - - if (lstate.timerfd < 0) { - ereport(ERROR, errmsg("Failed to create timerfd")); - } - if(!lstate.curl_mhandle) ereport(ERROR, errmsg("curl_multi_init()")); set_curl_mhandle(lstate.curl_mhandle, &lstate); - timerfd_settime(lstate.timerfd, 0, &(itimerspec){}, NULL); - - epoll_ctl(lstate.epfd, EPOLL_CTL_ADD, lstate.timerfd, &(epoll_event){.events = EPOLLIN, .data.fd = lstate.timerfd}); - while (!got_sigterm) { WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -146,59 +130,40 @@ void pg_net_worker(Datum main_arg) { consume_request_queue(lstate.curl_mhandle, guc_batch_size, CurlMemContext); int running_handles = 0; - int maxevents = guc_batch_size + 1; // 1 extra for the timer - epoll_event *events = palloc0(sizeof(epoll_event) * maxevents); - do { - int nfds = epoll_wait(lstate.epfd, events, maxevents, /*timeout=*/1000); - if (nfds < 0) { - int save_errno = errno; - if(save_errno == EINTR) { // can happen when the epoll is interrupted, for example when running under GDB. Just continue in this case. - continue; - } - else { - ereport(ERROR, errmsg("epoll_wait() failed: %s", strerror(save_errno))); - break; - } - } + lstate.event_set = CreateWaitEventSet(CurlMemContext, guc_batch_size); - for (int i = 0; i < nfds; i++) { - if (events[i].data.fd == lstate.timerfd) { - EREPORT_MULTI( - curl_multi_socket_action(lstate.curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles) - ); - } else { - int ev_bitmask = - events[i].events & EPOLLIN ? CURL_CSELECT_IN: - events[i].events & EPOLLOUT ? CURL_CSELECT_OUT: - CURL_CSELECT_ERR; - - EREPORT_MULTI( - curl_multi_socket_action( - lstate.curl_mhandle, events[i].data.fd, - ev_bitmask, - &running_handles) - ); - - if(running_handles <= 0) { - elog(DEBUG2, "last transfer done, kill timeout"); - timerfd_settime(lstate.timerfd, 0, &(itimerspec){0}, NULL); - } - } + EREPORT_MULTI( + curl_multi_socket_action(lstate.curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles) + ); + + do { + WaitEvent *events = palloc(sizeof(WaitEvent)*guc_batch_size); + int num_events_ocurred = WaitEventSetWait(lstate.event_set, /*timeout=*/1000, events, guc_batch_size, /*pending=*/ 1); + + for (size_t i = 0; i < num_events_ocurred; i++) { + int ev_bitmask = + events[i].events & WL_SOCKET_READABLE ? CURL_CSELECT_IN: + events[i].events & WL_SOCKET_WRITEABLE ? CURL_CSELECT_OUT: + CURL_CSELECT_ERR; + + EREPORT_MULTI( + curl_multi_socket_action( + lstate.curl_mhandle, events[i].fd, + ev_bitmask, + &running_handles) + ); insert_curl_responses(&lstate, CurlMemContext); } } while (running_handles > 0); // run again while there are curl handles, this will prevent waiting for the latch_timeout (which will cause the cause the curl timeouts to be wrong) - pfree(events); + FreeWaitEventSet(lstate.event_set); MemoryContextReset(CurlMemContext); } - close(lstate.epfd); - close(lstate.timerfd); - curl_multi_cleanup(lstate.curl_mhandle); curl_global_cleanup();