From f7c070878b7246fd76aeca86f96b30fdd760c67a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 6 May 2025 17:04:03 +0800 Subject: [PATCH] Support constructing MessageId from results of send() and receive() Currently, `Producer.send` returns a `_pulsar.MessageId` instance, `Consumer.receive` returns a `MessageId` whose `message_id()` method returns a `_pulsar.MessageId` instance. This forces users to access the type from the C extension module (`_pulsar`). This patch adds a `MessageId.wrap` class method to convert the type from the C extension to the type in the `pulsar` module. It also exposes the comparison methods for `MessageId`. --- pulsar/__init__.py | 50 ++++++++++++++++++++++++++++++++++++++------ tests/pulsar_test.py | 23 ++++++++++++++++++++ 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 45b7a96..aae3359 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -81,7 +81,7 @@ class MessageId: """ def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1): - self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) + self._msg_id: _pulsar.MessageId = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) earliest = _pulsar.MessageId.earliest latest = _pulsar.MessageId.latest @@ -111,6 +111,24 @@ def __str__(self) -> str: """ return str(self._msg_id) + def __eq__(self, other) -> bool: + return self._msg_id == other._msg_id + + def __ne__(self, other) -> bool: + return self._msg_id != other._msg_id + + def __le__(self, other) -> bool: + return self._msg_id <= other._msg_id + + def __lt__(self, other) -> bool: + return self._msg_id < other._msg_id + + def __ge__(self, other) -> bool: + return self._msg_id >= other._msg_id + + def __gt__(self, other) -> bool: + return self._msg_id > other._msg_id + @staticmethod def deserialize(message_id_bytes): """ @@ -119,6 +137,14 @@ def deserialize(message_id_bytes): """ return _pulsar.MessageId.deserialize(message_id_bytes) + @classmethod + def wrap(cls, msg_id: _pulsar.MessageId): + """ + Wrap the underlying MessageId type from the C extension to the Python type. + """ + self = cls() + self._msg_id = msg_id + return self class Message: """ @@ -170,9 +196,13 @@ def event_timestamp(self): """ return self._message.event_timestamp() - def message_id(self): + def message_id(self) -> _pulsar.MessageId: """ The message ID that can be used to refer to this particular message. + + Returns + ---------- + A `_pulsar.MessageId` object that represents where the message is persisted. """ return self._message.message_id() @@ -1231,7 +1261,7 @@ def send(self, content, event_timestamp=None, deliver_at=None, deliver_after=None, - ): + ) -> _pulsar.MessageId: """ Publish a message on the topic. Blocks until the message is acknowledged @@ -1264,6 +1294,10 @@ def send(self, content, The timestamp is milliseconds and based on UTC deliver_after: optional Specify a delay in timedelta for the delivery of the messages. + + Returns + ---------- + A `_pulsar.MessageId` object that represents where the message is persisted. """ msg = self._build_msg(content, properties, partition_key, ordering_key, sequence_id, replication_clusters, disable_replication, event_timestamp, @@ -1502,7 +1536,7 @@ def batch_receive(self): messages.append(m) return messages - def acknowledge(self, message): + def acknowledge(self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId]): """ Acknowledge the reception of a single message. @@ -1511,7 +1545,7 @@ def acknowledge(self, message): Parameters ---------- - message : Message, _pulsar.Message, _pulsar.MessageId + message : Message, MessageId, _pulsar.Message, _pulsar.MessageId The received message or message id. Raises @@ -1521,10 +1555,12 @@ def acknowledge(self, message): """ if isinstance(message, Message): self._consumer.acknowledge(message._message) + elif isinstance(message, MessageId): + self._consumer.acknowledge(message._msg_id) else: self._consumer.acknowledge(message) - def acknowledge_cumulative(self, message): + def acknowledge_cumulative(self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId]): """ Acknowledge the reception of all the messages in the stream up to (and including) the provided message. @@ -1545,6 +1581,8 @@ def acknowledge_cumulative(self, message): """ if isinstance(message, Message): self._consumer.acknowledge_cumulative(message._message) + elif isinstance(message, MessageId): + self._consumer.acknowledge_cumulative(message._msg_id) else: self._consumer.acknowledge_cumulative(message) diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 5f9c259..94c04e3 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -1253,6 +1253,29 @@ def test_message_id(self): s = MessageId.latest.serialize() self.assertEqual(MessageId.deserialize(s), MessageId.latest) + client = Client(self.serviceUrl) + topic = f'test-message-id-compare-{str(time.time())}' + producer = client.create_producer(topic) + consumer = client.subscribe(topic, 'sub') + + sent_ids = [] + received_ids = [] + for i in range(5): + sent_ids.append(MessageId.wrap(producer.send(b'msg-%d' % i))) + msg = consumer.receive(TM) + received_ids.append(MessageId.wrap(msg.message_id())) + self.assertEqual(sent_ids[i], received_ids[i]) + consumer.acknowledge(received_ids[i]) + consumer.acknowledge_cumulative(received_ids[4]) + + for i in range(4): + self.assertLess(sent_ids[i], sent_ids[i + 1]) + self.assertLessEqual(sent_ids[i], sent_ids[i + 1]) + self.assertGreater(sent_ids[i + 1], sent_ids[i]) + self.assertGreaterEqual(sent_ids[i + 1], sent_ids[i]) + self.assertNotEqual(sent_ids[i], sent_ids[i + 1]) + client.close() + def test_get_topics_partitions(self): client = Client(self.serviceUrl) topic_partitioned = "persistent://public/default/test_get_topics_partitions"