-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.js
82 lines (65 loc) · 2.3 KB
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// consumer.js
const { Kafka } = require('kafkajs');
const mongoose = require('mongoose');
require('dotenv').config();
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID,
brokers: [process.env.KAFKA_BROKER]
});
const consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID });
const topic = 'location';
// Mongoose setup
const mongoUrl = process.env.MONGO_URL;
mongoose.connect(mongoUrl, { useNewUrlParser: true, useUnifiedTopology: true })
.then(() => console.log('Connected to MongoDB'))
.catch(err => console.error('Failed to connect to MongoDB', err));
const locationSchema = new mongoose.Schema({
latitude: Number,
longitude: Number
});
const Location = mongoose.model('Location', locationSchema);
// Buffer to store messages
const buffer = [];
const bufferSize = process.env.MAX_DATA_LENGTH_BUFFER;
const runConsumer = async () => {
await consumer.connect();
await consumer.subscribe({ topic });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const location = JSON.parse(message.value.toString());
console.log(`Received message: ${JSON.stringify(location)}`);
buffer.push(location);
// Check if the buffer has reached the desired size
if (buffer.length >= bufferSize) {
try {
await Location.insertMany(buffer);
console.log('Documents inserted into MongoDB');
// Clear the buffer after successful insert
buffer.length = 0;
buffer.length = 0;
// Manually commit offsets
await consumer.commitOffsets([
{ topic, partition, offset: (parseInt(message.offset, 10) + 1).toString() }
]);
console.log('Offsets committed');
} catch (err) {
console.error('Failed to insert documents into MongoDB', err);
}
}
},
});
};
const disconnectConsumer = async () => {
// Insert any remaining messages in the buffer before disconnecting
if (buffer.length > 0) {
try {
await Location.insertMany(buffer);
console.log('Documents inserted into MongoDB');
} catch (err) {
console.error('Failed to insert remaining documents into MongoDB', err);
}
}
await consumer.disconnect();
await mongoose.disconnect();
};
module.exports = { runConsumer, disconnectConsumer };