Commit inicial

This commit is contained in:
Luiz F Picolo 2023-04-12 13:42:55 -04:00
commit 82fb88a4e7
9 changed files with 185 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
node_modules

40
consumer.js Normal file
View File

@ -0,0 +1,40 @@
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
//const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
// // Producing
// await producer.connect()
// await producer.send({
// topic: 'test-topic',
// messages: [
// { value: 'Hello KafkaJS user!' },
// ],
// })
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: process.env.TOPIC, fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const obj = JSON.parse(message.value)
console.log('Message consumer successfully!');
console.log(obj.name)
// console.log({
// partition,
// offset: message.offset,
// value: Json.parse(message.value),
// })
},
})
}
run()

28
create-topic.js Normal file
View File

@ -0,0 +1,28 @@
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const admin = kafka.admin();
async function createTopic() {
try {
await admin.connect();
await admin.createTopics({
topics: [{
topic: 'my-topic-1',
numPartitions: 2,
replicationFactor: 1
}]
});
console.log('Topic created successfully!');
} catch (error) {
console.error('Error creating topic:', error);
} finally {
await admin.disconnect();
}
}
createTopic();

21
list-topic.js Normal file
View File

@ -0,0 +1,21 @@
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const admin = kafka.admin();
async function listTopic() {
try {
const list = await admin.listTopics()
console.log(list);
} catch (error) {
console.error('Error creating topic:', error);
} finally {
await admin.disconnect();
}
}
listTopic();

27
package-lock.json generated Normal file
View File

@ -0,0 +1,27 @@
{
"name": "kafkajs",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"dependencies": {
"kafkajs": "^2.2.2"
}
},
"node_modules/kafkajs": {
"version": "2.2.4",
"resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz",
"integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==",
"engines": {
"node": ">=14.0.0"
}
}
},
"dependencies": {
"kafkajs": {
"version": "2.2.4",
"resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz",
"integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA=="
}
}
}

5
package.json Normal file
View File

@ -0,0 +1,5 @@
{
"dependencies": {
"kafkajs": "^2.2.2"
}
}

50
producer.js Normal file
View File

@ -0,0 +1,50 @@
const { Kafka, Partitioners } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner });
const run = async () => {
const obj = {
id: 1,
name: 'John 1',
age: 30
};
// Producing
try {
await producer.connect()
await producer.send({
topic: process.env.TOPIC,
messages: [
{ value: JSON.stringify(obj) },
],
})
console.log('Message sent successfully!');
} catch (error) {
console.error('Error sending message:', error);
} finally {
await producer.disconnect();
}
// Consuming
// await consumer.connect()
// await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
// await consumer.run({
// eachMessage: async ({ topic, partition, message }) => {
// console.log({
// partition,
// offset: message.offset,
// value: message.value.toString(),
// })
// },
// })
}
run()

5
start Normal file
View File

@ -0,0 +1,5 @@
// Start Zookeeper
/usr/local/bin/zookeeper-server-start /usr/local/etc/zookeeper/zoo.cfg
// start Kafka
/usr/local/bin/kafka-server-start /usr/local/etc/kafka/server.properties

8
yarn.lock Normal file
View File

@ -0,0 +1,8 @@
# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY.
# yarn lockfile v1
"kafkajs@^2.2.2":
"integrity" "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA=="
"resolved" "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz"
"version" "2.2.4"