Skip to content

Commit

Permalink
Added ZMQ_STREAM socket type
Browse files Browse the repository at this point in the history
- designed for TCP clients and servers
- added HTTP client / server example in tests/test_stream.cpp
- same as ZMQ_ROUTER + ZMQ_ROUTER_RAW + ZMQ_ROUTER_MANDATORY
- includes b893ce set ZMQ_IDENTITY on outgoing connect
- deprecates ZMQ_ROUTER_RAW
  • Loading branch information
hintjens committed Jun 27, 2013
1 parent 4a4d222 commit ad77937
Show file tree
Hide file tree
Showing 16 changed files with 763 additions and 11 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ tests/test_disconnect_inproc
tests/test_ctx_options
tests/test_iov
tests/test_security
tests/test_security_curve
tests/test_probe_router
tests/test_stream
src/platform.hpp*
src/stamp-h1
perf/local_lat
Expand Down
2 changes: 2 additions & 0 deletions doc/zmq_msg_send.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ The operation was interrupted by delivery of a signal before the message was
sent.
*EFAULT*::
Invalid message.
*EHOSTUNREACH*::
The message cannot be routed.


EXAMPLE
Expand Down
2 changes: 2 additions & 0 deletions doc/zmq_send.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ The provided 'socket' was invalid.
*EINTR*::
The operation was interrupted by delivery of a signal before the message was
sent.
*EHOSTUNREACH*::
The message cannot be routed.


EXAMPLE
Expand Down
2 changes: 2 additions & 0 deletions doc/zmq_sendmsg.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ The operation was interrupted by delivery of a signal before the message was
sent.
*EFAULT*::
Invalid message.
*EHOSTUNREACH*::
The message cannot be routed.


EXAMPLE
Expand Down
2 changes: 2 additions & 0 deletions doc/zmq_setsockopt.txt
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ When using raw mode, you cannot set explicit identities, and the ZMQ_MSGMORE
flag is ignored when sending data messages. In raw mode you can close a specific
connection by sending it a zero-length message (following the identity frame).

NOTE: This option is deprecated, please use ZMQ_STREAM sockets instead.

[horizontal]
Option value type:: int
Option value unit:: 0, 1
Expand Down
42 changes: 40 additions & 2 deletions doc/zmq_socket.txt
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,46 @@ Outgoing routing strategy:: N/A
Action in mute state:: Block


Native Pattern
~~~~~~~~~~~~~~
The native pattern is used for communicating with TCP peers and allows
asynchronous requests and replies in either direction.


ZMQ_STREAM
^^^^^^^^^^
A socket of type 'ZMQ_STREAM' is used to send and receive TCP data from a
non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can
act as client and/or server, sending and/or receiving TCP data asynchronously.

When receiving TCP data, a 'ZMQ_STREAM' socket shall prepend a message part
containing the _identity_ of the originating peer to the message before passing
it to the application. Messages received are fair-queued from among all
connected peers.

When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the
message and use it to determine the _identity_ of the peer the message shall be
routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error.

To open a connection to a server, use the zmq_connect call, and then fetch the
socket identity using the ZMQ_IDENTITY zmq_getsockopt call.

To close a specific client connection, as a server, send a zero-length message
following the identity frame.

The ZMQ_MSGMORE flag is ignored on data frames. You must send one identity frame
followed by one data frame.

[horizontal]
.Summary of ZMQ_STREAM characteristics
Compatible peer sockets:: none.
Direction:: Bidirectional
Send/receive pattern:: Unrestricted
Outgoing routing strategy:: See text
Incoming routing strategy:: Fair-queued
Action in mute state:: EAGAIN


RETURN VALUE
------------
The _zmq_socket()_ function shall return an opaque handle to the newly created
Expand Down Expand Up @@ -357,5 +397,3 @@ AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.


1 change: 1 addition & 0 deletions include/zmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_PUSH 8
#define ZMQ_XPUB 9
#define ZMQ_XSUB 10
#define ZMQ_STREAM 11

/* Deprecated aliases */
#define ZMQ_XREQ ZMQ_DEALER
Expand Down
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ libzmq_la_SOURCES = \
signaler.hpp \
socket_base.hpp \
stdint.hpp \
stream.hpp \
stream_engine.hpp \
sub.hpp \
tcp.hpp \
Expand Down Expand Up @@ -134,6 +135,7 @@ libzmq_la_SOURCES = \
session_base.cpp \
signaler.cpp \
socket_base.cpp \
stream.cpp \
stream_engine.cpp \
sub.cpp \
tcp.cpp \
Expand Down
4 changes: 2 additions & 2 deletions src/mechanism.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
{
static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP",
"DEALER", "ROUTER", "PULL", "PUSH",
"XPUB", "XSUB"};
zmq_assert (socket_type >= 0 && socket_type <= 10);
"XPUB", "XSUB", "STREAM"};
zmq_assert (socket_type >= ZMQ_PAIR && socket_type <= ZMQ_STREAM);
return names [socket_type];
}

Expand Down
7 changes: 2 additions & 5 deletions src/router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
more_out (false),
next_peer_id (generate_random ()),
mandatory (false),
raw_sock (false),
// raw_sock functionality in ROUTER is deprecated
raw_sock (false),
probe_router (false)
{
options.type = ZMQ_ROUTER;
Expand Down Expand Up @@ -272,7 +273,6 @@ int zmq::router_t::xrecv (msg_t *msg_)
// It's possible that we receive peer's identity. That happens
// after reconnection. The current implementation assumes that
// the peer always uses the same identity.
// TODO: handle the situation when the peer changes its identity.
while (rc == 0 && msg_->is_identity ())
rc = fq.recvpipe (msg_, &pipe);

Expand Down Expand Up @@ -372,9 +372,6 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
buf [0] = 0;
put_uint32 (buf + 1, next_peer_id++);
identity = blob_t (buf, sizeof buf);
unsigned int i = 0; // Store identity to allow use of raw socket as client
for (blob_t::iterator it = identity.begin(); it != identity.end(); it++) options.identity[i++] = *it;
options.identity_size = i;
}
else {
msg.init ();
Expand Down
5 changes: 5 additions & 0 deletions src/session_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "push.hpp"
#include "pull.hpp"
#include "pair.hpp"
#include "stream.hpp"

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_,
Expand Down Expand Up @@ -91,6 +92,10 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
s = new (std::nothrow) pair_session_t (io_thread_, connect_,
socket_, options_, addr_);
break;
case ZMQ_STREAM:
s = new (std::nothrow) stream_session_t (io_thread_, connect_,
socket_, options_, addr_);
break;
default:
errno = EINVAL;
return NULL;
Expand Down
4 changes: 4 additions & 0 deletions src/socket_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
#include "router.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
#include "stream.hpp"

bool zmq::socket_base_t::check_tag ()
{
Expand Down Expand Up @@ -112,6 +113,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
break;
case ZMQ_STREAM:
s = new (std::nothrow) stream_t (parent_, tid_, sid_);
break;
default:
errno = EINVAL;
return NULL;
Expand Down
Loading

0 comments on commit ad77937

Please sign in to comment.