@@ -5,13 +5,13 @@ import { SubscribeNotification } from '../service/SubscribeNotification';
5
5
export class WebSocketServerHandler {
6
6
7
7
public websocket_server : any ;
8
- public websocket_connections : Map < string , WebSocket > ;
8
+ public websocket_connections : Map < string , WebSocket [ ] > ;
9
9
public subscribe_notification : SubscribeNotification ;
10
10
11
11
12
12
constructor ( websocket_server : WebSocket . server ) {
13
13
this . websocket_server = websocket_server ;
14
- this . websocket_connections = new Map < string , WebSocket > ( ) ;
14
+ this . websocket_connections = new Map < string , WebSocket [ ] > ( ) ;
15
15
this . subscribe_notification = new SubscribeNotification ( ) ;
16
16
}
17
17
@@ -23,16 +23,14 @@ export class WebSocketServerHandler {
23
23
24
24
this . websocket_server . on ( 'request' , ( request : any ) => {
25
25
const connection = request . accept ( 'solid-stream-notifications-aggregator' , request . origin ) ;
26
- connection . on ( 'message' , ( message : any ) => {
26
+ connection . on ( 'message' , async ( message : any ) => {
27
27
if ( message . type === 'utf8' ) {
28
28
const message_utf8 = message . utf8Data ;
29
29
const ws_message = JSON . parse ( message_utf8 ) ;
30
30
if ( Object . keys ( ws_message ) . includes ( 'subscribe' ) ) {
31
31
console . log ( `Received a subscribe message from the client.` ) ;
32
32
let stream_to_subscribe = ws_message . subscribe ;
33
33
for ( let stream of stream_to_subscribe ) {
34
- // We first subscribe to the latest inbox of the LDES stream.
35
- this . subscribe_notification . subscribe_inbox ( stream ) ;
36
34
console . log ( `Subscribed to the stream: ${ stream } ` ) ;
37
35
this . set_connections ( stream , connection ) ;
38
36
}
@@ -41,7 +39,13 @@ export class WebSocketServerHandler {
41
39
console . log ( `Received a new event message from the client.` ) ;
42
40
let connection = this . websocket_connections . get ( ws_message . stream ) ;
43
41
if ( connection !== undefined ) {
44
- connection . send ( JSON . stringify ( ws_message ) ) ;
42
+ for ( const [ stream , connections ] of this . websocket_connections ) {
43
+ if ( stream == ws_message . stream ) {
44
+ for ( let connection of connections ) {
45
+ connection . send ( JSON . stringify ( ws_message ) ) ;
46
+ }
47
+ }
48
+ }
45
49
}
46
50
}
47
51
else if ( Object . keys ( ws_message ) . includes ( 'container_location' ) ) {
@@ -59,7 +63,18 @@ export class WebSocketServerHandler {
59
63
} ) ;
60
64
}
61
65
62
- public set_connections ( subscribed_stream : string , connection : WebSocket ) : void {
63
- this . websocket_connections . set ( subscribed_stream , connection ) ;
66
+ public set_connections ( subscribed_stream : string , connection : WebSocket ) {
67
+ if ( ! this . websocket_connections . has ( subscribed_stream ) ) {
68
+ this . subscribe_notification . subscribe_inbox ( subscribed_stream ) ;
69
+ this . websocket_connections . set ( subscribed_stream , [ connection ] ) ;
70
+ }
71
+ else {
72
+ const connections = this . websocket_connections . get ( subscribed_stream ) ;
73
+ if ( connections !== undefined ) {
74
+ connections . push ( connection ) ;
75
+ this . websocket_connections . set ( subscribed_stream , connections ) ;
76
+ }
77
+
78
+ }
64
79
}
65
80
}
0 commit comments