Skip to content

Commit bd9f3ac

Browse files
committed
Improve heartbeat
Check interval between ping messages more often (interval / 10). This is to prevent some configurations that may cause timeouts and connection close. For example, when timeout and interval are set to very close values and there was data message recived right after connection is established. Use one heartbeat interval in shared mode, add test. Close #255
1 parent c7b5a8e commit bd9f3ac

File tree

3 files changed

+65
-9
lines changed

3 files changed

+65
-9
lines changed

src/lib/attach-shared-listeners.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ const bindOpenHandler = (
4646
heartbeatOptions?: boolean | HeartbeatOptions
4747
) => {
4848
webSocketInstance.onopen = (event: WebSocketEventMap['open']) => {
49-
getSubscribers(url).forEach(subscriber => {
49+
const subscribers = getSubscribers(url);
50+
subscribers.forEach(subscriber => {
5051
subscriber.reconnectCount.current = 0;
5152
if (subscriber.optionsRef.current.onOpen) {
5253
subscriber.optionsRef.current.onOpen(event);
@@ -58,9 +59,11 @@ const bindOpenHandler = (
5859

5960
if (heartbeatOptions && webSocketInstance instanceof WebSocket) {
6061
subscriber.lastMessageTime.current = Date.now();
61-
heartbeat(webSocketInstance, subscriber.lastMessageTime, typeof heartbeatOptions === 'boolean' ? undefined : heartbeatOptions,);
6262
}
6363
});
64+
if (heartbeatOptions && webSocketInstance instanceof WebSocket) {
65+
heartbeat(webSocketInstance, subscribers.map(subscriber => subscriber.lastMessageTime), typeof heartbeatOptions === 'boolean' ? undefined : heartbeatOptions,);
66+
}
6467
};
6568
};
6669

src/lib/heartbeat.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,54 @@ import { MutableRefObject } from "react";
22
import { DEFAULT_HEARTBEAT } from "./constants";
33
import { HeartbeatOptions } from "./types";
44

5-
export function heartbeat(ws: WebSocket, lastMessageTime: MutableRefObject<number>, options?: HeartbeatOptions): () => void {
5+
function getLastMessageTime(lastMessageTime: MutableRefObject<number> | MutableRefObject<number>[]): number {
6+
if (Array.isArray(lastMessageTime)) {
7+
return lastMessageTime.reduce((p, c) => { return (p.current > c.current) ? p : c; }).current;
8+
}
9+
return lastMessageTime.current
10+
}
11+
12+
export function heartbeat(ws: WebSocket, lastMessageTime: MutableRefObject<number> | MutableRefObject<number>[], options?: HeartbeatOptions): () => void {
613
const {
714
interval = DEFAULT_HEARTBEAT.interval,
815
timeout = DEFAULT_HEARTBEAT.timeout,
916
message = DEFAULT_HEARTBEAT.message,
1017
} = options || {};
1118

19+
// how often check interval between ping messages
20+
// minimum is 100ms
21+
// maximum is ${interval / 10}ms
22+
const intervalCheck = Math.max(100, interval / 10);
23+
24+
let lastPingSentAt = Date.now();
25+
1226
const heartbeatInterval = setInterval(() => {
13-
if (lastMessageTime.current + timeout <= Date.now()) {
14-
console.warn(`Heartbeat timed out, closing connection, last message was seen ${Date.now() - lastMessageTime.current}ms ago`);
27+
const timeNow = Date.now();
28+
const lastMessageReceivedAt = getLastMessageTime(lastMessageTime);
29+
if (lastMessageReceivedAt + timeout <= timeNow) {
30+
console.warn(`Heartbeat timed out, closing connection, last message received ${timeNow - lastMessageReceivedAt}ms ago, last ping sent ${timeNow - lastPingSentAt}ms ago`);
1531
ws.close();
1632
} else {
17-
if (lastMessageTime.current + interval <= Date.now()) {
33+
if (lastMessageReceivedAt + interval <= timeNow && lastPingSentAt + interval <= timeNow) {
1834
try {
1935
if (typeof message === 'function') {
2036
ws.send(message());
2137
} else {
2238
ws.send(message);
2339
}
40+
lastPingSentAt = timeNow;
2441
} catch (err: unknown) {
2542
console.error(`Heartbeat failed, closing connection`, err instanceof Error ? err.message : err);
2643
ws.close();
2744
}
2845

2946
}
3047
}
31-
}, interval);
48+
}, intervalCheck);
3249

3350
ws.addEventListener("close", () => {
3451
clearInterval(heartbeatInterval);
3552
});
3653

37-
3854
return () => { };
3955
}

src/lib/use-websocket.test.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ test.each([false, true])('Options#heartbeat, if provided, close websocket if no
614614
options.heartbeat = {
615615
message: 'ping',
616616
timeout: 1000,
617-
interval: 500,
617+
interval: 400,
618618
};
619619
options.share = shareOption;
620620

@@ -627,6 +627,7 @@ test.each([false, true])('Options#heartbeat, if provided, close websocket if no
627627
}
628628
await server.connected;
629629
await sleep(1600);
630+
expect(server.messages).toEqual(['ping', 'ping'])
630631
expect(result.current.readyState).toBe(WebSocket.CLOSED);
631632
});
632633

@@ -682,4 +683,40 @@ test.each([false, true])('Options#heartbeat, if provided, lastMessage is updated
682683
expect(result.current.lastMessage?.data).toBe('ping');
683684
});
684685

686+
test.each([false, true])('Options#heartbeat, can handle case when interval is very close to timeout', async (shareOption) => {
687+
options.heartbeat = {
688+
message: "ping",
689+
returnMessage: "pong",
690+
timeout: 1000,
691+
interval: 800,
692+
};
693+
options.share = shareOption;
694+
695+
const { result } = renderHook(() => useWebSocket(URL, options));
696+
697+
if (shareOption) {
698+
renderHook(() => useWebSocket(URL, options));
699+
}
700+
701+
await server.connected;
702+
await sleep(50);
703+
expect(result.current.readyState).toEqual(ReadyState.OPEN);
704+
705+
result.current.sendMessage("token");
706+
await sleep(50);
707+
expect(server).toHaveReceivedMessages(["token"]);
708+
709+
server.send("authorized");
710+
await sleep(50);
711+
expect(result.current.lastMessage?.data).toEqual("authorized");
712+
713+
await sleep(850);
714+
// it does not matter is it shared mode or not, in both cases only one timer is used that means only one ping message is sent for all subscribers
715+
expect(server.messages).toEqual(["token", "ping"]);
716+
server.send("pong");
717+
718+
expect(result.current.readyState).toBe(WebSocket.OPEN);
719+
}
720+
);
721+
685722
// //TODO: Write companion tests for useSocketIO

0 commit comments

Comments
 (0)