From ad7793795692e93595f01e12773467fb9270e9ec Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Thu, 27 Jun 2013 20:47:34 +0200 Subject: [PATCH] Added ZMQ_STREAM socket type - designed for TCP clients and servers - added HTTP client / server example in tests/test_stream.cpp - same as ZMQ_ROUTER + ZMQ_ROUTER_RAW + ZMQ_ROUTER_MANDATORY - includes b893ce set ZMQ_IDENTITY on outgoing connect - deprecates ZMQ_ROUTER_RAW --- .gitignore | 2 + doc/zmq_msg_send.txt | 2 + doc/zmq_send.txt | 2 + doc/zmq_sendmsg.txt | 2 + doc/zmq_setsockopt.txt | 2 + doc/zmq_socket.txt | 42 +++++- include/zmq.h | 1 + src/Makefile.am | 2 + src/mechanism.cpp | 4 +- src/router.cpp | 7 +- src/session_base.cpp | 5 + src/socket_base.cpp | 4 + src/stream.cpp | 335 +++++++++++++++++++++++++++++++++++++++++ src/stream.hpp | 127 ++++++++++++++++ tests/Makefile.am | 9 +- tests/test_stream.cpp | 228 ++++++++++++++++++++++++++++ 16 files changed, 763 insertions(+), 11 deletions(-) create mode 100644 src/stream.cpp create mode 100644 src/stream.hpp create mode 100644 tests/test_stream.cpp diff --git a/.gitignore b/.gitignore index 6557b809..d57fee6c 100644 --- a/.gitignore +++ b/.gitignore @@ -50,7 +50,9 @@ tests/test_disconnect_inproc tests/test_ctx_options tests/test_iov tests/test_security +tests/test_security_curve tests/test_probe_router +tests/test_stream src/platform.hpp* src/stamp-h1 perf/local_lat diff --git a/doc/zmq_msg_send.txt b/doc/zmq_msg_send.txt index 5356750a..1056e604 100644 --- a/doc/zmq_msg_send.txt +++ b/doc/zmq_msg_send.txt @@ -81,6 +81,8 @@ The operation was interrupted by delivery of a signal before the message was sent. *EFAULT*:: Invalid message. +*EHOSTUNREACH*:: +The message cannot be routed. EXAMPLE diff --git a/doc/zmq_send.txt b/doc/zmq_send.txt index df25bff1..4b3f2b3a 100644 --- a/doc/zmq_send.txt +++ b/doc/zmq_send.txt @@ -70,6 +70,8 @@ The provided 'socket' was invalid. *EINTR*:: The operation was interrupted by delivery of a signal before the message was sent. +*EHOSTUNREACH*:: +The message cannot be routed. EXAMPLE diff --git a/doc/zmq_sendmsg.txt b/doc/zmq_sendmsg.txt index 27dbadb3..ac7959d6 100644 --- a/doc/zmq_sendmsg.txt +++ b/doc/zmq_sendmsg.txt @@ -77,6 +77,8 @@ The operation was interrupted by delivery of a signal before the message was sent. *EFAULT*:: Invalid message. +*EHOSTUNREACH*:: +The message cannot be routed. EXAMPLE diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 3d73c899..b4e0633f 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -417,6 +417,8 @@ When using raw mode, you cannot set explicit identities, and the ZMQ_MSGMORE flag is ignored when sending data messages. In raw mode you can close a specific connection by sending it a zero-length message (following the identity frame). +NOTE: This option is deprecated, please use ZMQ_STREAM sockets instead. + [horizontal] Option value type:: int Option value unit:: 0, 1 diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index f75423c6..cd0d3c6c 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -323,6 +323,46 @@ Outgoing routing strategy:: N/A Action in mute state:: Block +Native Pattern +~~~~~~~~~~~~~~ +The native pattern is used for communicating with TCP peers and allows +asynchronous requests and replies in either direction. + + +ZMQ_STREAM +^^^^^^^^^^ +A socket of type 'ZMQ_STREAM' is used to send and receive TCP data from a +non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can +act as client and/or server, sending and/or receiving TCP data asynchronously. + +When receiving TCP data, a 'ZMQ_STREAM' socket shall prepend a message part +containing the _identity_ of the originating peer to the message before passing +it to the application. Messages received are fair-queued from among all +connected peers. + +When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the +message and use it to determine the _identity_ of the peer the message shall be +routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error. + +To open a connection to a server, use the zmq_connect call, and then fetch the +socket identity using the ZMQ_IDENTITY zmq_getsockopt call. + +To close a specific client connection, as a server, send a zero-length message +following the identity frame. + +The ZMQ_MSGMORE flag is ignored on data frames. You must send one identity frame +followed by one data frame. + +[horizontal] +.Summary of ZMQ_STREAM characteristics +Compatible peer sockets:: none. +Direction:: Bidirectional +Send/receive pattern:: Unrestricted +Outgoing routing strategy:: See text +Incoming routing strategy:: Fair-queued +Action in mute state:: EAGAIN + + RETURN VALUE ------------ The _zmq_socket()_ function shall return an opaque handle to the newly created @@ -357,5 +397,3 @@ AUTHORS ------- This page was written by the 0MQ community. To make a change please read the 0MQ Contribution Policy at . - - diff --git a/include/zmq.h b/include/zmq.h index 536aa9fa..6acc62f2 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -227,6 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_PUSH 8 #define ZMQ_XPUB 9 #define ZMQ_XSUB 10 +#define ZMQ_STREAM 11 /* Deprecated aliases */ #define ZMQ_XREQ ZMQ_DEALER diff --git a/src/Makefile.am b/src/Makefile.am index 75842367..19e63478 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -72,6 +72,7 @@ libzmq_la_SOURCES = \ signaler.hpp \ socket_base.hpp \ stdint.hpp \ + stream.hpp \ stream_engine.hpp \ sub.hpp \ tcp.hpp \ @@ -134,6 +135,7 @@ libzmq_la_SOURCES = \ session_base.cpp \ signaler.cpp \ socket_base.cpp \ + stream.cpp \ stream_engine.cpp \ sub.cpp \ tcp.cpp \ diff --git a/src/mechanism.cpp b/src/mechanism.cpp index 0d4627c2..ce246718 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -51,8 +51,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const { static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP", "DEALER", "ROUTER", "PULL", "PUSH", - "XPUB", "XSUB"}; - zmq_assert (socket_type >= 0 && socket_type <= 10); + "XPUB", "XSUB", "STREAM"}; + zmq_assert (socket_type >= ZMQ_PAIR && socket_type <= ZMQ_STREAM); return names [socket_type]; } diff --git a/src/router.cpp b/src/router.cpp index d70d1329..a5f56f2b 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -33,7 +33,8 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : more_out (false), next_peer_id (generate_random ()), mandatory (false), - raw_sock (false), + // raw_sock functionality in ROUTER is deprecated + raw_sock (false), probe_router (false) { options.type = ZMQ_ROUTER; @@ -272,7 +273,6 @@ int zmq::router_t::xrecv (msg_t *msg_) // It's possible that we receive peer's identity. That happens // after reconnection. The current implementation assumes that // the peer always uses the same identity. - // TODO: handle the situation when the peer changes its identity. while (rc == 0 && msg_->is_identity ()) rc = fq.recvpipe (msg_, &pipe); @@ -372,9 +372,6 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) buf [0] = 0; put_uint32 (buf + 1, next_peer_id++); identity = blob_t (buf, sizeof buf); - unsigned int i = 0; // Store identity to allow use of raw socket as client - for (blob_t::iterator it = identity.begin(); it != identity.end(); it++) options.identity[i++] = *it; - options.identity_size = i; } else { msg.init (); diff --git a/src/session_base.cpp b/src/session_base.cpp index df8573b4..e658484b 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -40,6 +40,7 @@ #include "push.hpp" #include "pull.hpp" #include "pair.hpp" +#include "stream.hpp" zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, bool connect_, class socket_base_t *socket_, const options_t &options_, @@ -91,6 +92,10 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, s = new (std::nothrow) pair_session_t (io_thread_, connect_, socket_, options_, addr_); break; + case ZMQ_STREAM: + s = new (std::nothrow) stream_session_t (io_thread_, connect_, + socket_, options_, addr_); + break; default: errno = EINVAL; return NULL; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index bcd47990..6e4853e0 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -67,6 +67,7 @@ #include "router.hpp" #include "xpub.hpp" #include "xsub.hpp" +#include "stream.hpp" bool zmq::socket_base_t::check_tag () { @@ -112,6 +113,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, case ZMQ_XSUB: s = new (std::nothrow) xsub_t (parent_, tid_, sid_); break; + case ZMQ_STREAM: + s = new (std::nothrow) stream_t (parent_, tid_, sid_); + break; default: errno = EINVAL; return NULL; diff --git a/src/stream.cpp b/src/stream.cpp new file mode 100644 index 00000000..9463f706 --- /dev/null +++ b/src/stream.cpp @@ -0,0 +1,335 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "stream.hpp" +#include "pipe.hpp" +#include "wire.hpp" +#include "random.hpp" +#include "likely.hpp" +#include "err.hpp" + +zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_), + prefetched (false), + identity_sent (false), + more_in (false), + current_out (NULL), + more_out (false), + next_peer_id (generate_random ()) +{ + options.type = ZMQ_STREAM; + options.recv_identity = true; + options.raw_sock = true; + + prefetched_id.init (); + prefetched_msg.init (); +} + +zmq::stream_t::~stream_t () +{ + zmq_assert (anonymous_pipes.empty ());; + zmq_assert (outpipes.empty ()); + prefetched_id.close (); + prefetched_msg.close (); +} + +void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) +{ + // icanhasall_ is unused + (void)icanhasall_; + + zmq_assert (pipe_); + + bool identity_ok = identify_peer (pipe_); + if (identity_ok) + fq.attach (pipe_); + else + anonymous_pipes.insert (pipe_); +} + +void zmq::stream_t::xpipe_terminated (pipe_t *pipe_) +{ + std::set ::iterator it = anonymous_pipes.find (pipe_); + if (it != anonymous_pipes.end ()) + anonymous_pipes.erase (it); + else { + outpipes_t::iterator it = outpipes.find (pipe_->get_identity ()); + zmq_assert (it != outpipes.end ()); + outpipes.erase (it); + fq.pipe_terminated (pipe_); + if (pipe_ == current_out) + current_out = NULL; + } +} + +void zmq::stream_t::xread_activated (pipe_t *pipe_) +{ + std::set ::iterator it = anonymous_pipes.find (pipe_); + if (it == anonymous_pipes.end ()) + fq.activated (pipe_); + else { + bool identity_ok = identify_peer (pipe_); + if (identity_ok) { + anonymous_pipes.erase (it); + fq.attach (pipe_); + } + } +} + +void zmq::stream_t::xwrite_activated (pipe_t *pipe_) +{ + outpipes_t::iterator it; + for (it = outpipes.begin (); it != outpipes.end (); ++it) + if (it->second.pipe == pipe_) + break; + + zmq_assert (it != outpipes.end ()); + zmq_assert (!it->second.active); + it->second.active = true; +} + +int zmq::stream_t::xsend (msg_t *msg_) +{ + // If this is the first part of the message it's the ID of the + // peer to send the message to. + if (!more_out) { + zmq_assert (!current_out); + + // If we have malformed message (prefix with no subsequent message) + // then just silently ignore it. + // TODO: The connections should be killed instead. + if (msg_->flags () & msg_t::more) { + + more_out = true; + + // Find the pipe associated with the identity stored in the prefix. + // If there's no such pipe return an error + blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); + outpipes_t::iterator it = outpipes.find (identity); + + if (it != outpipes.end ()) { + current_out = it->second.pipe; + if (!current_out->check_write ()) { + it->second.active = false; + current_out = NULL; + more_out = false; + errno = EAGAIN; + return -1; + } + } + else { + more_out = false; + errno = EHOSTUNREACH; + return -1; + } + } + + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + return 0; + } + + // Ignore the MORE flag + msg_->reset_flags (msg_t::more); + + // Check whether this is the last part of the message. + more_out = msg_->flags () & msg_t::more ? true : false; + + // Push the message into the pipe. If there's no out pipe, just drop it. + if (current_out) { + + // Close the remote connection if user has asked to do so + // by sending zero length message. + // Pending messages in the pipe will be dropped (on receiving term- ack) + if (msg_->size() == 0) { + current_out->terminate (false); + int rc = msg_->close (); + errno_assert (rc == 0); + current_out = NULL; + return 0; + } + bool ok = current_out->write (msg_); + if (unlikely (!ok)) + current_out = NULL; + else + if (!more_out) { + current_out->flush (); + current_out = NULL; + } + } + else { + int rc = msg_->close (); + errno_assert (rc == 0); + } + + // Detach the message from the data buffer. + int rc = msg_->init (); + errno_assert (rc == 0); + + return 0; +} + +int zmq::stream_t::xrecv (msg_t *msg_) +{ + if (prefetched) { + if (!identity_sent) { + int rc = msg_->move (prefetched_id); + errno_assert (rc == 0); + identity_sent = true; + } + else { + int rc = msg_->move (prefetched_msg); + errno_assert (rc == 0); + prefetched = false; + } + more_in = msg_->flags () & msg_t::more ? true : false; + return 0; + } + + pipe_t *pipe = NULL; + int rc = fq.recvpipe (msg_, &pipe); + + // It's possible that we receive peer's identity. That happens + // after reconnection. The current implementation assumes that + // the peer always uses the same identity. + // TODO: handle the situation when the peer changes its identity. + while (rc == 0 && msg_->is_identity ()) + rc = fq.recvpipe (msg_, &pipe); + + if (rc != 0) + return -1; + + zmq_assert (pipe != NULL); + + // If we are in the middle of reading a message, just return the next part. + if (more_in) + more_in = msg_->flags () & msg_t::more ? true : false; + else { + // We are at the beginning of a message. + // Keep the message part we have in the prefetch buffer + // and return the ID of the peer instead. + rc = prefetched_msg.move (*msg_); + errno_assert (rc == 0); + prefetched = true; + + blob_t identity = pipe->get_identity (); + rc = msg_->init_size (identity.size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), identity.data (), identity.size ()); + msg_->set_flags (msg_t::more); + identity_sent = true; + } + + return 0; +} + +int zmq::stream_t::rollback (void) +{ + if (current_out) { + current_out->rollback (); + current_out = NULL; + more_out = false; + } + return 0; +} + +bool zmq::stream_t::xhas_in () +{ + // If we are in the middle of reading the messages, there are + // definitely more parts available. + if (more_in) + return true; + + // We may already have a message pre-fetched. + if (prefetched) + return true; + + // Try to read the next message. + // The message, if read, is kept in the pre-fetch buffer. + pipe_t *pipe = NULL; + int rc = fq.recvpipe (&prefetched_msg, &pipe); + + // It's possible that we receive peer's identity. That happens + // after reconnection. The current implementation assumes that + // the peer always uses the same identity. + while (rc == 0 && prefetched_msg.is_identity ()) + rc = fq.recvpipe (&prefetched_msg, &pipe); + + if (rc != 0) + return false; + + zmq_assert (pipe != NULL); + + blob_t identity = pipe->get_identity (); + rc = prefetched_id.init_size (identity.size ()); + errno_assert (rc == 0); + memcpy (prefetched_id.data (), identity.data (), identity.size ()); + prefetched_id.set_flags (msg_t::more); + + prefetched = true; + identity_sent = false; + + return true; +} + +bool zmq::stream_t::xhas_out () +{ + // In theory, ROUTER socket is always ready for writing. Whether actual + // attempt to write succeeds depends on which pipe the message is going + // to be routed to. + return true; +} + +bool zmq::stream_t::identify_peer (pipe_t *pipe_) +{ + blob_t identity; + bool ok; + + // Always assign identity for raw-socket + unsigned char buffer [5]; + buffer [0] = 0; + put_uint32 (buffer + 1, next_peer_id++); + identity = blob_t (buffer, sizeof buffer); + unsigned int i = 0; // Store identity to allow use of raw socket as client + for (blob_t::iterator it = identity.begin(); it != identity.end(); it++) + options.identity[i++] = *it; + options.identity_size = i; + + pipe_->set_identity (identity); + // Add the record into output pipes lookup table + outpipe_t outpipe = {pipe_, true}; + ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second; + zmq_assert (ok); + + return true; +} + +zmq::stream_session_t::stream_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_) +{ +} + +zmq::stream_session_t::~stream_session_t () +{ +} + diff --git a/src/stream.hpp b/src/stream.hpp new file mode 100644 index 00000000..23989b0b --- /dev/null +++ b/src/stream.hpp @@ -0,0 +1,127 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_STREAM_HPP_INCLUDED__ +#define __ZMQ_STREAM_HPP_INCLUDED__ + +#include + +#include "router.hpp" + +namespace zmq +{ + + class ctx_t; + class pipe_t; + + class stream_t : + public socket_base_t + { + public: + + stream_t (zmq::ctx_t *parent_, uint32_t tid_, int sid); + ~stream_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); + int xsend (zmq::msg_t *msg_); + int xrecv (zmq::msg_t *msg_); + bool xhas_in (); + bool xhas_out (); + void xread_activated (zmq::pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); + + protected: + + // Rollback any message parts that were sent but not yet flushed. + int rollback (); + + private: + // Receive peer id and update lookup map + bool identify_peer (pipe_t *pipe_); + + // Fair queueing object for inbound pipes. + fq_t fq; + + // True iff there is a message held in the pre-fetch buffer. + bool prefetched; + + // If true, the receiver got the message part with + // the peer's identity. + bool identity_sent; + + // Holds the prefetched identity. + msg_t prefetched_id; + + // Holds the prefetched message. + msg_t prefetched_msg; + + // If true, more incoming message parts are expected. + bool more_in; + + struct outpipe_t + { + zmq::pipe_t *pipe; + bool active; + }; + + // We keep a set of pipes that have not been identified yet. + std::set anonymous_pipes; + + // Outbound pipes indexed by the peer IDs. + typedef std::map outpipes_t; + outpipes_t outpipes; + + // The pipe we are currently writing to. + zmq::pipe_t *current_out; + + // If true, more outgoing message parts are expected. + bool more_out; + + // Peer ID are generated. It's a simple increment and wrap-over + // algorithm. This value is the next ID to use (if not used already). + uint32_t next_peer_id; + + // If true, report EAGAIN to the caller instead of silently dropping + // the message targeting an unknown peer. + bool mandatory; + + stream_t (const stream_t&); + const stream_t &operator = (const stream_t&); + }; + + class stream_session_t : public session_base_t + { + public: + + stream_session_t (zmq::io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const address_t *addr_); + ~stream_session_t (); + + private: + + stream_session_t (const stream_session_t&); + const stream_session_t &operator = (const stream_session_t&); + }; + +} + +#endif diff --git a/tests/Makefile.am b/tests/Makefile.am index 5619b01a..3938ee7c 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -19,7 +19,7 @@ noinst_PROGRAMS = test_pair_inproc \ test_monitor \ test_router_mandatory \ test_probe_router \ - test_raw_sock \ + test_stream \ test_disconnect_inproc \ test_ctx_options \ test_security \ @@ -49,7 +49,7 @@ test_term_endpoint_SOURCES = test_term_endpoint.cpp test_monitor_SOURCES = test_monitor.cpp test_router_mandatory_SOURCES = test_router_mandatory.cpp test_probe_router_SOURCES = test_probe_router.cpp -test_raw_sock_SOURCES = test_raw_sock.cpp +test_stream_SOURCES = test_stream.cpp test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp test_ctx_options_SOURCES = test_ctx_options.cpp test_iov_SOURCES = test_iov.cpp @@ -62,4 +62,9 @@ test_reqrep_ipc_SOURCES = test_reqrep_ipc.cpp testutil.hpp test_timeo_SOURCES = test_timeo.cpp endif +# Deprecated test cases +noinst_PROGRAMS += test_raw_sock +test_raw_sock_SOURCES = test_raw_sock.cpp + +# Run the test cases TESTS = $(noinst_PROGRAMS) diff --git a/tests/test_stream.cpp b/tests/test_stream.cpp new file mode 100644 index 00000000..a3d96f70 --- /dev/null +++ b/tests/test_stream.cpp @@ -0,0 +1,228 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" +#include +#include +#undef NDEBUG +#include + +// ZMTP protocol greeting structure + +typedef unsigned char byte; +typedef struct { + byte signature [10]; // 0xFF 8*0x00 0x7F + byte version [2]; // 0x03 0x00 for ZMTP/3.0 + byte mechanism [20]; // "NULL" + byte as_server; + byte filler [31]; +} zmtp_greeting_t; + +#define ZMTP_DEALER 5 // Socket type constants + +// This is a greeting matching what 0MQ will send us; note the +// 8-byte size is set to 1 for backwards compatibility + +static zmtp_greeting_t greeting + = { { 0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F }, {3, 0}, { 'N', 'U', 'L', 'L'} }; + +static void +test_stream_to_dealer (void) +{ + int rc; + + // Set up our context and sockets + void *ctx = zmq_ctx_new (); + assert (ctx); + + // We'll be using this socket in raw mode + void *stream = zmq_socket (ctx, ZMQ_STREAM); + assert (stream); + + int zero = 0; + rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero)); + assert (rc == 0); + rc = zmq_bind (stream, "tcp://*:5556"); + assert (rc == 0); + + // We'll be using this socket as the other peer + void *dealer = zmq_socket (ctx, ZMQ_DEALER); + assert (dealer); + rc = zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero)); + assert (rc == 0); + rc = zmq_connect (dealer, "tcp://localhost:5556"); + + // Send a message on the dealer socket + rc = zmq_send (dealer, "Hello", 5, 0); + assert (rc == 5); + + // First frame is identity + zmq_msg_t identity; + rc = zmq_msg_init (&identity); + assert (rc == 0); + rc = zmq_msg_recv (&identity, stream, 0); + assert (rc > 0); + assert (zmq_msg_more (&identity)); + + // Second frame is greeting signature + byte buffer [255]; + rc = zmq_recv (stream, buffer, 255, 0); + assert (rc == 10); + assert (memcmp (buffer, greeting.signature, 10) == 0); + + // Send our own protocol greeting + rc = zmq_msg_send (&identity, stream, ZMQ_SNDMORE); + assert (rc > 0); + rc = zmq_send (stream, &greeting, sizeof (greeting), 0); + assert (rc == sizeof (greeting)); + + // Now we expect the data from the DEALER socket + // First frame is, again, the identity of the connection + rc = zmq_msg_recv (&identity, stream, 0); + assert (rc > 0); + assert (zmq_msg_more (&identity)); + + // Second frame contains the rest of greeting along with + // the Ready command + rc = zmq_recv (stream, buffer, 255, 0); + assert (rc == 99); + + // First two bytes are major and minor version numbers. + assert (buffer [0] == 3); // ZMTP/3.0 + assert (buffer [1] == 0); + + // Mechanism is "NULL" + assert (memcmp (buffer + 2, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 22) == 0); + assert (memcmp (buffer + 54, "\0\53READY ", 10) == 0); + assert (memcmp (buffer + 64, "\13Socket-Type\0\0\0\6DEALER", 22) == 0); + assert (memcmp (buffer + 86, "\10Identity\0\0\0\0", 13) == 0); + + // Announce we are ready + memcpy (buffer, "\0\53", 2); + memcpy (buffer + 2, "READY ", 8); + memcpy (buffer + 10, "\13Socket-Type\0\0\0\6STREAM", 22); + memcpy (buffer + 32, "\10Identity\0\0\0\0", 13); + + // Send Ready command + rc = zmq_msg_send (&identity, stream, ZMQ_SNDMORE); + assert (rc > 0); + rc = zmq_send (stream, buffer, 45, 0); + assert (rc == 45); + + // Now we expect the data from the DEALER socket + // First frame is, again, the identity of the connection + rc = zmq_msg_recv (&identity, stream, 0); + assert (rc > 0); + assert (zmq_msg_more (&identity)); + + // Third frame contains Hello message from DEALER + rc = zmq_recv (stream, buffer, sizeof buffer, 0); + assert (rc == 7); + + // Then we have a 5-byte message "Hello" + assert (buffer [0] == 0); // Flags = 0 + assert (buffer [1] == 5); // Size = 5 + assert (memcmp (buffer + 2, "Hello", 5) == 0); + + // Send "World" back to DEALER + rc = zmq_msg_send (&identity, stream, ZMQ_SNDMORE); + assert (rc > 0); + byte world [] = { 0, 5, 'W', 'o', 'r', 'l', 'd' }; + rc = zmq_send (stream, world, sizeof (world), 0); + assert (rc == sizeof (world)); + + // Expect response on DEALER socket + rc = zmq_recv (dealer, buffer, 255, 0); + assert (rc == 5); + assert (memcmp (buffer, "World", 5) == 0); + + rc = zmq_close (dealer); + assert (rc == 0); + + rc = zmq_close (stream); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + + +static void +test_stream_to_stream (void) +{ + int rc; + // Set-up our context and sockets + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *server = zmq_socket (ctx, ZMQ_STREAM); + assert (server); + rc = zmq_bind (server, "tcp://*:8080"); + assert (rc == 0); + + void *client = zmq_socket (ctx, ZMQ_STREAM); + assert (client); + rc = zmq_connect (client, "tcp://localhost:8080"); + assert (rc == 0); + // It would be less surprising to get an empty message instead + // of having to fetch the identity like this [PH 2013/06/27] + uint8_t id [256]; + size_t id_size = 256; + rc = zmq_getsockopt (client, ZMQ_IDENTITY, id, &id_size); + assert (rc == 0); + + // Sent HTTP request on client socket + // First frame is server identity + rc = zmq_send (client, id, id_size, ZMQ_SNDMORE); + assert (rc == (int) id_size); + // Second frame is HTTP GET request + rc = zmq_send (client, "GET /\n\n", 7, 0); + assert (rc == 7); + + // Get HTTP request; ID frame and then request + id_size = zmq_recv (server, id, 256, 0); + assert (id_size > 0); + uint8_t buffer [256]; + rc = zmq_recv (server, buffer, 256, 0); + assert (rc > 0); + assert (memcmp (buffer, "GET /\n\n", 7) == 0); + + // Send reply back to client + char http_response [] = + "HTTP/1.0 200 OK\r\n" + "Content-Type: text/plain\r\n" + "\r\n" + "Hello, World!"; + zmq_send (server, id, id_size, ZMQ_SNDMORE); + zmq_send (server, http_response, sizeof (http_response), 0); + + // Get reply at client and check that it's complete + id_size = zmq_recv (client, id, 256, 0); + assert (id_size > 0); + rc = zmq_recv (client, buffer, 256, 0); + assert (rc == sizeof (http_response)); + assert (memcmp (buffer, http_response, sizeof (http_response)) == 0); +} + + +int main (void) +{ + test_stream_to_dealer (); + test_stream_to_stream (); +} \ No newline at end of file