Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c9cf87c
feat: 레디스와 의존성 설치
w0uldy0u Feb 13, 2026
7f5943b
feat: redis 연결 설정
w0uldy0u Feb 13, 2026
64318ba
feat: redis 템플릿 생성
w0uldy0u Feb 13, 2026
727194e
feat: RedisStreamStore 구현
w0uldy0u Feb 13, 2026
e6f0d33
chore: redis key 수정 및 ttl 단축
w0uldy0u Feb 14, 2026
9ad0cd0
feat: stage 환경 redis 추가
w0uldy0u Feb 14, 2026
f99814d
feat: update 프레임 확인 및 redis append
w0uldy0u Feb 14, 2026
7ebee7e
chore: 불필요한 어노테이션 제거
w0uldy0u Feb 14, 2026
89afedd
Merge branch 'dev' into feat/#356/use_redis_to_save_yjs_update
w0uldy0u Feb 14, 2026
558f681
test: 테스트 코드 작성
w0uldy0u Feb 14, 2026
1f94943
test: 테스트 displayName 작성
w0uldy0u Feb 14, 2026
4f32cbe
chore: liveness readiness 의존성 설정
w0uldy0u Feb 14, 2026
da01bd5
chore: healthcheck은 liveness로
w0uldy0u Feb 14, 2026
ebf4fe8
feat: Redis append를 별도 스레드로 분리
w0uldy0u Feb 14, 2026
b151b3e
Merge branch 'dev' into feat/#356/use_redis_to_save_yjs_update
w0uldy0u Feb 15, 2026
ca79f40
Merge branch 'dev' into chore/#366/auto_rollback_using_liveness
w0uldy0u Feb 15, 2026
0fb4fa5
chore: 리뷰 반영
w0uldy0u Feb 15, 2026
cca1218
Merge branch 'feat/#356/use_redis_to_save_yjs_update' of https://gith…
w0uldy0u Feb 15, 2026
6eec06c
Merge branch 'feat/#356/use_redis_to_save_yjs_update' into chore/#366…
w0uldy0u Feb 15, 2026
fecb16a
Merge branch 'dev' into chore/#366/auto_rollback_using_liveness
w0uldy0u Feb 15, 2026
b40ee42
Merge branch 'dev' into chore/#366/auto_rollback_using_liveness
w0uldy0u Feb 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/deploy-production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ jobs:
id: health_check
run: |
set -e
URL="https://api.episode.io.kr/actuator/health"
URL="https://api.episode.io.kr/actuator/health/liveness"

echo "Checking health:"
for i in $(seq 1 30); do
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/deploy-staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ jobs:
id: health_check
run: |
set -e
URL="https://stage.episode.io.kr/actuator/health"
URL="https://stage.episode.io.kr/actuator/health/liveness"

echo "Checking health:"
for i in $(seq 1 30); do
Expand Down
1 change: 1 addition & 0 deletions backend/api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'org.springframework:spring-webflux'
implementation 'com.github.f4b6a3:uuid-creator:6.0.0'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,40 @@
@Service
public class CollaborationService {
private final SessionRegistry sessionRegistry;
private final RedisStreamRepository redisStreamRepository;
private final RedisStreamStore redisStreamStore;

public void handleConnect(WebSocketSession session) {
sessionRegistry.addSession(getMindmapId(session), session);
}

public void processMessage(WebSocketSession sender, BinaryMessage message) {
ByteBuffer buffer = message.getPayload();
byte[] payload = new byte[buffer.remaining()];
buffer.get(payload);
UUID roomId = getMindmapId(sender);

sessionRegistry.broadcast(getMindmapId(sender), sender, payload);
byte[] payload = toByteArray(message.getPayload());

sessionRegistry.broadcast(roomId, sender, payload);

if (YjsProtocolUtil.isUpdateFrame(payload)) {
try {
redisStreamStore.appendUpdate(roomId, payload);
} catch (Exception e) {
log.error("Error while appending update frame to redis. roomId={}", roomId, e);
}
}
}

public void handleDisconnect(WebSocketSession session) {
//TODO: Collaboration room 세션 수 0일때 스냅샷 트리거
sessionRegistry.removeSession(getMindmapId(session), session);
}

private UUID getMindmapId(WebSocketSession session) {
return (UUID) session.getAttributes().get(AttributeKeys.MINDMAP_ID);
}

private byte[] toByteArray(ByteBuffer buffer) {
ByteBuffer dup = buffer.duplicate();
byte[] bytes = new byte[dup.remaining()];
dup.get(bytes);
return bytes;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.yat2.episode.collaboration;

import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;

@Service
@RequiredArgsConstructor
public class RedisStreamStore {

private final RedisTemplate<String, byte[]> redisBinaryTemplate;

private static final String FIELD_UPDATE = "u";

private String streamKey(UUID roomId) {
return "collab:room:" + roomId;
}

public RecordId appendUpdate(UUID roomId, byte[] update) {
String key = streamKey(roomId);

StreamOperations<String, String, byte[]> ops = redisBinaryTemplate.opsForStream();

MapRecord<String, String, byte[]> record =
StreamRecords.newRecord().in(key).ofMap(Map.of(FIELD_UPDATE, update));

RecordId id = ops.add(record);

redisBinaryTemplate.expire(key, Duration.ofDays(2));

return id;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.yat2.episode.collaboration;

import lombok.experimental.UtilityClass;

@UtilityClass
public final class YjsProtocolUtil {
public static final int MSG_SYNC = 0;

/* 추후 sync 라우팅을 위함 */
public static final int SYNC_STEP_1 = 0;
public static final int SYNC_STEP_2 = 1;
public static final int SYNC_UPDATE = 2;

public static boolean isUpdateFrame(byte[] payload) {
if (payload == null || payload.length < 3) {
return false;
}
return payload[0] == MSG_SYNC && payload[1] == SYNC_UPDATE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.yat2.episode.global.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Slf4j
@Configuration
public class RedisConfig {

@Bean
public RedisTemplate<String, byte[]> redisBinaryTemplate(
RedisConnectionFactory connectionFactory
) {
RedisTemplate<String, byte[]> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);

template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());

template.setValueSerializer(RedisSerializer.byteArray());
template.setHashValueSerializer(RedisSerializer.byteArray());

template.afterPropertiesSet();
return template;
}
}
11 changes: 11 additions & 0 deletions backend/api/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ spring:
resources:
add-mappings: false

data:
redis:
host: ${REDIS_HOST}
port: ${REDIS_PORT}
password: ${REDIS_PASSWORD}

server:
port: 8080
forward-headers-strategy: framework
Expand Down Expand Up @@ -116,6 +122,11 @@ management:
include: health
endpoint:
health:
group:
readiness:
include: readinessState,db,redis
liveness:
include: livenessState
show-details: never

collaboration:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package com.yat2.episode.collaboration;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.WebSocketSession;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import com.yat2.episode.global.constant.AttributeKeys;

import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
@DisplayName("CollaborationService 단위 테스트")
class CollaborationServiceTest {

@Mock
SessionRegistry sessionRegistry;

@Mock
RedisStreamStore redisStreamStore;

CollaborationService service;

@BeforeEach
void setUp() {
service = new CollaborationService(sessionRegistry, redisStreamStore);
}

@Nested
@DisplayName("세션 연결/해제")
class ConnectionTests {

@Test
@DisplayName("연결 시 room에 세션을 등록한다")
void handleConnect_addsSessionToRoom() {
UUID roomId = UUID.randomUUID();
WebSocketSession session = mock(WebSocketSession.class);

Map<String, Object> attrs = new HashMap<>();
attrs.put(AttributeKeys.MINDMAP_ID, roomId);
when(session.getAttributes()).thenReturn(attrs);

service.handleConnect(session);

verify(sessionRegistry).addSession(roomId, session);
verifyNoMoreInteractions(sessionRegistry);
}

@Test
@DisplayName("해제 시 room에서 세션을 제거한다")
void handleDisconnect_removesSessionFromRoom() {
UUID roomId = UUID.randomUUID();
WebSocketSession session = mock(WebSocketSession.class);

Map<String, Object> attrs = new HashMap<>();
attrs.put(AttributeKeys.MINDMAP_ID, roomId);
when(session.getAttributes()).thenReturn(attrs);

service.handleDisconnect(session);

verify(sessionRegistry).removeSession(roomId, session);
verifyNoMoreInteractions(sessionRegistry);
}
}

@Nested
@DisplayName("메시지 처리")
class MessageTests {

@Test
@DisplayName("항상 브로드캐스트한다")
void processMessage_alwaysBroadcasts() {
UUID roomId = UUID.randomUUID();
WebSocketSession sender = mock(WebSocketSession.class);

Map<String, Object> attrs = new HashMap<>();
attrs.put(AttributeKeys.MINDMAP_ID, roomId);
when(sender.getAttributes()).thenReturn(attrs);

byte[] frame = new byte[]{ 9, 9, 9 };
BinaryMessage message = new BinaryMessage(frame);

service.processMessage(sender, message);

ArgumentCaptor<byte[]> payloadCaptor = ArgumentCaptor.forClass(byte[].class);
verify(sessionRegistry).broadcast(eq(roomId), eq(sender), payloadCaptor.capture());

assertArrayEquals(frame, payloadCaptor.getValue());
verifyNoMoreInteractions(sessionRegistry);
}

@Test
@DisplayName("Update 프레임이면 Redis에 저장한다")
void processMessage_whenUpdateFrame_appendsToRedis() {
UUID roomId = UUID.randomUUID();
WebSocketSession sender = mock(WebSocketSession.class);

Map<String, Object> attrs = new HashMap<>();
attrs.put(AttributeKeys.MINDMAP_ID, roomId);
when(sender.getAttributes()).thenReturn(attrs);

byte[] frame = new byte[]{ 0, 2, 1, 2, 3, 4 };
BinaryMessage message = new BinaryMessage(frame);

service.processMessage(sender, message);

ArgumentCaptor<byte[]> broadcastCaptor = ArgumentCaptor.forClass(byte[].class);
verify(sessionRegistry).broadcast(eq(roomId), eq(sender), broadcastCaptor.capture());
assertArrayEquals(frame, broadcastCaptor.getValue());

ArgumentCaptor<byte[]> redisCaptor = ArgumentCaptor.forClass(byte[].class);
verify(redisStreamStore).appendUpdate(eq(roomId), redisCaptor.capture());
assertArrayEquals(frame, redisCaptor.getValue());
}

@Test
@DisplayName("Update 프레임이 아니면 Redis에 저장하지 않는다")
void processMessage_whenNotUpdateFrame_doesNotAppendToRedis() {
UUID roomId = UUID.randomUUID();
WebSocketSession sender = mock(WebSocketSession.class);

Map<String, Object> attrs = new HashMap<>();
attrs.put(AttributeKeys.MINDMAP_ID, roomId);
when(sender.getAttributes()).thenReturn(attrs);

byte[] frame = new byte[]{ 0, 1, 9, 9 };
BinaryMessage message = new BinaryMessage(frame);

service.processMessage(sender, message);

verify(sessionRegistry).broadcast(eq(roomId), eq(sender), any(byte[].class));
verifyNoInteractions(redisStreamStore);
}

@Test
@DisplayName("Redis 저장 중 예외가 발생해도 처리 흐름이 죽지 않는다")
void processMessage_whenRedisThrows_doesNotCrash() {
UUID roomId = UUID.randomUUID();
WebSocketSession sender = mock(WebSocketSession.class);

Map<String, Object> attrs = new HashMap<>();
attrs.put(AttributeKeys.MINDMAP_ID, roomId);
when(sender.getAttributes()).thenReturn(attrs);

byte[] frame = new byte[]{ 0, 2, 1, 2, 3 };
BinaryMessage message = new BinaryMessage(frame);

doThrow(new RuntimeException("redis down")).when(redisStreamStore)
.appendUpdate(eq(roomId), any(byte[].class));

assertThatCode(() -> service.processMessage(sender, message)).doesNotThrowAnyException();

verify(sessionRegistry).broadcast(eq(roomId), eq(sender), any(byte[].class));
verify(redisStreamStore).appendUpdate(eq(roomId), any(byte[].class));
}
}
}
Loading