1.Windows环境下安装zookeeper和kafka
Windows环境下安装zookeeper和kafka
2.运行zookeeper
3.运行kaka
4.生产者
import ("encoding/json""github.com/Shopify/sarama""strconv"
)type Product struct {Id intName stringTitle string
}func NewProduct() error {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 10config.Producer.Return.Successes = truebrokers := []string{"localhost:9092"}producer, err := sarama.NewAsyncProducer(brokers, config)if err != nil {return err}p := &Product{Id: 1,Name: "钻戒",Title: "那戒指的质地似乎是钻石制成的吧,闪闪发光又不失内敛,清雅又不失高贵,阳光洒下来,发出淡淡的光,和淡淡的清香,有着像是通了灵般的仙气",}key := sarama.StringEncoder(strconv.Itoa(p.Id))value, err := json.Marshal(p)if err != nil {return err}msg := &sarama.ProducerMessage{Topic: "new-products",Key: key,Value: sarama.ByteEncoder(value),}producer.Input() <- msgreturn nil
}
5.消费者
import ("encoding/json""fmt""github.com/Shopify/sarama""log"
)func Consume() error {// 初始化 Kafka 消费者config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 10config.Producer.Return.Successes = truebrokers := []string{"localhost:9092"}consumer, err := sarama.NewConsumer(brokers, config)partitionConsumer, err := consumer.ConsumePartition("newProduct", 0, sarama.OffsetNewest)if err != nil {log.Printf("Error consuming partition: %v", err)return err}for {select {case msg := <-partitionConsumer.Messages():var product Producterr = json.Unmarshal(msg.Value, &product)if err != nil {log.Printf("Error unmarshaling product: %v", err)return err} else {fmt.Printf("New product: %+v\n", product)}case err = <-partitionConsumer.Errors():log.Printf("Error consuming message: %v", err)return err}}
}
6.main函数
import ("fmt""golang_test/kafka_test/kafka""log""sync"
)var wg sync.WaitGroupfunc main() {wg.Add(2)go func() {defer wg.Done()if err := kafka.NewProduct(); err != nil {log.Println("kafka生产者运行失败")return}}()go func() {defer wg.Done()if err := kafka.Consume(); err != nil {log.Println("kafka生产者运行失败")return}}()wg.Wait()fmt.Println("运行结束")
}