From f6293d257d9f649ef82ce40850d1b18cca9a89dd Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Wed, 9 Oct 2013 10:51:30 +0200 Subject: [PATCH] Signal that the peer performed orderly shutdown --- src/stream_engine.cpp | 77 ++++++++++++++++++++----------------------- src/stream_engine.hpp | 7 ++-- 2 files changed, 39 insertions(+), 45 deletions(-) diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 2c078890..68780d28 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -217,17 +217,22 @@ void zmq::stream_engine_t::in_event () // Note that buffer can be arbitrarily large. However, we assume // the underlying TCP layer has fixed buffer size and thus the // number of bytes read will be always limited. - decoder->get_buffer (&inpos, &insize); - const int bytes_read = read (inpos, insize); + size_t bufsize = 0; + decoder->get_buffer (&inpos, &bufsize); - // Check whether the peer has closed the connection. - if (bytes_read == -1) { + int const rc = read (inpos, bufsize); + if (rc == 0) { error (); return; } + if (rc == -1) { + if (errno != EAGAIN) + error (); + return; + } // Adjust input size - insize = static_cast (bytes_read); + insize = static_cast (rc); } int rc = 0; @@ -396,12 +401,15 @@ bool zmq::stream_engine_t::handshake () while (greeting_bytes_read < greeting_size) { const int n = read (greeting_recv + greeting_bytes_read, greeting_size - greeting_bytes_read); - if (n == -1) { + if (n == 0) { error (); return false; } - if (n == 0) + if (n == -1) { + if (errno != EAGAIN) + error (); return false; + } greeting_bytes_read += n; @@ -792,58 +800,45 @@ int zmq::stream_engine_t::read (void *data_, size_t size_) { #ifdef ZMQ_HAVE_WINDOWS - int nbytes = recv (s, (char*) data_, (int) size_, 0); + const int rc = recv (s, (char*) data_, (int) size_, 0); // If not a single byte can be read from the socket in non-blocking mode // we'll get an error (this may happen during the speculative read). - if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) - return 0; - - // Connection failure. - if (nbytes == SOCKET_ERROR && ( - WSAGetLastError () == WSAENETDOWN || - WSAGetLastError () == WSAENETRESET || - WSAGetLastError () == WSAECONNABORTED || - WSAGetLastError () == WSAETIMEDOUT || - WSAGetLastError () == WSAECONNRESET || - WSAGetLastError () == WSAECONNREFUSED || - WSAGetLastError () == WSAENOTCONN)) - return -1; - - wsa_assert (nbytes != SOCKET_ERROR); - - // Orderly shutdown by the other peer. - if (nbytes == 0) - return -1; + if (rc == SOCKET_ERROR) { + if (WSAGetLastError () == WSAEWOULDBLOCK) + errno = EAGAIN; + else { + wsa_assert (WSAGetLastError () == WSAENETDOWN + || WSAGetLastError () == WSAENETRESET + || WSAGetLastError () == WSAECONNABORTED + || WSAGetLastError () == WSAETIMEDOUT + || WSAGetLastError () == WSAECONNRESET + || WSAGetLastError () == WSAECONNREFUSED + || WSAGetLastError () == WSAENOTCONN); + errno = wsa_error_to_errno (WSAGetLastError ()); + } + } - return nbytes; + return rc == SOCKET_ERROR? -1: rc; #else - ssize_t nbytes = recv (s, data_, size_, 0); + const ssize_t rc = recv (s, data_, size_, 0); // Several errors are OK. When speculative read is being done we may not // be able to read a single byte from the socket. Also, SIGSTOP issued // by a debugging tool can result in EINTR error. - if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || - errno == EINTR)) - return 0; - - // Signalise peer failure. - if (nbytes == -1) { + if (rc == -1) { errno_assert (errno != EBADF && errno != EFAULT && errno != EINVAL && errno != ENOMEM && errno != ENOTSOCK); - return -1; + if (errno == EWOULDBLOCK || errno == EINTR) + errno = EAGAIN; } - // Orderly shutdown by the peer. - if (nbytes == 0) - return -1; - - return static_cast (nbytes); + return static_cast (rc); #endif } diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 583a8608..2b3857d5 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -87,10 +87,9 @@ namespace zmq // of error or orderly shutdown by the other peer -1 is returned. int write (const void *data_, size_t size_); - // Reads data from the socket (up to 'size' bytes). Returns the number - // of bytes actually read (even zero is to be considered to be - // a success). In case of error or orderly shutdown by the other - // peer -1 is returned. + // Reads data from the socket (up to 'size' bytes). + // Returns the number of bytes actually read or -1 on error. + // Zero indicates the peer has closed the connection. int read (void *data_, size_t size_); int read_identity (msg_t *msg_);