无序消息的重试只针对集群消费模式生效;广播消费模式不提供失败重试特性
Producer
发了100个对象消息
public class AddProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("a-group");producer.setNamesrvAddr("192.168.0.211:9876");producer.start();ArrayList<Alarm> list = new ArrayList<>();for (int i = 1; i < 101; i++) {Alarm bean = new Alarm(i, "add_"+i, new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-01"));list.add(bean);}try {for (Alarm alarm : list) {Message msg = new Message("ALARM_RECORD", "add",String.valueOf(alarm.getId()), JSONUtil.toJsonStr(alarm).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg,new CustomSendCallback());System.out.println(alarm.getId() + " Continue execution ");}} catch (Exception e) {e.printStackTrace();}// 6. 关闭生产者
// producer.shutdown();}
}
Consumer
public class ConsumerAdd {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("record_add_group");consumer.setNamesrvAddr("192.168.0.211:9876");consumer.subscribe("ALARM_RECORD", "add");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 默认允许每条消息最多重试16次consumer.setMaxReconsumeTimes(7);// 注册消息监听器consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {try {for (MessageExt msg : msgs) {String body = new String(msg.getBody());Alarm bean = JSONUtil.toBean(body, Alarm.class);if(bean.getId() == 77){throw new Exception();}System.out.println("正常消费 bean = " + bean);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {System.out.println("重试 "+msgs.get(0).getReconsumeTimes() +" "+ new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});// 启动消费者consumer.start();System.out.printf("Consumer Tag add_1 Started.%n");}
}
多开测试
失败后进入重试队列
16次的时间间隔
10s、30s、1min、2min、3min、4min、5min、6min、7min、8min、9min、10min、20min、30min、1h、2h
最后一次失败直接进入死信队列、人工处理