-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.js
More file actions
24 lines (18 loc) · 711 Bytes
/
consumer.js
File metadata and controls
24 lines (18 loc) · 711 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const { kafka } = require("./client");
async function consumePollVotes(groupId) {
const consumer = kafka.consumer({ groupId: groupId });
await consumer.connect();
await consumer.subscribe({ topics: [`poll_${groupId}`], fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const { question, text } = JSON.parse(message.value.toString());
console.log(`GroupId:${groupId}, Topic:${question} voted:${text} Partiton:${partition}:`);
},
});
}
const pollId = process.argv[2];
if (!pollId) {
console.error("Poll ID is required to start the consumer.");
process.exit(1);
}
consumePollVotes(pollId);