Skip to content

Commit c4f6135

Browse files
committed
Update ChatWebSocketHandler to move blocking operations to boundedElastic threads
1 parent 93894bf commit c4f6135

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

Diff for: src/main/java/com/github/rawsanj/handler/ChatWebSocketHandler.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import reactor.core.publisher.Flux;
1212
import reactor.core.publisher.Mono;
1313
import reactor.core.publisher.Sinks;
14+
import reactor.core.scheduler.Schedulers;
1415

1516
@Slf4j
1617
public class ChatWebSocketHandler implements WebSocketHandler {
@@ -40,15 +41,19 @@ public Mono<Void> handle(WebSocketSession webSocketSession) {
4041
Mono<Void> inputMessage = webSocketSession.receive()
4142
.flatMap(webSocketMessage -> redisChatMessagePublisher.publishChatMessage(webSocketMessage.getPayloadAsText()))
4243
.doOnSubscribe(subscription -> {
43-
long activeUserCount = activeUserCounter.incrementAndGet();
44-
log.info("User '{}' Connected. Total Active Users: {}", webSocketSession.getId(), activeUserCount);
45-
chatMessageSink.tryEmitNext(new ChatMessage(0, "CONNECTED", "CONNECTED", activeUserCount));
44+
Mono.fromRunnable(() -> {
45+
long activeUserCount = activeUserCounter.incrementAndGet();
46+
log.info("User '{}' Connected. Total Active Users: {}", webSocketSession.getId(), activeUserCount);
47+
chatMessageSink.tryEmitNext(new ChatMessage(0, "CONNECTED", "CONNECTED", activeUserCount));
48+
}).subscribeOn(Schedulers.boundedElastic());
4649
})
4750
.doOnError(throwable -> log.error("Error Occurred while sending message to Redis.", throwable))
4851
.doFinally(signalType -> {
49-
long activeUserCount = activeUserCounter.decrementAndGet();
50-
log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount);
51-
chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount));
52+
Mono.fromRunnable(() -> {
53+
long activeUserCount = activeUserCounter.decrementAndGet();
54+
log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount);
55+
chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount));
56+
}).subscribeOn(Schedulers.boundedElastic());
5257
})
5358
.then();
5459

0 commit comments

Comments
 (0)