当前位置: 首页 > news >正文

kafka发送消息-自定义消息发送的拦截器

在这里插入图片描述

1、自定义拦截器

创建自定义拦截器类,实现ProducerInterceptor接口。对消息进行拦截,可以在拦截中对消息做些处理,记录日志等操作…
在这里插入图片描述

package com.power.config;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CustomerProducerInterceptor implements ProducerInterceptor<String,Object> {/*** 发送消息时,会调用该方法,对消息进行拦截,可以在拦截中对消息做些处理,记录日志等操作......* @param record* @return*/@Overridepublic ProducerRecord<String,Object> onSend(ProducerRecord record) {System.out.println("拦截消息:"+record.toString());return record;}/*** 服务器收到消息后的一个确认* @param metadata* @param exception*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(null!=metadata){System.out.println("服务器接收到该消息:"+metadata.toString());}else {System.out.println("消息发送失败了,exception = "+exception);}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

2、kafak配置类

在这里插入图片描述

package com.power.config;import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomerPartitioner.class);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,CustomerProducerInterceptor.class.getName());return props;}public ProducerFactory<String, ?> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, ?> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}//第二次创建@Beanpublic NewTopic newTopic9() {return new NewTopic("heTopic", 9, (short) 1);}
}

3、生产者

在这里插入图片描述

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void send10(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.send("heTopic",user);}
}

4、测试类

在这里插入图片描述

package com.power;import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.Date;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendInterceptor(){eventProducer.send10();}}

5、执行测试类

在这里插入图片描述


http://www.mrgr.cn/news/7876.html

相关文章:

  • 【自动化测试】python+selenium+谷歌驱动安装记录
  • rapidjson的移植
  • Pandas_merge_join
  • Java nio pipe 通信原理
  • JS基础进阶3-DOM事件
  • Python实现贪心算法
  • 使用分布式锁解决IM聊天数据重复插入的问题
  • STM32中的shell框架搭建
  • 游戏app激励视频广告预加载位置,最大化广告收益
  • 【Redis】基本全局命令
  • 如何在不格式化的情况下解锁 Android 智能手机密码
  • 257. 二叉树的所有路径
  • 口语笔词——高频介词
  • operlayers绘制点,线,面,以及其他基本操作
  • 基于Java语言的能源管理系统-水-电-气-热-油-数据采集系统源码
  • 王立铭脑科学50讲——19~24篇,大脑如何学习的
  • 【iOS安全】iPhone8 iOS14.4.2 越狱教程
  • 鸿蒙内核源码分析(特殊进程篇)
  • WiFi的IP和电脑IP一样吗?怎么更改wifi的ip地址
  • 计算机的错误计算(七十二)