From 0c3d9179262ab431b8949b8646eed9a1a1e4a233 Mon Sep 17 00:00:00 2001 From: Sergey KHripchenko Date: Thu, 5 Apr 2012 19:39:53 +0400 Subject: [PATCH] initial implementation of tuning TCP keep-alives for TCP socket currently not fully cross-platform work on linux possibly work in *bsd and could be enhanced to work on windows --- acinclude.m4 | 55 +++++++++++++++++++++ configure.in | 10 ++++ include/zmq.h | 5 ++ src/ip.cpp | 31 ++++++++++++ src/ip.hpp | 3 ++ src/options.cpp | 111 +++++++++++++++++++++++++++++++++++++++++- src/options.hpp | 9 +++- src/tcp_connecter.cpp | 1 + src/tcp_listener.cpp | 1 + 9 files changed, 223 insertions(+), 3 deletions(-) diff --git a/acinclude.m4 b/acinclude.m4 index 9c35fc90..3e2c63ac 100644 --- a/acinclude.m4 +++ b/acinclude.m4 @@ -603,6 +603,61 @@ int main (int argc, char *argv []) ) }]) +dnl ################################################################################ +dnl # LIBZMQ_CHECK_TCP_KEEPALIVE([action-if-found], [action-if-not-found]) # +dnl # Check if SO_KEEPALIVE is supported # +dnl ################################################################################ +AC_DEFUN([LIBZMQ_CHECK_TCP_KEEPALIVE], [{ + AC_MSG_CHECKING(whether SO_KEEPALIVE is supported) + AC_TRY_RUN([/* SO_KEEPALIVE test */ +#include +#include + +int main (int argc, char *argv []) +{ + int s, rc, opt = 1; + return ( + ((s = socket (PF_INET, SOCK_STREAM, 0)) == -1) || + ((rc = setsockopt (s, SOL_SOCKET, SO_KEEPALIVE, (char*) &opt, sizeof (int))) == -1) + ); +} + ], + [AC_MSG_RESULT(yes) ; libzmq_cv_tcp_keepalive="yes" ; $1], + [AC_MSG_RESULT(no) ; libzmq_cv_tcp_keepalive="no" ; $2], + [AC_MSG_RESULT(not during cross-compile) ; libzmq_cv_tcp_keepalive="no"] + ) +}]) + +dnl ################################################################################ +dnl # LIBZMQ_CHECK_TCP_KEEPALIVE_OPTS([action-if-found], [action-if-not-found]) # +dnl # Check if TCP_KEEPCNT, TCP_KEEPIDLE, TCP_KEEPINTVL are supported # +dnl ################################################################################ +AC_DEFUN([LIBZMQ_CHECK_TCP_KEEPALIVE_OPTS], [{ + AC_MSG_CHECKING(whether TCP_KEEPCNT TCP_KEEPIDLE TCP_KEEPINTVL are supported) + AC_TRY_RUN([/* TCP_KEEPCNT TCP_KEEPIDLE TCP_KEEPINTVL test */ +#include +#include +#include +#include + +int main (int argc, char *argv []) +{ + int s, rc, opt = 1; + return ( + ((s = socket (PF_INET, SOCK_STREAM, 0)) == -1) || + ((rc = setsockopt (s, SOL_SOCKET, SO_KEEPALIVE, (char*) &opt, sizeof (int))) == -1) || + ((rc = setsockopt (s, IPPROTO_TCP, TCP_KEEPCNT, (char*) &opt, sizeof (int))) == -1) || + ((rc = setsockopt (s, IPPROTO_TCP, TCP_KEEPIDLE, (char*) &opt, sizeof (int))) == -1) || + ((rc = setsockopt (s, IPPROTO_TCP, TCP_KEEPINTVL, (char*) &opt, sizeof (int))) == -1) + ); +} + ], + [AC_MSG_RESULT(yes) ; libzmq_cv_tcp_keepalive_opts="yes" ; $1], + [AC_MSG_RESULT(no) ; libzmq_cv_tcp_keepalive_opts="no" ; $2], + [AC_MSG_RESULT(not during cross-compile) ; libzmq_cv_tcp_keepalive_opts="no"] + ) +}]) + dnl ################################################################################ dnl # LIBZMQ_CHECK_POLLER_KQUEUE([action-if-found], [action-if-not-found]) # dnl # Checks kqueue polling system # diff --git a/configure.in b/configure.in index bd630a01..fd8ea0aa 100644 --- a/configure.in +++ b/configure.in @@ -383,6 +383,16 @@ LIBZMQ_CHECK_SOCK_CLOEXEC([AC_DEFINE( [1], [Whether SOCK_CLOEXEC is defined and functioning.]) ]) +LIBZMQ_CHECK_TCP_KEEPALIVE([AC_DEFINE( + [ZMQ_HAVE_TCP_KEEPALIVE], + [1], + [Whether SO_KEEPALIVE is supported.]) + ]) +LIBZMQ_CHECK_TCP_KEEPALIVE_OPTS([AC_DEFINE( + [ZMQ_HAVE_TCP_KEEPALIVE_OPTS], + [1], + [Whether TCP_KEEPCNT, TCP_KEEPIDLE, TCP_KEEPINTVL are supported.]) + ]) # Subst LIBZMQ_EXTRA_CFLAGS & CXXFLAGS & LDFLAGS AC_SUBST(LIBZMQ_EXTRA_CFLAGS) diff --git a/include/zmq.h b/include/zmq.h index fc6c7a02..3c1cba4e 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -222,6 +222,11 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_IPV4ONLY 31 #define ZMQ_LAST_ENDPOINT 32 #define ZMQ_FAIL_UNROUTABLE 33 +#define ZMQ_TCP_KEEPALIVE 34 +#define ZMQ_TCP_KEEPALIVE_CNT 35 +#define ZMQ_TCP_KEEPALIVE_IDLE 36 +#define ZMQ_TCP_KEEPALIVE_INTVL 37 + /* Message options */ #define ZMQ_MORE 1 diff --git a/src/ip.cpp b/src/ip.cpp index 0b4596a2..60f2244e 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -83,6 +83,37 @@ void zmq::tune_tcp_socket (fd_t s_) #endif } +void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_) +{ + // Tuning TCP keep-alives if platform allows it + // All values = -1 means skip and leave it for OS +#ifdef ZMQ_HAVE_TCP_KEEPALIVE + if (keepalive_ != -1) { + int rc = setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE, (char*) &keepalive_, sizeof (int)); +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + errno_assert (rc == 0); +#endif + +#if defined ZMQ_HAVE_TCP_KEEPALIVE_OPTS && !defined ZMQ_HAVE_WINDOWS + if (keepalive_cnt_ != -1) { + int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_, sizeof (int)); + errno_assert (rc == 0); + } + if (keepalive_idle_ != -1) { + int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE, &keepalive_idle_, sizeof (int)); + errno_assert (rc == 0); + } + if (keepalive_intvl_ != -1) { + int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL, &keepalive_intvl_, sizeof (int)); + errno_assert (rc == 0); + } +#endif + } +#endif +} + void zmq::unblock_socket (fd_t s_) { #ifdef ZMQ_HAVE_WINDOWS diff --git a/src/ip.hpp b/src/ip.hpp index c5f31dbe..1cf4824f 100644 --- a/src/ip.hpp +++ b/src/ip.hpp @@ -33,6 +33,9 @@ namespace zmq // Tunes the supplied TCP socket for the best latency. void tune_tcp_socket (fd_t s_); + // Tunes TCP keep-alives + void tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_); + // Sets the socket into non-blocking mode. void unblock_socket (fd_t s_); diff --git a/src/options.cpp b/src/options.cpp index 972d1783..6c9f6482 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -49,6 +49,10 @@ zmq::options_t::options_t () : filter (false), send_identity (false), recv_identity (false), + tcp_keepalive (-1), + tcp_keepalive_cnt (-1), + tcp_keepalive_idle (-1), + tcp_keepalive_intvl (-1), socket_id (0) { } @@ -214,8 +218,75 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ipv4only = val; return 0; } - } + case ZMQ_TCP_KEEPALIVE: + { + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + int val = *((int*) optval_); + if (val != -1 && val != 0 && val != 1) { + errno = EINVAL; + return -1; + } +#ifdef ZMQ_HAVE_TCP_KEEPALIVE + tcp_keepalive = val; +#endif + return 0; + } + + case ZMQ_TCP_KEEPALIVE_CNT: + { + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + int val = *((int*) optval_); + if (val <= 0 && val != -1) { + errno = EINVAL; + return -1; + } +#if defined ZMQ_HAVE_TCP_KEEPALIVE && defined ZMQ_HAVE_TCP_KEEPALIVE_OPTS + tcp_keepalive_cnt = val; +#endif + return 0; + } + + case ZMQ_TCP_KEEPALIVE_IDLE: + { + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + int val = *((int*) optval_); + if (val <= 0 && val != -1) { + errno = EINVAL; + return -1; + } +#if defined ZMQ_HAVE_TCP_KEEPALIVE && defined ZMQ_HAVE_TCP_KEEPALIVE_OPTS + tcp_keepalive_idle = val; +#endif + return 0; + } + + case ZMQ_TCP_KEEPALIVE_INTVL: + { + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + int val = *((int*) optval_); + if (val <= 0 && val != -1) { + errno = EINVAL; + return -1; + } +#if defined ZMQ_HAVE_TCP_KEEPALIVE && defined ZMQ_HAVE_TCP_KEEPALIVE_OPTS + tcp_keepalive_intvl = val; +#endif + return 0; + } + } errno = EINVAL; return -1; } @@ -385,7 +456,43 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *((int*) optval_) = ipv4only; *optvallen_ = sizeof (int); return 0; - + + case ZMQ_TCP_KEEPALIVE: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = tcp_keepalive; + *optvallen_ = sizeof (int); + return 0; + + case ZMQ_TCP_KEEPALIVE_CNT: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = tcp_keepalive_cnt; + *optvallen_ = sizeof (int); + return 0; + + case ZMQ_TCP_KEEPALIVE_IDLE: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = tcp_keepalive_idle; + *optvallen_ = sizeof (int); + return 0; + + case ZMQ_TCP_KEEPALIVE_INTVL: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = tcp_keepalive_intvl; + *optvallen_ = sizeof (int); + return 0; + case ZMQ_LAST_ENDPOINT: // don't allow string which cannot contain the entire message if (*optvallen_ < last_endpoint.size() + 1) { diff --git a/src/options.hpp b/src/options.hpp index 667416ad..d1302d93 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -110,7 +110,14 @@ namespace zmq // Receivers identity from all new connections. bool recv_identity; - + + // TCP keep-alive settings. + // Defaults to -1 = do not change socket options + int tcp_keepalive; + int tcp_keepalive_cnt; + int tcp_keepalive_idle; + int tcp_keepalive_intvl; + // ID of the socket. int socket_id; }; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 6ad2f808..a91c4145 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -106,6 +106,7 @@ void zmq::tcp_connecter_t::out_event () } tune_tcp_socket (fd); + tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 562d9a2e..95ad792a 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -87,6 +87,7 @@ void zmq::tcp_listener_t::in_event () return; tune_tcp_socket (fd); + tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);