Skip to content

Commit

Permalink
Merge pull request #373 from ianbarber/master
Browse files Browse the repository at this point in the history
Add ZMQ_DELAY_ATTACH_ON_CONNECT sockopt
  • Loading branch information
hintjens committed Jun 12, 2012
2 parents e1cc2d4 + bc9ae71 commit 076e081
Show file tree
Hide file tree
Showing 12 changed files with 382 additions and 26 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ tests/test_invalid_rep
tests/test_msg_flags
tests/test_ts_context
tests/test_connect_resolve
tests/test_connect_delay
tests/test_term_endpoint
src/platform.hpp*
src/stamp-h1
Expand Down
15 changes: 15 additions & 0 deletions doc/zmq_getsockopt.txt
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,21 @@ Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports.


ZMQ_DELAY_ATTACH_ON_CONNECT
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Retrieve the state of the attach on connect value. If set to `1`, will delay the
attachment of a pipe on connect until the underlying connection has completed.
This will cause the socket to block if there are no other connections, but will
prevent queues from filling on pipes awaiting connection.

[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: all, primarily when using TCP/IPC transports.


ZMQ_FD: Retrieve file descriptor associated with the socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_FD' option shall retrieve the file descriptor associated with the
Expand Down
14 changes: 14 additions & 0 deletions doc/zmq_setsockopt.txt
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,20 @@ Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports.


ZMQ_DELAY_ATTACH_ON_CONNECT
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If set to `1`, will delay the attachment of a pipe on connect until the underlying
connection has completed. This will cause the socket to block if there are no other
connections, but will prevent queues from filling on pipes awaiting connection.

[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: all, primarily when using TCP/IPC transports.


ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions include/zmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_TCP_KEEPALIVE_IDLE 36
#define ZMQ_TCP_KEEPALIVE_INTVL 37
#define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_DELAY_ATTACH_ON_CONNECT 39

/* Message options */
#define ZMQ_MORE 1
Expand Down
3 changes: 1 addition & 2 deletions src/lb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ zmq::lb_t::~lb_t ()
void zmq::lb_t::attach (pipe_t *pipe_)
{
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
activated (pipe_);
}

void zmq::lb_t::terminated (pipe_t *pipe_)
Expand Down
27 changes: 27 additions & 0 deletions src/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ zmq::options_t::options_t () :
rcvtimeo (-1),
sndtimeo (-1),
ipv4only (1),
delay_attach_on_connect (0),
delay_on_close (true),
delay_on_disconnect (true),
filter (false),
Expand Down Expand Up @@ -218,6 +219,8 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
ipv4only = val;
return 0;
}



case ZMQ_TCP_KEEPALIVE:
{
Expand All @@ -236,6 +239,21 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
}

case ZMQ_DELAY_ATTACH_ON_CONNECT:
{
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
int val = *((int*) optval_);
if (val != 0 && val != 1) {
errno = EINVAL;
return -1;
}
delay_attach_on_connect = val;
return 0;
}

case ZMQ_TCP_KEEPALIVE_CNT:
{
if (optvallen_ != sizeof (int)) {
Expand Down Expand Up @@ -483,6 +501,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*((int*) optval_) = ipv4only;
*optvallen_ = sizeof (int);
return 0;

case ZMQ_DELAY_ATTACH_ON_CONNECT:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = delay_attach_on_connect;
*optvallen_ = sizeof (int);
return 0;

case ZMQ_TCP_KEEPALIVE:
if (*optvallen_ < sizeof (int)) {
Expand Down
4 changes: 4 additions & 0 deletions src/options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ namespace zmq
// possible to communicate with IPv6-only hosts. If 0, the socket can
// connect to and accept connections from both IPv4 and IPv6 hosts.
int ipv4only;

// If 1, connecting pipes are not attached immediately, meaning a send()
// on a socket with only connecting pipes would block
int delay_attach_on_connect;

// If true, session reads all the pending messages from the pipe and
// sends them to the network when socket is closed.
Expand Down
44 changes: 34 additions & 10 deletions src/session_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,20 +229,30 @@ void zmq::session_base_t::clean_pipes ()

void zmq::session_base_t::terminated (pipe_t *pipe_)
{
// Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_);
pipe = NULL;

// If we are waiting for pending messages to be sent, at this point
// we are sure that there will be no more messages and we can proceed
// with termination safely.
if (pending)
// Drop the reference to the deallocated pipe if required.
zmq_assert (pipe == pipe_ || terminating_pipes.count (pipe_) == 1);

if (pipe == pipe_)
// If this is our current pipe, remove it
pipe = NULL;
else
// Remove the pipe from the detached pipes set
terminating_pipes.erase (pipe_);

// If we are waiting for pending messages to be sent, at this point
// we are sure that there will be no more messages and we can proceed
// with termination safely.
if (pending && !pipe && terminating_pipes.size () == 0)
proceed_with_term ();
}

void zmq::session_base_t::read_activated (pipe_t *pipe_)
{
zmq_assert (pipe == pipe_);
// Skip activating if we're detaching this pipe
if (pipe != pipe_) {
zmq_assert (terminating_pipes.count (pipe_) == 1);
return;
}

if (likely (engine != NULL))
engine->activate_out ();
Expand All @@ -252,7 +262,11 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)

void zmq::session_base_t::write_activated (pipe_t *pipe_)
{
zmq_assert (pipe == pipe_);
// Skip activating if we're detaching this pipe
if (pipe != pipe_) {
zmq_assert (terminating_pipes.count (pipe_) == 1);
return;
}

if (engine)
engine->activate_in ();
Expand Down Expand Up @@ -395,6 +409,16 @@ void zmq::session_base_t::detached ()
return;
}

// For delayed connect situations, terminate the pipe
// and reestablish later on
if (pipe && options.delay_attach_on_connect == 1
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
pipe->hiccup ();
pipe->terminate (false);
terminating_pipes.insert (pipe);
pipe = NULL;
}

reset ();

// Reconnect.
Expand Down
3 changes: 3 additions & 0 deletions src/session_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ namespace zmq

// Pipe connecting the session to its socket.
zmq::pipe_t *pipe;

// This set is added to with pipes we are disconnecting, but haven't yet completed
std::set<pipe_t *> terminating_pipes;

// This flag is true if the remainder of the message being processed
// is still in the in pipe.
Expand Down
34 changes: 20 additions & 14 deletions src/socket_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,27 +530,29 @@ int zmq::socket_base_t::connect (const char *addr_)
options, paddr);
errno_assert (session);

// Create a bi-directional pipe.
object_t *parents [2] = {this, session};
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
rc = pipepair (parents, pipes, hwms, delays);
errno_assert (rc == 0);

// PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe.
bool icanhasall = false;
if (protocol == "pgm" || protocol == "epgm")
icanhasall = true;

// Attach local end of the pipe to the socket object.
attach_pipe (pipes [0], icanhasall);
if (options.delay_attach_on_connect != 1 || icanhasall) {
// Create a bi-directional pipe.
object_t *parents [2] = {this, session};
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
rc = pipepair (parents, pipes, hwms, delays);
errno_assert (rc == 0);

// Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]);
// Attach local end of the pipe to the socket object.
attach_pipe (pipes [0], icanhasall);

// Save last endpoint URI
// Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]);
}

// Save last endpoint URI
paddr->to_string (options.last_endpoint);

add_endpoint (addr_, (own_t *) session);
Expand Down Expand Up @@ -968,7 +970,11 @@ void zmq::socket_base_t::write_activated (pipe_t *pipe_)

void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
{
xhiccuped (pipe_);
if (options.delay_attach_on_connect == 1)
pipe_->terminate (false);
else
// Notify derived sockets of the hiccup
xhiccuped (pipe_);
}

void zmq::socket_base_t::terminated (pipe_t *pipe_)
Expand Down
2 changes: 2 additions & 0 deletions tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ noinst_PROGRAMS = test_pair_inproc \
test_invalid_rep \
test_msg_flags \
test_connect_resolve \
test_connect_delay \
test_last_endpoint \
test_term_endpoint \
test_monitor
Expand All @@ -34,6 +35,7 @@ test_sub_forward_SOURCES = test_sub_forward.cpp
test_invalid_rep_SOURCES = test_invalid_rep.cpp
test_msg_flags_SOURCES = test_msg_flags.cpp
test_connect_resolve_SOURCES = test_connect_resolve.cpp
test_connect_delay_SOURCES = test_connect_delay.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_monitor_SOURCES = test_monitor.cpp
Expand Down
Loading

0 comments on commit 076e081

Please sign in to comment.