diff --git a/examples/tutorial/reactivex/chat_client.py b/examples/tutorial/reactivex/chat_client.py index 2065335d..8fe432b2 100644 --- a/examples/tutorial/reactivex/chat_client.py +++ b/examples/tutorial/reactivex/chat_client.py @@ -36,16 +36,15 @@ def set_period(self, period: timedelta): class ChatClient: def __init__(self, rsocket: RSocketClient): self._rsocket = rsocket - self._listen_task: Optional[Task] = None self._statistics_task: Optional[Task] = None - self._session_id: Optional[str] = None self._username: Optional[str] = None async def login(self, username: str): self._username = username payload = Payload(ensure_bytes(username), composite(route('login'))) - self._session_id = (await self._rsocket.request_response(payload)).data - return self + response = await self._rsocket.request_response(payload) + + logging.info(f'Login response: {utf8_decode(response.data)}') async def join(self, channel_name: str): request = Payload(ensure_bytes(channel_name), composite(route('channel.join'))) @@ -60,19 +59,16 @@ async def leave(self, channel_name: str): def listen_for_messages(self): def print_message(data: bytes): message = decode_dataclass(data, Message) - print(f'{self._username}: from {message.user} ({message.channel}): {message.content}') + logging.info(f'to {self._username}: from {message.user} (channel: {message.channel}): {message.content}') async def listen_for_messages(): await ReactiveXClient(self._rsocket).request_stream(Payload(metadata=composite( route('messages.incoming') ))).pipe( operators.do_action(on_next=lambda value: print_message(value.data), - on_error=lambda exception: print(exception))) - - self._listen_task = asyncio.create_task(listen_for_messages()) + on_error=lambda exception: logging.error(exception))) - def stop_listening_for_messages(self): - self._listen_task.cancel() + return asyncio.create_task(listen_for_messages()) async def send_statistics(self): memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss @@ -83,7 +79,7 @@ async def send_statistics(self): def listen_for_statistics(self) -> StatisticsControl: def print_statistics(value: bytes): statistics = decode_dataclass(value, ServerStatistics) - print(f'users: {statistics.user_count}, channels: {statistics.channel_count}') + logging.info(f'users: {statistics.user_count}, channels: {statistics.channel_count}') control = StatisticsControl() @@ -97,7 +93,7 @@ async def listen_for_statistics(): lambda backpressure: observable_from_queue(control.queue, backpressure)) ).pipe( operators.do_action(on_next=lambda value: print_statistics(value.data), - on_error=lambda exception: print(exception))) + on_error=lambda exception: logging.error(exception))) self._statistics_task = asyncio.create_task(listen_for_statistics()) @@ -107,14 +103,24 @@ def stop_listening_for_statistics(self): self._statistics_task.cancel() async def private_message(self, username: str, content: str): - print(f'Sending {content} to user {username}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to user {username}') + + request = Payload( + encode_dataclass(Message(username, content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def channel_message(self, channel: str, content: str): - print(f'Sending {content} to channel {channel}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to channel {channel}') + + request = Payload( + encode_dataclass(Message(channel=channel, content=content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def upload(self, file_name, content): await self._rsocket.request_response(Payload(content, composite( @@ -177,16 +183,16 @@ async def main(): await files_example(user1, user2) -async def messaging_example(user1, user2): - user1.listen_for_messages() - user2.listen_for_messages() +async def messaging_example(user1: ChatClient, user2: ChatClient): + message_subscriber1 = user1.listen_for_messages() + message_subscriber2 = user2.listen_for_messages() channel_name = 'channel1' await user1.join(channel_name) await user2.join(channel_name) - print(f'Channels: {await user1.list_channels()}') - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channels: {await user1.list_channels()}') + logging.info(f'Channel: {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') await user1.channel_message(channel_name, 'channel message from user1') @@ -194,26 +200,26 @@ async def messaging_example(user1, user2): await asyncio.sleep(1) await user1.leave(channel_name) - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') - user1.stop_listening_for_messages() - user2.stop_listening_for_messages() + message_subscriber1.cancel() + message_subscriber2.cancel() -async def files_example(user1, user2): +async def files_example(user1: ChatClient, user2: ChatClient): file_contents = b'abcdefg1234567' file_name = 'file_name_1.txt' await user1.upload(file_name, file_contents) - print(f'Files: {await user1.list_files()}') + logging.info(f'Files: {await user1.list_files()}') download_data = await user2.download(file_name) if download_data != file_contents: raise Exception('File download failed') else: - print(f'Downloaded file: {len(download_data)} bytes') + logging.info(f'Downloaded file: {len(download_data)} bytes') async def statistics_example(user1): diff --git a/examples/tutorial/reactivex/chat_server.py b/examples/tutorial/reactivex/chat_server.py index 8d70e720..6a8e27f4 100644 --- a/examples/tutorial/reactivex/chat_server.py +++ b/examples/tutorial/reactivex/chat_server.py @@ -111,7 +111,7 @@ def __init__(self): def remove(self): if self._session is not None: - print(f'Removing session: {self._session.session_id}') + logging.info(f'Removing session: {self._session.session_id}') del chat_data.user_session_by_id[self._session.session_id] def router_factory(self): diff --git a/examples/tutorial/step0/chat_client.py b/examples/tutorial/step0/chat_client.py index 2602069e..966434b1 100644 --- a/examples/tutorial/step0/chat_client.py +++ b/examples/tutorial/step0/chat_client.py @@ -13,7 +13,7 @@ async def main(): async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client: response = await client.request_response(Payload(data=b'George')) - print(f"Server response: {utf8_decode(response.data)}") + logging.info(f'Login response: {utf8_decode(response.data)}') if __name__ == '__main__': diff --git a/examples/tutorial/step0/chat_server.py b/examples/tutorial/step0/chat_server.py index ae908972..2db56516 100644 --- a/examples/tutorial/step0/chat_server.py +++ b/examples/tutorial/step0/chat_server.py @@ -13,6 +13,9 @@ class Handler(BaseRequestHandler): async def request_response(self, payload: Payload) -> Awaitable[Payload]: username = utf8_decode(payload.data) + + logging.info(f'New user: {username}') + return create_future(Payload(ensure_bytes(f'Welcome to chat, {username}'))) diff --git a/examples/tutorial/step1/chat_client.py b/examples/tutorial/step1/chat_client.py index d3347fec..6f1b67ce 100644 --- a/examples/tutorial/step1/chat_client.py +++ b/examples/tutorial/step1/chat_client.py @@ -20,7 +20,8 @@ async def login(self, username: str): self._username = username payload = Payload(ensure_bytes(username), composite(route('login'))) response = await self._rsocket.request_response(payload) - print(f'Server response: {utf8_decode(response.data)}') + + logging.info(f'Login response: {utf8_decode(response.data)}') async def main(): diff --git a/examples/tutorial/step1/chat_server.py b/examples/tutorial/step1/chat_server.py index 93d24d62..5a85c59f 100644 --- a/examples/tutorial/step1/chat_server.py +++ b/examples/tutorial/step1/chat_server.py @@ -20,7 +20,7 @@ async def login(payload: Payload) -> Awaitable[Payload]: logging.info(f'New user: {username}') - return create_response(ensure_bytes(f'Hello {username}')) + return create_response(ensure_bytes(f'Welcome to chat, {username}')) return RoutingRequestHandler(router) diff --git a/examples/tutorial/step2/chat_client.py b/examples/tutorial/step2/chat_client.py index 26b24344..6f1b67ce 100644 --- a/examples/tutorial/step2/chat_client.py +++ b/examples/tutorial/step2/chat_client.py @@ -5,7 +5,7 @@ from rsocket.extensions.helpers import composite, route from rsocket.extensions.mimetypes import WellKnownMimeTypes from rsocket.frame_helpers import ensure_bytes -from rsocket.helpers import single_transport_provider +from rsocket.helpers import single_transport_provider, utf8_decode from rsocket.payload import Payload from rsocket.rsocket_client import RSocketClient from rsocket.transports.tcp import TransportTCP @@ -14,14 +14,14 @@ class ChatClient: def __init__(self, rsocket: RSocketClient): self._rsocket = rsocket - self._session_id: Optional[str] = None self._username: Optional[str] = None async def login(self, username: str): self._username = username payload = Payload(ensure_bytes(username), composite(route('login'))) - self._session_id = (await self._rsocket.request_response(payload)).data - return self + response = await self._rsocket.request_response(payload) + + logging.info(f'Login response: {utf8_decode(response.data)}') async def main(): @@ -31,7 +31,7 @@ async def main(): metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client1: user = ChatClient(client1) - await user.login('George') + await user.login('user1') if __name__ == '__main__': diff --git a/examples/tutorial/step3/chat_client.py b/examples/tutorial/step3/chat_client.py index ae50a9c9..4881555e 100644 --- a/examples/tutorial/step3/chat_client.py +++ b/examples/tutorial/step3/chat_client.py @@ -8,7 +8,7 @@ from rsocket.extensions.helpers import composite, route from rsocket.extensions.mimetypes import WellKnownMimeTypes from rsocket.frame_helpers import ensure_bytes -from rsocket.helpers import single_transport_provider +from rsocket.helpers import single_transport_provider, utf8_decode from rsocket.payload import Payload from rsocket.rsocket_client import RSocketClient from rsocket.transports.tcp import TransportTCP @@ -17,20 +17,19 @@ class ChatClient: def __init__(self, rsocket: RSocketClient): self._rsocket = rsocket - self._message_subscriber: Optional = None - self._session_id: Optional[str] = None self._username: Optional[str] = None async def login(self, username: str): self._username = username payload = Payload(ensure_bytes(username), composite(route('login'))) - self._session_id = (await self._rsocket.request_response(payload)).data - return self + response = await self._rsocket.request_response(payload) + + logging.info(f'Login response: {utf8_decode(response.data)}') def listen_for_messages(self): def print_message(data: bytes): message = decode_dataclass(data, Message) - print(f'{self._username}: from {message.user}: {message.content}') + logging.info(f'to {self._username}: from {message.user}: {message.content}') class MessageListener(DefaultSubscriber, DefaultSubscription): @@ -38,23 +37,26 @@ def on_next(self, value, is_complete=False): print_message(value.data) def on_error(self, exception: Exception): - print(exception) + logging.error(exception) def cancel(self): self.subscription.cancel() - self._message_subscriber = MessageListener() + message_subscriber = MessageListener() self._rsocket.request_stream( Payload(metadata=composite(route('messages.incoming'))) - ).subscribe(self._message_subscriber) - - def stop_listening_for_messages(self): - self._message_subscriber.cancel() + ).subscribe(message_subscriber) + return message_subscriber async def private_message(self, username: str, content: str): - print(f'Sending {content} to user {username}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to user {username}') + + request = Payload( + encode_dataclass(Message(username, content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def main(): @@ -76,15 +78,15 @@ async def main(): async def messaging_example(user1: ChatClient, user2: ChatClient): - user1.listen_for_messages() - user2.listen_for_messages() + message_subscriber1 = user1.listen_for_messages() + message_subscriber2 = user2.listen_for_messages() await user1.private_message('user2', 'private message from user1') await asyncio.sleep(1) - user1.stop_listening_for_messages() - user2.stop_listening_for_messages() + message_subscriber1.cancel() + message_subscriber2.cancel() if __name__ == '__main__': diff --git a/examples/tutorial/step3/chat_server.py b/examples/tutorial/step3/chat_server.py index cbb84efc..56864a84 100644 --- a/examples/tutorial/step3/chat_server.py +++ b/examples/tutorial/step3/chat_server.py @@ -89,7 +89,7 @@ def __init__(self, session: UserSessionData): def cancel(self): if self._sender is not None: - logging.info('Canceling message sender task') + logging.info('Canceling incoming message sender task') self._sender.cancel() self._sender = None diff --git a/examples/tutorial/step4/chat_client.py b/examples/tutorial/step4/chat_client.py index 7c726494..17edc0d0 100644 --- a/examples/tutorial/step4/chat_client.py +++ b/examples/tutorial/step4/chat_client.py @@ -18,15 +18,14 @@ class ChatClient: def __init__(self, rsocket: RSocketClient): self._rsocket = rsocket - self._message_subscriber: Optional = None - self._session_id: Optional[str] = None self._username: Optional[str] = None async def login(self, username: str): self._username = username payload = Payload(ensure_bytes(username), composite(route('login'))) - self._session_id = (await self._rsocket.request_response(payload)).data - return self + response = await self._rsocket.request_response(payload) + + logging.info(f'Login response: {utf8_decode(response.data)}') async def join(self, channel_name: str): request = Payload(ensure_bytes(channel_name), composite(route('channel.join'))) @@ -41,7 +40,7 @@ async def leave(self, channel_name: str): def listen_for_messages(self): def print_message(data: bytes): message = decode_dataclass(data, Message) - print(f'{self._username}: from {message.user} ({message.channel}): {message.content}') + logging.info(f'to {self._username}: from {message.user} (channel: {message.channel}): {message.content}') class MessageListener(DefaultSubscriber, DefaultSubscription): @@ -49,28 +48,36 @@ def on_next(self, value, is_complete=False): print_message(value.data) def on_error(self, exception: Exception): - print(exception) + logging.error(exception) def cancel(self): self.subscription.cancel() - self._message_subscriber = MessageListener() + message_subscriber = MessageListener() self._rsocket.request_stream( Payload(metadata=composite(route('messages.incoming'))) - ).subscribe(self._message_subscriber) - - def stop_listening_for_messages(self): - self._message_subscriber.cancel() + ).subscribe(message_subscriber) + return message_subscriber async def private_message(self, username: str, content: str): - print(f'Sending {content} to user {username}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to user {username}') + + request = Payload( + encode_dataclass(Message(username, content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def channel_message(self, channel: str, content: str): - print(f'Sending {content} to channel {channel}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to channel {channel}') + + request = Payload( + encode_dataclass(Message(channel=channel, content=content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def list_channels(self) -> List[str]: request = Payload(metadata=composite(route('channels'))) @@ -102,16 +109,16 @@ async def main(): async def messaging_example(user1: ChatClient, user2: ChatClient): - user1.listen_for_messages() - user2.listen_for_messages() + message_subscriber1 = user1.listen_for_messages() + message_subscriber2 = user2.listen_for_messages() channel_name = 'channel1' await user1.join(channel_name) await user2.join(channel_name) - print(f'Channels: {await user1.list_channels()}') - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channels: {await user1.list_channels()}') + logging.info(f'Channel: {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') await user1.channel_message(channel_name, 'channel message from user1') @@ -119,10 +126,10 @@ async def messaging_example(user1: ChatClient, user2: ChatClient): await asyncio.sleep(1) await user1.leave(channel_name) - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') - user1.stop_listening_for_messages() - user2.stop_listening_for_messages() + message_subscriber1.cancel() + message_subscriber2.cancel() if __name__ == '__main__': diff --git a/examples/tutorial/step4/chat_server.py b/examples/tutorial/step4/chat_server.py index 842f4159..3a03a2b5 100644 --- a/examples/tutorial/step4/chat_server.py +++ b/examples/tutorial/step4/chat_server.py @@ -7,7 +7,7 @@ from typing import Dict, Optional, Set, Awaitable from weakref import WeakValueDictionary, WeakSet -from examples.tutorial.step4.shared import (Message, chat_filename_mimetype, dataclass_to_payload, decode_payload) +from examples.tutorial.step4.shared import (Message, dataclass_to_payload, decode_payload) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber from reactivestreams.subscription import DefaultSubscription @@ -79,10 +79,6 @@ async def channel_message_delivery(channel_name: str): logging.error(str(exception), exc_info=True) -def get_file_name(composite_metadata): - return utf8_decode(composite_metadata.find_by_mimetype(chat_filename_mimetype)[0].content) - - class ChatUserSession: def __init__(self): @@ -94,7 +90,9 @@ def router_factory(self): @router.response('login') async def login(payload: Payload) -> Awaitable[Payload]: username = utf8_decode(payload.data) + logging.info(f'New user: {username}') + session_id = SessionId(uuid.uuid4()) self._session = UserSessionData(username, session_id) chat_data.user_session_by_id[session_id] = self._session @@ -142,7 +140,7 @@ def __init__(self, session: UserSessionData): def cancel(self): if self._sender is not None: - logging.info('Canceling message sender task') + logging.info('Canceling incoming message sender task') self._sender.cancel() self._sender = None @@ -185,7 +183,9 @@ def handler_factory(): async def run_server(): def session(*connection): - RSocketServer(TransportTCP(*connection), handler_factory=handler_factory) + RSocketServer(TransportTCP(*connection), + handler_factory=handler_factory + ) async with await asyncio.start_server(session, 'localhost', 6565) as server: await server.serve_forever() diff --git a/examples/tutorial/step4/shared.py b/examples/tutorial/step4/shared.py index eb6318a2..911a85db 100644 --- a/examples/tutorial/step4/shared.py +++ b/examples/tutorial/step4/shared.py @@ -14,9 +14,6 @@ class Message: channel: Optional[str] = None -chat_filename_mimetype = b'chat/file-name' - - def encode_dataclass(obj): return ensure_bytes(json.dumps(obj.__dict__)) diff --git a/examples/tutorial/step5/chat_client.py b/examples/tutorial/step5/chat_client.py index a3834af8..a331e737 100644 --- a/examples/tutorial/step5/chat_client.py +++ b/examples/tutorial/step5/chat_client.py @@ -18,15 +18,14 @@ class ChatClient: def __init__(self, rsocket: RSocketClient): self._rsocket = rsocket - self._message_subscriber: Optional = None - self._session_id: Optional[str] = None self._username: Optional[str] = None async def login(self, username: str): self._username = username payload = Payload(ensure_bytes(username), composite(route('login'))) - self._session_id = (await self._rsocket.request_response(payload)).data - return self + response = await self._rsocket.request_response(payload) + + logging.info(f'Login response: {utf8_decode(response.data)}') async def join(self, channel_name: str): request = Payload(ensure_bytes(channel_name), composite(route('channel.join'))) @@ -41,7 +40,7 @@ async def leave(self, channel_name: str): def listen_for_messages(self): def print_message(data: bytes): message = decode_dataclass(data, Message) - print(f'{self._username}: from {message.user} ({message.channel}): {message.content}') + logging.info(f'to {self._username}: from {message.user} (channel: {message.channel}): {message.content}') class MessageListener(DefaultSubscriber, DefaultSubscription): @@ -49,28 +48,36 @@ def on_next(self, value, is_complete=False): print_message(value.data) def on_error(self, exception: Exception): - print(exception) + logging.error(exception) def cancel(self): self.subscription.cancel() - self._message_subscriber = MessageListener() + message_subscriber = MessageListener() self._rsocket.request_stream( Payload(metadata=composite(route('messages.incoming'))) - ).subscribe(self._message_subscriber) - - def stop_listening_for_messages(self): - self._message_subscriber.cancel() + ).subscribe(message_subscriber) + return message_subscriber async def private_message(self, username: str, content: str): - print(f'Sending {content} to user {username}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to user {username}') + + request = Payload( + encode_dataclass(Message(username, content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def channel_message(self, channel: str, content: str): - print(f'Sending {content} to channel {channel}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to channel {channel}') + + request = Payload( + encode_dataclass(Message(channel=channel, content=content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def upload(self, file_name, content): await self._rsocket.request_response(Payload(content, composite( @@ -120,16 +127,16 @@ async def main(): async def messaging_example(user1: ChatClient, user2: ChatClient): - user1.listen_for_messages() - user2.listen_for_messages() + message_subscriber1 = user1.listen_for_messages() + message_subscriber2 = user2.listen_for_messages() channel_name = 'channel1' await user1.join(channel_name) await user2.join(channel_name) - print(f'Channels: {await user1.list_channels()}') - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channels: {await user1.list_channels()}') + logging.info(f'Channel: {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') await user1.channel_message(channel_name, 'channel message from user1') @@ -137,10 +144,10 @@ async def messaging_example(user1: ChatClient, user2: ChatClient): await asyncio.sleep(1) await user1.leave(channel_name) - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') - user1.stop_listening_for_messages() - user2.stop_listening_for_messages() + message_subscriber1.cancel() + message_subscriber2.cancel() async def files_example(user1: ChatClient, user2: ChatClient): @@ -149,14 +156,14 @@ async def files_example(user1: ChatClient, user2: ChatClient): await user1.upload(file_name, file_contents) - print(f'Files: {await user1.list_files()}') + logging.info(f'Files: {await user1.list_files()}') download = await user2.download(file_name) if download.data != file_contents: raise Exception('File download failed') else: - print(f'Downloaded file: {len(download.data)} bytes') + logging.info(f'Downloaded file: {len(download.data)} bytes') if __name__ == '__main__': diff --git a/examples/tutorial/step5/chat_server.py b/examples/tutorial/step5/chat_server.py index 395cac4a..8added9b 100644 --- a/examples/tutorial/step5/chat_server.py +++ b/examples/tutorial/step5/chat_server.py @@ -97,6 +97,7 @@ def router_factory(self): @router.response('login') async def login(username: str) -> Awaitable[Payload]: logging.info(f'New user: {username}') + session_id = SessionId(uuid.uuid4()) self._session = UserSessionData(username, session_id) chat_data.user_session_by_id[session_id] = self._session @@ -114,18 +115,6 @@ async def leave_channel(channel_name: str) -> Awaitable[Payload]: chat_data.channel_users[channel_name].discard(self._session.session_id) return create_response() - @router.stream('channel.users') - async def get_channel_users(channel_name: str) -> Publisher: - if channel_name not in chat_data.channel_users: - return EmptyStream() - - count = len(chat_data.channel_users[channel_name]) - generator = ((Payload(ensure_bytes(find_username_by_session(session_id))), index == count) for - (index, session_id) in - enumerate(chat_data.channel_users[channel_name], 1)) - - return StreamFromGenerator(lambda: generator) - @router.response('file.upload') async def upload_file(payload: Payload, composite_metadata: CompositeMetadata) -> Awaitable[Payload]: chat_data.files[get_file_name(composite_metadata)] = payload.data @@ -174,7 +163,7 @@ def __init__(self, session: UserSessionData): def cancel(self): if self._sender is not None: - logging.info('Canceling message sender task') + logging.info('Canceling incoming message sender task') self._sender.cancel() self._sender = None @@ -190,6 +179,18 @@ async def _message_sender(self): return MessagePublisher(self._session) + @router.stream('channel.users') + async def get_channel_users(channel_name: str) -> Publisher: + if channel_name not in chat_data.channel_users: + return EmptyStream() + + count = len(chat_data.channel_users[channel_name]) + generator = ((Payload(ensure_bytes(find_username_by_session(session_id))), index == count) for + (index, session_id) in + enumerate(chat_data.channel_users[channel_name], 1)) + + return StreamFromGenerator(lambda: generator) + return router @@ -207,7 +208,8 @@ async def run_server(): def session(*connection): RSocketServer(TransportTCP(*connection), handler_factory=handler_factory, - fragment_size_bytes=1_000_000) + fragment_size_bytes=1_000_000 + ) async with await asyncio.start_server(session, 'localhost', 6565) as server: await server.serve_forever() diff --git a/examples/tutorial/step5/shared.py b/examples/tutorial/step5/shared.py index eb6318a2..bc6a2642 100644 --- a/examples/tutorial/step5/shared.py +++ b/examples/tutorial/step5/shared.py @@ -17,7 +17,7 @@ class Message: chat_filename_mimetype = b'chat/file-name' -def encode_dataclass(obj): +def encode_dataclass(obj) -> bytes: return ensure_bytes(json.dumps(obj.__dict__)) diff --git a/examples/tutorial/step6/chat_client.py b/examples/tutorial/step6/chat_client.py index af315d45..148f0cd5 100644 --- a/examples/tutorial/step6/chat_client.py +++ b/examples/tutorial/step6/chat_client.py @@ -29,7 +29,7 @@ def __init__(self): def on_next(self, payload: Payload, is_complete=False): statistics = decode_dataclass(payload.data, ServerStatistics) - print(statistics) + logging.info(statistics) if is_complete: self.done.set() @@ -51,16 +51,14 @@ def set_period(self, period: timedelta): class ChatClient: def __init__(self, rsocket: RSocketClient): self._rsocket = rsocket - self._message_subscriber: Optional = None - self._statistics_subscriber: Optional[StatisticsHandler] = None - self._session_id: Optional[str] = None self._username: Optional[str] = None async def login(self, username: str): self._username = username payload = Payload(ensure_bytes(username), composite(route('login'))) - self._session_id = (await self._rsocket.request_response(payload)).data - return self + response = await self._rsocket.request_response(payload) + + logging.info(f'Login response: {utf8_decode(response.data)}') async def join(self, channel_name: str): request = Payload(ensure_bytes(channel_name), composite(route('channel.join'))) @@ -75,7 +73,7 @@ async def leave(self, channel_name: str): def listen_for_messages(self): def print_message(data: bytes): message = decode_dataclass(data, Message) - print(f'{self._username}: from {message.user} ({message.channel}): {message.content}') + logging.info(f'to {self._username}: from {message.user} (channel: {message.channel}): {message.content}') class MessageListener(DefaultSubscriber, DefaultSubscription): @@ -83,18 +81,16 @@ def on_next(self, value, is_complete=False): print_message(value.data) def on_error(self, exception: Exception): - print(exception) + logging.error(exception) def cancel(self): self.subscription.cancel() - self._message_subscriber = MessageListener() + message_subscriber = MessageListener() self._rsocket.request_stream( Payload(metadata=composite(route('messages.incoming'))) - ).subscribe(self._message_subscriber) - - def stop_listening_for_messages(self): - self._message_subscriber.cancel() + ).subscribe(message_subscriber) + return message_subscriber async def send_statistics(self): memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss @@ -103,24 +99,35 @@ async def send_statistics(self): await self._rsocket.fire_and_forget(payload) def listen_for_statistics(self) -> StatisticsHandler: - self._statistics_subscriber = StatisticsHandler() - self._rsocket.request_channel(Payload(metadata=composite( - route('statistics') - )), publisher=self._statistics_subscriber).subscribe(self._statistics_subscriber) - return self._statistics_subscriber + statistics_subscriber = StatisticsHandler() + + request = Payload(metadata=composite(route('statistics'))) - def stop_listening_for_statistics(self): - self._statistics_subscriber.cancel() + response = self._rsocket.request_channel(request, publisher=statistics_subscriber) + + response.subscribe(statistics_subscriber) + + return statistics_subscriber async def private_message(self, username: str, content: str): - print(f'Sending {content} to user {username}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to user {username}') + + request = Payload( + encode_dataclass(Message(username, content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def channel_message(self, channel: str, content: str): - print(f'Sending {content} to channel {channel}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to channel {channel}') + + request = Payload( + encode_dataclass(Message(channel=channel, content=content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def upload(self, file_name, content): await self._rsocket.request_response(Payload(content, composite( @@ -166,21 +173,21 @@ async def main(): await user2.login('user2') await messaging_example(user1, user2) - await statistics_example(user1) await files_example(user1, user2) + await statistics_example(user1) async def messaging_example(user1: ChatClient, user2: ChatClient): - user1.listen_for_messages() - user2.listen_for_messages() + message_subscriber1 = user1.listen_for_messages() + message_subscriber2 = user2.listen_for_messages() channel_name = 'channel1' await user1.join(channel_name) await user2.join(channel_name) - print(f'Channels: {await user1.list_channels()}') - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channels: {await user1.list_channels()}') + logging.info(f'Channel: {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') await user1.channel_message(channel_name, 'channel message from user1') @@ -188,26 +195,26 @@ async def messaging_example(user1: ChatClient, user2: ChatClient): await asyncio.sleep(1) await user1.leave(channel_name) - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') - user1.stop_listening_for_messages() - user2.stop_listening_for_messages() + message_subscriber1.cancel() + message_subscriber2.cancel() -async def files_example(user1, user2): +async def files_example(user1: ChatClient, user2: ChatClient): file_contents = b'abcdefg1234567' file_name = 'file_name_1.txt' await user1.upload(file_name, file_contents) - print(f'Files: {await user1.list_files()}') + logging.info(f'Files: {await user1.list_files()}') download = await user2.download(file_name) if download.data != file_contents: raise Exception('File download failed') else: - print(f'Downloaded file: {len(download.data)} bytes') + logging.info(f'Downloaded file: {len(download.data)} bytes') async def statistics_example(user1): @@ -221,7 +228,7 @@ async def statistics_example(user1): await asyncio.sleep(5) - user1.stop_listening_for_statistics() + statistics_control.cancel() if __name__ == '__main__': diff --git a/examples/tutorial/step6/chat_server.py b/examples/tutorial/step6/chat_server.py index 9dbe60d8..ee9840f8 100644 --- a/examples/tutorial/step6/chat_server.py +++ b/examples/tutorial/step6/chat_server.py @@ -33,23 +33,23 @@ class SessionId(str): # allow weak reference @dataclass() class UserSessionData: username: str - session_id: str + session_id: SessionId messages: Queue = field(default_factory=Queue) statistics: Optional[ClientStatistics] = None @dataclass(frozen=True) class ChatData: - channel_users: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(WeakSet)) + channel_users: Dict[str, Set[SessionId]] = field(default_factory=lambda: defaultdict(WeakSet)) files: Dict[str, bytes] = field(default_factory=dict) channel_messages: Dict[str, Queue] = field(default_factory=lambda: defaultdict(Queue)) - user_session_by_id: Dict[str, UserSessionData] = field(default_factory=WeakValueDictionary) + user_session_by_id: Dict[SessionId, UserSessionData] = field(default_factory=WeakValueDictionary) chat_data = ChatData() -def ensure_channel_exists(channel_name): +def ensure_channel_exists(channel_name: str): if channel_name not in chat_data.channel_users: chat_data.channel_users[channel_name] = WeakSet() chat_data.channel_messages[channel_name] = Queue() @@ -206,11 +206,13 @@ async def messages_incoming() -> Publisher: class MessagePublisher(DefaultPublisher, DefaultSubscription): def __init__(self, session: UserSessionData): self._session = session + self._sender: Optional[Task] = None def cancel(self): if self._sender is not None: - logging.info('Canceling message sender task') + logging.info('Canceling incoming message sender task') self._sender.cancel() + self._sender = None def subscribe(self, subscriber: Subscriber): super(MessagePublisher, self).subscribe(subscriber) diff --git a/examples/tutorial/step7/chat_client.py b/examples/tutorial/step7/chat_client.py index 29496904..61b63416 100644 --- a/examples/tutorial/step7/chat_client.py +++ b/examples/tutorial/step7/chat_client.py @@ -32,7 +32,7 @@ def __init__(self): def on_next(self, payload: Payload, is_complete=False): statistics = decode_dataclass(payload.data, ServerStatistics) - print(statistics) + logging.info(statistics) if is_complete: self.done.set() @@ -54,16 +54,14 @@ def set_period(self, period: timedelta): class ChatClient: def __init__(self, rsocket: RSocketClient): self._rsocket = rsocket - self._message_subscriber: Optional = None - self._statistics_subscriber: Optional[StatisticsHandler] = None - self._session_id: Optional[str] = None self._username: Optional[str] = None async def login(self, username: str): self._username = username payload = Payload(ensure_bytes(username), composite(route('login'))) - self._session_id = (await self._rsocket.request_response(payload)).data - return self + response = await self._rsocket.request_response(payload) + + logging.info(f'Login response: {utf8_decode(response.data)}') async def join(self, channel_name: str): request = Payload(ensure_bytes(channel_name), composite(route('channel.join'))) @@ -78,7 +76,7 @@ async def leave(self, channel_name: str): def listen_for_messages(self): def print_message(data: bytes): message = decode_dataclass(data, Message) - print(f'{self._username}: from {message.user} ({message.channel}): {message.content}') + logging.info(f'to {self._username}: from {message.user} (channel: {message.channel}): {message.content}') class MessageListener(DefaultSubscriber, DefaultSubscription): @@ -86,18 +84,16 @@ def on_next(self, value, is_complete=False): print_message(value.data) def on_error(self, exception: Exception): - print(exception) + logging.error(exception) def cancel(self): self.subscription.cancel() - self._message_subscriber = MessageListener() + message_subscriber = MessageListener() self._rsocket.request_stream( Payload(metadata=composite(route('messages.incoming'))) - ).subscribe(self._message_subscriber) - - def stop_listening_for_messages(self): - self._message_subscriber.cancel() + ).subscribe(message_subscriber) + return message_subscriber async def send_statistics(self): memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss @@ -106,24 +102,35 @@ async def send_statistics(self): await self._rsocket.fire_and_forget(payload) def listen_for_statistics(self) -> StatisticsHandler: - self._statistics_subscriber = StatisticsHandler() - self._rsocket.request_channel(Payload(metadata=composite( - route('statistics') - )), publisher=self._statistics_subscriber).subscribe(self._statistics_subscriber) - return self._statistics_subscriber + statistics_subscriber = StatisticsHandler() + + request = Payload(metadata=composite(route('statistics'))) - def stop_listening_for_statistics(self): - self._statistics_subscriber.cancel() + response = self._rsocket.request_channel(request, publisher=statistics_subscriber) + + response.subscribe(statistics_subscriber) + + return statistics_subscriber async def private_message(self, username: str, content: str): - print(f'Sending {content} to user {username}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to user {username}') + + request = Payload( + encode_dataclass(Message(username, content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def channel_message(self, channel: str, content: str): - print(f'Sending {content} to channel {channel}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to channel {channel}') + + request = Payload( + encode_dataclass(Message(channel=channel, content=content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def upload(self, file_name, content): await self._rsocket.request_response(Payload(content, composite( @@ -183,21 +190,21 @@ async def main(): await user2.login('user2') await messaging_example(user1, user2) - await statistics_example(user1) await files_example(user1, user2) + await statistics_example(user1) async def messaging_example(user1: ChatClient, user2: ChatClient): - user1.listen_for_messages() - user2.listen_for_messages() + message_subscriber1 = user1.listen_for_messages() + message_subscriber2 = user2.listen_for_messages() channel_name = 'channel1' await user1.join(channel_name) await user2.join(channel_name) - print(f'Channels: {await user1.list_channels()}') - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channels: {await user1.list_channels()}') + logging.info(f'Channel: {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') await user1.channel_message(channel_name, 'channel message from user1') @@ -205,26 +212,26 @@ async def messaging_example(user1: ChatClient, user2: ChatClient): await asyncio.sleep(1) await user1.leave(channel_name) - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') - user1.stop_listening_for_messages() - user2.stop_listening_for_messages() + message_subscriber1.cancel() + message_subscriber2.cancel() -async def files_example(user1, user2): +async def files_example(user1: ChatClient, user2: ChatClient): file_contents = b'abcdefg1234567' file_name = 'file_name_1.txt' await user1.upload(file_name, file_contents) - print(f'Files: {await user1.list_files()}') + logging.info(f'Files: {await user1.list_files()}') download = await user2.download(file_name) if download.data != file_contents: raise Exception('File download failed') else: - print(f'Downloaded file: {len(download.data)} bytes') + logging.info(f'Downloaded file: {len(download.data)} bytes') async def statistics_example(user1): @@ -238,7 +245,7 @@ async def statistics_example(user1): await asyncio.sleep(5) - user1.stop_listening_for_statistics() + statistics_control.cancel() if __name__ == '__main__': diff --git a/examples/tutorial/step7/chat_server.py b/examples/tutorial/step7/chat_server.py index 92333231..6ab8dfc6 100644 --- a/examples/tutorial/step7/chat_server.py +++ b/examples/tutorial/step7/chat_server.py @@ -33,23 +33,23 @@ class SessionId(str): # allow weak reference @dataclass() class UserSessionData: username: str - session_id: str + session_id: SessionId messages: Queue = field(default_factory=Queue) statistics: Optional[ClientStatistics] = None @dataclass(frozen=True) class ChatData: - channel_users: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(WeakSet)) + channel_users: Dict[str, Set[SessionId]] = field(default_factory=lambda: defaultdict(WeakSet)) files: Dict[str, bytes] = field(default_factory=dict) channel_messages: Dict[str, Queue] = field(default_factory=lambda: defaultdict(Queue)) - user_session_by_id: Dict[str, UserSessionData] = field(default_factory=WeakValueDictionary) + user_session_by_id: Dict[SessionId, UserSessionData] = field(default_factory=WeakValueDictionary) chat_data = ChatData() -def ensure_channel_exists(channel_name): +def ensure_channel_exists(channel_name: str): if channel_name not in chat_data.channel_users: chat_data.channel_users[channel_name] = WeakSet() chat_data.channel_messages[channel_name] = Queue() @@ -210,7 +210,7 @@ def __init__(self, session: UserSessionData): def cancel(self): if self._sender is not None: - logging.info('Canceling message sender task') + logging.info('Canceling incoming message sender task') self._sender.cancel() self._sender = None @@ -257,8 +257,9 @@ async def session(*connection): handler_factory=handler_factory, fragment_size_bytes=1_000_000) - response = await server.request_response(Payload(metadata=composite(route('time')))) - print(f'Client time: {response.data}') + request = Payload(metadata=composite(route('time'))) + response = await server.request_response(request) + logging.info(f'Client time: {response.data}') async with await asyncio.start_server(session, 'localhost', 6565) as server: await server.serve_forever() diff --git a/examples/tutorial/step8/chat_client.py b/examples/tutorial/step8/chat_client.py index 38bc99e4..80835f7f 100644 --- a/examples/tutorial/step8/chat_client.py +++ b/examples/tutorial/step8/chat_client.py @@ -32,7 +32,7 @@ def __init__(self): def on_next(self, payload: Payload, is_complete=False): statistics = decode_dataclass(payload.data, ServerStatistics) - print(statistics) + logging.info(statistics) if is_complete: self.done.set() @@ -54,16 +54,14 @@ def set_period(self, period: timedelta): class ChatClient: def __init__(self, rsocket: RSocketClient): self._rsocket = rsocket - self._message_subscriber: Optional = None - self._statistics_subscriber: Optional[StatisticsHandler] = None - self._session_id: Optional[str] = None self._username: Optional[str] = None async def login(self, username: str): self._username = username payload = Payload(ensure_bytes(username), composite(route('login'))) - self._session_id = (await self._rsocket.request_response(payload)).data - return self + response = await self._rsocket.request_response(payload) + + logging.info(f'Login response: {utf8_decode(response.data)}') async def join(self, channel_name: str): request = Payload(ensure_bytes(channel_name), composite(route('channel.join'))) @@ -78,7 +76,7 @@ async def leave(self, channel_name: str): def listen_for_messages(self): def print_message(data: bytes): message = decode_dataclass(data, Message) - print(f'{self._username}: from {message.user} ({message.channel}): {message.content}') + logging.info(f'to {self._username}: from {message.user} (channel: {message.channel}): {message.content}') class MessageListener(DefaultSubscriber, DefaultSubscription): @@ -86,18 +84,16 @@ def on_next(self, value, is_complete=False): print_message(value.data) def on_error(self, exception: Exception): - print(exception) + logging.error(exception) def cancel(self): self.subscription.cancel() - self._message_subscriber = MessageListener() + message_subscriber = MessageListener() self._rsocket.request_stream( Payload(metadata=composite(route('messages.incoming'))) - ).subscribe(self._message_subscriber) - - def stop_listening_for_messages(self): - self._message_subscriber.cancel() + ).subscribe(message_subscriber) + return message_subscriber async def send_statistics(self): memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss @@ -106,24 +102,35 @@ async def send_statistics(self): await self._rsocket.fire_and_forget(payload) def listen_for_statistics(self) -> StatisticsHandler: - self._statistics_subscriber = StatisticsHandler() - self._rsocket.request_channel(Payload(metadata=composite( - route('statistics') - )), publisher=self._statistics_subscriber).subscribe(self._statistics_subscriber) - return self._statistics_subscriber + statistics_subscriber = StatisticsHandler() + + request = Payload(metadata=composite(route('statistics'))) - def stop_listening_for_statistics(self): - self._statistics_subscriber.cancel() + response = self._rsocket.request_channel(request, publisher=statistics_subscriber) + + response.subscribe(statistics_subscriber) + + return statistics_subscriber async def private_message(self, username: str, content: str): - print(f'Sending {content} to user {username}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to user {username}') + + request = Payload( + encode_dataclass(Message(username, content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def channel_message(self, channel: str, content: str): - print(f'Sending {content} to channel {channel}') - await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)), - composite(route('message')))) + logging.info(f'Sending "{content}" to channel {channel}') + + request = Payload( + encode_dataclass(Message(channel=channel, content=content)), + composite(route('message')) + ) + + await self._rsocket.request_response(request) async def upload(self, file_name, content): await self._rsocket.request_response(Payload(content, composite( @@ -172,21 +179,21 @@ async def main(): await user2.login('user2') await messaging_example(user1, user2) - await statistics_example(user1) await files_example(user1, user2) + await statistics_example(user1) async def messaging_example(user1: ChatClient, user2: ChatClient): - user1.listen_for_messages() - user2.listen_for_messages() + message_subscriber1 = user1.listen_for_messages() + message_subscriber2 = user2.listen_for_messages() channel_name = 'channel1' await user1.join(channel_name) await user2.join(channel_name) - print(f'Channels: {await user1.list_channels()}') - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channels: {await user1.list_channels()}') + logging.info(f'Channel: {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') await user1.channel_message(channel_name, 'channel message from user1') @@ -194,26 +201,26 @@ async def messaging_example(user1: ChatClient, user2: ChatClient): await asyncio.sleep(1) await user1.leave(channel_name) - print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + logging.info(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') - user1.stop_listening_for_messages() - user2.stop_listening_for_messages() + message_subscriber1.cancel() + message_subscriber2.cancel() -async def files_example(user1, user2): +async def files_example(user1: ChatClient, user2: ChatClient): file_contents = b'abcdefg1234567' file_name = 'file_name_1.txt' await user1.upload(file_name, file_contents) - print(f'Files: {await user1.list_files()}') + logging.info(f'Files: {await user1.list_files()}') download = await user2.download(file_name) if download.data != file_contents: raise Exception('File download failed') else: - print(f'Downloaded file: {len(download.data)} bytes') + logging.info(f'Downloaded file: {len(download.data)} bytes') async def statistics_example(user1): @@ -227,7 +234,7 @@ async def statistics_example(user1): await asyncio.sleep(5) - user1.stop_listening_for_statistics() + statistics_control.cancel() if __name__ == '__main__':