-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsilly-wizard.js
127 lines (99 loc) · 2.86 KB
/
silly-wizard.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
const {isIP} = require('net');
const readline = require('node:readline');
const {Kafka, Partitioners, logLevel} = require('kafkajs');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
const kafka = new Kafka({
clientId: 'silly-wizard',
brokers: ['ec2-15-228-13-15.sa-east-1.compute.amazonaws.com:9094'],
logLevel: logLevel.NOTHING,
});
const messages = [];
const inputs = [];
function handleIp(kafka) {
return (ip) => {
const trimmedIp = ip.trim();
const isIp = isIP(trimmedIp);
if (isIp) {
const message = {
ip: ip,
clientId: 'silly-wizard',
timestamp: Date.now(),
};
produce(kafka.producer, JSON.stringify(message)).catch(console.error);
inputs.push(message);
}
broadcastMessages(isIp, kafka);
};
}
function askForIp(validIpInput, kafka) {
if (!validIpInput) {
console.log('> Must be a valid IP.');
}
rl.question('> Insert an IP: ', handleIp(kafka));
}
function broadcastMessages(validIpInput, kafka) {
console.clear();
console.log('> Inputs:');
inputs.forEach((input) => {
const {ip, clientId, timestamp} = input;
console.log(`— IP: ${ip} | client id: ${clientId} | timestamp: ${timestamp}`);
});
console.log('\n');
console.log('> Broadcast:');
messages.forEach((message) => {
const {ip, city, latitude, longitude, country, region, clientId, timestamp} = JSON.parse(message);
console.log(`— IP: ${ip} | country: ${country} | region: ${region} | city: ${city} | latitude: ${latitude} | longitude: ${longitude} | client id: ${clientId} | timestamp: ${timestamp}`);
});
console.log('\n');
askForIp(validIpInput, kafka);
}
async function produce(producer, message) {
await producer.connect();
await producer.send({
topic: 'location_input',
messages: [
{value: message},
],
});
}
async function main() {
let loadingMessage = 'Waiting wizard to initialize';
const initialLength = loadingMessage.length;
let waitingCount = 0;
const interval = setInterval(() => {
console.clear();
if (initialLength + 3 <= loadingMessage.length) {
loadingMessage = loadingMessage.slice(0, -3);
} else {
loadingMessage += '.';
}
waitingCount += 1;
console.log(loadingMessage);
if (waitingCount > 24) {
console.log('Taking longer than usual...');
}
}, 250);
const producer = kafka.producer({createPartitioner: Partitioners.LegacyPartitioner});
const consumer = kafka.consumer({groupId: 'reviewers'});
const topicManagers = {
producer,
consumer,
};
await consumer.connect();
await consumer.subscribe({topic: 'location_output', fromBeginning: false});
await consumer.run({
eachMessage: async ({message}) => {
messages.push(message.value.toString());
broadcastMessages(true, topicManagers);
},
}).then(() => {
producer.connect().then(() => {
clearInterval(interval);
broadcastMessages(true, topicManagers);
});
});
}
main().catch(console.error);