Skip to content

Commit 9c726af

Browse files
author
Kushagra Singh Bisen
committed
adds:the first version of the Notifications Cache Server
1 parent 5a04e8e commit 9c726af

12 files changed

+805
-69
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ typings/
6969
.yarn-integrity
7070

7171
# dotenv environment variables file
72-
.env
7372
.env.test
7473

7574
# parcel-bundler cache (https://parceljs.org/)

package-lock.json

Lines changed: 343 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"main": "dist/index.js",
66
"scripts": {
77
"build": "npx tsc",
8-
"start": "node dist/index.js",
8+
"start": "npx tsc && node --max-old-space-size=8192 dist/index.js cache-notifications",
99
"test": "jest --coverage",
1010
"lint:ts": "eslint . --ext ts --report-unused-disable-directives --max-warnings 0",
1111
"lint:ts:fix": "eslint . --ext ts --report-unused-disable-directives --max-warnings 0 --fix",
@@ -27,9 +27,13 @@
2727
"typescript": "^4.9.4"
2828
},
2929
"dependencies": {
30+
"@types/n3": "^1.16.4",
3031
"@types/redis": "^4.0.11",
3132
"@typescript-eslint/parser": "^7.1.0",
33+
"axios": "^1.6.7",
3234
"bunyan": "^1.8.15",
33-
"ioredis": "^5.3.2"
35+
"commander": "^12.0.0",
36+
"ioredis": "^5.3.2",
37+
"n3": "^1.17.2"
3438
}
3539
}

src/index.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { CacheServiceHTTPServer } from "./server/CacheServiceHTTPServer";
22
import * as bunyan from "bunyan";
33
import * as fs from 'fs';
4-
const program = require('commander');
4+
import { program } from "commander";
55
const log_file = fs.createWriteStream('./logs/info.log', { flags: 'a' });
66

77
const logger = bunyan.createLogger({
@@ -30,7 +30,9 @@ program
3030
.command('cache-notifications')
3131
.description('Starts the cache service for notifications from the LDES stream stored in the solid server(s).')
3232
.option('-p, --port <port>', 'The port where the HTTP server will listen.', '8085')
33-
.option('-l --ldes <ldes>', 'The location of the LDES Stream', 'http://localhost:3000/aggregation_pod/aggregation/')
33+
.option('-l --ldes <ldes>', 'The location of the LDES Stream', ['http://localhost:3000/aggregation_pod/aggregation/'])
3434
.action((options: any) => {
35-
new CacheServiceHTTPServer(options.port, options.pod, logger);
36-
});
35+
new CacheServiceHTTPServer(options.port, options.ldes, logger);
36+
});
37+
38+
program.parse(process.argv);

src/server/CacheServiceHTTPServer.ts

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import * as http from 'http';
2+
import * as url from 'url';
23
import { CacheService } from '../service/CacheService';
4+
import { SubscribeNotification } from '../service/SubscribeNotification';
35

46
/**
57
* A class for the HTTP server that interacts with the cache service to handle requests.
@@ -9,31 +11,46 @@ import { CacheService } from '../service/CacheService';
911
export class CacheServiceHTTPServer {
1012
private readonly cacheService: CacheService;
1113
private readonly server: http.Server;
12-
private pod_url: string;
14+
public logger: any;
15+
private subscription_notification: SubscribeNotification;
16+
private pod_url_array: string[];
17+
1318
/**
1419
* Creates an instance of CacheServiceHTTPServer.
1520
* @param {number} port - The port where the HTTP server will listen.
16-
* @param {string} pod_url - The location of the Solid Pod from which the notifications are retrieved.
21+
* @param {string[]} pod_url - The location of the Solid Pod from which the notifications are retrieved.
1722
* @param {*} logger - The logger object.
1823
* @memberof CacheServiceHTTPServer
1924
*/
20-
constructor(port: number, pod_url: string, logger: any) {
25+
constructor(port: number, pod_url: string[], logger: any) {
26+
this.logger = logger;
2127
this.cacheService = new CacheService();
22-
this.pod_url = pod_url;
28+
this.pod_url_array = pod_url;
29+
this.subscription_notification = new SubscribeNotification(this.pod_url_array);
2330
this.server = http.createServer(this.request_handler.bind(this));
24-
this.setupServer(port);
31+
this.setupServerAndSubscribe(port);
2532
}
2633
/**
2734
* Sets up the HTTP server where it listens on the specified port as well as connects to the cache service.
2835
* @private
2936
* @param {number} port - The port where the HTTP server will listen.
3037
* @memberof CacheServiceHTTPServer
3138
*/
32-
private async setupServer(port: number) {
33-
await this.cacheService.connect();
34-
this.server.listen(port, () => {
35-
console.log(`Server listening on port ${port}`);
36-
});
39+
private async setupServerAndSubscribe(port: number) {
40+
if (await this.cacheService.get_status() === "connecting") {
41+
const subscription_successful = await this.subscription_notification.subscribe();
42+
if(subscription_successful) {
43+
console.log(`Subscription was successful`);
44+
45+
}
46+
this.server.listen(port, () => {
47+
this.logger.info(`Server listening on port ${port}`);
48+
});
49+
}
50+
else {
51+
this.logger.error("Cache service is not connecting");
52+
await this.cacheService.connect();
53+
}
3754
}
3855
/**
3956
* Handles the requests to the HTTP server.
@@ -43,7 +60,7 @@ export class CacheServiceHTTPServer {
4360
* @returns {Promise<void>} - A promise which responses nothing.
4461
* @memberof CacheServiceHTTPServer
4562
*/
46-
private async request_handler(request: http.IncomingMessage, response: http.ServerResponse) {
63+
public async request_handler(request: http.IncomingMessage, response: http.ServerResponse) {
4764
if (request.method === 'POST') {
4865
await this.handleNotificationPostRequest(request, response);
4966
}
@@ -67,7 +84,35 @@ export class CacheServiceHTTPServer {
6784
* @memberof CacheServiceHTTPServer
6885
*/
6986
private async handleNotificationPostRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
70-
87+
let body = '';
88+
request.on('data', (chunk) => {
89+
body += chunk.toString();
90+
});
91+
request.on('end', async () => {
92+
try {
93+
const notification = JSON.parse(body);
94+
const published = (new Date(notification.published).getTime()).toString();
95+
const resource_location = notification.object;
96+
try {
97+
const resource_fetch_response = await fetch(resource_location, {
98+
method: 'GET',
99+
headers: {
100+
'Accept': 'text/turtle'
101+
}
102+
});
103+
this.logger.info("Resource fetched successfully");
104+
const response_text = await resource_fetch_response.text();
105+
this.cacheService.set(published, response_text);
106+
} catch (error) {
107+
this.logger.error("Error fetching the resource: " + error);
108+
}
109+
response.writeHead(200, 'OK');
110+
response.end('OK');
111+
} catch (error) {
112+
response.writeHead(400, 'Bad Request');
113+
response.end('Bad Request');
114+
}
115+
});
71116
}
72117
/**
73118
* Handles the GET requests to the HTTP server, which are requests from the clients to retrieve the notifications.
@@ -78,7 +123,13 @@ export class CacheServiceHTTPServer {
78123
* @memberof CacheServiceHTTPServer
79124
*/
80125
private async handleClientGetRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
81-
126+
this.logger.info(`GET request received for ${request.url}`)
127+
console.log(`GET request received for ${request.url}`);
128+
const parsed_url = url.parse(request.url!, true);
129+
const query_parameters = parsed_url.query;
130+
const event_time = query_parameters.event_time as string | undefined || 'Anonymous';
131+
response.writeHead(200, 'OK', { 'Content-Type': 'text/turtle' });
132+
response.end(await this.cacheService.get(event_time));
82133
}
83134
/**
84135
* Handles the DELETE requests to the HTTP server.
@@ -89,7 +140,12 @@ export class CacheServiceHTTPServer {
89140
* @memberof CacheServiceHTTPServer
90141
*/
91142
private async handleNotificationDeleteRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
92-
143+
const parsed_url = url.parse(request.url!, true);
144+
const query_parameters = parsed_url.query;
145+
const event_time = query_parameters.event_time as string | undefined || 'Anonymous';
146+
await this.cacheService.delete(event_time);
147+
response.writeHead(200, 'OK');
148+
response.end('OK');
93149
}
94150

95151

src/service/CacheService.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,9 @@ describe('CacheService', () => {
2828
const is_disconnected = await cacheService.disconnect();
2929
expect(is_disconnected).toBe(true);
3030
});
31+
32+
it('should_describe_the_cache', async() => {
33+
const status = await cacheService.get_status();
34+
expect(status).toBe('wait');
35+
})
3136
});

src/service/CacheService.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Redis } from "ioredis";
2+
import { RedisStatus } from "../utils/Types";
23
/**
34
* A service for interacting with the Redis cache.
45
* @class CacheService
@@ -79,5 +80,13 @@ export class CacheService {
7980
async delete(key: string) {
8081
await this.client.del(key);
8182
}
83+
/**
84+
* Get the status of the Redis cache.
85+
* @returns {Promise<RedisStatus>} - The current status of the Redis Cache, returned as a promise.
86+
* @memberof CacheService
87+
*/
88+
async get_status(): Promise<RedisStatus> {
89+
return await this.client.status;
90+
}
8291
}
8392

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import { SubscribeNotification } from './SubscribeNotification';
2+
import { extract_ldp_inbox, extract_subscription_server } from '../utils/Util';
3+
4+
jest.mock('../utils/Util', () => ({
5+
extract_ldp_inbox: jest.fn(),
6+
extract_subscription_server: jest.fn(),
7+
}));
8+
9+
describe('SubscribeNotification', () => {
10+
afterEach(() => {
11+
jest.clearAllMocks();
12+
});
13+
14+
it('should subscribe successfully', async () => {
15+
const streams = ['stream1', 'stream2'];
16+
const subscribeNotification = new SubscribeNotification(streams);
17+
18+
(extract_ldp_inbox as jest.Mock).mockResolvedValueOnce('inbox1');
19+
(extract_subscription_server as jest.Mock).mockResolvedValueOnce({
20+
location: 'http://subscription-server1',
21+
});
22+
23+
const fetchMock = jest.fn().mockResolvedValueOnce({ status: 200 });
24+
global.fetch = fetchMock;
25+
26+
const result = await subscribeNotification.subscribe();
27+
28+
expect(result).toBe(true);
29+
expect(extract_ldp_inbox).toHaveBeenCalledWith('stream1');
30+
expect(extract_subscription_server).toHaveBeenCalledWith('inbox1');
31+
expect(fetchMock).toHaveBeenCalledWith('http://subscription-server1', {
32+
method: 'POST',
33+
headers: {
34+
'Content-Type': 'application/ld+json',
35+
},
36+
body: JSON.stringify({
37+
'@context': ['https://www.w3.org/ns/solid/notification/v1'],
38+
type: 'http://www.w3.org/ns/solid/notifications#WebhookChannel2023',
39+
topic: 'inbox1',
40+
sendTo: 'http://localhost:8085/',
41+
}),
42+
});
43+
});
44+
45+
it('should throw an error if subscription server is undefined', async () => {
46+
const streams = ['stream1'];
47+
const subscribeNotification = new SubscribeNotification(streams);
48+
49+
(extract_ldp_inbox as jest.Mock).mockResolvedValueOnce('inbox1');
50+
(extract_subscription_server as jest.Mock).mockResolvedValueOnce(undefined);
51+
52+
await expect(subscribeNotification.subscribe()).rejects.toThrow(
53+
'Subscription server is undefined.'
54+
);
55+
56+
expect(extract_ldp_inbox).toHaveBeenCalledWith('stream1');
57+
expect(extract_subscription_server).toHaveBeenCalledWith('inbox1');
58+
});
59+
60+
it('should throw an error if subscription to the notification server fails', async () => {
61+
const streams = ['stream1'];
62+
const subscribeNotification = new SubscribeNotification(streams);
63+
64+
(extract_ldp_inbox as jest.Mock).mockResolvedValueOnce('inbox1');
65+
(extract_subscription_server as jest.Mock).mockResolvedValueOnce({
66+
location: 'http://subscription-server1',
67+
});
68+
69+
const fetchMock = jest.fn().mockResolvedValueOnce({ status: 500 });
70+
global.fetch = fetchMock;
71+
72+
await expect(subscribeNotification.subscribe()).rejects.toThrow(
73+
'The subscription to the notification server failed.'
74+
);
75+
76+
expect(extract_ldp_inbox).toHaveBeenCalledWith('stream1');
77+
expect(extract_subscription_server).toHaveBeenCalledWith('inbox1');
78+
expect(fetchMock).toHaveBeenCalledWith('http://subscription-server1', {
79+
method: 'POST',
80+
headers: {
81+
'Content-Type': 'application/ld+json',
82+
},
83+
body: JSON.stringify({
84+
'@context': ['https://www.w3.org/ns/solid/notification/v1'],
85+
type: 'http://www.w3.org/ns/solid/notifications#WebhookChannel2023',
86+
topic: 'inbox1',
87+
sendTo: 'http://localhost:8085/',
88+
}),
89+
});
90+
});
91+
});

src/service/SubscribeNotification.ts

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,51 @@
1-
export class SubscribeNotification {
2-
public solid_pod_urls: string[];
1+
import { extract_ldp_inbox, extract_subscription_server } from "../utils/Util";
32

4-
constructor(solid_pod_urls: string[]) {
5-
this.solid_pod_urls = solid_pod_urls;
3+
/**
4+
* This class is used to subscribe to the notification server for real-time notifications.
5+
* @class SubscribeNotification
6+
*/
7+
export class SubscribeNotification {
8+
private ldes_streams: string[];
9+
/**
10+
* Creates an instance of SubscribeNotification.
11+
* @param {string[]} streams - An array of LDES streams to subscribe to, for real-time notifications.
12+
* @memberof SubscribeNotification
13+
*/
14+
constructor(streams: string[]) {
15+
this.ldes_streams = streams;
616
}
717

8-
public async subscribe() {
9-
console.log(`Subscribing to notifications for ${this.solid_pod_urls}...`);
10-
18+
/**
19+
* Subscribes to the notification server for each LDES stream in the constructor, using the inbox and subscription server.
20+
* @returns {(Promise<boolean | undefined>)} - Returns a promise with a boolean or undefined. If the subscription is successful, it returns true. If the subscription fails, it throws an error.
21+
* @memberof SubscribeNotification
22+
*/
23+
public async subscribe(): Promise<boolean | undefined> {
24+
for (const stream of this.ldes_streams) {
25+
const inbox = await extract_ldp_inbox(stream) as string;
26+
const subscription_server = await extract_subscription_server(inbox);
27+
if (subscription_server === undefined) {
28+
throw new Error("Subscription server is undefined.");
29+
} else {
30+
const response = await fetch(subscription_server.location, {
31+
method: 'POST',
32+
headers: {
33+
'Content-Type': 'application/ld+json'
34+
},
35+
body: JSON.stringify({
36+
"@context": ["https://www.w3.org/ns/solid/notification/v1"],
37+
"type": "http://www.w3.org/ns/solid/notifications#WebhookChannel2023",
38+
"topic": inbox,
39+
"sendTo": "http://localhost:8085/"
40+
})
41+
});
42+
if (response.status === 200) {
43+
return true;
44+
}
45+
else {
46+
throw new Error("The subscription to the notification server failed.");
47+
}
48+
}
49+
}
1150
}
12-
13-
}
14-
15-
16-
/**
17-
* sending the server `http://localhost:3000/.notifications/WebhookChannel2023/`
18-
{
19-
"@context": [ "https://www.w3.org/ns/solid/notification/v1" ],
20-
"type": "http://www.w3.org/ns/solid/notifications#WebhookChannel2023",
21-
"topic": "http://localhost:3000/aggregation_pod/",
22-
"sendTo": "http://localhost:3001/"
2351
}
24-
*/

src/utils/Types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export type SubscriptionServerNotification = {
2+
location: string,
3+
channelType: string,
4+
channelLocation: string
5+
}
6+
7+
export type RedisStatus = "wait" | "reconnecting" | "connecting" | "connect" | "ready" | "close" | "end";

0 commit comments

Comments
 (0)