diff --git a/src/lib/attach-shared-listeners.ts b/src/lib/attach-shared-listeners.ts index 3d8585c..48d2420 100644 --- a/src/lib/attach-shared-listeners.ts +++ b/src/lib/attach-shared-listeners.ts @@ -46,7 +46,8 @@ const bindOpenHandler = ( heartbeatOptions?: boolean | HeartbeatOptions ) => { webSocketInstance.onopen = (event: WebSocketEventMap['open']) => { - getSubscribers(url).forEach(subscriber => { + const subscribers = getSubscribers(url); + subscribers.forEach(subscriber => { subscriber.reconnectCount.current = 0; if (subscriber.optionsRef.current.onOpen) { subscriber.optionsRef.current.onOpen(event); @@ -58,9 +59,11 @@ const bindOpenHandler = ( if (heartbeatOptions && webSocketInstance instanceof WebSocket) { subscriber.lastMessageTime.current = Date.now(); - heartbeat(webSocketInstance, subscriber.lastMessageTime, typeof heartbeatOptions === 'boolean' ? undefined : heartbeatOptions,); } }); + if (heartbeatOptions && webSocketInstance instanceof WebSocket) { + heartbeat(webSocketInstance, subscribers.map(subscriber => subscriber.lastMessageTime), typeof heartbeatOptions === 'boolean' ? undefined : heartbeatOptions,); + } }; }; diff --git a/src/lib/heartbeat.ts b/src/lib/heartbeat.ts index 098cd05..0b6bc48 100644 --- a/src/lib/heartbeat.ts +++ b/src/lib/heartbeat.ts @@ -2,25 +2,42 @@ import { MutableRefObject } from "react"; import { DEFAULT_HEARTBEAT } from "./constants"; import { HeartbeatOptions } from "./types"; -export function heartbeat(ws: WebSocket, lastMessageTime: MutableRefObject, options?: HeartbeatOptions): () => void { +function getLastMessageTime(lastMessageTime: MutableRefObject | MutableRefObject[]): number { + if (Array.isArray(lastMessageTime)) { + return lastMessageTime.reduce((p, c) => { return (p.current > c.current) ? p : c; }).current; + } + return lastMessageTime.current +} + +export function heartbeat(ws: WebSocket, lastMessageTime: MutableRefObject | MutableRefObject[], options?: HeartbeatOptions): () => void { const { interval = DEFAULT_HEARTBEAT.interval, timeout = DEFAULT_HEARTBEAT.timeout, message = DEFAULT_HEARTBEAT.message, } = options || {}; + // how often check interval between ping messages + // minimum is 100ms + // maximum is ${interval / 10}ms + const intervalCheck = Math.max(100, interval / 10); + + let lastPingSentAt = Date.now(); + const heartbeatInterval = setInterval(() => { - if (lastMessageTime.current + timeout <= Date.now()) { - console.warn(`Heartbeat timed out, closing connection, last message was seen ${Date.now() - lastMessageTime.current}ms ago`); + const timeNow = Date.now(); + const lastMessageReceivedAt = getLastMessageTime(lastMessageTime); + if (lastMessageReceivedAt + timeout <= timeNow) { + console.warn(`Heartbeat timed out, closing connection, last message received ${timeNow - lastMessageReceivedAt}ms ago, last ping sent ${timeNow - lastPingSentAt}ms ago`); ws.close(); } else { - if (lastMessageTime.current + interval <= Date.now()) { + if (lastMessageReceivedAt + interval <= timeNow && lastPingSentAt + interval <= timeNow) { try { if (typeof message === 'function') { ws.send(message()); } else { ws.send(message); } + lastPingSentAt = timeNow; } catch (err: unknown) { console.error(`Heartbeat failed, closing connection`, err instanceof Error ? err.message : err); ws.close(); @@ -28,12 +45,11 @@ export function heartbeat(ws: WebSocket, lastMessageTime: MutableRefObject { clearInterval(heartbeatInterval); }); - return () => { }; } diff --git a/src/lib/use-websocket.test.ts b/src/lib/use-websocket.test.ts index 1e79cea..5ccf676 100644 --- a/src/lib/use-websocket.test.ts +++ b/src/lib/use-websocket.test.ts @@ -614,7 +614,7 @@ test.each([false, true])('Options#heartbeat, if provided, close websocket if no options.heartbeat = { message: 'ping', timeout: 1000, - interval: 500, + interval: 400, }; options.share = shareOption; @@ -627,6 +627,7 @@ test.each([false, true])('Options#heartbeat, if provided, close websocket if no } await server.connected; await sleep(1600); + expect(server.messages).toEqual(['ping', 'ping']) expect(result.current.readyState).toBe(WebSocket.CLOSED); }); @@ -682,4 +683,40 @@ test.each([false, true])('Options#heartbeat, if provided, lastMessage is updated expect(result.current.lastMessage?.data).toBe('ping'); }); +test.each([false, true])('Options#heartbeat, can handle case when interval is very close to timeout', async (shareOption) => { + options.heartbeat = { + message: "ping", + returnMessage: "pong", + timeout: 1000, + interval: 800, + }; + options.share = shareOption; + + const { result } = renderHook(() => useWebSocket(URL, options)); + + if (shareOption) { + renderHook(() => useWebSocket(URL, options)); + } + + await server.connected; + await sleep(50); + expect(result.current.readyState).toEqual(ReadyState.OPEN); + + result.current.sendMessage("token"); + await sleep(50); + expect(server).toHaveReceivedMessages(["token"]); + + server.send("authorized"); + await sleep(50); + expect(result.current.lastMessage?.data).toEqual("authorized"); + + await sleep(850); + // it does not matter is it shared mode or not, in both cases only one timer is used that means only one ping message is sent for all subscribers + expect(server.messages).toEqual(["token", "ping"]); + server.send("pong"); + + expect(result.current.readyState).toBe(WebSocket.OPEN); +} +); + // //TODO: Write companion tests for useSocketIO