diff --git a/nats/aio/client.py b/nats/aio/client.py index f6281464..db29dbb9 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -20,6 +20,7 @@ import ssl import sys import time +from secrets import token_hex from dataclasses import dataclass from email.parser import BytesParser from random import shuffle @@ -56,7 +57,7 @@ Subscription, ) -__version__ = '2.1.6' +__version__ = '2.1.7' __lang__ = 'python3' _logger = logging.getLogger(__name__) PROTOCOL = 1 @@ -168,12 +169,8 @@ def __init__(self) -> None: self._reconnection_task: Union[asyncio.Task[None], None] = None self._reconnection_task_future: Optional[asyncio.Future] = None self._max_payload: int = DEFAULT_MAX_PAYLOAD_SIZE - # This is the client id that the NATS server knows - # about. Useful in debugging application errors - # when logged with this identifier along - # with nats server log. - # This would make more sense if we log the server - # connected to as well in case of cluster setup. + + # client id that the NATS server knows about. self._client_id: Optional[str] = None self._sid: int = 0 self._subs: Dict[int, Subscription] = {} @@ -929,8 +926,10 @@ async def _request_new_style( await self._init_request_sub() assert self._resp_sub_prefix - # Use a new NUID for the token inbox and then use the future. + # Use a new NUID + couple of unique token bytes to identify the request, + # then use the future to get the response. token = self._nuid.next() + token.extend(token_hex(2).encode()) inbox = self._resp_sub_prefix[:] inbox.extend(token) future: asyncio.Future = asyncio.Future() diff --git a/nats/nuid.py b/nats/nuid.py index eabf5808..fd93d14c 100644 --- a/nats/nuid.py +++ b/nats/nuid.py @@ -12,8 +12,9 @@ # limitations under the License. # -from random import Random, SystemRandom +from random import Random from sys import maxsize as MaxInt +from secrets import token_bytes, randbelow DIGITS = b'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' BASE = 62 @@ -33,8 +34,7 @@ class NUID: """ def __init__(self) -> None: - self._srand = SystemRandom() - self._prand = Random(self._srand.randint(0, MaxInt)) + self._prand = Random(randbelow(MaxInt)) self._seq = self._prand.randint(0, MAX_SEQ) self._inc = MIN_INC + self._prand.randint(BASE + 1, INC) self._prefix = bytearray() @@ -60,9 +60,7 @@ def next(self) -> bytearray: return prefix def randomize_prefix(self) -> None: - random_bytes = ( - self._srand.getrandbits(8) for i in range(PREFIX_LENGTH) - ) + random_bytes = token_bytes(PREFIX_LENGTH) self._prefix = bytearray(DIGITS[c % BASE] for c in random_bytes) def reset_sequential(self) -> None: diff --git a/tests/test_js.py b/tests/test_js.py index 6bdb698b..d0e2444d 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -700,6 +700,22 @@ async def error_cb(err): assert len(msgs) <= 100 assert sub.pending_msgs == 0 assert sub.pending_bytes == 0 + + # Consumer has a single message pending but none in buffer. + await js.publish("a3", b'last message') + info = await sub.consumer_info() + assert info.num_pending == 1 + assert sub.pending_msgs == 0 + + # Remove interest + await sub.unsubscribe() + with pytest.raises(TimeoutError): + await sub.fetch(1, timeout=1) + + # The pending message is still there, but not possible to consume. + info = await sub.consumer_info() + assert info.num_pending == 1 + await nc.close() diff --git a/todo.md b/todo.md index 3edc2b34..07c81d8d 100644 --- a/todo.md +++ b/todo.md @@ -20,7 +20,7 @@ - [X] io_loop becomes loop parameter - [X] Drain Mode - [X] Connect timeout +- [X] Adopt async/await in client +- [X] Subscription object on subscribe +- [X] Error handler yields the subscription - [ ] Use asyncio.Protocol -- [ ] Adopt async/await in client -- [ ] Subscription object on subscribe -- [ ] Error handler yields the subscription