Skip to content

Commit

Permalink
Add couple of extra unique token characters to requests
Browse files Browse the repository at this point in the history
This is to ensure that the NUID does not become shared state
for some reason and end up accidentally using the same response
token for a request.

Signed-off-by: Waldemar Quevedo <[email protected]>
  • Loading branch information
wallyqs committed Aug 18, 2022
1 parent 84ac62f commit ce35668
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 17 deletions.
15 changes: 7 additions & 8 deletions nats/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import ssl
import sys
import time
from secrets import token_hex
from dataclasses import dataclass
from email.parser import BytesParser
from random import shuffle
Expand Down Expand Up @@ -56,7 +57,7 @@
Subscription,
)

__version__ = '2.1.6'
__version__ = '2.1.7'
__lang__ = 'python3'
_logger = logging.getLogger(__name__)
PROTOCOL = 1
Expand Down Expand Up @@ -168,12 +169,8 @@ def __init__(self) -> None:
self._reconnection_task: Union[asyncio.Task[None], None] = None
self._reconnection_task_future: Optional[asyncio.Future] = None
self._max_payload: int = DEFAULT_MAX_PAYLOAD_SIZE
# This is the client id that the NATS server knows
# about. Useful in debugging application errors
# when logged with this identifier along
# with nats server log.
# This would make more sense if we log the server
# connected to as well in case of cluster setup.

# client id that the NATS server knows about.
self._client_id: Optional[str] = None
self._sid: int = 0
self._subs: Dict[int, Subscription] = {}
Expand Down Expand Up @@ -929,8 +926,10 @@ async def _request_new_style(
await self._init_request_sub()
assert self._resp_sub_prefix

# Use a new NUID for the token inbox and then use the future.
# Use a new NUID + couple of unique token bytes to identify the request,
# then use the future to get the response.
token = self._nuid.next()
token.extend(token_hex(2).encode())
inbox = self._resp_sub_prefix[:]
inbox.extend(token)
future: asyncio.Future = asyncio.Future()
Expand Down
10 changes: 4 additions & 6 deletions nats/nuid.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# limitations under the License.
#

from random import Random, SystemRandom
from random import Random
from sys import maxsize as MaxInt
from secrets import token_bytes, randbelow

DIGITS = b'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
BASE = 62
Expand All @@ -33,8 +34,7 @@ class NUID:
"""

def __init__(self) -> None:
self._srand = SystemRandom()
self._prand = Random(self._srand.randint(0, MaxInt))
self._prand = Random(randbelow(MaxInt))
self._seq = self._prand.randint(0, MAX_SEQ)
self._inc = MIN_INC + self._prand.randint(BASE + 1, INC)
self._prefix = bytearray()
Expand All @@ -60,9 +60,7 @@ def next(self) -> bytearray:
return prefix

def randomize_prefix(self) -> None:
random_bytes = (
self._srand.getrandbits(8) for i in range(PREFIX_LENGTH)
)
random_bytes = token_bytes(PREFIX_LENGTH)
self._prefix = bytearray(DIGITS[c % BASE] for c in random_bytes)

def reset_sequential(self) -> None:
Expand Down
16 changes: 16 additions & 0 deletions tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,22 @@ async def error_cb(err):
assert len(msgs) <= 100
assert sub.pending_msgs == 0
assert sub.pending_bytes == 0

# Consumer has a single message pending but none in buffer.
await js.publish("a3", b'last message')
info = await sub.consumer_info()
assert info.num_pending == 1
assert sub.pending_msgs == 0

# Remove interest
await sub.unsubscribe()
with pytest.raises(TimeoutError):
await sub.fetch(1, timeout=1)

# The pending message is still there, but not possible to consume.
info = await sub.consumer_info()
assert info.num_pending == 1

await nc.close()


Expand Down
6 changes: 3 additions & 3 deletions todo.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
- [X] io_loop becomes loop parameter
- [X] Drain Mode
- [X] Connect timeout
- [X] Adopt async/await in client
- [X] Subscription object on subscribe
- [X] Error handler yields the subscription
- [ ] Use asyncio.Protocol
- [ ] Adopt async/await in client
- [ ] Subscription object on subscribe
- [ ] Error handler yields the subscription

0 comments on commit ce35668

Please sign in to comment.