Skip to content

Commit

Permalink
Merge pull request #335 from nats-io/reqid-nuid
Browse files Browse the repository at this point in the history
Add couple of extra unique token characters to requests
  • Loading branch information
wallyqs authored Aug 18, 2022
2 parents 84ac62f + ce35668 commit 12c85cc
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 17 deletions.
15 changes: 7 additions & 8 deletions nats/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import ssl
import sys
import time
from secrets import token_hex
from dataclasses import dataclass
from email.parser import BytesParser
from random import shuffle
Expand Down Expand Up @@ -56,7 +57,7 @@
Subscription,
)

__version__ = '2.1.6'
__version__ = '2.1.7'
__lang__ = 'python3'
_logger = logging.getLogger(__name__)
PROTOCOL = 1
Expand Down Expand Up @@ -168,12 +169,8 @@ def __init__(self) -> None:
self._reconnection_task: Union[asyncio.Task[None], None] = None
self._reconnection_task_future: Optional[asyncio.Future] = None
self._max_payload: int = DEFAULT_MAX_PAYLOAD_SIZE
# This is the client id that the NATS server knows
# about. Useful in debugging application errors
# when logged with this identifier along
# with nats server log.
# This would make more sense if we log the server
# connected to as well in case of cluster setup.

# client id that the NATS server knows about.
self._client_id: Optional[str] = None
self._sid: int = 0
self._subs: Dict[int, Subscription] = {}
Expand Down Expand Up @@ -929,8 +926,10 @@ async def _request_new_style(
await self._init_request_sub()
assert self._resp_sub_prefix

# Use a new NUID for the token inbox and then use the future.
# Use a new NUID + couple of unique token bytes to identify the request,
# then use the future to get the response.
token = self._nuid.next()
token.extend(token_hex(2).encode())
inbox = self._resp_sub_prefix[:]
inbox.extend(token)
future: asyncio.Future = asyncio.Future()
Expand Down
10 changes: 4 additions & 6 deletions nats/nuid.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# limitations under the License.
#

from random import Random, SystemRandom
from random import Random
from sys import maxsize as MaxInt
from secrets import token_bytes, randbelow

DIGITS = b'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
BASE = 62
Expand All @@ -33,8 +34,7 @@ class NUID:
"""

def __init__(self) -> None:
self._srand = SystemRandom()
self._prand = Random(self._srand.randint(0, MaxInt))
self._prand = Random(randbelow(MaxInt))
self._seq = self._prand.randint(0, MAX_SEQ)
self._inc = MIN_INC + self._prand.randint(BASE + 1, INC)
self._prefix = bytearray()
Expand All @@ -60,9 +60,7 @@ def next(self) -> bytearray:
return prefix

def randomize_prefix(self) -> None:
random_bytes = (
self._srand.getrandbits(8) for i in range(PREFIX_LENGTH)
)
random_bytes = token_bytes(PREFIX_LENGTH)
self._prefix = bytearray(DIGITS[c % BASE] for c in random_bytes)

def reset_sequential(self) -> None:
Expand Down
16 changes: 16 additions & 0 deletions tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,22 @@ async def error_cb(err):
assert len(msgs) <= 100
assert sub.pending_msgs == 0
assert sub.pending_bytes == 0

# Consumer has a single message pending but none in buffer.
await js.publish("a3", b'last message')
info = await sub.consumer_info()
assert info.num_pending == 1
assert sub.pending_msgs == 0

# Remove interest
await sub.unsubscribe()
with pytest.raises(TimeoutError):
await sub.fetch(1, timeout=1)

# The pending message is still there, but not possible to consume.
info = await sub.consumer_info()
assert info.num_pending == 1

await nc.close()


Expand Down
6 changes: 3 additions & 3 deletions todo.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
- [X] io_loop becomes loop parameter
- [X] Drain Mode
- [X] Connect timeout
- [X] Adopt async/await in client
- [X] Subscription object on subscribe
- [X] Error handler yields the subscription
- [ ] Use asyncio.Protocol
- [ ] Adopt async/await in client
- [ ] Subscription object on subscribe
- [ ] Error handler yields the subscription

0 comments on commit 12c85cc

Please sign in to comment.