nodejs 使用kafka案例,node-red配置kafka案例,从安装配置kafka开始
生产者测试:
bin/kafka-console-producer.sh --broker-list 1.2.3.4:9092 --topic test-topic
消费者测试:
bin/kafka-console-consumer.sh --bootstrap-server 1.2.3.4:9092 --topic test-topic --from-beginning
const { Kafka } = require('kafkajs')const kafka = new Kafka({clientId: 'my-app',brokers: ['1.2.3.4:9092'] // 使用服务器的 IP 和端口
})const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })const run = async () => {// Producingawait producer.connect()await producer.send({topic: 'test-topic',messages: [{ value: 'Hello KafkaJS user!1111' },],})// Consumingawait consumer.connect()await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })await consumer.run({eachMessage: async ({ topic, partition, message }) => {console.log({partition,offset: message.offset,value: message.value.toString(),})},})
}run().catch(console.error)