diff --git a/src/Makefile.am b/src/Makefile.am index 530af01e..b3e9d166 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -51,6 +51,7 @@ libzmq_la_SOURCES = \ pgm_sender.hpp \ pgm_socket.hpp \ pipe.hpp \ + plain_mechanism.hpp \ platform.hpp \ poll.hpp \ poller.hpp \ @@ -114,6 +115,7 @@ libzmq_la_SOURCES = \ pgm_sender.cpp \ pgm_socket.cpp \ pipe.cpp \ + plain_mechanism.cpp \ poll.cpp \ poller_base.cpp \ pull.cpp \ diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 716fef0f..30841cd1 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -112,7 +112,8 @@ void zmq::ipc_connecter_t::out_event () return; } // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); + stream_engine_t *engine = new (std::nothrow) + stream_engine_t (fd, options, false, endpoint); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index b3ad88bc..d503d73e 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -80,7 +80,8 @@ void zmq::ipc_listener_t::in_event () } // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); + stream_engine_t *engine = new (std::nothrow) + stream_engine_t (fd, options, true, endpoint); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already diff --git a/src/plain_mechanism.cpp b/src/plain_mechanism.cpp new file mode 100644 index 00000000..d235dfa9 --- /dev/null +++ b/src/plain_mechanism.cpp @@ -0,0 +1,359 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "platform.hpp" +#ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#endif + +#include +#include + +#include "msg.hpp" +#include "err.hpp" +#include "plain_mechanism.hpp" +#include "wire.hpp" + +zmq::plain_mechanism_t::plain_mechanism_t (const options_t &options_, + bool as_server_) : + mechanism_t (options_), + state (as_server_? waiting_for_hello: sending_hello) +{ +} + +zmq::plain_mechanism_t::~plain_mechanism_t () +{ +} + +int zmq::plain_mechanism_t::next_handshake_message (msg_t *msg_) +{ + int rc = 0; + + switch (state) { + case sending_hello: + rc = hello_command (msg_); + if (rc == 0) + state = waiting_for_welcome; + break; + case sending_welcome: + rc = welcome_command (msg_); + if (rc == 0) + state = waiting_for_initiate; + break; + case sending_initiate: + rc = initiate_command (msg_); + if (rc == 0) + state = waiting_for_ready; + break; + case sending_ready: + rc = ready_command (msg_); + if (rc == 0) + state = ready; + break; + default: + errno = EAGAIN; + rc = -1; + } + + return rc; +} + +int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_) +{ + int rc = 0; + + switch (state) { + case waiting_for_hello: + rc = process_hello_command (msg_); + if (rc == 0) + state = sending_welcome; + break; + case waiting_for_welcome: + rc = process_welcome_command (msg_); + if (rc == 0) + state = sending_initiate; + break; + case waiting_for_initiate: + rc = process_initiate_command (msg_); + if (rc == 0) + state = sending_ready; + break; + case waiting_for_ready: + rc = process_ready_command (msg_); + if (rc == 0) + state = ready; + break; + default: + errno = EAGAIN; + rc = -1; + } + + if (rc == 0) { + rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + } + + return 0; +} + +bool zmq::plain_mechanism_t::is_handshake_complete () const +{ + return state == ready; +} + +int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const +{ + // TODO: fetch these from options + const std::string username = "username"; + const std::string password = "password"; + + zmq_assert (username.length () < 256); + zmq_assert (password.length () < 256); + + const size_t command_size = 8 + 1 + username.length () + + 1 + password.length (); + + const int rc = msg_->init_size (command_size); + errno_assert (rc == 0); + + unsigned char *ptr = static_cast (msg_->data ()); + + memcpy (ptr, "HELLO ", 8); + ptr += 8; + *ptr++ = static_cast (username.length ()); + memcpy (ptr, username.c_str (), username.length ()); + ptr += username.length (); + *ptr++ = static_cast (password.length ()); + memcpy (ptr, password.c_str (), password.length ()); + ptr += password.length (); + + // TODO: check username and password + + return 0; +} + +int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_) +{ + const unsigned char *ptr = static_cast (msg_->data ()); + size_t bytes_left = msg_->size (); + + if (bytes_left < 8 || memcmp (ptr, "HELLO ", 8)) { + errno = EPROTO; + return -1; + } + + ptr += 8; + bytes_left -= 8; + + if (bytes_left < 1) { + errno = EPROTO; + return -1; + } + + size_t username_length = static_cast (*ptr++); + bytes_left -= 1; + + if (bytes_left < username_length) { + errno = EPROTO; + return -1; + } + + const std::string username = std::string ((char *) ptr, username_length); + ptr += username_length; + bytes_left -= username_length; + + if (bytes_left < 1) { + errno = EPROTO; + return -1; + } + + size_t password_length = static_cast (*ptr++); + bytes_left -= 1; + + if (bytes_left < password_length) { + errno = EPROTO; + return -1; + } + + const std::string password = std::string ((char *) ptr, password_length); + ptr += password_length; + bytes_left -= password_length; + + if (bytes_left > 0) { + errno = EPROTO; + return -1; + } + + return 0; +} + +int zmq::plain_mechanism_t::welcome_command (msg_t *msg_) const +{ + const int rc = msg_->init_size (8); + errno_assert (rc == 0); + memcpy (msg_->data (), "WELCOME ", 8); + return 0; +} + +int zmq::plain_mechanism_t::process_welcome_command (msg_t *msg_) +{ + const unsigned char *ptr = static_cast (msg_->data ()); + size_t bytes_left = msg_->size (); + + if (bytes_left != 8 || memcmp (ptr, "WELCOME ", 8)) { + errno = EPROTO; + return -1; + } + + return 0; +} + +int zmq::plain_mechanism_t::initiate_command (msg_t *msg_) const +{ + unsigned char * const command_buffer = (unsigned char *) malloc (512); + alloc_assert (command_buffer); + + unsigned char *ptr = command_buffer; + + // Add mechanism string + memcpy (ptr, "INITIATE", 8); + ptr += 8; + + // Add socket type property + const char *socket_type = socket_type_string (options.type); + ptr += add_property (ptr, "Socket-Type", socket_type, strlen (socket_type)); + + // Add identity property + if (options.type == ZMQ_REQ + || options.type == ZMQ_DEALER + || options.type == ZMQ_ROUTER) { + ptr += add_property (ptr, "Identity", + options.identity, options.identity_size); + } + + const size_t command_size = ptr - command_buffer; + const int rc = msg_->init_size (command_size); + errno_assert (rc == 0); + memcpy (msg_->data (), command_buffer, command_size); + free (command_buffer); + + return 0; +} + +int zmq::plain_mechanism_t::process_initiate_command (msg_t *msg_) +{ + const unsigned char *ptr = static_cast (msg_->data ()); + size_t bytes_left = msg_->size (); + + if (bytes_left < 8 || memcmp (ptr, "INITIATE", 8)) { + errno = EPROTO; + return -1; + } + + return parse_property_list (ptr + 8, bytes_left - 8); +} + +int zmq::plain_mechanism_t::ready_command (msg_t *msg_) const +{ + unsigned char * const command_buffer = (unsigned char *) malloc (512); + alloc_assert (command_buffer); + + unsigned char *ptr = command_buffer; + + // Add mechanism string + memcpy (ptr, "READY ", 8); + ptr += 8; + + // Add socket type property + const char *socket_type = socket_type_string (options.type); + ptr += add_property (ptr, "Socket-Type", socket_type, strlen (socket_type)); + + // Add identity property + if (options.type == ZMQ_REQ + || options.type == ZMQ_DEALER + || options.type == ZMQ_ROUTER) { + ptr += add_property (ptr, "Identity", + options.identity, options.identity_size); + } + + const size_t command_size = ptr - command_buffer; + const int rc = msg_->init_size (command_size); + errno_assert (rc == 0); + memcpy (msg_->data (), command_buffer, command_size); + free (command_buffer); + + return 0; +} + +int zmq::plain_mechanism_t::process_ready_command (msg_t *msg_) +{ + const unsigned char *ptr = static_cast (msg_->data ()); + size_t bytes_left = msg_->size (); + + if (bytes_left < 8 || memcmp (ptr, "READY ", 8)) { + errno = EPROTO; + return -1; + } + + return parse_property_list (ptr + 8, bytes_left - 8); +} + +int zmq::plain_mechanism_t::parse_property_list (const unsigned char *ptr, + size_t bytes_left) +{ + while (bytes_left > 1) { + const size_t name_length = static_cast (*ptr); + ptr += 1; + bytes_left -= 1; + + if (bytes_left < name_length) + break; + const std::string name = std::string((const char *) ptr, name_length); + ptr += name_length; + bytes_left -= name_length; + + if (bytes_left < 4) + break; + const size_t value_length = static_cast (get_uint32 (ptr)); + ptr += 4; + bytes_left -= 4; + + if (bytes_left < value_length) + break; + const unsigned char * const value = ptr; + ptr += value_length; + bytes_left -= value_length; + + if (name == "Socket-Type") { + // TODO: Implement socket type checking + } + else + if (name == "Identity" && options.recv_identity) + set_peer_identity (value, value_length); + } + + if (bytes_left > 0) { + errno = EPROTO; + return -1; + } + + return 0; +} diff --git a/src/plain_mechanism.hpp b/src/plain_mechanism.hpp new file mode 100644 index 00000000..a89d0a41 --- /dev/null +++ b/src/plain_mechanism.hpp @@ -0,0 +1,74 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PLAIN_MECHANISM_HPP_INCLUDED__ +#define __ZMQ_PLAIN_MECHANISM_HPP_INCLUDED__ + +#include "mechanism.hpp" +#include "options.hpp" + +namespace zmq +{ + + class msg_t; + + class plain_mechanism_t : public mechanism_t + { + public: + + plain_mechanism_t (const options_t &options_, bool as_server_); + virtual ~plain_mechanism_t (); + + // mechanism implementation + virtual int next_handshake_message (msg_t *msg_); + virtual int process_handshake_message (msg_t *msg_); + virtual bool is_handshake_complete () const; + + private: + + enum state_t { + sending_hello, + waiting_for_hello, + sending_welcome, + waiting_for_welcome, + sending_initiate, + waiting_for_initiate, + sending_ready, + waiting_for_ready, + ready + }; + + state_t state; + + int hello_command (msg_t *msg_) const; + int welcome_command (msg_t *msg_) const; + int initiate_command (msg_t *msg_) const; + int ready_command (msg_t *msg_) const; + + int process_hello_command (msg_t *msg_); + int process_welcome_command (msg_t *msg); + int process_ready_command (msg_t *msg_); + int process_initiate_command (msg_t *msg_); + + int parse_property_list (const unsigned char *ptr, size_t length); + }; + +} + +#endif diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 6e4e69d1..4c604db8 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -41,6 +41,7 @@ #include "v2_encoder.hpp" #include "v2_decoder.hpp" #include "null_mechanism.hpp" +#include "plain_mechanism.hpp" #include "raw_decoder.hpp" #include "raw_encoder.hpp" #include "config.hpp" @@ -49,8 +50,10 @@ #include "likely.hpp" #include "wire.hpp" -zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) : +zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, + bool as_server_, const std::string &endpoint_) : s (fd_), + as_server (as_server_), inpos (NULL), insize (0), decoder (NULL), @@ -523,13 +526,19 @@ bool zmq::stream_engine_t::handshake () if (memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) { mechanism = new (std::nothrow) null_mechanism_t (options); alloc_assert (mechanism); - read_msg = &stream_engine_t::next_handshake_message; - write_msg = &stream_engine_t::process_handshake_message; + } + else + if (memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) { + mechanism = new (std::nothrow) plain_mechanism_t (options, as_server); + alloc_assert (mechanism); } else { error (); return false; } + + read_msg = &stream_engine_t::next_handshake_message; + write_msg = &stream_engine_t::process_handshake_message; } // Start polling for output if necessary. diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 91d641b8..31acc83c 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -52,7 +52,8 @@ namespace zmq { public: - stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint); + stream_engine_t (fd_t fd_, const options_t &options_, + bool as_server_, const std::string &endpoint); ~stream_engine_t (); // i_engine interface implementation. @@ -112,6 +113,9 @@ namespace zmq // Underlying socket. fd_t s; + // True iff this is server's engine. + bool as_server; + msg_t tx_msg; handle_t handle; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index e1ddf617..b83854ce 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -125,7 +125,8 @@ void zmq::tcp_connecter_t::out_event () tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); + stream_engine_t *engine = new (std::nothrow) + stream_engine_t (fd, options, false, endpoint); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 7de38cb1..a291f53c 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -91,7 +91,8 @@ void zmq::tcp_listener_t::in_event () tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); + stream_engine_t *engine = new (std::nothrow) + stream_engine_t (fd, options, true, endpoint); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already