Skip to content

Commit

Permalink
Merge pull request #658 from ricnewton/inproc_connect_before_bind
Browse files Browse the repository at this point in the history
Support high water mark on inproc socket connect before bind.
  • Loading branch information
hintjens committed Sep 15, 2013
2 parents 133c32d + 4e6c56e commit 9066851
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 69 deletions.
97 changes: 60 additions & 37 deletions src/ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,7 @@ void zmq::ctx_t::pend_connection (const char *addr_, pending_connection_t &pendi
else
{
// Bind has happened in the mean time, connect directly
it->second.socket->inc_seqnum();
pending_connection_.bind_pipe->set_tid(it->second.socket->get_tid());
command_t cmd;
cmd.type = command_t::bind;
cmd.args.bind.pipe = pending_connection_.bind_pipe;
it->second.socket->process_command(cmd);
connect_inproc_sockets(it->second.socket, it->second.options, pending_connection_, connect_side);
}

endpoints_sync.unlock ();
Expand All @@ -425,43 +420,71 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so

for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
{
bind_socket_->inc_seqnum();
p->second.bind_pipe->set_tid(bind_socket_->get_tid());
connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);
}

pending_connections.erase(pending.first, pending.second);

endpoints_sync.unlock ();
}

void zmq::ctx_t::connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, pending_connection_t &pending_connection_, side side_)
{
bind_socket_->inc_seqnum();
pending_connection_.bind_pipe->set_tid(bind_socket_->get_tid());

if (side_ == bind_side)
{
command_t cmd;
cmd.type = command_t::bind;
cmd.args.bind.pipe = p->second.bind_pipe;
cmd.args.bind.pipe = pending_connection_.bind_pipe;
bind_socket_->process_command(cmd);

bind_socket_->send_inproc_connected(p->second.endpoint.socket);
bind_socket_->send_inproc_connected(pending_connection_.endpoint.socket);
}
else
{
pending_connection_.connect_pipe->send_bind(bind_socket_, pending_connection_.bind_pipe, false);
}

// Send identities
options_t& bind_options = endpoints[addr_].options;
if (bind_options.recv_identity) {
int sndhwm = 0;
if (pending_connection_.endpoint.options.sndhwm != 0 && bind_options.rcvhwm != 0)
sndhwm = pending_connection_.endpoint.options.sndhwm + bind_options.rcvhwm;
int rcvhwm = 0;
if (pending_connection_.endpoint.options.rcvhwm != 0 && bind_options.sndhwm != 0)
rcvhwm = pending_connection_.endpoint.options.rcvhwm + bind_options.sndhwm;

bool conflate = pending_connection_.endpoint.options.conflate &&
(pending_connection_.endpoint.options.type == ZMQ_DEALER ||
pending_connection_.endpoint.options.type == ZMQ_PULL ||
pending_connection_.endpoint.options.type == ZMQ_PUSH ||
pending_connection_.endpoint.options.type == ZMQ_PUB ||
pending_connection_.endpoint.options.type == ZMQ_SUB);

int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
pending_connection_.connect_pipe->set_hwms(hwms [1], hwms [0]);
pending_connection_.bind_pipe->set_hwms(hwms [0], hwms [1]);

if (bind_options.recv_identity) {

msg_t id;
int rc = id.init_size (p->second.endpoint.options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), p->second.endpoint.options.identity, p->second.endpoint.options.identity_size);
id.set_flags (msg_t::identity);
bool written = p->second.connect_pipe->write (&id);
zmq_assert (written);
p->second.connect_pipe->flush ();
}
if (p->second.endpoint.options.recv_identity) {
msg_t id;
int rc = id.init_size (bind_options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), bind_options.identity, bind_options.identity_size);
id.set_flags (msg_t::identity);
bool written = p->second.bind_pipe->write (&id);
zmq_assert (written);
p->second.bind_pipe->flush ();
}
msg_t id;
int rc = id.init_size (pending_connection_.endpoint.options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), pending_connection_.endpoint.options.identity, pending_connection_.endpoint.options.identity_size);
id.set_flags (msg_t::identity);
bool written = pending_connection_.connect_pipe->write (&id);
zmq_assert (written);
pending_connection_.connect_pipe->flush ();
}
if (pending_connection_.endpoint.options.recv_identity) {
msg_t id;
int rc = id.init_size (bind_options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), bind_options.identity, bind_options.identity_size);
id.set_flags (msg_t::identity);
bool written = pending_connection_.bind_pipe->write (&id);
zmq_assert (written);
pending_connection_.bind_pipe->flush ();
}

pending_connections.erase(pending.first, pending.second);

endpoints_sync.unlock ();
}

// The last used socket ID, or 0 if no socket was used so far. Note that this
Expand Down
2 changes: 2 additions & 0 deletions src/ctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ namespace zmq
// the process that created this context. Used to detect forking.
pid_t pid;
#endif
enum side { connect_side, bind_side };
void connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, pending_connection_t &pending_connection_, side side_);
};

}
Expand Down
3 changes: 1 addition & 2 deletions src/object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ namespace zmq
ctx_t *get_ctx ();
void process_command (zmq::command_t &cmd_);
void send_inproc_connected (zmq::socket_base_t *socket_);
void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_ = true);

protected:

Expand Down Expand Up @@ -80,8 +81,6 @@ namespace zmq
zmq::own_t *object_);
void send_attach (zmq::session_base_t *destination_,
zmq::i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_,
bool inc_seqnum_ = true);
void send_activate_read (zmq::pipe_t *destination_);
void send_activate_write (zmq::pipe_t *destination_,
uint64_t msgs_read_);
Expand Down
5 changes: 5 additions & 0 deletions src/pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,3 +478,8 @@ void zmq::pipe_t::hiccup ()
send_hiccup (peer, (void*) inpipe);
}

void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
{
lwm = compute_lwm (inhwm_);
hwm = outhwm_;
}
3 changes: 3 additions & 0 deletions src/pipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ namespace zmq
// before actual shutdown.
void terminate (bool delay_);

// set the high water marks.
void set_hwms (int inhwm_, int outhwm_);

private:

// Type of the underlying lock-free pipe.
Expand Down
8 changes: 6 additions & 2 deletions src/socket_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,14 @@ int zmq::socket_base_t::connect (const char *addr_)
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
int sndhwm = 0;
if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
if (peer.socket == NULL)
sndhwm = options.sndhwm;
else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
sndhwm = options.sndhwm + peer.options.rcvhwm;
int rcvhwm = 0;
if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
if (peer.socket == NULL)
rcvhwm = options.rcvhwm;
else if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
rcvhwm = options.rcvhwm + peer.options.sndhwm;

// Create a bi-directional pipe to connect the peers.
Expand Down
Loading

0 comments on commit 9066851

Please sign in to comment.