diff --git a/src/session_base.cpp b/src/session_base.cpp index 12900df5..d4e9855c 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -230,27 +230,29 @@ void zmq::session_base_t::clean_pipes () void zmq::session_base_t::terminated (pipe_t *pipe_) { // Drop the reference to the deallocated pipe if required. - zmq_assert (pipe == pipe_ || incomplete_pipes.size () > 0); + zmq_assert (pipe == pipe_ || terminating_pipes.count (pipe_) == 1); if (pipe == pipe_) // If this is our current pipe, remove it pipe = NULL; else // Remove the pipe from the detached pipes set - incomplete_pipes.erase (pipe_); + terminating_pipes.erase (pipe_); // If we are waiting for pending messages to be sent, at this point // we are sure that there will be no more messages and we can proceed // with termination safely. - if (pending && !pipe && incomplete_pipes.size () == 0) + if (pending && !pipe && terminating_pipes.size () == 0) proceed_with_term (); } void zmq::session_base_t::read_activated (pipe_t *pipe_) { // Skip activating if we're detaching this pipe - if (incomplete_pipes.size () > 0 && pipe_ != pipe) + if (pipe != pipe_) { + zmq_assert (terminating_pipes.count (pipe_) == 1); return; + } if (likely (engine != NULL)) engine->activate_out (); @@ -261,8 +263,10 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_) void zmq::session_base_t::write_activated (pipe_t *pipe_) { // Skip activating if we're detaching this pipe - if (incomplete_pipes.size () > 0 && pipe_ != pipe) + if (pipe != pipe_) { + zmq_assert (terminating_pipes.count (pipe_) == 1); return; + } if (engine) engine->activate_in (); @@ -411,7 +415,7 @@ void zmq::session_base_t::detached () && addr->protocol != "pgm" && addr->protocol != "epgm") { pipe->hiccup (); pipe->terminate (false); - incomplete_pipes.insert (pipe); + terminating_pipes.insert (pipe); pipe = NULL; } diff --git a/src/session_base.hpp b/src/session_base.hpp index 7b9f3fc6..fa765ba3 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -105,7 +105,7 @@ namespace zmq zmq::pipe_t *pipe; // This set is added to with pipes we are disconnecting, but haven't yet completed - std::set incomplete_pipes; + std::set terminating_pipes; // This flag is true if the remainder of the message being processed // is still in the in pipe.