From 82fb88a4e7388aa3f55705d305b6d4ac41cd4d2f Mon Sep 17 00:00:00 2001 From: "Luiz F. Picolo" Date: Wed, 12 Apr 2023 13:42:55 -0400 Subject: [PATCH] Commit inicial --- .gitignore | 1 + consumer.js | 40 +++++++++++++++++++++++++++++++++++++ create-topic.js | 28 ++++++++++++++++++++++++++ list-topic.js | 21 ++++++++++++++++++++ package-lock.json | 27 +++++++++++++++++++++++++ package.json | 5 +++++ producer.js | 50 +++++++++++++++++++++++++++++++++++++++++++++++ start | 5 +++++ yarn.lock | 8 ++++++++ 9 files changed, 185 insertions(+) create mode 100644 .gitignore create mode 100644 consumer.js create mode 100644 create-topic.js create mode 100644 list-topic.js create mode 100644 package-lock.json create mode 100644 package.json create mode 100644 producer.js create mode 100644 start create mode 100644 yarn.lock diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b512c09 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules \ No newline at end of file diff --git a/consumer.js b/consumer.js new file mode 100644 index 0000000..68a250c --- /dev/null +++ b/consumer.js @@ -0,0 +1,40 @@ +const { Kafka } = require('kafkajs') + +const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['localhost:9092'] +}) + +//const producer = kafka.producer() +const consumer = kafka.consumer({ groupId: 'test-group' }) + +const run = async () => { + // // Producing + // await producer.connect() + // await producer.send({ + // topic: 'test-topic', + // messages: [ + // { value: 'Hello KafkaJS user!' }, + // ], + // }) + + // Consuming + await consumer.connect() + await consumer.subscribe({ topic: process.env.TOPIC, fromBeginning: true }) + + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + const obj = JSON.parse(message.value) + console.log('Message consumer successfully!'); + console.log(obj.name) + // console.log({ + // partition, + // offset: message.offset, + // value: Json.parse(message.value), + // }) + }, + + }) +} + +run() \ No newline at end of file diff --git a/create-topic.js b/create-topic.js new file mode 100644 index 0000000..dfb12ba --- /dev/null +++ b/create-topic.js @@ -0,0 +1,28 @@ +const { Kafka } = require('kafkajs'); + +const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['localhost:9092'] +}); + +const admin = kafka.admin(); + +async function createTopic() { + try { + await admin.connect(); + await admin.createTopics({ + topics: [{ + topic: 'my-topic-1', + numPartitions: 2, + replicationFactor: 1 + }] + }); + console.log('Topic created successfully!'); + } catch (error) { + console.error('Error creating topic:', error); + } finally { + await admin.disconnect(); + } +} + +createTopic(); diff --git a/list-topic.js b/list-topic.js new file mode 100644 index 0000000..938b016 --- /dev/null +++ b/list-topic.js @@ -0,0 +1,21 @@ +const { Kafka } = require('kafkajs'); + +const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['localhost:9092'] +}); + +const admin = kafka.admin(); + +async function listTopic() { + try { + const list = await admin.listTopics() + console.log(list); + } catch (error) { + console.error('Error creating topic:', error); + } finally { + await admin.disconnect(); + } +} + +listTopic(); diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..0b5671a --- /dev/null +++ b/package-lock.json @@ -0,0 +1,27 @@ +{ + "name": "kafkajs", + "lockfileVersion": 2, + "requires": true, + "packages": { + "": { + "dependencies": { + "kafkajs": "^2.2.2" + } + }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "engines": { + "node": ">=14.0.0" + } + } + }, + "dependencies": { + "kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==" + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..75648fe --- /dev/null +++ b/package.json @@ -0,0 +1,5 @@ +{ + "dependencies": { + "kafkajs": "^2.2.2" + } +} diff --git a/producer.js b/producer.js new file mode 100644 index 0000000..6a8612b --- /dev/null +++ b/producer.js @@ -0,0 +1,50 @@ +const { Kafka, Partitioners } = require('kafkajs') + +const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['localhost:9092'] +}) + +const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner }); + +const run = async () => { + const obj = { + id: 1, + name: 'John 1', + age: 30 + }; + + // Producing + try { + await producer.connect() + await producer.send({ + topic: process.env.TOPIC, + messages: [ + { value: JSON.stringify(obj) }, + ], + }) + + console.log('Message sent successfully!'); + } catch (error) { + console.error('Error sending message:', error); + } finally { + await producer.disconnect(); + } + + // Consuming + // await consumer.connect() + // await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }) + + // await consumer.run({ + // eachMessage: async ({ topic, partition, message }) => { + // console.log({ + // partition, + // offset: message.offset, + // value: message.value.toString(), + // }) + // }, + + // }) +} + +run() \ No newline at end of file diff --git a/start b/start new file mode 100644 index 0000000..db82b41 --- /dev/null +++ b/start @@ -0,0 +1,5 @@ +// Start Zookeeper +/usr/local/bin/zookeeper-server-start /usr/local/etc/zookeeper/zoo.cfg + +// start Kafka +/usr/local/bin/kafka-server-start /usr/local/etc/kafka/server.properties \ No newline at end of file diff --git a/yarn.lock b/yarn.lock new file mode 100644 index 0000000..cd5f22b --- /dev/null +++ b/yarn.lock @@ -0,0 +1,8 @@ +# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY. +# yarn lockfile v1 + + +"kafkajs@^2.2.2": + "integrity" "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==" + "resolved" "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz" + "version" "2.2.4"