Skip to content

Commit 15f1b69

Browse files
author
Kush Bisen
authored
Merge pull request #17 from SolidLabResearch/15-add-support-to-monitor-the-ldes-stream-as-well-as-the-inbox-incase-new-container-is-created-and-the-inbox-is-updated
fixes #15
2 parents 74685d6 + 265b0c3 commit 15f1b69

File tree

3 files changed

+127
-47
lines changed

3 files changed

+127
-47
lines changed

src/server/NotificationServiceHTTPServer.ts

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -97,26 +97,32 @@ export class NotificationServiceHTTPServer {
9797
const stream = notification.target.replace(/\/\d+\/$/, '/');
9898
const key = `stream:${stream}:${published_time}`;
9999
const resource_location = notification.object;
100-
try {
101-
const resource_fetch_response = await fetch(resource_location, {
102-
method: 'GET',
103-
headers: {
104-
'Accept': 'text/turtle'
105-
}
106-
});
107-
this.logger.info("Resource fetched successfully");
108-
const response_text = await resource_fetch_response.text();
109-
// set the response in the cache, with the key as the LDES stream and the published time.
110-
// set the time to live for the cache to 60 seconds.
111-
console.log("Setting the response in the cache");
112-
await this.cacheService.set(key, response_text);
113-
await this.cacheService.setTimeToLive(key, 60);
114-
// TODO: notify the clients that the resource has been updated by first notifying the websocket server.
115-
const parsed_notification = JSON.stringify({ "stream": stream, "published_time": published_time, "event": response_text });
100+
if (this.check_if_container(resource_location) === true) {
101+
const parsed_notification = JSON.stringify({ "stream": stream, "published_time": published_time, "container_location": resource_location });
116102
this.send_to_websocket_server(parsed_notification);
117-
118-
} catch (error) {
119-
this.logger.error("Error fetching the resource: " + error);
103+
}
104+
else if (this.check_if_container(resource_location) === false) {
105+
try {
106+
const resource_fetch_response = await fetch(resource_location, {
107+
method: 'GET',
108+
headers: {
109+
'Accept': 'text/turtle'
110+
}
111+
});
112+
this.logger.info("Resource fetched successfully");
113+
const response_text = await resource_fetch_response.text();
114+
// set the response in the cache, with the key as the LDES stream and the published time.
115+
// set the time to live for the cache to 60 seconds.
116+
console.log("Setting the response in the cache");
117+
await this.cacheService.set(key, response_text);
118+
await this.cacheService.setTimeToLive(key, 60);
119+
const parsed_notification = JSON.stringify({ "stream": stream, "published_time": published_time, "event": response_text });
120+
this.send_to_websocket_server(parsed_notification);
121+
} catch (error) {
122+
this.logger.error("Error fetching the resource: " + error)
123+
}
124+
} else {
125+
throw new Error("The resource location is neither a container nor a resource. This SHOULD NOT happen.");
120126
}
121127
response.writeHead(200, 'OK');
122128
response.end('OK');
@@ -187,4 +193,13 @@ export class NotificationServiceHTTPServer {
187193
this.logger.error(`Connection to the WebSocket server failed:`, error);
188194
});
189195
}
196+
197+
public check_if_container(resource_location: string): boolean {
198+
if (resource_location.endsWith('/')) {
199+
return true;
200+
}
201+
else {
202+
return false;
203+
}
204+
}
190205
}

src/server/WebSocketServerHandler.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,30 @@ export class WebSocketServerHandler {
3030
if (Object.keys(ws_message).includes('subscribe')) {
3131
console.log(`Received a subscribe message from the client.`);
3232
let stream_to_subscribe = ws_message.subscribe;
33-
for (let stream of stream_to_subscribe){
34-
this.subscribe_notification.subscribe(stream);
33+
for (let stream of stream_to_subscribe) {
34+
// We first subscribe to the latest inbox of the LDES stream.
35+
this.subscribe_notification.subscribe_inbox(stream);
3536
console.log(`Subscribed to the stream: ${stream}`);
3637
this.set_connections(stream, connection);
3738
}
3839
}
3940
else if (Object.keys(ws_message).includes('event')) {
41+
console.log(`Received a new event message from the client.`);
4042
let connection = this.websocket_connections.get(ws_message.stream);
4143
if (connection !== undefined) {
4244
connection.send(JSON.stringify(ws_message));
4345
}
4446
}
47+
else if (Object.keys(ws_message).includes('container_location')) {
48+
console.log(`Received a new inbox container location message from the client.`);
49+
const inbox_container_location = ws_message.container_location;
50+
this.subscribe_notification.subscribe_inbox(inbox_container_location);
51+
console.log(`Subscribed to the inbox container location: ${inbox_container_location}`);
52+
}
53+
else {
54+
console.log(`Received an unknown message from the client with the following content: ${message_utf8}`);
55+
console.log(`The message is not recognized and supported by the Solid Stream Notifications Aggregator.`);
56+
}
4557
}
4658
});
4759
});

src/service/SubscribeNotification.ts

Lines changed: 79 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { extract_ldp_inbox, extract_subscription_server } from "../utils/Util";
2+
import * as WebSocket from 'websocket';
23

34
/**
45
* This class is used to subscribe to the notification server for real-time notifications.
@@ -7,42 +8,94 @@ import { extract_ldp_inbox, extract_subscription_server } from "../utils/Util";
78
export class SubscribeNotification {
89
/**
910
* Creates an instance of SubscribeNotification.
10-
* @param {string[]} streams - An array of LDES streams to subscribe to, for real-time notifications.
1111
* @memberof SubscribeNotification
1212
*/
1313
constructor() {
1414
}
1515

1616
/**
1717
* Subscribes to the notification server for each LDES stream in the constructor, using the inbox and subscription server.
18+
* @param {string} ldes_stream - The LDES stream to subscribe to.
1819
* @returns {(Promise<boolean | undefined>)} - Returns a promise with a boolean or undefined. If the subscription is successful, it returns true. If the subscription fails, it throws an error.
1920
* @memberof SubscribeNotification
2021
*/
21-
public async subscribe(ldes_stream: string): Promise<boolean | undefined> {
22-
const inbox = await extract_ldp_inbox(ldes_stream) as string;
23-
const subscription_server = await extract_subscription_server(inbox);
24-
if (subscription_server === undefined) {
25-
throw new Error("Subscription server is undefined.");
26-
} else {
27-
const response = await fetch(subscription_server.location, {
28-
method: 'POST',
29-
headers: {
30-
'Content-Type': 'application/ld+json'
31-
},
32-
body: JSON.stringify({
33-
"@context": ["https://www.w3.org/ns/solid/notification/v1"],
34-
"type": "http://www.w3.org/ns/solid/notifications#WebhookChannel2023",
35-
"topic": inbox,
36-
"sendTo": "http://localhost:8085/"
37-
})
38-
});
39-
if (response.status === 200) {
40-
return true;
41-
}
42-
else {
43-
throw new Error("The subscription to the notification server failed.");
44-
}
22+
public async subscribe_stream(ldes_stream: string): Promise<boolean | undefined> {
23+
const inbox = await extract_ldp_inbox(ldes_stream) as string;
24+
const subscription_server = await extract_subscription_server(inbox);
25+
if (subscription_server === undefined) {
26+
throw new Error("Subscription server is undefined.");
27+
} else {
28+
const response_subscribe_ldes_stream = await fetch(subscription_server.location, {
29+
method: 'POST',
30+
headers: {
31+
'Content-Type': 'application/ld+json'
32+
},
33+
body: JSON.stringify({
34+
"@context": ["https://www.w3.org/ns/solid/notification/v1"],
35+
"type": "http://www.w3.org/ns/solid/notifications#WebhookChannel2023",
36+
"topic": ldes_stream,
37+
"sendTo": "http://localhost:8085/"
38+
})
39+
});
40+
if (response_subscribe_ldes_stream.status === 200) {
41+
return true;
4542
}
46-
43+
else {
44+
throw new Error("The subscription to the notification server failed.");
45+
}
46+
}
47+
}
48+
49+
public async subscribe_inbox(inbox_location:string): Promise<boolean | undefined> {
50+
const subscription_server = await extract_subscription_server(inbox_location);
51+
if (subscription_server === undefined) {
52+
throw new Error("Subscription server is undefined.");
53+
} else {
54+
const response_subscribe_ldes_stream = await fetch(subscription_server.location, {
55+
method: 'POST',
56+
headers: {
57+
'Content-Type': 'application/ld+json'
58+
},
59+
body: JSON.stringify({
60+
"@context": ["https://www.w3.org/ns/solid/notification/v1"],
61+
"type": "http://www.w3.org/ns/solid/notifications#WebhookChannel2023",
62+
"topic": inbox_location,
63+
"sendTo": "http://localhost:8085/"
64+
})
65+
});
66+
if (response_subscribe_ldes_stream.status === 200) {
67+
return true;
68+
}
69+
else {
70+
throw new Error("The subscription to the notification server failed.");
71+
}
72+
}
73+
}
74+
75+
/**
76+
* Checks if the LDES stream is already subscribed to the notification server.
77+
* @param {string} ldes_stream - The LDES stream to check if it is already subscribed.
78+
* @param {Map<string, WebSocket[]>} websocket_connections - The WebSocket connections.
79+
* @returns {Promise<boolean>} - Returns a promise with a boolean. If the LDES stream is already subscribed, it returns true. If the LDES stream is not subscribed, it returns false.
80+
* @memberof SubscribeNotification
81+
*/
82+
public async check_if_aleady_subscribed(ldes_stream: string, websocket_connections: Map<string, WebSocket[]>): Promise<boolean> {
83+
if (websocket_connections.has(ldes_stream)) {
84+
return true;
85+
}
86+
else {
87+
return false;
88+
}
89+
}
90+
91+
/**
92+
* Sets the connections for the WebSocket server's Map.
93+
* @param {string} ldes_stream - The LDES stream to subscribe to.
94+
* @param {Map<string, WebSocket>} websocket_connections - The WebSocket connections.
95+
* @returns {(Promise<WebSocket | undefined>)} - Returns a promise with a WebSocket or undefined. If the connection is set, it returns the WebSocket connection. If the connection is not set, it returns undefined.
96+
* @memberof SubscribeNotification
97+
*/
98+
public async get_connection(ldes_stream: string, websocket_connections: Map<string, WebSocket>): Promise<WebSocket | undefined> {
99+
return websocket_connections.get(ldes_stream);
47100
}
48101
}

0 commit comments

Comments
 (0)