Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tutorial code cleanup and refactoring. #108

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
tutorial code cleanup and refactoring.
jell-o-fishi committed Jan 12, 2023
commit 781724590de9ed74b9e62557b858fc7506e7173a
64 changes: 35 additions & 29 deletions examples/tutorial/reactivex/chat_client.py
Original file line number Diff line number Diff line change
@@ -36,16 +36,15 @@ def set_period(self, period: timedelta):
class ChatClient:
def __init__(self, rsocket: RSocketClient):
self._rsocket = rsocket
self._listen_task: Optional[Task] = None
self._statistics_task: Optional[Task] = None
self._session_id: Optional[str] = None
self._username: Optional[str] = None

async def login(self, username: str):
self._username = username
payload = Payload(ensure_bytes(username), composite(route('login')))
self._session_id = (await self._rsocket.request_response(payload)).data
return self
response = await self._rsocket.request_response(payload)

logging.info(f'Login response: {utf8_decode(response.data)}')

async def join(self, channel_name: str):
request = Payload(ensure_bytes(channel_name), composite(route('channel.join')))
@@ -60,19 +59,16 @@ async def leave(self, channel_name: str):
def listen_for_messages(self):
def print_message(data: bytes):
message = decode_dataclass(data, Message)
print(f'{self._username}: from {message.user} ({message.channel}): {message.content}')
logging.info(f'to {self._username}: from {message.user} (channel: {message.channel}): {message.content}')

async def listen_for_messages():
await ReactiveXClient(self._rsocket).request_stream(Payload(metadata=composite(
route('messages.incoming')
))).pipe(
operators.do_action(on_next=lambda value: print_message(value.data),
on_error=lambda exception: print(exception)))

self._listen_task = asyncio.create_task(listen_for_messages())
on_error=lambda exception: logging.error(exception)))

def stop_listening_for_messages(self):
self._listen_task.cancel()
return asyncio.create_task(listen_for_messages())

async def send_statistics(self):
memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
@@ -83,7 +79,7 @@ async def send_statistics(self):
def listen_for_statistics(self) -> StatisticsControl:
def print_statistics(value: bytes):
statistics = decode_dataclass(value, ServerStatistics)
print(f'users: {statistics.user_count}, channels: {statistics.channel_count}')
logging.info(f'users: {statistics.user_count}, channels: {statistics.channel_count}')

control = StatisticsControl()

@@ -97,7 +93,7 @@ async def listen_for_statistics():
lambda backpressure: observable_from_queue(control.queue, backpressure))
).pipe(
operators.do_action(on_next=lambda value: print_statistics(value.data),
on_error=lambda exception: print(exception)))
on_error=lambda exception: logging.error(exception)))

self._statistics_task = asyncio.create_task(listen_for_statistics())

@@ -107,14 +103,24 @@ def stop_listening_for_statistics(self):
self._statistics_task.cancel()

async def private_message(self, username: str, content: str):
print(f'Sending {content} to user {username}')
await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)),
composite(route('message'))))
logging.info(f'Sending "{content}" to user {username}')

request = Payload(
encode_dataclass(Message(username, content)),
composite(route('message'))
)

await self._rsocket.request_response(request)

async def channel_message(self, channel: str, content: str):
print(f'Sending {content} to channel {channel}')
await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)),
composite(route('message'))))
logging.info(f'Sending "{content}" to channel {channel}')

request = Payload(
encode_dataclass(Message(channel=channel, content=content)),
composite(route('message'))
)

await self._rsocket.request_response(request)

async def upload(self, file_name, content):
await self._rsocket.request_response(Payload(content, composite(
@@ -176,43 +182,43 @@ async def main():
await files_example(user1, user2)


async def messaging_example(user1, user2):
user1.listen_for_messages()
user2.listen_for_messages()
async def messaging_example(user1: ChatClient, user2: ChatClient):
message_subscriber1 = user1.listen_for_messages()
message_subscriber2 = user2.listen_for_messages()

channel_name = 'channel1'
await user1.join(channel_name)
await user2.join(channel_name)

print(f'Channels: {await user1.list_channels()}')
print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}')
logging.info(f'Channels: {await user1.list_channels()}')
logging.info(f'Channel: {channel_name} users: {await user1.list_channel_users(channel_name)}')

await user1.private_message('user2', 'private message from user1')
await user1.channel_message(channel_name, 'channel message from user1')

await asyncio.sleep(1)

await user1.leave(channel_name)
print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}')
logging.info(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}')

user1.stop_listening_for_messages()
user2.stop_listening_for_messages()
message_subscriber1.cancel()
message_subscriber2.cancel()


async def files_example(user1, user2):
async def files_example(user1: ChatClient, user2: ChatClient):
file_contents = b'abcdefg1234567'
file_name = 'file_name_1.txt'

await user1.upload(file_name, file_contents)

print(f'Files: {await user1.list_files()}')
logging.info(f'Files: {await user1.list_files()}')

download_data = await user2.download(file_name)

if download_data != file_contents:
raise Exception('File download failed')
else:
print(f'Downloaded file: {len(download_data)} bytes')
logging.info(f'Downloaded file: {len(download_data)} bytes')


async def statistics_example(user1):
2 changes: 1 addition & 1 deletion examples/tutorial/reactivex/chat_server.py
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ def __init__(self):

def remove(self):
if self._session is not None:
print(f'Removing session: {self._session.session_id}')
logging.info(f'Removing session: {self._session.session_id}')
del chat_data.user_session_by_id[self._session.session_id]

def router_factory(self):
2 changes: 1 addition & 1 deletion examples/tutorial/step0/chat_client.py
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ async def main():
async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client:
response = await client.request_response(Payload(data=b'George'))

print(f"Server response: {utf8_decode(response.data)}")
logging.info(f'Login response: {utf8_decode(response.data)}')


if __name__ == '__main__':
3 changes: 3 additions & 0 deletions examples/tutorial/step0/chat_server.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,9 @@
class Handler(BaseRequestHandler):
async def request_response(self, payload: Payload) -> Awaitable[Payload]:
username = utf8_decode(payload.data)

logging.info(f'New user: {username}')

return create_future(Payload(ensure_bytes(f'Welcome to chat, {username}')))


3 changes: 2 additions & 1 deletion examples/tutorial/step1/chat_client.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,8 @@ async def login(self, username: str):
self._username = username
payload = Payload(ensure_bytes(username), composite(route('login')))
response = await self._rsocket.request_response(payload)
print(f'Server response: {utf8_decode(response.data)}')

logging.info(f'Login response: {utf8_decode(response.data)}')


async def main():
2 changes: 1 addition & 1 deletion examples/tutorial/step1/chat_server.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ async def login(payload: Payload) -> Awaitable[Payload]:

logging.info(f'New user: {username}')

return create_response(ensure_bytes(f'Hello {username}'))
return create_response(ensure_bytes(f'Welcome to chat, {username}'))

return RoutingRequestHandler(router)

10 changes: 5 additions & 5 deletions examples/tutorial/step2/chat_client.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
from rsocket.extensions.helpers import composite, route
from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.frame_helpers import ensure_bytes
from rsocket.helpers import single_transport_provider
from rsocket.helpers import single_transport_provider, utf8_decode
from rsocket.payload import Payload
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP
@@ -14,14 +14,14 @@
class ChatClient:
def __init__(self, rsocket: RSocketClient):
self._rsocket = rsocket
self._session_id: Optional[str] = None
self._username: Optional[str] = None

async def login(self, username: str):
self._username = username
payload = Payload(ensure_bytes(username), composite(route('login')))
self._session_id = (await self._rsocket.request_response(payload)).data
return self
response = await self._rsocket.request_response(payload)

logging.info(f'Login response: {utf8_decode(response.data)}')


async def main():
@@ -31,7 +31,7 @@ async def main():
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client1:
user = ChatClient(client1)

await user.login('George')
await user.login('user1')


if __name__ == '__main__':
40 changes: 21 additions & 19 deletions examples/tutorial/step3/chat_client.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
from rsocket.extensions.helpers import composite, route
from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.frame_helpers import ensure_bytes
from rsocket.helpers import single_transport_provider
from rsocket.helpers import single_transport_provider, utf8_decode
from rsocket.payload import Payload
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP
@@ -17,44 +17,46 @@
class ChatClient:
def __init__(self, rsocket: RSocketClient):
self._rsocket = rsocket
self._message_subscriber: Optional = None
self._session_id: Optional[str] = None
self._username: Optional[str] = None

async def login(self, username: str):
self._username = username
payload = Payload(ensure_bytes(username), composite(route('login')))
self._session_id = (await self._rsocket.request_response(payload)).data
return self
response = await self._rsocket.request_response(payload)

logging.info(f'Login response: {utf8_decode(response.data)}')

def listen_for_messages(self):
def print_message(data: bytes):
message = decode_dataclass(data, Message)
print(f'{self._username}: from {message.user}: {message.content}')
logging.info(f'to {self._username}: from {message.user}: {message.content}')

class MessageListener(DefaultSubscriber, DefaultSubscription):

def on_next(self, value, is_complete=False):
print_message(value.data)

def on_error(self, exception: Exception):
print(exception)
logging.error(exception)

def cancel(self):
self.subscription.cancel()

self._message_subscriber = MessageListener()
message_subscriber = MessageListener()
self._rsocket.request_stream(
Payload(metadata=composite(route('messages.incoming')))
).subscribe(self._message_subscriber)

def stop_listening_for_messages(self):
self._message_subscriber.cancel()
).subscribe(message_subscriber)
return message_subscriber

async def private_message(self, username: str, content: str):
print(f'Sending {content} to user {username}')
await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)),
composite(route('message'))))
logging.info(f'Sending "{content}" to user {username}')

request = Payload(
encode_dataclass(Message(username, content)),
composite(route('message'))
)

await self._rsocket.request_response(request)


async def main():
@@ -76,15 +78,15 @@ async def main():


async def messaging_example(user1: ChatClient, user2: ChatClient):
user1.listen_for_messages()
user2.listen_for_messages()
message_subscriber1 = user1.listen_for_messages()
message_subscriber2 = user2.listen_for_messages()

await user1.private_message('user2', 'private message from user1')

await asyncio.sleep(1)

user1.stop_listening_for_messages()
user2.stop_listening_for_messages()
message_subscriber1.cancel()
message_subscriber2.cancel()


if __name__ == '__main__':
2 changes: 1 addition & 1 deletion examples/tutorial/step3/chat_server.py
Original file line number Diff line number Diff line change
@@ -89,7 +89,7 @@ def __init__(self, session: UserSessionData):

def cancel(self):
if self._sender is not None:
logging.info('Canceling message sender task')
logging.info('Canceling incoming message sender task')
self._sender.cancel()
self._sender = None

Loading