Skip to content

Commit

Permalink
Implement non-blocking shutdown command that unblocks other threads w…
Browse files Browse the repository at this point in the history
…aiting on blocking operations.
  • Loading branch information
ricnewton committed Sep 10, 2013
1 parent a601b3f commit 64e1c18
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ set(tests
test_system
test_connect_delay
test_connect_resolve
test_ctx_destroy
test_ctx_options
test_disconnect_inproc
test_hwm
Expand Down
52 changes: 52 additions & 0 deletions doc/zmq_ctx_shutdown.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
zmq_ctx_shutdown(3)
==================


NAME
----
zmq_ctx_shutdown - shutdown a 0MQ context


SYNOPSIS
--------
*int zmq_ctx_shutdown (void '*context');*


DESCRIPTION
-----------
The _zmq_ctx_shutdown()_ function shall shutdown the 0MQ context 'context'.

Context shutdown will cause any blocking operations currently in progress on
sockets open within 'context' to return immediately with an error code of ETERM.
With the exception of _zmq_close()_, any further operations on sockets open within
'context' shall fail with an error code of ETERM.

This function is optional, client code is still required to call the linkzmq:zmq_ctx_term[3]
function to free all resources allocated by zeromq.


RETURN VALUE
------------
The _zmq_ctx_shutdown()_ function shall return zero if successful. Otherwise
it shall return `-1` and set 'errno' to one of the values defined below.


ERRORS
------
*EFAULT*::
The provided 'context' was invalid.


SEE ALSO
--------
linkzmq:zmq[7]
linkzmq:zmq_init[3]
linkzmq:zmq_ctx_term[3]
linkzmq:zmq_close[3]
linkzmq:zmq_setsockopt[3]


AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
1 change: 1 addition & 0 deletions include/zmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum);

ZMQ_EXPORT void *zmq_ctx_new (void);
ZMQ_EXPORT int zmq_ctx_term (void *context);
ZMQ_EXPORT int zmq_ctx_shutdown (void *ctx_);
ZMQ_EXPORT int zmq_ctx_set (void *context, int option, int optval);
ZMQ_EXPORT int zmq_ctx_get (void *context, int option);

Expand Down
19 changes: 19 additions & 0 deletions src/ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,25 @@ int zmq::ctx_t::terminate ()
return 0;
}

int zmq::ctx_t::shutdown ()
{
slot_sync.lock ();
if (!starting && !terminating) {
terminating = true;

// Send stop command to sockets so that any blocking calls
// can be interrupted. If there are no sockets we can ask reaper
// thread to stop.
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
sockets [i]->stop ();
if (sockets.empty ())
reaper->stop ();
}
slot_sync.unlock ();

return 0;
}

int zmq::ctx_t::set (int option_, int optval_)
{
int rc = 0;
Expand Down
9 changes: 9 additions & 0 deletions src/ctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ namespace zmq
// after the last one is closed.
int terminate ();

// This function starts the terminate process by unblocking any blocking
// operations currently in progress and stopping any more socket activity
// (except zmq_close).
// This function is non-blocking.
// terminate must still be called afterwards.
// This function is optional, terminate will unblock any current
// operations as well.
int shutdown();

// Set and get context properties.
int set (int option_, int optval_);
int get (int option_);
Expand Down
10 changes: 10 additions & 0 deletions src/zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ int zmq_ctx_term (void *ctx_)
return rc;
}

int zmq_ctx_shutdown (void *ctx_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}

return ((zmq::ctx_t*) ctx_)->shutdown ();
}

int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
Expand Down
1 change: 1 addition & 0 deletions tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ noinst_PROGRAMS = test_system \
test_stream \
test_disconnect_inproc \
test_ctx_options \
test_ctx_destroy \
test_security_null \
test_security_plain \
test_security_curve \
Expand Down
90 changes: 90 additions & 0 deletions tests/test_ctx_destroy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <string.h>
#include "testutil.hpp"

static void receiver (void *socket)
{
char buffer[16];
int rc = zmq_recv (socket, &buffer, sizeof (buffer), 0);
assert(rc == -1);
}

void test_ctx_destroy()
{
int rc;

// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);

void *socket = zmq_socket (ctx, ZMQ_PULL);
assert (socket);

// Close the socket
rc = zmq_close (socket);
assert (rc == 0);

// Destroy the context
rc = zmq_ctx_destroy (ctx);
assert (rc == 0);
}

void test_ctx_shutdown()
{
int rc;

// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);

void *socket = zmq_socket (ctx, ZMQ_PULL);
assert (socket);

// Spawn a thread to receive on socket
void *receiver_thread = zmq_threadstart (&receiver, socket);

// Shutdown context, if we used destroy here we would deadlock.
rc = zmq_ctx_shutdown (ctx);
assert (rc == 0);

// Wait for thread to finish
zmq_threadclose (receiver_thread);

// Close the socket.
rc = zmq_close (socket);
assert (rc == 0);

// Destory the context, will now not hang as we have closed the socket.
rc = zmq_ctx_destroy (ctx);
assert (rc == 0);
}

int main (void)
{
setup_test_environment();

test_ctx_destroy();
test_ctx_shutdown();

return 0;
}

0 comments on commit 64e1c18

Please sign in to comment.