当前位置: 首页 > news >正文

Spring Boot 集成 Redisson 实现消息队列

包含组件内容

  • RedisQueue:消息队列监听标识
  • RedisQueueInit:Redis队列监听器
  • RedisQueueListener:Redis消息队列监听实现
  • RedisQueueService:Redis消息队列服务工具

代码实现

RedisQueue

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** Redis消息队列注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {/*** 队列名*/String value();
}

RedisQueueInit

import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** 初始化Redis队列监听器** @author 十八* @createTime 2024-09-09 22:49*/
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {final AtomicBoolean shutdownRequested = new AtomicBoolean(false);@Resourceprivate RedissonClient redissonClient;private ExecutorService executorService;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisQueueListener> map = applicationContext.getBeansOfType(RedisQueueListener.class);executorService = createThreadPool("redis-queue");for (Map.Entry<String, RedisQueueListener> entry : map.entrySet()) {RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);if (redisQueue != null) {String queueName = redisQueue.value();executorService.submit(() -> listenQueue(queueName, entry.getValue()));}}}private ExecutorService createThreadPool(String namePrefix) {return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),new NamedThreadFactory(namePrefix));}private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);log.info("Redis队列监听: {}", queueName);while (true) {if (shutdownRequested.get() || redissonClient.isShutdown()) {log.info("Redisson已关闭,停止监听队列: {}", queueName);break;}try {Object message = blockingQueue.take();redisQueueListener.invoke(message);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("监听队列线程被中断", e);break;} catch (Exception e) {log.error("监听队列线程错误", e);}}}public void shutdown() {if (executorService != null) {executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();}}shutdownRequested.set(true);if (redissonClient != null && !redissonClient.isShuttingDown()) {redissonClient.shutdown();}}private static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String prefix) {this.namePrefix = prefix;}@Overridepublic Thread newThread(@NotNull Runnable r) {return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());}}
}

RedisQueueListener

/*** Redis消息队列监听实现** @author 十八* @createTime 2024-09-09 22:51*/
public interface RedisQueueListener<T> {/*** 队列消费方法** @param content 消息内容*/void invoke(T content);
}

RedisQueueService

import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;/*** Redis 消息队列服务** @author 十八* @createTime 2024-09-09 22:52*/
@Component
public class RedisQueueService {@Resourceprivate RedissonClient redissonClient;/*** 添加队列** @param queueName 队列名称* @param content   消息* @param <T>       泛型*/public <T> void send(String queueName, T content) {RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(queueName);blockingQueue.add(content);}/*** 添加延迟队列** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param timeUnit  单位* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, timeUnit);}/*** 发送延迟队列消息(单位毫秒)** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);}
}

测试

创建监听对象

import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author 十八* @createTime 2024-09-10 00:09*/
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {@Overridepublic void invoke(String content) {log.info("队列消息接收 >>> {}", content);}
}

测试用例

import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 十八* @createTime 2024-09-10 00:11*/
@RestController
@RequestMapping("queue")
public class QueueController {@Resourceprivate RedisQueueService redisQueueService;@PostMapping("send")public void send(String message) {redisQueueService.send("test", message);redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);}}

测试结果


http://www.mrgr.cn/news/23243.html

相关文章:

  • 全网最适合入门的面向对象编程教程:46 Python函数方法与接口-函数与事件驱动框架
  • Windows环境下 VS2022 编译 OGG 源码
  • 【Rust】Mdbook插件开发和分享——多图浏览和多语言代码
  • 不到200行代码,一键写出简单贪吃蛇网页游戏!附详细代码!快来看看吧!
  • Linux IO模型:IO多路复用
  • Android源码 ota升级
  • 2-89 基于matlab的图像去噪方法
  • 利士策分享,克服生活中的困难:走好勇攀高峰的每一步
  • 计算机的发展史和基本结构
  • 我的第3个AI项目-Advanced RAG with Gemma, Weaviate, and LlamaIndex
  • Linux 中 Tail 命令的 9 个实用示例
  • 【华为OD流程】性格测试选项+注意事项
  • 从“纯血鸿蒙”看“自研系统”有多难
  • 洛谷 P10798 「CZOI-R1」消除威胁
  • Android使用Room后无法找到字符BR
  • 选择网站服务器有哪几种类型?
  • C8T6超绝模块--USART串口通信
  • docker conda
  • 分组注解和自定义注解及分页查询
  • 4.人事管理系统(springbootvue项目)