当前位置: 首页 > news >正文

Java项目集成RocketMQ

文章目录

    • 1.调整MQ的配置
        • 1.进入bin目录
        • 2.关闭broker和namesrv
        • 3.查看进程确认关闭
        • 4.编辑配置文件broker.conf,配置brokerIP1
        • 5.开放端口10911
        • 6.重新启动
          • 1.进入bin目录
          • 2.启动mqnamesrv和mqbroker
            • 1.启动 NameServer 并将输出重定向到 mqnamesrv.log
            • 2.**启动 Broker 并将输出重定向到** **mqbroker.log**
            • 3.**实时监控 NameServer 的日志文件**
            • 4.**实时监控 Broker 的日志文件**
            • 5.查看进程
    • 2.项目集成MQ
        • 1.domain引入mq依赖
        • 2.sun-club-application-mq 引入domain依赖,用于消费mq
        • 3.sun-club-starter 引入mq层
        • 4.application.yml 配置mq
        • 5.SubjectController.java
          • 1.依赖注入 RocketMQTemplate
          • 2.编写controller,作为消息生产者
        • 6.TestConsumer.java 测试消费
        • 5.测试
    • 3.点赞业务优化为MQ处理
        • 1.SubjectLikedMessage.java 点赞消息实体
        • 2.sun-club-domain 同步点赞数据
          • 1.SubjectLikedDomainService.java
          • 2.SubjectLikedDomainServiceImpl.java
          • 3.add方法逻辑修改
          • 4.测试

1.调整MQ的配置

1.进入bin目录
cd /usr/local/soft/rocketmq-all-4.8.0-bin-release/bin
2.关闭broker和namesrv
sh mqshutdown broker && sh mqshutdown namesrv

CleanShot 2024-07-12 at 12.20.25@2x

3.查看进程确认关闭
ps -ef | grep NamesrvStartup && ps -ef | grep BrokerStartup

CleanShot 2024-07-12 at 12.21.38@2x

4.编辑配置文件broker.conf,配置brokerIP1
vim /usr/local/soft/rocketmq-all-4.8.0-bin-release/conf/broker.conf
# NameServer 地址(开端口)
namesrvAddr=# brokerIP1 指定了 Broker 对外提供服务的 IP 地址
brokerIP1=# listenPort 指定了 Broker 监听客户端连接的端口(开端口)
listenPort=10911# 当这个选项设置为 true 时,如果客户端尝试向一个不存在的主题发送消息,Broker 会自动创建这个主题
autoCreateTopicEnable=true
5.开放端口10911
systemctl start firewalld && firewall-cmd --permanent --add-port=10911/tcp && firewall-cmd --reload && firewall-cmd --query-port=10911/tcp

CleanShot 2024-07-12 at 12.52.09@2x

6.重新启动
1.进入bin目录
cd /usr/local/soft/rocketmq-all-4.8.0-bin-release/bin
2.启动mqnamesrv和mqbroker
1.启动 NameServer 并将输出重定向到 mqnamesrv.log
nohup sh mqnamesrv > mqnamesrv.log 2>&1 &
2.启动 Broker 并将输出重定向到 mqbroker.log
nohup sh mqbroker -c ../conf/broker.conf > mqbroker.log 2>&1 &
3.实时监控 NameServer 的日志文件
tail -f mqnamesrv.log &

CleanShot 2024-07-12 at 12.40.03@2x

4.实时监控 Broker 的日志文件
tail -f mqbroker.log &
5.查看进程
ps -ef | grep NamesrvStartup && ps -ef | grep BrokerStartup

CleanShot 2024-07-12 at 12.57.04@2x

2.项目集成MQ

1.domain引入mq依赖
        <!-- rocketmq --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency>

CleanShot 2024-07-12 at 13.05.02@2x

2.sun-club-application-mq 引入domain依赖,用于消费mq
        <!-- 引入domain层 --><dependency><groupId>com.sun.club</groupId><artifactId>sun-club-domain</artifactId><version>1.0-SNAPSHOT</version></dependency>

CleanShot 2024-07-12 at 13.07.58@2x

3.sun-club-starter 引入mq层
        <!-- 引入mq层 --><dependency><groupId>com.sun.club</groupId><artifactId>sun-club-application-mq</artifactId><version>1.0-SNAPSHOT</version></dependency>

CleanShot 2024-07-12 at 13.11.35@2x

4.application.yml 配置mq
# mq配置
rocketmq:name-server:  # 作用是服务注册和发现,会自动发现brokerproducer:group: test-group

CleanShot 2024-07-12 at 13.16.40@2x

5.SubjectController.java
1.依赖注入 RocketMQTemplate
    @Resourceprivate RocketMQTemplate rocketMQTemplate;
2.编写controller,作为消息生产者
    /*** 测试mq发送* @return*/@GetMapping("/pushMessage")public Result<Boolean> pushMessage(@Param("id") int id) {rocketMQTemplate.convertAndSend("first-topic", "hello " + id);return Result.ok();}

CleanShot 2024-07-12 at 13.34.21@2x

6.TestConsumer.java 测试消费
package com.sunxiansheng.subject.application.mq;import com.sun.media.jfxmedia.logging.Logger;
import groovy.util.logging.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;/*** Description:* @Author sun* @Create 2024/7/12 13:24* @Version 1.0*/
@Component
// topic:主题,就是生产者那里指定的主题
// consumerGroup:消费组,在application.yml文件中配置的
// RocketMQListener<String>:这里的泛型就是消息的类型
@RocketMQMessageListener(topic = "first-topic", consumerGroup = "test-group")
@Slf4j
public class TestConsumer implements RocketMQListener<String> {private static final org.slf4j.Logger log = LoggerFactory.getLogger(TestConsumer.class);/*** 对消息进行消费* @param s*/@Overridepublic void onMessage(String s) {log.info("接受到消息了:{}", s);}}

CleanShot 2024-07-12 at 13.34.57@2x

5.测试

CleanShot 2024-07-12 at 13.35.13@2x

CleanShot 2024-07-12 at 13.35.21@2x

3.点赞业务优化为MQ处理

1.SubjectLikedMessage.java 点赞消息实体
package com.sunxiansheng.subject.domain.entity;import lombok.Data;
import lombok.experimental.Accessors;import java.io.Serializable;/*** 题目点赞消息*/
@Data
@Accessors(chain = true) // 支持链式调用
public class SubjectLikedMessage implements Serializable {/*** 题目id*/private Long subjectId;/*** 点赞人id*/private String likeUserId;/*** 点赞状态 1点赞 0不点赞*/private Integer status;}
2.sun-club-domain 同步点赞数据
1.SubjectLikedDomainService.java
    /*** MQ同步点赞数据* @param subjectLikedBO*/void syncLikedByMsg(SubjectLikedBO subjectLikedBO);
2.SubjectLikedDomainServiceImpl.java
    @Overridepublic void syncLikedByMsg(SubjectLikedBO subjectLikedBO) {SubjectLiked subjectLiked = new SubjectLiked();subjectLiked.setSubjectId(subjectLikedBO.getSubjectId());subjectLiked.setLikeUserId(subjectLikedBO.getLikeUserId());subjectLiked.setStatus(subjectLikedBO.getStatus());subjectLiked.setIsDeleted(IsDeleteFlagEnum.UN_DELETED.getCode());subjectLikedService.insert(subjectLiked);}
3.add方法逻辑修改

CleanShot 2024-07-12 at 14.33.20@2x

4.测试

CleanShot 2024-07-12 at 14.37.16@2x

CleanShot 2024-07-12 at 14.37.25@2x


http://www.mrgr.cn/news/1350.html

相关文章:

  • 《Docker:实现开发环境一致性与高效部署的利器》
  • 创建张量、张量的操作
  • Kafka运行机制(二):消息确认,消息日志的存储和回收
  • echarts渐变圆环进度条样式
  • jenkins pipline脚本 获取git分支
  • Snipaste 的一款替代工具 PixPin,支持 gif 截图、长截图和 OCR 文字识别,功能不是一点点强!
  • 虚幻5|角色武器装备的数据库学习(不只是用来装备武器,甚至是角色切换也很可能用到)
  • 《AI音频类工具之九——Stable Audio​ 》
  • TortoiseGit修改差异查看器为BeyondCompare
  • 网络丢包深度解析与优化:检测、诊断与减少策略
  • 《区块链与监管合规:在创新与规范之间寻求平衡》
  • 这 2 个 GitHub 项目,YYDS!
  • Linux下如何使用Cron定时任务
  • ZooKeeper 集群的详细部署
  • 【linux】SCP或SSH 连接失败: no matching host key type found. Their offer: ssh-rs
  • Python办公自动化:使用openpyxl 创建与保存 Excel 工作簿
  • Webpack高级配置(干货)
  • gitignore忽略某些格式文件
  • 精通网络分析工具:深入探索与实践指南
  • 数据结构-栈