diff --git a/docs/source/api/zmq.md b/docs/source/api/zmq.md index fa265ce31..05ceb079b 100644 --- a/docs/source/api/zmq.md +++ b/docs/source/api/zmq.md @@ -10,10 +10,10 @@ ## Basic Classes ````{note} -For typing purposes, `zmq.Context` and `zmq.Socket` are Generics, +For typing purposes, {class}`.zmq.Context` and {class}`.zmq.Socket` are Generics, which means they will accept any Context or Socket implementation. -The base `zmq.Context()` constructor returns the type +The base {class}`zmq.Context()` constructor returns the type `zmq.Context[zmq.Socket[bytes]]`. If you are using type annotations and want to _exclude_ the async subclasses, use the resolved types instead of the base Generics: @@ -32,7 +32,7 @@ sock: zmq.SyncSocket ```` -### {class}`Context` +## {class}`Context` ```{eval-rst} .. autoclass:: Context @@ -47,7 +47,7 @@ sock: zmq.SyncSocket ``` -### {class}`Socket` +## {class}`Socket` ```{eval-rst} .. autoclass:: Socket @@ -81,7 +81,7 @@ sock: zmq.SyncSocket ``` -### {class}`Frame` +## {class}`Frame` ```{eval-rst} .. autoclass:: Frame @@ -90,7 +90,7 @@ sock: zmq.SyncSocket ``` -### {class}`MessageTracker` +## {class}`MessageTracker` ```{eval-rst} .. autoclass:: MessageTracker @@ -99,9 +99,7 @@ sock: zmq.SyncSocket ``` -## Polling - -### {class}`Poller` +## {class}`Poller` ```{eval-rst} .. autoclass:: Poller diff --git a/docs/source/howto/morethanbindings.md b/docs/source/howto/morethanbindings.md index afb4b0272..73ce62f53 100644 --- a/docs/source/howto/morethanbindings.md +++ b/docs/source/howto/morethanbindings.md @@ -122,11 +122,24 @@ as first-class methods to the {class}`~.zmq.Socket` class. A socket has the meth {meth}`~.zmq.Socket.send_json` and {meth}`~.zmq.Socket.send_pyobj`, which correspond to sending an object over the wire after serializing with {mod}`json` and {mod}`pickle` respectively, and any object sent via those methods can be reconstructed with the -{meth}`~.zmq.Socket.recv_json` and {meth}`~.zmq.Socket.recv_pyobj` methods. Unicode strings are -other objects that are not unambiguously sendable over the wire, so we include -{meth}`~.zmq.Socket.send_string` and {meth}`~.zmq.Socket.recv_string` that simply send bytes +{meth}`~.zmq.Socket.recv_json` and {meth}`~.zmq.Socket.recv_pyobj` methods. + +```{warning} +Deserializing with pickle grants the message sender access to arbitrary code execution on the receiver. +Never use `recv_pyobj` on a socket that might receive messages from untrusted sources +before authenticating the sender. + +It's always a good idea to enable CURVE security if you can, +or authenticate messages with e.g. HMAC digests or other signing mechanisms. +``` + +Text strings are other objects that are not unambiguously sendable over the wire, so we include +{meth}`~.zmq.Socket.send_string` and {meth}`~.zmq.Socket.recv_string` that send bytes after encoding the message ('utf-8' is the default). +These are all convenience methods, and users are encouraged to build their own serialization that best suits their applications needs, +especially concerning performance and security. + ```{seealso} - {ref}`Further information ` on serialization in pyzmq. ``` diff --git a/docs/source/howto/serialization.md b/docs/source/howto/serialization.md index d27f62efa..79e467184 100644 --- a/docs/source/howto/serialization.md +++ b/docs/source/howto/serialization.md @@ -8,85 +8,170 @@ When sending messages over a network, you often need to marshall your data into ## Builtin serialization -PyZMQ is primarily bindings for libzmq, but we do provide three builtin serialization +PyZMQ is primarily bindings for libzmq, but we do provide some builtin serialization methods for convenience, to help Python developers learn libzmq. Python has two primary -packages for serializing objects: {py:mod}`json` and {py:mod}`pickle`, so we provide -simple convenience methods for sending and receiving objects serialized with these -modules. A socket has the methods {meth}`~.Socket.send_json` and +modules for serializing objects in the standard library: {py:mod}`json` and {py:mod}`pickle`, +so pyzmq provides simple convenience methods for sending and receiving objects serialized with these modules. +A socket has the methods {meth}`~.Socket.send_json` and {meth}`~.Socket.send_pyobj`, which correspond to sending an object over the wire after serializing with json and pickle respectively, and any object sent via those methods can be reconstructed with the {meth}`~.Socket.recv_json` and {meth}`~.Socket.recv_pyobj` methods. -These methods designed for convenience, not for performance, so developers who want -to emphasize performance should use their own serialized send/recv methods. +```{note} +These methods are meant more for convenience and demonstration purposes, not for performance or safety. +Applications should usually define their own serialized send/recv functions. +``` + +```{warning} +`send/recv_pyobj` are very basic wrappers around `send(pickle.dumps(obj))` and `pickle.loads(recv())`. +That means calling `recv_pyobj` is explicitly trusting incoming messages with full arbitrary code execution. +Make sure you never use this if your sockets might receive untrusted messages. +You can protect your sockets by e.g.: + +- enabling CURVE encryption/authentication, IPC socket permissions, or other socket-level security to prevent unauthorized messages in the first place, or +- using some kind of message authentication, such as HMAC digests, to verify trusted messages **before** deserializing +``` ## Using your own serialization In general, you will want to provide your own serialization that is optimized for your -application or library availability. This may include using your own preferred -serialization ([^cite_msgpack], [^cite_protobuf]), or adding compression via [^cite_zlib] in the standard -library, or the super fast [^cite_blosc] library. +application goals or library availability. This may include using your own preferred +serialization such as [msgpack] or [msgspec], +or adding compression via {py:mod}`zlib` in the standard library, +or the super fast [blosc] library. + +```{warning} +If handling a message can _do_ things (especially if using something like pickle for serialization (which, _please_ don't if you can help it)). +Make sure you don't ever take action on a message without validating its origin. +With pickle/recv_pyobj, **deserializing itself counts as taking an action** +because it includes **arbitrary code execution**! +``` + +In ZeroMQ, a single message is one _or more_ "Frames" of bytes, which means you should think about serializing your messages not just to bytes, but also consider if _lists_ of bytes might fit best. +Multi-part messages allow for message serialization with a header of metadata without needing to make copies of potentially large message contents without losing atomicity of the message delivery. + +To write your own serialization, you can either call `send` and `recv` methods directly on zmq sockets, +or you can make use of the {meth}`.Socket.send_serialized` / {meth}`.Socket.recv_serialized` methods. +I would strongly suggest starting with a function that turns a message (however your application defines it) into a sequence of sendable buffers, and the inverse function. + +For example: + +```python +socket.send_json(msg) +msg = socket.recv_json() +``` + +is equivalent to + +```python +def json_dump_bytes(msg: Any) -> list[bytes]: + return [json.dumps(msg).encode("utf8")] -There are two simple models for implementing your own serialization: write a function -that takes the socket as an argument, or subclass Socket for use in your own apps. + +def json_load_bytes(msg_list: list[bytes]) -> Any: + return json.loads(msg_list[0].decode("utf8")) + + +socket.send_multipart(json_dump_bytes(msg)) +msg = json_load_bytes(socket.recv_multipart()) +# or +socket.send_serialized(msg, serialize=json_dump_bytes) +msg = socket.recv_serialized(json_load_bytes) +``` + +### Example: pickling Python objects + +As an example, pickle is Python's powerful built-in serialization for arbitrary Python objects. +Two potential issues you might face: + +1. sometimes it is inefficient, and +1. `pickle.loads` enables arbitrary code execution For instance, pickles can often be reduced substantially in size by compressing the data. -The following will send *compressed* pickles over the wire: +We also want to make sure we don't call `pickle.loads` on any untrusted messages. +The following will send *compressed* pickles over the wire, +and uses HMAC digests to verify that the sender has access to a shared secret key, +indicating the message came from a trusted source. ```python +import haslib +import hmac import pickle import zlib -def send_zipped_pickle(socket, obj, flags=0, protocol=pickle.HIGHEST_PROTOCOL): - """pickle an object, and zip the pickle before sending it""" +def sign(self, key: bytes, msg: bytes) -> bytes: + """Compute the HMAC digest of msg, given signing key `key`""" + return hmac.HMAC( + key, + msg, + digestmod=hashlib.sha256, + ).digest() + + +def send_signed_zipped_pickle( + socket, obj, flags=0, *, key, protocol=pickle.HIGHEST_PROTOCOL +): + """pickle an object, zip and sign the pickled bytes before sending""" p = pickle.dumps(obj, protocol) z = zlib.compress(p) - return socket.send(z, flags=flags) + signature = sign(key, zobj) + return socket.send_multipart([signature, z], flags=flags) -def recv_zipped_pickle(socket, flags=0): - """inverse of send_zipped_pickle""" - z = socket.recv(flags) +def recv_signed_zipped_pickle(socket, flags=0, *, key): + """inverse of send_signed_zipped_pickle""" + sig, z = socket.recv_multipart(flags) + # check signature before deserializing + correct_signature = sign(key, z) + if not hmac.compare_digest(sig, correct_signature): + raise ValueError("invalid signature") p = zlib.decompress(z) return pickle.loads(p) ``` +### Example: numpy arrays + A common data structure in Python is the numpy array. PyZMQ supports sending numpy arrays without copying any data, since they provide the Python buffer interface. -However just the buffer is not enough information to reconstruct the array on the -receiving side. Here is an example of a send/recv that allow non-copying +However, just the buffer is not enough information to reconstruct the array on the +receiving side because it arrives as a 1-D array of bytes. +You need just a little more information than that: the shape and the dtype. + +Here is an example of a send/recv that allow non-copying sends/recvs of numpy arrays including the dtype/shape data necessary for reconstructing the array. +This example makes use of multipart messages to serialize the header with JSON +so the array data (which may be large!) doesn't need any unnecessary copies. ```python import numpy -def send_array(socket, A, flags=0, copy=True, track=False): +def send_array( + socket: zmq.Socket, + A: numpy.ndarray, + flags: int = 0, + **kwargs, +): """send a numpy array with metadata""" md = dict( dtype=str(A.dtype), shape=A.shape, ) socket.send_json(md, flags | zmq.SNDMORE) - return socket.send(A, flags, copy=copy, track=track) + return socket.send(A, flags, **kwargs) -def recv_array(socket, flags=0, copy=True, track=False): +def recv_array(socket: zmq.Socket, flags: int = 0, **kwargs) -> numpy.array: """recv a numpy array""" md = socket.recv_json(flags=flags) - msg = socket.recv(flags=flags, copy=copy, track=track) - buf = memoryview(msg) - A = numpy.frombuffer(buf, dtype=md["dtype"]) + msg = socket.recv(flags=flags, **kwargs) + A = numpy.frombuffer(msg, dtype=md["dtype"]) return A.reshape(md["shape"]) ``` -[^cite_msgpack]: Message Pack serialization library - -[^cite_protobuf]: Google Protocol Buffers - -[^cite_zlib]: Python stdlib module for zip compression: {py:mod}`zlib` - -[^cite_blosc]: Blosc: A blocking, shuffling and loss-less (and crazy-fast) compression library +[blosc]: https://www.blosc.org +[msgpack]: https://msgpack.org +[msgspec]: https://jcristharif.com/msgspec/ diff --git a/examples/gevent/simple.py b/examples/gevent/simple.py index 1996b2000..1b21bf42e 100644 --- a/examples/gevent/simple.py +++ b/examples/gevent/simple.py @@ -1,4 +1,4 @@ -from typing import Optional +from __future__ import annotations from gevent import spawn, spawn_later @@ -10,13 +10,13 @@ sock = ctx.socket(zmq.PUSH) sock.bind('ipc:///tmp/zmqtest') -spawn(sock.send_pyobj, ('this', 'is', 'a', 'python', 'tuple')) -spawn_later(1, sock.send_pyobj, {'hi': 1234}) +spawn(sock.send_json, ['this', 'is', 'a', 'list']) +spawn_later(1, sock.send_json, {'hi': 1234}) spawn_later( - 2, sock.send_pyobj, ({'this': ['is a more complicated object', ':)']}, 42, 42, 42) + 2, sock.send_json, ({'this': ['is a more complicated object', ':)']}, 42, 42, 42) ) -spawn_later(3, sock.send_pyobj, 'foobar') -spawn_later(4, sock.send_pyobj, 'quit') +spawn_later(3, sock.send_json, 'foobar') +spawn_later(4, sock.send_json, 'quit') # client @@ -27,14 +27,14 @@ def get_objs(sock: zmq.Socket): while True: - o = sock.recv_pyobj() - print('received python object:', o) + o = sock.recv_json() + print('received:', o) if o == 'quit': print('exiting.') break -def print_every(s: str, t: Optional[float] = None): +def print_every(s: str, t: float | None = None): print(s) if t: spawn_later(t, print_every, s, t) diff --git a/examples/pubsub/publisher.py b/examples/pubsub/publisher.py index 61938ba09..3fddcfffe 100644 --- a/examples/pubsub/publisher.py +++ b/examples/pubsub/publisher.py @@ -1,6 +1,6 @@ """A test that publishes NumPy arrays. -Uses REQ/REP (on PUB/SUB socket + 1) to synchronize +Uses XPUB to wait for subscription to start """ # ----------------------------------------------------------------------------- @@ -10,50 +10,51 @@ # the file LICENSE.BSD, distributed as part of this software. # ----------------------------------------------------------------------------- -import sys +from argparse import ArgumentParser import numpy import zmq -def sync(bind_to: str) -> None: - # use bind socket + 1 - sync_with = ':'.join( - bind_to.split(':')[:-1] + [str(int(bind_to.split(':')[-1]) + 1)] - ) - ctx = zmq.Context.instance() - s = ctx.socket(zmq.REP) - s.bind(sync_with) - print("Waiting for subscriber to connect...") - s.recv() - print(" Done.") - s.send(b'GO') +def send_array( + socket: zmq.Socket, array: numpy.ndarray, flags=0, copy=True, track=False +): + md = { + "shape": array.shape, + # is there a better way to serialize dtypes? + "dtype": str(array.dtype), + } + socket.send_json(md, flags | zmq.SNDMORE) + return socket.send(array, flags, copy=copy, track=track) def main() -> None: - if len(sys.argv) != 4: - print('usage: publisher ') - sys.exit(1) - - try: - bind_to = sys.argv[1] - array_size = int(sys.argv[2]) - array_count = int(sys.argv[3]) - except (ValueError, OverflowError) as e: - print('array-size and array-count must be integers') - sys.exit(1) - + parser = ArgumentParser() + parser.add_argument("--url", default="tcp://127.0.0.1:5555") + parser.add_argument( + "-n", "--count", default=10, type=int, help="number of arrays to send" + ) + parser.add_argument( + "--size", + default=1024, + type=int, + help="size of the arrays to send (length of each dimension). Total size is size**nd", + ) + parser.add_argument("--nd", default=2, type=int, help="number of dimensions") + args = parser.parse_args() + bind_to = args.url ctx = zmq.Context() - s = ctx.socket(zmq.PUB) + s = ctx.socket(zmq.XPUB) s.bind(bind_to) - - sync(bind_to) - + print("Waiting for subscriber") + s.recv() print("Sending arrays...") - for i in range(array_count): - a = numpy.random.rand(array_size, array_size) - s.send_pyobj(a) + shape = (args.size,) * args.nd + for i in range(args.count): + a = numpy.random.random(shape) + send_array(s, a) + s.send_json({"done": True}) print(" Done.") diff --git a/examples/pubsub/subscriber.py b/examples/pubsub/subscriber.py index 0e890d924..e0710883f 100644 --- a/examples/pubsub/subscriber.py +++ b/examples/pubsub/subscriber.py @@ -10,57 +10,56 @@ # the file LICENSE.BSD, distributed as part of this software. # ----------------------------------------------------------------------------- -import sys +from __future__ import annotations + import time +from argparse import ArgumentParser +from typing import Any, cast + +import numpy import zmq -def sync(connect_to: str) -> None: - # use connect socket + 1 - sync_with = ':'.join( - connect_to.split(':')[:-1] + [str(int(connect_to.split(':')[-1]) + 1)] - ) - ctx = zmq.Context.instance() - s = ctx.socket(zmq.REQ) - s.connect(sync_with) - s.send(b'READY') - s.recv() +def recv_array( + socket: zmq.Socket, flags: int = 0, copy: bool = True, track: bool = False +) -> numpy.ndarray | None: + """recv a numpy array""" + header = cast(dict[str, Any], socket.recv_json(flags=flags)) + if header.get('done', False): + return None + msg = socket.recv(flags=flags, copy=copy, track=track) + A = numpy.frombuffer(msg, dtype=header['dtype']) # type: ignore + return A.reshape(header['shape']) def main() -> None: - if len(sys.argv) != 3: - print('usage: subscriber ') - sys.exit(1) - - try: - connect_to = sys.argv[1] - array_count = int(sys.argv[2]) - except (ValueError, OverflowError) as e: - print('array-count must be integers') - sys.exit(1) + parser = ArgumentParser() + parser.add_argument("--url", default="tcp://127.0.0.1:5555") + args = parser.parse_args() ctx = zmq.Context() s = ctx.socket(zmq.SUB) - s.connect(connect_to) - s.setsockopt(zmq.SUBSCRIBE, b'') - - sync(connect_to) - - start = time.process_time() + s.connect(args.url) + s.subscribe(b'') + start = time.perf_counter() print("Receiving arrays...") - for i in range(array_count): - a = s.recv_pyobj() + a = first_array = recv_array(s) + assert first_array is not None + array_count = 0 + while a is not None: + array_count += 1 + a = recv_array(s) print(" Done.") - end = time.process_time() + end = time.perf_counter() elapsed = end - start throughput = float(array_count) / elapsed - message_size = a.nbytes + message_size = first_array.nbytes megabits = float(throughput * message_size * 8) / 1000000 print(f"message size: {message_size:.0f} [B]") @@ -68,8 +67,6 @@ def main() -> None: print(f"mean throughput: {throughput:.0f} [msg/s]") print(f"mean throughput: {megabits:.3f} [Mb/s]") - time.sleep(1.0) - if __name__ == "__main__": main() diff --git a/examples/serialization/serialsocket.py b/examples/serialization/serialsocket.py index ac50b2f5c..7134ab2d7 100644 --- a/examples/serialization/serialsocket.py +++ b/examples/serialization/serialsocket.py @@ -1,7 +1,10 @@ """A Socket subclass that adds some serialization methods.""" +import hmac import pickle +import secrets import zlib +from hashlib import sha256 from typing import Any, Dict, cast import numpy @@ -13,24 +16,51 @@ class SerializingSocket(zmq.Socket): """A class with some extra serialization methods send_zipped_pickle is just like send_pyobj, but uses - zlib to compress the stream before sending. + zlib to compress the stream before sending, + and signs messages with a key for authentication because + we must never load untrusted pickles. - send_array sends numpy arrays with metadata necessary + send_array sends numpy arrays without copying the array, + along with metadata necessary for reconstructing the array on the other side (dtype,shape). """ + signing_key: bytes + + def sign(self, buffer: bytes) -> bytes: + return hmac.HMAC( + self.signing_key, + buffer, + sha256, + ).digest() + def send_zipped_pickle( - self, obj: Any, flags: int = 0, protocol: int = pickle.HIGHEST_PROTOCOL + self, + obj: Any, + flags: int = 0, + *, + protocol: int = pickle.HIGHEST_PROTOCOL, ) -> None: """pack and compress an object with pickle and zlib.""" pobj = pickle.dumps(obj, protocol) zobj = zlib.compress(pobj) - print(f'zipped pickle is {len(zobj)} bytes') + shrinkage = len(pobj) / len(zobj) + print(f'zipped pickle is {len(zobj)} bytes ({shrinkage:.1f}x smaller)') + signature = self.sign(zobj) + self.send(signature, flags=flags | zmq.SNDMORE) return self.send(zobj, flags=flags) def recv_zipped_pickle(self, flags: int = 0) -> Any: """reconstruct a Python object sent with zipped_pickle""" + recvd_signature = self.recv() + assert self.rcvmore zobj = self.recv(flags) + check_signature = self.sign(zobj) + if not hmac.compare_digest(recvd_signature, check_signature): + raise ValueError("Invalid signature") + + # check signature before loading with pickle + # pickle.loads involves arbitrary code execution pobj = zlib.decompress(zobj) return pickle.loads(pobj) @@ -61,25 +91,38 @@ class SerializingContext(zmq.Context[SerializingSocket]): def main() -> None: ctx = SerializingContext() - req = ctx.socket(zmq.REQ) - rep = ctx.socket(zmq.REP) - - rep.bind('inproc://a') - req.connect('inproc://a') + push = ctx.socket(zmq.PUSH) + pull = ctx.socket(zmq.PULL) + + push.bind('inproc://a') + pull.connect('inproc://a') + # 'distribute' shared secret + push.signing_key = pull.signing_key = secrets.token_bytes(32) + # all ones is a best-case scenario for zip A = numpy.ones((1024, 1024)) print(f"Array is {A.nbytes} bytes") # send/recv with pickle+zip - req.send_zipped_pickle(A) - B = rep.recv_zipped_pickle() + push.send_zipped_pickle(A) + B = pull.recv_zipped_pickle() # now try non-copying version - rep.send_array(A, copy=False) - C = req.recv_array(copy=False) + push.send_array(A, copy=False) + C = pull.recv_array(copy=False) print("Checking zipped pickle...") print("Okay" if (A == B).all() else "Failed") print("Checking send_array...") print("Okay" if (C == B).all() else "Failed") + print("Checking incorrect signature...") + push.signing_key = b"wrong" + push.send_zipped_pickle(A) + try: + B = pull.recv_zipped_pickle() + except ValueError: + print("Okay") + else: + print("Failed") + if __name__ == '__main__': main() diff --git a/zmq/sugar/socket.py b/zmq/sugar/socket.py index 0f10f3d7b..f077a0df0 100644 --- a/zmq/sugar/socket.py +++ b/zmq/sugar/socket.py @@ -936,7 +936,17 @@ def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str: def send_pyobj( self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs ) -> zmq.Frame | None: - """Send a Python object as a message using pickle to serialize. + """ + Send a Python object as a message using pickle to serialize. + + .. warning:: + + Never deserialize an untrusted message with pickle, + which can involve arbitrary code execution. + Make sure to authenticate the sources of messages + before unpickling them, e.g. with transport-level security + (e.g. CURVE, ZAP, or IPC permissions) + or signed messages. Parameters ---------- @@ -952,7 +962,17 @@ def send_pyobj( return self.send(msg, flags=flags, **kwargs) def recv_pyobj(self, flags: int = 0) -> Any: - """Receive a Python object as a message using pickle to serialize. + """ + Receive a Python object as a message using UNSAFE pickle to serialize. + + .. warning:: + + Never deserialize an untrusted message with pickle, + which can involve arbitrary code execution. + Make sure to authenticate the sources of messages + before unpickling them, e.g. with transport-level security + (such as CURVE or IPC permissions) + or authenticating messages themselves before deserializing. Parameters ----------