Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'org.springframework:spring-webflux'
implementation 'com.github.f4b6a3:uuid-creator:6.0.0'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.Executor;

import com.yat2.episode.global.constant.AttributeKeys;

Expand All @@ -16,18 +17,37 @@
@Service
public class CollaborationService {
private final SessionRegistry sessionRegistry;
private final RedisStreamRepository redisStreamRepository;
private final RedisStreamStore redisStreamStore;
private final Executor redisExecutor;

public void handleConnect(WebSocketSession session) {
sessionRegistry.addSession(getMindmapId(session), session);
}

public void processMessage(WebSocketSession sender, BinaryMessage message) {
ByteBuffer buffer = message.getPayload();
byte[] payload = new byte[buffer.remaining()];
buffer.get(payload);
UUID roomId = getMindmapId(sender);
if (roomId == null) {
log.error("Mindmap Id is null.");
return;
}

sessionRegistry.broadcast(getMindmapId(sender), sender, payload);
byte[] payload = toByteArray(message.getPayload());

sessionRegistry.broadcast(roomId, sender, payload);

if (YjsProtocolUtil.isUpdateFrame(payload)) {
try {
redisExecutor.execute(() -> {
try {
redisStreamStore.appendUpdate(roomId, payload);
} catch (Exception e) {
log.warn("Redis append failed. roomId={}", roomId, e);
}
});
} catch (Exception e) {
log.error("Redis append failed. roomId={}", roomId, e);
}
}
}

public void handleDisconnect(WebSocketSession session) {
Expand All @@ -38,4 +58,11 @@ public void handleDisconnect(WebSocketSession session) {
private UUID getMindmapId(WebSocketSession session) {
return (UUID) session.getAttributes().get(AttributeKeys.MINDMAP_ID);
}

private byte[] toByteArray(ByteBuffer buffer) {
ByteBuffer dup = buffer.duplicate();
byte[] bytes = new byte[dup.remaining()];
dup.get(bytes);
return bytes;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.yat2.episode.collaboration;

import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;

@Service
@RequiredArgsConstructor
public class RedisStreamStore {

private final RedisTemplate<String, byte[]> redisBinaryTemplate;

private static final String FIELD_UPDATE = "u";

private String streamKey(UUID roomId) {
return "collab:room:" + roomId;
}

public RecordId appendUpdate(UUID roomId, byte[] update) {
String key = streamKey(roomId);

StreamOperations<String, String, byte[]> ops = redisBinaryTemplate.opsForStream();

MapRecord<String, String, byte[]> record =
StreamRecords.newRecord().in(key).ofMap(Map.of(FIELD_UPDATE, update));

RecordId id = ops.add(record);

redisBinaryTemplate.expire(key, Duration.ofDays(2));

return id;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.yat2.episode.collaboration;

import lombok.experimental.UtilityClass;

@UtilityClass
public final class YjsProtocolUtil {
public static final int MSG_SYNC = 0;

/* 추후 sync 라우팅을 위함 */
public static final int SYNC_STEP_1 = 0;
public static final int SYNC_STEP_2 = 1;
public static final int SYNC_UPDATE = 2;

public static boolean isUpdateFrame(byte[] payload) {
if (payload == null || payload.length < 3) {
return false;
}
return payload[0] == MSG_SYNC && payload[1] == SYNC_UPDATE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.yat2.episode.collaboration.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.atomic.AtomicLong;

@Slf4j
@Configuration
public class CollaborationAsyncConfig {

@Bean(name = "redisExecutor")
public Executor redisExecutor() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setThreadNamePrefix("redis-append-");
exec.setCorePoolSize(1);
exec.setMaxPoolSize(2);
exec.setQueueCapacity(10_000);
exec.setKeepAliveSeconds(30);
exec.setAllowCoreThreadTimeOut(true);

exec.setRejectedExecutionHandler(dropAndLogError());
exec.initialize();
return exec;
}

private RejectedExecutionHandler dropAndLogError() {
AtomicLong dropped = new AtomicLong();
AtomicLong lastLogMs = new AtomicLong(0);
//TODO: Update가 drop 될 경우 Yjs Runner에게 알려 Sync 프로토콜로 복구
return (r, executor) -> {
long n = dropped.incrementAndGet();

long now = System.currentTimeMillis();
long prev = lastLogMs.get();
if (now - prev >= 1000 && lastLogMs.compareAndSet(prev, now)) {
int qSize = executor.getQueue().size();
log.error("Redis queue full. Dropping tasks. dropped={}, queueSize={}", n, qSize);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.yat2.episode.global.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Slf4j
@Configuration
public class RedisConfig {

@Bean
public RedisTemplate<String, byte[]> redisBinaryTemplate(
RedisConnectionFactory connectionFactory
) {
RedisTemplate<String, byte[]> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);

template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());

template.setValueSerializer(RedisSerializer.byteArray());
template.setHashValueSerializer(RedisSerializer.byteArray());

template.afterPropertiesSet();
return template;
}
}
6 changes: 6 additions & 0 deletions backend/api/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ spring:
resources:
add-mappings: false

data:
redis:
host: ${REDIS_HOST}
port: ${REDIS_PORT}
password: ${REDIS_PASSWORD}

server:
port: 8080
forward-headers-strategy: framework
Expand Down
Loading