diff --git a/content-docs/guides/guide-shared/_preface.mdx b/content-docs/guides/guide-shared/_preface.mdx new file mode 100644 index 00000000..bfde2f48 --- /dev/null +++ b/content-docs/guides/guide-shared/_preface.mdx @@ -0,0 +1,10 @@ +We will be creating a chat application with a server and a client. + +The chat client will have the following functionality: +- Private messages between users +- Joining and sending messages to channels +- Uploading/Downloading files +- Getting server and client statistics (e.g. number of channels) + +Since the emphasis is on showcasing as much RSocket functionality as possible, some examples may be either a bit contrived, or +be possible to implement in a different way using RSocket. This is left as an exercise to the reader. diff --git a/content-docs/guides/guide-shared/_routing.mdx b/content-docs/guides/guide-shared/_routing.mdx new file mode 100644 index 00000000..a46757f6 --- /dev/null +++ b/content-docs/guides/guide-shared/_routing.mdx @@ -0,0 +1,4 @@ +In the previous step we added a single request-response handler. In order to allow more than one functionality to use this handler, +(e.g. login, messages, join/leave chanel) they need to be distinguished from each other. To achieve this, each request to the server will +be identified by a [route](https://github.com/rsocket/rsocket/blob/master/Extensions/Routing.md). This is similar to paths in an HTTP URL where +each URL may handle one of the HTTP methods (eg. GET, POST). diff --git a/content-docs/guides/index.mdx b/content-docs/guides/index.mdx index d0c5c617..bf482dc3 100644 --- a/content-docs/guides/index.mdx +++ b/content-docs/guides/index.mdx @@ -10,3 +10,4 @@ In this section you will find guides related to working with and consuming the v - [`rsocket-js`](./rsocket-js/index.mdx) - [`rsocket-py`](./rsocket-py/index.mdx) +- [`rsocket-java`](./rsocket-java/index.mdx) diff --git a/content-docs/guides/rsocket-java/index.mdx b/content-docs/guides/rsocket-java/index.mdx new file mode 100644 index 00000000..647c3f65 --- /dev/null +++ b/content-docs/guides/rsocket-java/index.mdx @@ -0,0 +1,11 @@ +--- +slug: /guides/rsocket-java +title: rsocket-java +sidebar_label: Introduction +--- + +The java `rsocket` package implements the 1.0 version of the [RSocket protocol](/about/protocol). + +## Guides + +See [Tutorial](/guides/rsocket-java/tutorial) for a step by step construction of an application. diff --git a/content-docs/guides/rsocket-java/tutorial/00-base.mdx b/content-docs/guides/rsocket-java/tutorial/00-base.mdx new file mode 100644 index 00000000..304ec6fb --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/00-base.mdx @@ -0,0 +1,127 @@ +--- +slug: /guides/rsocket-java/tutorial/base +title: Getting started +sidebar_label: Getting started +--- + +## Application structure + +In this step we will set up a minimal code required for both the server and the client. + +The application will be composed of: +- Server side +- Client side +- Shared code + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-java/tree/master/examples/tutorial/step0) + +## Server side + +We will set up a simple server to accept connections and respond to the client sending the user's name. +The server will listen on TCP port 6565. + +Below is the code for the ServerApplication class: + +```java +package io.rsocket.guide; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketServer; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import reactor.core.publisher.Mono; + +public class ServerApplication { + + public static void main(String[] args) { + final var transport = TcpServerTransport.create(6565); + + final SocketAcceptor socketAcceptor = (setup, sendingSocket) -> Mono.just(new RSocket() { + public Mono requestResponse(Payload payload) { + return Mono.just(DefaultPayload.create("Welcome to chat, " + payload.getDataUtf8())); + } + }); + + RSocketServer.create() + .acceptor(socketAcceptor) + .bind(transport) + .block() + .onClose() + .block(); + } +} +``` + +*Lines 22-27* start an RSocket TCP server listening on localhost:6565. + +The 2 parameters passed are: +- transport : An instance of a supported connection method. In this case it is at instance of `TcpServerTransport` created in *Line 14*. +- socketAcceptor: A callable which returns an `RSocket` instance wrapped in a `Mono`. This will be used to respond to the client's requests. + +*Lines 16-20* Define the `RSocket` service with a single `requestResponse` endpoint at *Lines *17-19*. + +The `requestResponse` method receives a single argument containing the payload. +It is an instance of a `Payload` class which contains the data and metadata of the request. The data property is assumed to contain +a UTF-8 encoded string of the username, so is retrieved using `getDataUtf8`. + +*Line 18* Takes the username from the `Payload` instance's data and returns it to the client with a "welcome" message. + +A response is created using helper methods: +- `DefaultPayload::create` : This creates a payload which is the standard object which wraps all data transferred over RSocket. In our case, only the data property is set. +- `Mono::just` : All RSocket responses must be in the form of streams, either a `Flux` or a `Mono`. + +In the example, only the `requestResponse` method of `RSocket` is overridden. In this class, we can override the methods +which handle the 4 RSocket request types: +- `requestResponse` +- `requestStream` +- `requestChannel` +- `fireAndForget` + +Check the `RSocket` for other methods which can be implemented. + +Next we will look at a simple client which connects to this server. + +## Client side + +The client will connect to the server, send a single *response* request and disconnect. + +Below is the code for the ClientApplication class: + +```java +package io.rsocket.guide; + +import io.rsocket.core.RSocketConnector; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; + +import java.time.Duration; + +public class ClientApplication { + + public static void main(String[] args) { + final var transport = TcpClientTransport.create("localhost", 6565); + + final var rSocket = RSocketConnector.create() + .connect(transport) + .block(); + + final var payload = DefaultPayload.create("George"); + + rSocket.requestResponse(payload) + .doOnNext(response -> System.out.println(response.getDataUtf8())) + .block(Duration.ofMinutes(1)); + } +} +``` + +*Line 12* instantiates a TCP connection to localhost on port 6565, similar to the one in `ServerApplication`. + +*Lines 14-16* instantiates an `RSocket` client. + +*Line 18* Wraps the username "George" which the client will send to the server in a `Payload` using the `DefaultPayload.create` factory method + +Finally, *Line 20* sends the request to the server and prints (*Line 21*) the received response. + +Since RSocket is reactive, and we want to wait for the request to finish before quitting, a call to `block(Duration.ofMinutes(1))` is added to block for 1 minute. diff --git a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx new file mode 100644 index 00000000..fea217de --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx @@ -0,0 +1,144 @@ +--- +slug: /guides/rsocket-java/tutorial/request_routing +title: Request routing +sidebar_label: Request routing +--- + +import Routing from '../../guide-shared/_routing.mdx' + + + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-java/tree/master/examples/tutorial/step1) + +## Server side + +We will modify the example from the previous step into a routed request response. + +### Routing request handler + +Below is the modified code for instantiating `SocketAcceptor`: + +```java +final SocketAcceptor socketAcceptor = (setup, sendingSocket) -> Mono.just(new RSocket() { + public Mono requestResponse(Payload payload) { + final var route = requireRoute(payload); + + switch (route) { + case "login": + return Mono.just(DefaultPayload.create("Welcome to chat, " + payload.getDataUtf8())); + } + + throw new RuntimeException("Unknown requestResponse route " + route); + } + + private String requireRoute(Payload payload) { + final var metadata = payload.sliceMetadata(); + final CompositeMetadata compositeMetadata = new CompositeMetadata(metadata, false); + + for (CompositeMetadata.Entry metadatum : compositeMetadata) { + if (Objects.requireNonNull(metadatum.getMimeType()) + .equals(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())) { + return new RoutingMetadata(metadatum.getContent()).iterator().next(); + } + } + + throw new IllegalStateException(); + } +}); +``` + +The `requestResponse` method in *Lines 2-11* is modified to first parse the route from the `Payload` metadata, using the `requireRoute` helper method. +For now there is only a single case, the "login" route, which returns the same response as in the previous section of this guide. + +*Line 10* raises an exception if no known route is supplied. + +The `requireRoute` method parses the `Payload` metadata using the `CompositeMetadata` class. If any of the metadata items is of routing type, its value is returned. +If no routing metadata is found (*Line 24*) an exception is thrown. + +## Client side + +Let's modify the client side to call this new routed request. For readability and maintainability, we will create a `Client` +which will wrap the RSocket client and provide the methods for interacting with the server. + +### Client class + +Below is the complete code for the new `Client` class: + +```java +package io.rsocket.guide; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.metadata.CompositeMetadataCodec; +import io.rsocket.metadata.TaggingMetadataCodec; +import io.rsocket.metadata.WellKnownMimeType; +import io.rsocket.util.DefaultPayload; +import reactor.core.publisher.Mono; + +import java.util.List; + +public class Client { + + private final RSocket rSocket; + + public Client(RSocket rSocket) { + this.rSocket = rSocket; + } + + public Mono login(String username) { + final Payload payload = DefaultPayload.create( + Unpooled.wrappedBuffer(username.getBytes()), + route("login") + ); + return rSocket.requestResponse(payload); + } + + private static CompositeByteBuf route(String route) { + final var metadata = ByteBufAllocator.DEFAULT.compositeBuffer(); + + CompositeMetadataCodec.encodeAndAddMetadata( + metadata, + ByteBufAllocator.DEFAULT, + WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, + TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, List.of(route)) + ); + + return metadata; + } +} +``` + +*Lines 17-45* define our new `Client` which will encapsulate the methods used to interact with the chat server. + +*Lines 25-31* define a `login` method. It uses the `route` helper method defined later in the class to create the routing metadata, which is added to the `Payload`. +This ensures the payload is routed to the method registered on the server side in the previous step. + +The `route` method defined in *Lines 33-44*, creates a composite metadata item (*Line 34*) and adds the route metadata to it (*Lines 36-41*). + +### Test the new functionality + +Let's modify the `ClientApplication` class to test our new `Client`: + +```java +final var rSocket = RSocketConnector.create() + .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()) + .connect(transport) + .block(); + +final var client = new Client(rSocket); + +client.login("George") + .doOnNext(response -> System.out.println(response.getDataUtf8())) + .block(Duration.ofMinutes(10)); +``` + +The `RSocket` instantiation is modified, and in *Line 2* sets the `metadataMimeType` type to be COMPOSITE_METADATA. +This is required for multiple elements in the `Payload` metadata, which includes the routing information. + +*Lines 6* instantiates a `Client`, passing it the `RSocket` + +*Lines 8-10* call the `login` method, and prints the response. diff --git a/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx b/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx new file mode 100644 index 00000000..a27db692 --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx @@ -0,0 +1,108 @@ +--- +slug: /guides/rsocket-java/tutorial/user_session +title: User session +sidebar_label: User session +--- + +Let's add a server side session to store the logged-in user's state. Later on it will be used to temporarily store +the messages which will be delivered to the client. + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-java/tree/master/examples/tutorial/step2) + +## Server side + +### Data-classes + +First we will add a POJO to represent a single user session. Below is the contents of the new `Session` class: + +```java +package io.rsocket.guide; + +public class Session { + public String username; + + public String sessionId; +} +``` + +The username (*Line 4*) will be supplied by the client, and the sessionId (*Line 6*) will be a UUID4 generated by the server. + +### Login endpoint + +Let's separate the `SocketAcceptor` creation from the `ServerApplication` class. Below is the contents of the new `Server` class: + +```java +package io.rsocket.guide.step2; + +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.guide.step8.Session; +import io.rsocket.metadata.CompositeMetadata; +import io.rsocket.metadata.RoutingMetadata; +import io.rsocket.metadata.WellKnownMimeType; +import io.rsocket.util.DefaultPayload; +import reactor.core.publisher.Mono; + +import java.util.Objects; +import java.util.UUID; + +public class Server implements SocketAcceptor { + + @Override + public Mono accept(ConnectionSetupPayload setup, RSocket sendingSocket) { + final var session = new Session(); + session.sessionId = UUID.randomUUID().toString(); + + return Mono.just(new RSocket() { + public Mono requestResponse(Payload payload) { + final var route = requireRoute(payload); + + switch (route) { + case "login": + session.username = payload.getDataUtf8(); + return Mono.just(DefaultPayload.create(session.sessionId)); + } + + throw new RuntimeException("Unknown requestResponse route " + route); + } + + private String requireRoute(Payload payload) { + final var metadata = payload.sliceMetadata(); + final CompositeMetadata compositeMetadata = new CompositeMetadata(metadata, false); + + for (CompositeMetadata.Entry metadatum : compositeMetadata) { + if (Objects.requireNonNull(metadatum.getMimeType()) + .equals(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())) { + return new RoutingMetadata(metadatum.getContent()).iterator().next(); + } + } + + throw new IllegalStateException(); + } + }); + } +} +``` + +In order to keep a reference to the `Session` we will instantiate it in the `accept` method (*Line 21-22*) which serves as the scope for the current client connection. +The username provided in the login `Payload` will be stored in the session (*Line 30*). + +## Client side + +We will modify the `Client` to store the username, to use it in output later on: + +```py +public Mono login(String username) { + this.username = username; + + final Payload payload = DefaultPayload.create( + Unpooled.wrappedBuffer(username.getBytes()), + route("login") + ); + return rSocket.requestResponse(payload); +} +``` + +Instead of a greeting from the server, we now receive a session id in the response payload (*Line 8*). diff --git a/content-docs/guides/rsocket-java/tutorial/03-messages.mdx b/content-docs/guides/rsocket-java/tutorial/03-messages.mdx new file mode 100644 index 00000000..b5d7bca2 --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/03-messages.mdx @@ -0,0 +1,194 @@ +--- +slug: /guides/rsocket-java/tutorial/messages +title: Private messages +sidebar_label: Private messages +--- + +Let's add private messaging between users. We will use a request-stream to listen for new messages from other users. + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-java/tree/master/examples/tutorial/step3) + +## Shared + +Let's add an object representation of a message. Below is the contents of the `Message` class: + +```java +package io.rsocket.guide; + +public class Message { + public String user; + public String content; + + public Message() { + } + + public Message(String user, String content) { + this.user = user; + this.content = content; + } +} +``` + +*Lines 3-6* defines a POJO with 2 fields: +- `user` : Name of the recipient user when sending a message, and the name of the sender when receiving it. +- `content` : The message body. + +We will use [json](https://docs.python.org/3/library/json.html) to serialize the messages for transport. We will use the jackson library to do this. +Add the following dependencies to the pom.xml: + +```xml + + com.fasterxml.jackson.core + jackson-databind + 2.14.1 + + + com.fasterxml.jackson.core + jackson-core + 2.14.1 + +``` + +We will also add a global storage in order to look up sessions of other users and deliver them messages. Add the `ChatData` class: + +```java +package io.rsocket.guide.step3; + +import java.util.HashMap; +import java.util.Map; + +public class ChatData { + public final Map sessionById = new HashMap<>(); +} +``` + +## Server side + +### Data storage and helper methods + +Let's add a helper method to find sessions by username to the `Server` class: + +```java +public Mono findUserByName(final String username) { + return Flux.fromIterable(chatData.sessionById.entrySet()) + .filter(e -> e.getValue().username.equals(username)) + .map(e -> e.getValue()) + .single(); + } +``` + +TODO: explain + +### Send messages + +Next we will register a request-response endpoint for sending private messages in the `requestResponse` route switch case: + +```java +case "message": + try { + final var message = objectMapper.readValue(payload.getDataUtf8(), Message.class); + final var targetMessage = new Message(session.username, message.content); + return findUserByName(message.user) + .doOnNext(targetSession -> targetSession.messages.add(targetMessage)) + .thenReturn(EmptyPayload.INSTANCE); + } catch (Exception exception) { + throw new RuntimeException(exception); + } +``` + +TODO: explain + +### Receive incoming messages + +As a last step on the server side, we register a request-stream endpoint which listens for incoming messages and sends +them to the client: + +```java +public void messageSupplier(FluxSink sink) { + while (true) { + try { + final var message = session.messages.poll(20, TimeUnit.DAYS); + if (message != null) { + sink.next(DefaultPayload.create(objectMapper.writeValueAsString(message))); + } + } catch (Exception exception) { + break; + } + } +} + +public Flux requestStream(String route, Payload payload) { + return Flux.defer(() -> { + switch (route) { + case "messages.incoming": + final var threadContainer = new AtomicReference(); + return Flux.create(sink -> sink.onRequest(n -> { + if (threadContainer.get() == null) { + final var thread = new Thread(() -> messageSupplier(sink)); + thread.start(); + threadContainer.set(thread); + } + }) + .onCancel(() -> threadContainer.get().interrupt()) + .onDispose(() -> threadContainer.get().interrupt())); + } + + throw new IllegalStateException(); + }); +} +``` + +TODO: explain + +## Client side + +First let's add a client method for sending private messages: + +```java +public Mono sendMessage(String data) { + final Payload payload = DefaultPayload.create(Unpooled.wrappedBuffer(data.getBytes()), + route("message") + ); + return rSocket.requestResponse(payload); +} +``` + +TODO: explain + +Next we add a method which will listen for incoming messages: + +```java +public final AtomicReference incomingMessages = new AtomicReference<>(); + +public void listenForMessages() { + new Thread(() -> + { + Disposable subscribe = rSocket.requestStream(DefaultPayload.create(null, route("messages.incoming"))) + .doOnComplete(() -> System.out.println("Response from server stream completed")) + .doOnNext(response -> System.out.println("Response from server stream :: " + response.getDataUtf8())) + + .collectList() + .subscribe(); + incomingMessages.set(subscribe); + }).start(); +} + +public void stopListeningForMessages() { + incomingMessages.get().dispose(); +} +``` + +TODO: explain + +### Test the new functionality + +Finally, let's test the new functionality. Modify the `ClientApplication.main` method: + +```java +client.listenForMessages(); +client.sendMessage("{\"user\":\"user1\", \"content\":\"message\"}"); +Thread.sleep(2000); +client.incomingMessages.get().dispose(); +``` + +TODO: explain diff --git a/content-docs/guides/rsocket-java/tutorial/04-channels.mdx b/content-docs/guides/rsocket-java/tutorial/04-channels.mdx new file mode 100644 index 00000000..6d5285b6 --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/04-channels.mdx @@ -0,0 +1,267 @@ +--- +slug: /guides/rsocket-java/tutorial/channels +title: Channels +sidebar_label: Channels +--- + +In this section we will add basic channel support: +- Joining and leaving channels +- Sending messages to channels + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step4) + +## Shared code + +Let's add a `channel` property to the `Message` class. It will contain the name of the channel the message is intended for: + +```java +public class Message { + + // existing fields + + public String channel; + + // existing constructors + + public Message(String user, String content, String channel) { + this.user = user; + this.content = content; + this.channel = channel; + } +} +``` + +## Server side + +### Data-classes +We will add functionality to store the channel state. Belows is the contents of the new `ChatChannel` class: + +```java +package io.rsocket.guide; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +public class ChatChannel { + public String name; + + final public BlockingQueue messages = new LinkedBlockingQueue<>(); + + final public AtomicReference messageRouter = new AtomicReference<>(); + + final public Set users = new HashSet<>(); +} +``` + +```java +public class ChatData { + + // existing fields + + public final Map channelByName = new HashMap<>(); +} +``` + +In the `channel_users` dict, the keys are channel names, and the value is a set of user session ids. A [WeakSet](https://docs.python.org/3/library/weakref.html#weakref.WeakSet) is used to automatically remove logged-out users. + +In the `channel_messages` dict, the keys are the channel names, and the value is a [Queue](https://docs.python.org/3/library/asyncio-queue.html) of messages sent by users to the channel. + +### Helper methods + +Next, we will define some helper methods for managing channel messages: +- `ensure_channel_exists`: initialize the data for a new channel if it doesn't exist. +- `channel_message_delivery`: an asyncio task which will deliver channel messages to all the users in a channel. + +```java +public void ensureChannel(String channelName) { + if (!chatData.channelByName.containsKey(channelName)) { + ChatChannel chatChannel = new ChatChannel(); + chatChannel.name = channelName; + chatData.channelByName.put(channelName, chatChannel); + final var thread = new Thread(() -> channelMessageRouter(channelName)); + thread.start(); + chatChannel.messageRouter.set(thread); + } +} +``` + +If the channel doesn't exist yet (*Line 2*) It will be added to the `channel_users` and `channel_messages` dictionaries. +*Line 5* starts an asyncio task (described below) which will deliver messages sent to the channel, to the channel's users. + +```java +public void channelMessageRouter(String channelName) { + final var channel = chatData.channelByName.get(channelName); + while (true) { + try { + final var message = channel.messages.poll(20, TimeUnit.DAYS); + if (message != null) { + for (String user : channel.users) { + findUserByName(user).doOnNext(session -> { + try { + session.messages.put(message); + } catch (InterruptedException exception) { + throw new RuntimeException(exception); + } + }).block(); + } + } + } catch (Exception exception) { + break; + } + } +} +``` + +The above method will loop infinitely and watch the `channel_messages` queue of the specified +channel (*Line 8*). Upon receiving a message, it will be delivered to all the users in the channel (*Lines 9-13*). + +### Join/Leave Channel + +Now let's add the channel join/leave handling request-response endpoints. + +```java +case "channel.join": + final var channelJoin = payload.getDataUtf8(); + ensureChannel(channelJoin); + join(channelJoin, session.sessionId); + return Mono.just(EmptyPayload.INSTANCE); +case "channel.leave": + leave(payload.getDataUtf8(), session.sessionId); + return Mono.just(EmptyPayload.INSTANCE); +``` + +### Send channel message + +Next we add the ability to send channel message. We will modify the `send_message` method: + +```java +case "message": + final var message = fromJson(payload.getDataUtf8(), Message.class); + final var targetMessage = new Message(session.username, message.content, message.channel); + + if (message.channel != null) { + chatData.channelByName.get(message.channel).messages.add(targetMessage); + } else { + + return findUserByName(message.user) + .doOnNext(targetSession -> targetSession.messages.add(targetMessage)) + .thenReturn(EmptyPayload.INSTANCE); + } +``` + +*Lines 16-20* decide whether it is a private message or a channel message, and add it to the relevant queue. + +### List channels + +```java +case "channels": + return Flux.fromIterable(chatData.channelByName.keySet()).map(DefaultPayload::create); +``` + +*Lines 6-11* define an endpoint for getting a list of channels. It uses the `StreamFromGenerator` helper. Note that the argument to this class +is a factory method for the [generator](https://docs.python.org/3/glossary.html#term-generator), not the generator itself. + +### Get channel users + +```java +case "channel.users": + return Flux.fromIterable(chatData.channelByName.getOrDefault(payload.getDataUtf8(), new ChatChannel()).users) + .map(DefaultPayload::create); +``` +*Lines 6-11* define an endpoint for getting a list of users in a given channel. The `find_username_by_session` helper method is used to +convert the session ids to usernames. + +If the channel does not exist (*Line 10*) the `EmptyStream` helper can be used as a response. + +## Client side + +We will add the methods on the `Client` to interact with the new server functionality: + +```py +from typing import List + +from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket +from rsocket.extensions.helpers import composite, route +from rsocket.frame_helpers import ensure_bytes +from rsocket.payload import Payload +from rsocket.helpers import utf8_decode + +from shared import encode_dataclass + +class ChatClient: + + async def join(self, channel_name: str): + request = Payload(ensure_bytes(channel_name), composite(route('channel.join'))) + await self._rsocket.request_response(request) + return self + + async def leave(self, channel_name: str): + request = Payload(ensure_bytes(channel_name), composite(route('channel.leave'))) + await self._rsocket.request_response(request) + return self + + async def channel_message(self, channel: str, content: str): + print(f'Sending {content} to channel {channel}') + await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)), + composite(route('message')))) + + async def list_channels(self) -> List[str]: + request = Payload(metadata=composite(route('channels'))) + response = await AwaitableRSocket(self._rsocket).request_stream(request) + return list(map(lambda _: utf8_decode(_.data), response)) + + async def get_users(self, channel_name: str) -> List[str]: + request = Payload(ensure_bytes(channel_name), composite(route('channel.users'))) + users = await AwaitableRSocket(self._rsocket).request_stream(request) + return [utf8_decode(user.data) for user in users] +``` + +*Lines 15-23* define the join/leave methods. They are both simple routed `request_response` calls, with the channel name as the payload data. + +*Lines 25-28* define the list_channels method. This method uses the `AwaitableRSocket` adapter to simplify getting the response stream as a list. + +*Lines 30-31* define the get_users method, which lists a channel's users. + +Update the `print_message` method to include the channel: + +```py +def print_message(data: bytes): + message = Message(**json.loads(data)) + print(f'{self._username}: from {message.user} ({message.channel}): {message.content}') +``` + +Let's test the new functionality using the following code: + +```py +async def messaging_example(user1: ChatClient, user2: ChatClient): + user1.listen_for_messages() + user2.listen_for_messages() + + await user1.join('channel1') + await user2.join('channel1') + + print(f'Channels: {await user1.list_channels()}') + + await user1.private_message('user2', 'private message from user1') + await user1.channel_message('channel1', 'channel message from user1') + + await asyncio.sleep(1) + + user1.stop_listening_for_messages() + user2.stop_listening_for_messages() +``` + +Call the example method from the `main` method and pass it the two chat clients: + +```py +user1 = ChatClient(client1) +user2 = ChatClient(client2) + +await user1.login('user1') +await user2.login('user2') + +await messaging_example(user1, user2) +``` diff --git a/content-docs/guides/rsocket-java/tutorial/04-files.mdx b/content-docs/guides/rsocket-java/tutorial/04-files.mdx new file mode 100644 index 00000000..0dd14a0d --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/04-files.mdx @@ -0,0 +1,211 @@ +--- +slug: /guides/rsocket-java/tutorial/files +title: File upload/download +sidebar_label: File upload/download +--- + +In this section we will add very basic file upload/download functionality. All files will be stored in memory, +and downloadable by all users. + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step5) + +## Shared + +First, define a mimetype which will represent file names in the payloads. This will be used by both server and client, so +place it in the shared module: + +```py +chat_filename_mimetype = b'chat/file-name' +``` + +## Server side + +### Data-classes + +Next, we need a place to store the files in memory. Add a dictionary to the `ChatData` class to store the files. +The keys will be the file names, and the values the file content. + +```py +from dataclasses import dataclass, field +from typing import Dict + +@dataclass(frozen=True) +class ChatData: + ... + files: Dict[str, bytes] = field(default_factory=dict) +``` + +### Helper methods + +Next, define a helper method which extracts the filename from the upload/download payload: + +```py +from shared import chat_filename_mimetype +from rsocket.extensions.composite_metadata import CompositeMetadata +from rsocket.helpers import utf8_decode + +def get_file_name(composite_metadata: CompositeMetadata): + return utf8_decode(composite_metadata.find_by_mimetype(chat_filename_mimetype)[0].content) +``` + +This helper uses the `find_by_mimetype` method of `CompositeMetadata` to get a list of metadata items with the +specified mimetype. + +### Endpoints + +Next, register the request-response endpoints for uploading and downloading files, and for retrieving a list of +available files: + +```py +from typing import Awaitable + +from shared import chat_filename_mimetype +from rsocket.extensions.composite_metadata import CompositeMetadata +from rsocket.extensions.helpers import composite, metadata_item +from rsocket.frame_helpers import ensure_bytes +from rsocket.helpers import create_response +from rsocket.payload import Payload +from rsocket.routing.request_router import RequestRouter +from rsocket.streams.stream_from_generator import StreamFromGenerator + +class ChatUserSession: + + def router_factory(self): + router = RequestRouter() + + @router.response('file.upload') + async def upload_file(payload: Payload, composite_metadata: CompositeMetadata) -> Awaitable[Payload]: + chat_data.files[get_file_name(composite_metadata)] = payload.data + return create_response() + + @router.response('file.download') + async def download_file(composite_metadata: CompositeMetadata) -> Awaitable[Payload]: + file_name = get_file_name(composite_metadata) + return create_response(chat_data.files[file_name], + composite(metadata_item(ensure_bytes(file_name), chat_filename_mimetype))) + + @router.stream('files') + async def get_file_names() -> Publisher: + count = len(chat_data.files) + generator = ((Payload(ensure_bytes(file_name)), index == count) for (index, file_name) in + enumerate(chat_data.files.keys(), 1)) + return StreamFromGenerator(lambda: generator) +``` + +The `upload_file` and `download_file` methods (*Lines 18-27*) extract the filename from the metadata using the helper method we created, +and set and get the file content from the `chat_data` storage respectively. + +In this section we introduce the second argument which can be passed to routed endpoints. If the session is set up to use +composite metadata, the `composite_metadata` parameter will contain a parsed structure of the metadata in the request payload. + +Line 34 uses the `StreamFromGenerator` helper which creates a stream publisher from a generator factory. + +The generator must return a tuple of two values for each iteration: +- Payload instance +- boolean value denoting if it is the last element in the generator. +The argument for the helper class is a method which returns a generator, not the generator itself. + +### Large file support + +In the `download_file` method (Line 24), even though the frame size limit is 16MB, larger files can be downloaded. +To allow this, fragmentation must be enabled. This is done by adding the `fragment_size_bytes` argument to the `RSocketServer` instantiation: + +```py +from rsocket.rsocket_server import RSocketServer +from rsocket.transports.tcp import TransportTCP + +def session(*connection): + RSocketServer(TransportTCP(*connection), + handler_factory=handler_factory, + fragment_size_bytes=1_000_000) +``` + +## Client side + +### Methods + +On the client side, we will add 3 methods to access the new server functionality: +- `upload` +- `download` +- `list_files` + +```py +from typing import List + +from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket +from rsocket.extensions.helpers import composite, route, metadata_item +from rsocket.frame_helpers import ensure_bytes +from rsocket.helpers import utf8_decode +from rsocket.payload import Payload + +from shared import chat_filename_mimetype + +class ChatClient: + + async def upload(self, file_name: str, content: bytes): + await self._rsocket.request_response(Payload(content, composite( + route('file.upload'), + metadata_item(ensure_bytes(file_name), chat_filename_mimetype) + ))) + + async def download(self, file_name: str): + return await self._rsocket.request_response(Payload( + metadata=composite( + route('file.download'), + metadata_item(ensure_bytes(file_name), chat_filename_mimetype) + ))) + + async def list_files(self) -> List[str]: + request = Payload(metadata=composite(route('files'))) + response = await AwaitableRSocket(self._rsocket).request_stream(request) + return list(map(lambda _: utf8_decode(_.data), response)) +``` + +*Lines 13-17* define the upload method. the `Payload` of the request-response consists of a body with the file's contents, +and metadata which contains routing and the filename. To specify the filename a custom mimetype was used **chat/file-name**. +This mime type was used to create a metadata item using the `metadata_item` method. the `composite` method was used to combine +the two metadata items to the complete metadata of the payload. + +*Lines 19-24* define the download method. It is similar to the upload method, except for the absence of the payload data, +and a different route: 'file.download'. + +*Lines 26-32* defines the list_files method. Same as the `list_channels` method in the previous section, +it uses the request-stream 'files' endpoint to get a list of files. + +### Large file support + +Same as on the server size, fragmentation must be enabled to allow uploading files larger than 16MB. +This is done by adding the `fragment_size_bytes` argument to the `RSocketClient` instantiation. Do this for both clients: + +```py +from rsocket.extensions.mimetypes import WellKnownMimeTypes +from rsocket.helpers import single_transport_provider +from rsocket.rsocket_client import RSocketClient +from rsocket.transports.tcp import TransportTCP + +async with RSocketClient(single_transport_provider(TransportTCP(*connection1)), + metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA, + fragment_size_bytes=1_000_000) as client1: + ... +``` + +We will try out the new functionality with the following code: + +```py +async def files_example(user1: ChatClient, user2: ChatClient): + file_contents = b'abcdefg1234567' + file_name = 'file_name_1.txt' + + await user1.upload(file_name, file_contents) + + print(f'Files: {await user1.list_files()}') + + download = await user2.download(file_name) + + if download.data != file_contents: + raise Exception('File download failed') + else: + print(f'Downloaded file: {len(download.data)} bytes') +``` + +call the `files_example` method from the main client method. diff --git a/content-docs/guides/rsocket-java/tutorial/05-statistics.mdx b/content-docs/guides/rsocket-java/tutorial/05-statistics.mdx new file mode 100644 index 00000000..b496091b --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/05-statistics.mdx @@ -0,0 +1,303 @@ +--- +slug: /guides/rsocket-java/tutorial/statistics +title: Statistics +sidebar_label: Statistics +--- + +As a last step, we will add passing some statistics between the client and the server: +- The client will be able to send its memory usage to the server. +- The server will report the number of users and channels. The client will be able to specify which of these statistics it wants. + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step6) + +## Shared code + +We will define some POJOs to represent the payloads being sent between the client and server. + +:::note +The Jackson JSON annotations are optional. They are only required for compatibility with the client/server implementations of the other languages. +::: + +A `ServerStatistics` will hold the server channel and user count: + +```java +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ServerStatistic { + @JsonProperty("user_count") + public Integer userCount; + + @JsonProperty("channel_count") + public Integer channelCount; + + public ServerStatistic() { + } + + public ServerStatistic(Integer userCount, Integer channelCount) { + this.userCount = userCount; + this.channelCount = channelCount; + } +} +``` + +A `ClientStatistics` will hold the client's memory usage: +```java +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ClientStatistics { + @JsonProperty("memory_usage") + public Long memoryUsage; + + public ClientStatistics() { + } + + public ClientStatistics(Long memoryUsage) { + this.memoryUsage = memoryUsage; + } +} +``` + +And finally, the client will use a `StatisticsSettings` instance to tell the server which statistics it wants and how often: +```java +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ServerStatistic { + @JsonProperty("user_count") + public Integer userCount; + + @JsonProperty("channel_count") + public Integer channelCount; + + public ServerStatistic() { + } + + public ServerStatistic(Integer userCount, Integer channelCount) { + this.userCount = userCount; + this.channelCount = channelCount; + } +} +``` + +## Server side + +### Session + +First we will add fields on the `Session` class to hold statistics and statistics-settings sent from the client: + +```java +public class Session { + + public StatisticsSettings statisticsSettings = new StatisticsSettings(); + + public ClientStatistics clientStatistics; +} +``` + +### Endpoints + +We will add two endpoints, one for receiving from the client, and one for sending specific statistics from the server. + +#### Client sent statistics + +```java +public Mono fireAndForget(Payload payload) { + final var route = requireRoute(payload); + + return Mono.defer(() -> { + switch (route) { + case "statistics": + session.clientStatistics = fromJson(payload.getDataUtf8(), ClientStatistics.class); + return Mono.empty(); + } + + throw new IllegalStateException(); + }); +} +``` + +*Lines 14-17* defines an endpoint for receiving statistics from the client. It uses the fire-and-forget request type, since this +data is not critical to the application. No return value is required from this method, and if provided will be ignored. + +#### Receive requested statistics + +We will add a helper method for creating a new statistics response: + +```py +def new_statistics_data(statistics_request: ServerStatisticsRequest): + statistics_data = {} + + if 'users' in statistics_request.ids: + statistics_data['user_count'] = len(chat_data.user_session_by_id) + + if 'channels' in statistics_request.ids: + statistics_data['channel_count'] = len(chat_data.channel_messages) + + return ServerStatistics(**statistics_data) +``` + +Next we define the endpoint for sending statistics to the client: + +```py +import asyncio +import json + +from shared import ClientStatistics, ServerStatisticsRequest, ServerStatistics, encode_dataclass +from reactivestreams.publisher import DefaultPublisher +from reactivestreams.subscriber import Subscriber, DefaultSubscriber +from reactivestreams.subscription import DefaultSubscription +from rsocket.helpers import utf8_decode +from rsocket.payload import Payload +from rsocket.routing.request_router import RequestRouter + +class ChatUserSession: + def router_factory(self): + router = RequestRouter() + + @router.channel('statistics') + async def send_statistics(): + + class StatisticsChannel(DefaultPublisher, DefaultSubscriber, DefaultSubscription): + + def __init__(self, session: UserSessionData): + super().__init__() + self._session = session + self._requested_statistics = ServerStatisticsRequest() + + def cancel(self): + self._sender.cancel() + + def subscribe(self, subscriber: Subscriber): + super().subscribe(subscriber) + subscriber.on_subscribe(self) + self._sender = asyncio.create_task(self._statistics_sender()) + + async def _statistics_sender(self): + while True: + try: + await asyncio.sleep(self._requested_statistics.period_seconds) + next_message = new_statistics_data(self._requested_statistics) + + self._subscriber.on_next(dataclass_to_payload(next_message)) + except Exception: + logging.error('Statistics', exc_info=True) + + def on_next(self, value: Payload, is_complete=False): + request = ServerStatisticsRequest(**json.loads(utf8_decode(value.data))) + + logging.info(f'Received statistics request {request.ids}, {request.period_seconds}') + + if request.ids is not None: + self._requested_statistics.ids = request.ids + + if request.period_seconds is not None: + self._requested_statistics.period_seconds = request.period_seconds + + response = StatisticsChannel(self._session) + + return response, response +``` + +*Lines 16-57* defines an endpoint for sending statistics to the client. It uses the request-channel request type, which will allow +the client to both receive the server statistics, and update the server as to which statistics it wants to receive. + +## Client side + +On the client side we will add the methods to access the new server side functionality: +- `send_statistics` +- `listen_for_statistics` + +```py +import resource + +from shared import ServerStatistics, ClientStatistics +from rsocket.extensions.helpers import composite, route +from rsocket.payload import Payload + +class ChatClient: + + async def send_statistics(self): + memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + payload = Payload(encode_dataclass(ClientStatistics(memory_usage=memory_usage)), + metadata=composite(route('statistics'))) + await self._rsocket.fire_and_forget(payload) +``` + +The `send_statistics` uses a fire-and-forget request (*Line 15*) to send statistics to the server. This request does not receive a response, +so does not wait for confirmation that the payload was delivered, as it is not critical information (at least for this tutorial). + +Next we will request statistics from the server. First we will define a handler to listen on the channel request and control it: + +```py +import json +from asyncio import Event +from datetime import timedelta +from typing import List + +from examples.tutorial.step6.models import ServerStatistics, ServerStatisticsRequest, dataclass_to_payload +from reactivestreams.publisher import DefaultPublisher +from reactivestreams.subscriber import DefaultSubscriber +from reactivestreams.subscription import DefaultSubscription +from rsocket.helpers import utf8_decode +from rsocket.payload import Payload + +class StatisticsHandler(DefaultPublisher, DefaultSubscriber, DefaultSubscription): + + def __init__(self): + super().__init__() + self.done = Event() + + def on_next(self, value: Payload, is_complete=False): + statistics = ServerStatistics(**json.loads(utf8_decode(value.data))) + print(statistics) + + if is_complete: + self.done.set() + + def cancel(self): + self.subscription.cancel() + + def set_requested_statistics(self, ids: List[str]): + self._subscriber.on_next(dataclass_to_payload(ServerStatisticsRequest(ids=ids))) + + def set_period(self, period: timedelta): + self._subscriber.on_next( + dataclass_to_payload(ServerStatisticsRequest(period_seconds=int(period.total_seconds())))) +``` + +Next we will use this new handler in the `ChatClient`: + +```py +from rsocket.extensions.helpers import composite, route +from rsocket.payload import Payload + +class ChatClient: + + def listen_for_statistics(self) -> StatisticsHandler: + self._statistics_subscriber = StatisticsHandler() + self._rsocket.request_channel(Payload(metadata=composite( + route('statistics') + )), publisher=self._statistics_subscriber).subscribe(self._statistics_subscriber) + return self._statistics_subscriber + + def stop_listening_for_statistics(self): + self._statistics_subscriber.cancel() +``` + +Finally, let's try out this new functionality in the client: + +```py +async def statistics_example(user1): + await user1.send_statistics() + + statistics_control = user1.listen_for_statistics() + + await asyncio.sleep(5) + + statistics_control.set_requested_statistics(['users']) + + await asyncio.sleep(5) + + user1.stop_listening_for_statistics() +``` + +Call this new method from the client `main` method. diff --git a/content-docs/guides/rsocket-java/tutorial/index.mdx b/content-docs/guides/rsocket-java/tutorial/index.mdx new file mode 100644 index 00000000..dae02395 --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/index.mdx @@ -0,0 +1,32 @@ +--- +slug: /guides/rsocket-java/tutorial +title: Chat Application +sidebar_label: Preface +--- + +This guide will go over step by step of setting up an application using the java implementation of RSocket. + +:::tip +If you find a problem, code or otherwise, please report an [issue](https://github.com/rsocket/rsocket-website/issues) +::: + +## Preface + +import Preface from '../../guide-shared/_preface.mdx' + + + +## Required knowledge + +The guide assumes the following knowledge: + +* Basic java level (classes/methods, threads, streams) +* Basic understanding of RSocket protocol (See [About RSocket](/about/faq)) + +## Required setup + +TODO: setting up a java projects with rsocket as a dependency + +## Code + +The tutorial code is available on [GitHub](https://github.com/rsocket/rsocket-java) under examples/tutorial. diff --git a/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx b/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx index a137b096..ed827e91 100644 --- a/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx +++ b/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx @@ -4,9 +4,9 @@ title: Request routing sidebar_label: Request routing --- -The chat application will have various functionality (e.g. private messages and channels). Each request to the server will -be identified by a [route](https://github.com/rsocket/rsocket/blob/master/Extensions/Routing.md) (similar to paths in an HTTP URL). To implement this we will use the `RequestRouter` and `RoutingRequestHandler` -classes. +import Routing from '../../guide-shared/_routing.mdx' + + See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step1) @@ -16,7 +16,7 @@ We will modify the example from the previous step into a routed request response ### Routing request handler -The `handler_factory` method below replaces the `Handler` class from the previous step: +To implement routing we will use the `RequestRouter` and `RoutingRequestHandler` classes. The `handler_factory` method below replaces the `Handler` class from the previous step: ```py from typing import Awaitable diff --git a/content-docs/guides/rsocket-py/tutorial/index.mdx b/content-docs/guides/rsocket-py/tutorial/index.mdx index 354ab791..0ea8c6a5 100644 --- a/content-docs/guides/rsocket-py/tutorial/index.mdx +++ b/content-docs/guides/rsocket-py/tutorial/index.mdx @@ -12,16 +12,9 @@ If you find a problem, code or otherwise, please report an [issue](https://githu ## Preface -We will be setting up a chat application with a server and a client. +import Preface from '../../guide-shared/_preface.mdx' -The chat client will have the following functionality: -- Private messages between users -- Joining and sending messages to channels -- Uploading/Downloading files -- Getting server and client statistics (e.g. number of channels) - -Since the emphasis is on showcasing as much RSocket functionality as possible, some of the examples may be either a bit contrived, or -be possible to implement in a different way using RSocket. This is left as an exercise to the reader. + In the first steps the code will be written using only the core code. This results in more verbose code, but prevents the need for additional packages need be installed. diff --git a/sidebar-rsocket-java.js b/sidebar-rsocket-java.js new file mode 100644 index 00000000..d3fd3dd3 --- /dev/null +++ b/sidebar-rsocket-java.js @@ -0,0 +1,16 @@ +module.exports = [ + "guides/rsocket-java/index", + { + "Tutorial": + [ + "guides/rsocket-java/tutorial/index", + "guides/rsocket-java/tutorial/base", + "guides/rsocket-java/tutorial/request_routing", + "guides/rsocket-java/tutorial/user_session", + "guides/rsocket-java/tutorial/messages", + "guides/rsocket-java/tutorial/channels", + "guides/rsocket-java/tutorial/files", + "guides/rsocket-java/tutorial/statistics" + ] + } +]; diff --git a/sidebars.js b/sidebars.js index 33972631..efecaa8e 100644 --- a/sidebars.js +++ b/sidebars.js @@ -9,7 +9,8 @@ const guideItems = [ "guides/index", { "rsocket-js": require("./sidebar-rsocket-js"), - "rsocket-py": require("./sidebar-rsocket-py") + "rsocket-py": require("./sidebar-rsocket-py"), + "rsocket-java": require("./sidebar-rsocket-java") } ]; diff --git a/src/css/customTheme.css b/src/css/customTheme.css index 847511a0..6cde21fa 100644 --- a/src/css/customTheme.css +++ b/src/css/customTheme.css @@ -34,15 +34,18 @@ html[data-theme='dark'] .hero { -pre.language-py code::before { +pre.language-py code::before, +pre.language-java code::before { counter-reset: listing; } -pre.language-py code > span { +pre.language-py code > span, +pre.language-java code > span{ counter-increment: listing; } -pre.language-py code > span::before { +pre.language-py code > span::before, +pre.language-java code > span::before{ color: #9a9a9a; content: counter(listing) ". "; display: inline-block;