Skip to content

add rpushb and xadd #149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions asyncio_redis/encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ class StringEncoder(BaseEncoder):

def encode_from_native(self, data):
""" string to bytes """
return data.encode(self.encoding)
if isinstance(data, str):
return data.encode(self.encoding)
return data

def decode_to_native(self, data):
""" bytes to string """
return data.decode(self.encoding)
if isinstance(data, bytes):
return data.decode(self.encoding)
return data


class UTF8Encoder(StringEncoder):
Expand Down
32 changes: 32 additions & 0 deletions asyncio_redis/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -1500,6 +1500,27 @@ def sadd(self, tr, key: NativeType, members: ListOf(NativeType)) -> int:
*map(self.encode_from_native, members),
)

@_query_command
def xadd(
self, tr, stream: NativeType, fields: dict, id: NativeType='*',
) -> NativeType:
"""
Appends a new entry to a stream.

:param stream: The name of the stream.
:param id: The ID of the entry. Use '*' to auto-generate an ID.
:param fields: A list of field-value pairs.
:return: The ID of the added entry.
"""
flattened_list = [item for sublist in fields.items() for item in sublist]
return self._query(
tr,
b"xadd",
self.encode_from_native(stream),
self.encode_from_native(id),
*map(self.encode_from_native, flattened_list),
)

@_query_command
def srem(self, tr, key: NativeType, members: ListOf(NativeType)) -> int:
"""Remove one or more members from a set
Expand Down Expand Up @@ -1646,6 +1667,17 @@ def rpush(self, tr, key: NativeType, values: ListOf(NativeType)) -> int:
*map(self.encode_from_native, values),
)

@_query_command
def rpushb(self, tr, key: NativeType, values: bytes) -> int:
"""Append one or multiple values to a list
"""
return self._query(
tr,
b"rpush",
self.encode_from_native(key),
values,
)

@_query_command
def rpushx(self, tr, key: NativeType, value: NativeType) -> int:
"""Append a value to a list, only if the list exists
Expand Down