Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tutorial code cleanup and refactoring. #108

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions examples/tutorial/reactivex/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,15 @@ def set_period(self, period: timedelta):
class ChatClient:
def __init__(self, rsocket: RSocketClient):
self._rsocket = rsocket
self._listen_task: Optional[Task] = None
self._statistics_task: Optional[Task] = None
self._session_id: Optional[str] = None
self._username: Optional[str] = None

async def login(self, username: str):
self._username = username
payload = Payload(ensure_bytes(username), composite(route('login')))
self._session_id = (await self._rsocket.request_response(payload)).data
return self
response = await self._rsocket.request_response(payload)

logging.info(f'Login response: {utf8_decode(response.data)}')

async def join(self, channel_name: str):
request = Payload(ensure_bytes(channel_name), composite(route('channel.join')))
Expand All @@ -60,19 +59,16 @@ async def leave(self, channel_name: str):
def listen_for_messages(self):
def print_message(data: bytes):
message = decode_dataclass(data, Message)
print(f'{self._username}: from {message.user} ({message.channel}): {message.content}')
logging.info(f'to {self._username}: from {message.user} (channel: {message.channel}): {message.content}')

async def listen_for_messages():
await ReactiveXClient(self._rsocket).request_stream(Payload(metadata=composite(
route('messages.incoming')
))).pipe(
operators.do_action(on_next=lambda value: print_message(value.data),
on_error=lambda exception: print(exception)))

self._listen_task = asyncio.create_task(listen_for_messages())
on_error=lambda exception: logging.error(exception)))

def stop_listening_for_messages(self):
self._listen_task.cancel()
return asyncio.create_task(listen_for_messages())

async def send_statistics(self):
memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
Expand All @@ -83,7 +79,7 @@ async def send_statistics(self):
def listen_for_statistics(self) -> StatisticsControl:
def print_statistics(value: bytes):
statistics = decode_dataclass(value, ServerStatistics)
print(f'users: {statistics.user_count}, channels: {statistics.channel_count}')
logging.info(f'users: {statistics.user_count}, channels: {statistics.channel_count}')

control = StatisticsControl()

Expand All @@ -97,7 +93,7 @@ async def listen_for_statistics():
lambda backpressure: observable_from_queue(control.queue, backpressure))
).pipe(
operators.do_action(on_next=lambda value: print_statistics(value.data),
on_error=lambda exception: print(exception)))
on_error=lambda exception: logging.error(exception)))

self._statistics_task = asyncio.create_task(listen_for_statistics())

Expand All @@ -107,14 +103,24 @@ def stop_listening_for_statistics(self):
self._statistics_task.cancel()

async def private_message(self, username: str, content: str):
print(f'Sending {content} to user {username}')
await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)),
composite(route('message'))))
logging.info(f'Sending "{content}" to user {username}')

request = Payload(
encode_dataclass(Message(username, content)),
composite(route('message'))
)

await self._rsocket.request_response(request)

async def channel_message(self, channel: str, content: str):
print(f'Sending {content} to channel {channel}')
await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)),
composite(route('message'))))
logging.info(f'Sending "{content}" to channel {channel}')

request = Payload(
encode_dataclass(Message(channel=channel, content=content)),
composite(route('message'))
)

await self._rsocket.request_response(request)

async def upload(self, file_name, content):
await self._rsocket.request_response(Payload(content, composite(
Expand Down Expand Up @@ -177,43 +183,43 @@ async def main():
await files_example(user1, user2)


async def messaging_example(user1, user2):
user1.listen_for_messages()
user2.listen_for_messages()
async def messaging_example(user1: ChatClient, user2: ChatClient):
message_subscriber1 = user1.listen_for_messages()
message_subscriber2 = user2.listen_for_messages()

channel_name = 'channel1'
await user1.join(channel_name)
await user2.join(channel_name)

print(f'Channels: {await user1.list_channels()}')
print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}')
logging.info(f'Channels: {await user1.list_channels()}')
logging.info(f'Channel: {channel_name} users: {await user1.list_channel_users(channel_name)}')

await user1.private_message('user2', 'private message from user1')
await user1.channel_message(channel_name, 'channel message from user1')

await asyncio.sleep(1)

await user1.leave(channel_name)
print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}')
logging.info(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}')

user1.stop_listening_for_messages()
user2.stop_listening_for_messages()
message_subscriber1.cancel()
message_subscriber2.cancel()


async def files_example(user1, user2):
async def files_example(user1: ChatClient, user2: ChatClient):
file_contents = b'abcdefg1234567'
file_name = 'file_name_1.txt'

await user1.upload(file_name, file_contents)

print(f'Files: {await user1.list_files()}')
logging.info(f'Files: {await user1.list_files()}')

download_data = await user2.download(file_name)

if download_data != file_contents:
raise Exception('File download failed')
else:
print(f'Downloaded file: {len(download_data)} bytes')
logging.info(f'Downloaded file: {len(download_data)} bytes')


async def statistics_example(user1):
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/reactivex/chat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __init__(self):

def remove(self):
if self._session is not None:
print(f'Removing session: {self._session.session_id}')
logging.info(f'Removing session: {self._session.session_id}')
del chat_data.user_session_by_id[self._session.session_id]

def router_factory(self):
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/step0/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def main():
async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client:
response = await client.request_response(Payload(data=b'George'))

print(f"Server response: {utf8_decode(response.data)}")
logging.info(f'Login response: {utf8_decode(response.data)}')


if __name__ == '__main__':
Expand Down
3 changes: 3 additions & 0 deletions examples/tutorial/step0/chat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
class Handler(BaseRequestHandler):
async def request_response(self, payload: Payload) -> Awaitable[Payload]:
username = utf8_decode(payload.data)

logging.info(f'New user: {username}')

return create_future(Payload(ensure_bytes(f'Welcome to chat, {username}')))


Expand Down
3 changes: 2 additions & 1 deletion examples/tutorial/step1/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ async def login(self, username: str):
self._username = username
payload = Payload(ensure_bytes(username), composite(route('login')))
response = await self._rsocket.request_response(payload)
print(f'Server response: {utf8_decode(response.data)}')

logging.info(f'Login response: {utf8_decode(response.data)}')


async def main():
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/step1/chat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def login(payload: Payload) -> Awaitable[Payload]:

logging.info(f'New user: {username}')

return create_response(ensure_bytes(f'Hello {username}'))
return create_response(ensure_bytes(f'Welcome to chat, {username}'))

return RoutingRequestHandler(router)

Expand Down
10 changes: 5 additions & 5 deletions examples/tutorial/step2/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from rsocket.extensions.helpers import composite, route
from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.frame_helpers import ensure_bytes
from rsocket.helpers import single_transport_provider
from rsocket.helpers import single_transport_provider, utf8_decode
from rsocket.payload import Payload
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP
Expand All @@ -14,14 +14,14 @@
class ChatClient:
def __init__(self, rsocket: RSocketClient):
self._rsocket = rsocket
self._session_id: Optional[str] = None
self._username: Optional[str] = None

async def login(self, username: str):
self._username = username
payload = Payload(ensure_bytes(username), composite(route('login')))
self._session_id = (await self._rsocket.request_response(payload)).data
return self
response = await self._rsocket.request_response(payload)

logging.info(f'Login response: {utf8_decode(response.data)}')


async def main():
Expand All @@ -31,7 +31,7 @@ async def main():
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client1:
user = ChatClient(client1)

await user.login('George')
await user.login('user1')


if __name__ == '__main__':
Expand Down
40 changes: 21 additions & 19 deletions examples/tutorial/step3/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from rsocket.extensions.helpers import composite, route
from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.frame_helpers import ensure_bytes
from rsocket.helpers import single_transport_provider
from rsocket.helpers import single_transport_provider, utf8_decode
from rsocket.payload import Payload
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP
Expand All @@ -17,44 +17,46 @@
class ChatClient:
def __init__(self, rsocket: RSocketClient):
self._rsocket = rsocket
self._message_subscriber: Optional = None
self._session_id: Optional[str] = None
self._username: Optional[str] = None

async def login(self, username: str):
self._username = username
payload = Payload(ensure_bytes(username), composite(route('login')))
self._session_id = (await self._rsocket.request_response(payload)).data
return self
response = await self._rsocket.request_response(payload)

logging.info(f'Login response: {utf8_decode(response.data)}')

def listen_for_messages(self):
def print_message(data: bytes):
message = decode_dataclass(data, Message)
print(f'{self._username}: from {message.user}: {message.content}')
logging.info(f'to {self._username}: from {message.user}: {message.content}')

class MessageListener(DefaultSubscriber, DefaultSubscription):

def on_next(self, value, is_complete=False):
print_message(value.data)

def on_error(self, exception: Exception):
print(exception)
logging.error(exception)

def cancel(self):
self.subscription.cancel()

self._message_subscriber = MessageListener()
message_subscriber = MessageListener()
self._rsocket.request_stream(
Payload(metadata=composite(route('messages.incoming')))
).subscribe(self._message_subscriber)

def stop_listening_for_messages(self):
self._message_subscriber.cancel()
).subscribe(message_subscriber)
return message_subscriber

async def private_message(self, username: str, content: str):
print(f'Sending {content} to user {username}')
await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)),
composite(route('message'))))
logging.info(f'Sending "{content}" to user {username}')

request = Payload(
encode_dataclass(Message(username, content)),
composite(route('message'))
)

await self._rsocket.request_response(request)


async def main():
Expand All @@ -76,15 +78,15 @@ async def main():


async def messaging_example(user1: ChatClient, user2: ChatClient):
user1.listen_for_messages()
user2.listen_for_messages()
message_subscriber1 = user1.listen_for_messages()
message_subscriber2 = user2.listen_for_messages()

await user1.private_message('user2', 'private message from user1')

await asyncio.sleep(1)

user1.stop_listening_for_messages()
user2.stop_listening_for_messages()
message_subscriber1.cancel()
message_subscriber2.cancel()


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/step3/chat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def __init__(self, session: UserSessionData):

def cancel(self):
if self._sender is not None:
logging.info('Canceling message sender task')
logging.info('Canceling incoming message sender task')
self._sender.cancel()
self._sender = None

Expand Down
Loading