@@ -24,71 +24,49 @@ public class SseService {
2424
2525 private final ObjectMapper objectMapper ; // jsonμΌλ‘ λ³ν μν¨
2626 private final Map <String , SseEmitter > emitters = new ConcurrentHashMap <>(); // λμμ± κ³ λ €
27- // κ° emitterλ³ ping μ€ν¨ νμλ₯Ό μ μ₯νλ λ§΅ (μ΅λ νμ© μ€ν¨ νμ: μμλ‘ 3ν)
28- private final Map <String , Integer > pingFailureCounts = new ConcurrentHashMap <>();
29- private static final int MAX_PING_FAILURE_COUNT = 3 ;
3027
3128 /* SSE ꡬλ
- (ν΄λΌμ΄μΈνΈκ° SSE μ°κ²°ν λ νΈμΆλμ΄ μλ¦Ό λ°μ μ€λΉ) */
3229 public SseEmitter subscribe (String sessionToken ) {
3330 // κΈ°μ‘΄ Emitterκ° μλ€λ©΄ μμ
3431 if (emitters .containsKey (sessionToken )) {
35- log .info ("κΈ°μ‘΄ SSE Emitterκ° μ‘΄μ¬ν©λλ€. κΈ°μ‘΄ emitter μμ ν λ€μ μμ± - sessionToken: {}" , sessionToken );
32+ log .info ("κΈ°μ‘΄ SSE Emitter μμ - sessionToken: {}" , sessionToken );
3633 emitters .remove (sessionToken );
34+ log .info ("κΈ°μ‘΄ SSE Emitterκ° μ‘΄μ¬ν©λλ€. - sessionToken: {}" , sessionToken );
3735 }
3836
3937 // emitter μμ±
4038 SseEmitter sseEmitter = new SseEmitter (60 * 60 * 1000L ); // 60λΆκ° μλ²μμ μ무κ²λ 보λ΄μ§ μμΌλ©΄ νμμμλλλ‘ μ€μ
4139 emitters .put (sessionToken , sseEmitter );
42- // μ΄κΈ° ping μ€ν¨ μΉ΄μ΄νΈ 0μΌλ‘ μ΄κΈ°ν
43- pingFailureCounts .put (sessionToken , 0 );
4440
4541 // μ°κ²° μ§μμ μν ping 보λ΄κΈ° (30μ΄λ§λ€)
4642 ScheduledExecutorService scheduler = Executors .newSingleThreadScheduledExecutor ();
4743 scheduler .scheduleAtFixedRate (() -> {
4844 try {
4945 sseEmitter .send (SseEmitter .event ().name ("ping" ).data ("μ°κ²° μ€λ¨ λ°©μ§μ© ping" ));
50- // μ±κ³΅μ μΌλ‘ ping μ μ‘λλ©΄ μ€ν¨ μΉ΄μ΄νΈλ₯Ό μ΄κΈ°ν
51- pingFailureCounts .put (sessionToken , 0 );
5246 } catch (IOException e ) {
53- int failCount = pingFailureCounts .getOrDefault (sessionToken , 0 );
54- failCount ++;
55- pingFailureCounts .put (sessionToken , failCount );
5647 log .info ("ping μ λ¬ μ€ν¨ μ¬μ : {}" , e .getMessage ());
5748 log .warn ("β ping failed - sessionToken: {}" , sessionToken );
58- // μ€ν¨ νμκ° MAX_PING_FAILURE_COUNT μ΄κ³Όνλ©΄ emitter μμ
59- if (failCount >= MAX_PING_FAILURE_COUNT ) {
60- log .warn ("β οΈ ping μ€ν¨ νμ μ΄κ³Ό - emitter μμ - sessionToken: {}" , sessionToken );
61- emitters .remove (sessionToken );
62- scheduler .shutdown (); // μ€μΌμ€λ¬ μ’
λ£
63- }
6449 }
6550 }, 30 , 30 , TimeUnit .SECONDS ); // 30μ΄λ§λ€
6651
6752 // ν λ² μμ²μ΄ λ€μ΄μ¨ ν μλμ λ‘μ§ λλ¬Έμ λμ΄μ μλ¦Όμ΄ μμ€λ λ¬Έμ λ°μ, μ μ μ£Όμ μ²λ¦¬
68- // emitter μ’
λ£ μ΄λ²€νΈ μ²λ¦¬ - ν΄λΌμ΄μΈνΈμ λΈλΌμ°μ κ° μ’
λ£λκ±°λ νμ΄μ§ μ΄λν κ²½μ°, .complete() κ° μ€νλ κ²½μ°
69- sseEmitter .onCompletion (() -> {
70- log .info ("emitter μμ : SSE Emitter μ μ μ’
λ£ - sessionToken: {}" , sessionToken );
71- emitters .remove (sessionToken );
72- pingFailureCounts .remove (sessionToken );
73- });
53+ // // μ¬μ©μμκ² λͺ¨λ λ°μ΄ν° μ μ‘λμλ€λ©΄ emitter μμ
54+ // sseEmitter.onCompletion(() -> {
55+ // log.info("emitter μμ : SSE Emitter μ μ μ’
λ£ - sessionToken: {}", sessionToken);
56+ // emitters.remove(sessionToken);
57+ // });
7458 // emitterμ μ ν¨μκ° λ§λ£μ emmitter μμ
7559 sseEmitter .onTimeout (() -> {
7660 log .info ("emitter μμ : SSE Emitter νμμμ - sessionToken: {}" , sessionToken );
7761 emitters .remove (sessionToken );
78- pingFailureCounts .remove (sessionToken );
79- });
80- sseEmitter .onError ((e ) -> {
81- log .warn ("π₯ SSE Emitter μλ¬ λ°μ - sessionToken: {}" , sessionToken );
82- emitters .remove (sessionToken );
83- pingFailureCounts .remove (sessionToken );
8462 });
8563
8664 // νμ¬ λ±λ‘λ Emitter κ°μ νμΈ
8765 log .info ("π νμ¬ μ μ₯λ SSE Emitter κ°μ: {}" , emitters .size ());
8866
8967 // 503 μλ¬ λ°©μ§λ₯Ό μν΄ μ΄κΈ° λλ―Έ λ°μ΄ν° μ μ‘
9068 try {
91- sseEmitter .send (SseEmitter .event ().name ("connect" ).data ("SSE ꡬλ
μ±κ³΅!" ). reconnectTime ( 1000 )); // 1μ΄ ν μ¬μλ
69+ sseEmitter .send (SseEmitter .event ().name ("connect" ).data ("SSE ꡬλ
μ±κ³΅!" ));
9270 } catch (IOException e ) {
9371 emitters .remove (sessionToken );
9472 log .error ("β SSE - λλ―Έ λ°μ΄ν° μ μ‘ μ€ν¨" );
@@ -100,9 +78,8 @@ public SseEmitter subscribe(String sessionToken) {
10078 @ Override
10179 public void run () {
10280 if (emitters .containsKey (sessionToken )) {
103- log .info ("SSE Emitter κ°μ μμ - 24μκ° μ§λ¨ - sessionToken: {}" , sessionToken );
81+ log .info ("SSE Emitter κ°μ μμ - sessionToken: {}" , sessionToken );
10482 emitters .remove (sessionToken );
105- pingFailureCounts .remove (sessionToken );
10683 }
10784 }
10885 }, 24 * 60 * 60 * 1000 ); // 24μκ° ν κ°μ μμ
@@ -133,9 +110,8 @@ public <T> void sendNotification(String sessionToken, T responseDto) {
133110 log .info ("β
SSE λ©μμ§ μ μ‘ μλ£!" );
134111
135112 } catch (Exception e ) {
136- log .error ("β SSE λ©μμ§ μ μ‘ μ€ν¨ - sessionToken: {} / μ΄μ : {}" , sessionToken , e .getMessage ());
113+ log .error ("β SSE λ©μμ§ μ μ‘ μ€ν¨ - sessionToken: {} - {}" , sessionToken , e .getMessage ());
137114 emitters .remove (sessionToken );
138- pingFailureCounts .remove (sessionToken );
139115 throw new CustomException (FAILED_TO_SEND_NOTICE , null );
140116 }
141117 }
@@ -165,8 +141,7 @@ public void shutdown() {
165141 List <SseEmitter > emitterList = new ArrayList <>(emitters .values ());
166142 for (SseEmitter emitter : emitterList ) {
167143 try {
168- emitter .send (SseEmitter .event ().name ("end" ).data ("μ°κ²° μ μ μ’
λ£" ));
169- emitter .complete (); // μ μ μ’
λ£ // emitter.onCompletion μ€νλ¨
144+ emitter .complete (); // μ μ μ’
λ£
170145 } catch (Exception e ) {
171146 log .error ("SSE μ’
λ£ μ€ μ€λ₯ λ°μ: {}" , e .getMessage ());
172147 }
0 commit comments