-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathtopic.ts
100 lines (89 loc) · 2.39 KB
/
topic.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import Subscription from './subscription';
import { PubArg } from './utils';
/**
* This class is responsible for creating Topic object and,
* managing SUB, UNSUB and PUB commands for this topic.
*
* @export
* @class Topic
*/
export default class Topic {
/**
* The subject of the Topic.
*
* @private
* @type {string}
*/
private subject: string;
/**
* A set of subscriptions that are subscribed to this topic.
*
* @private
* @type {Set<Subscription>}
*/
private subscriptions: Set<Subscription>;
constructor(subject: string, subscriptions?: Set<Subscription>) {
this.subject = subject;
this.subscriptions = subscriptions ?? new Set<Subscription>();
}
/**
* Adds the given subscription to this topic.
*
* @param {Subscription} subscription
*/
sub(subscription: Subscription) {
this.subscriptions.add(subscription);
}
/**
* Removed the given subscription from this topic.
*
* @param {Subscription} subscription
*/
unsub(subscription: Subscription) {
this.subscriptions.delete(subscription);
}
/**
* This async method publishes the message to all the subscribed clients.
*
* @async
* @param {PubArg} pubArg
* @returns {Promise<void>}
*/
async publish(pubArg: PubArg): Promise<void> {
const promises: Promise<void>[] = [];
// Data before the `sid` goes in prefix
// This is same for all the messages during this publish call.
const prefix: Buffer = Buffer.concat([
Buffer.from('MSG '),
Buffer.from(this.subject + ' ')
]);
// Data after `sid` goes in suffix.
// This is same for all the messages during this publish call.
let suffix: Buffer;
if (pubArg.payload) {
suffix = Buffer.concat([
Buffer.from(pubArg.payloadSize.toString(10) + '\r\n'),
pubArg.payload,
Buffer.from('\r\n')
]);
} else {
suffix = Buffer.from('0\r\n\r\n');
}
this.subscriptions.forEach((subscription) => {
promises.push(
new Promise<void>((res) => {
// Prepare final message
const buffer = Buffer.concat([
prefix,
Buffer.from(subscription.sid.toString(10) + ' '),
suffix
]);
// Send the message and complete the promise
subscription.client.socket.write(buffer);
res();
})
);
});
await Promise.all(promises);
}
}