【stomp 实战】spring websocket用户消息发送源码分析

news/2024/5/20 12:19:20

这一节,我们学习用户消息是如何发送的。

消息的分类

spring websocket将消息分为两种,一种是给指定的用户发送(用户消息),一种是广播消息,即给所有用户发送消息。那怎么区分这两种消息呢?那就是用前缀了。

用户消息的前缀

  • 不配置的情况下,默认用户消息的前缀是/user
  • 也可以通过下面的方式来配置用户消息
@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {/*** stompClient.subscribe("/user/topic/subNewMsg",...)* 这个时候,后端推送消息应该这么写* msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg);* 即去掉了/user前缀*/registry.setUserDestinationPrefix(WsConstants.USER_DESTINATION_PREFIX);}
  • 默认情况下,/user是用户消息前缀,那么前端订阅的代码可以这么写
 //订阅用户消息topic1stompClient.subscribe("/user/topic/answer", function (response) {//do something});
  • 后端的发送消息的代码可以这么写,注意,在这里发送的时候,调用的convertAndSendToUser没有带/user前缀
    private final SimpMessageSendingOperations msgOperations;public void echo(Principal principal, Msg msg) {msgOperations.convertAndSendToUser(username, "/topic/answer", msg);}

广播消息的前缀

  • 广播消息没有默认值,必须显示地指定
  • 配置广播消息的前缀是这么配置,通过/topic或者/queue前缀来订阅的,就是广播消息
@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.enableSimpleBroker("/topic", "/queue")//配置stomp协议里, server返回的心跳.setHeartbeatValue(new long[]{10000L, 10000L})//配置发送心跳的scheduler.setTaskScheduler(new DefaultManagedTaskScheduler());}
  • 前端代码可以这么写
//订阅广播消息topicstompClient.subscribe("/topic/boardCast/hello", function (response) {// do something});
  • 后端代码可以这么写
  private final SimpMessageSendingOperations msgOperations;public void echo2(Msg msg) {log.info("收到的消息为:{}", msg.getContent());msgOperations.convertAndSend("/topic/boardCast/hello", "hello boardCast Message");}

发送用户消息源码分析

用户订阅过程

发送消息,本质上就是从内存中找到注册的用户,通过用户名找到用户会话,在从用户会话中找到该用户的订阅,如果该用户有该订阅,那么就发送消息给前端。

总结一下用户和会话之间的关系,如下图
在这里插入图片描述
如果这块不太熟悉,建议回顾这篇文章,了解一下用户,用户会话,订阅之间的关系:【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析

我们通过Debug来看一下,前端执行用户订阅,经历了哪些过程。
假设,当前登录用户是1001

  stompClient.subscribe("/user/topic/answer", function (response) {//do something});

该用户建立连接,并且绑定1001的用户会话后,执行后端的订阅注册
DefaultSimpUserRegistry响应订阅事件代码如下:
在这里插入图片描述
可以看到,当前的sessionId,destination

在这里插入图片描述
将订阅放到一个subscriptions的map里面。缓存在内存中。

用户消息的发送

后端代码是这么写的,我们来调试一下

    private final SimpMessageSendingOperations msgOperations;public void echo(Principal principal, Msg msg) {msgOperations.convertAndSendToUser(username, "/topic/answer", msg);}

经过层层调用,发现调到了下面的方法
在这里插入图片描述
发现我们的发送目的地变成了这个:this.destinationPrefix + user + destination
通过调试时,发现值如上图所示。
也就是说,我们的发送目的,变成了/user+用户名+我们传的入参/topic/answer
然后再进入下面的代码

//AbstractMessageSendingTemplate@Overridepublic void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers,@Nullable MessagePostProcessor postProcessor) throws MessagingException {//对消息进行转换,对象转字符串,或者字节数组之类的Message<?> message = doConvert(payload, headers, postProcessor);//调用Send发送send(destination, message);}

做了两个事:

  • 对消息进行转换,对象转字符串,或者字节数组之类的
  • 调用Send发送

再来看下send方法

	@Overridepublic void send(D destination, Message<?> message) {doSend(destination, message);}

再调用doSend,由子类SimpMessagingTemplate实现。

//SimpMessagingTemplate@Overrideprotected void doSend(String destination, Message<?> message) {Assert.notNull(destination, "Destination must not be null");SimpMessageHeaderAccessor simpAccessor =MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);if (simpAccessor != null) {if (simpAccessor.isMutable()) {simpAccessor.setDestination(destination);simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);simpAccessor.setImmutable();sendInternal(message);return;}else {// Try and keep the original accessor typesimpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message);initHeaders(simpAccessor);}}else {simpAccessor = SimpMessageHeaderAccessor.wrap(message);initHeaders(simpAccessor);}simpAccessor.setDestination(destination);simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders());sendInternal(message);}

其中最关键的是sendInternal

private void sendInternal(Message<?> message) {String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());Assert.notNull(destination, "Destination header required");long timeout = this.sendTimeout;boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));if (!sent) {throw new MessageDeliveryException(message,"Failed to send message to destination '" + destination + "' within timeout: " + timeout);}}

然后再通过messageChannel来发送此条消息。

//AbstractMessageChannel@Overridepublic final boolean send(Message<?> message, long timeout) {Assert.notNull(message, "Message must not be null");Message<?> messageToUse = message;ChannelInterceptorChain chain = new ChannelInterceptorChain();boolean sent = false;try {messageToUse = chain.applyPreSend(messageToUse, this);if (messageToUse == null) {return false;}sent = sendInternal(messageToUse, timeout);chain.applyPostSend(messageToUse, this, sent);chain.triggerAfterSendCompletion(messageToUse, this, sent, null);return sent;}catch (Exception ex) {chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);if (ex instanceof MessagingException) {throw (MessagingException) ex;}throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);}catch (Throwable err) {MessageDeliveryException ex2 =new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);throw ex2;}}
  • 构造了一个拦截链,在发送前,可以进行前置处理和后置处理。这个拦截链就是扩展的关键了。我们可以定义自己的拦截器,在发送消息前后进行拦截处理。这里spring给我们的扩展点。
  • 通过sendInternal将消息发送出去

再来看下sendInternal方法,进入子类ExecutorSubscribableChannel

//ExecutorSubscribableChannel@Overridepublic boolean sendInternal(Message<?> message, long timeout) {for (MessageHandler handler : getSubscribers()) {SendTask sendTask = new SendTask(message, handler);if (this.executor == null) {sendTask.run();}else {this.executor.execute(sendTask);}}return true;}

可以看到,通过这个Channel,找到messageHandler,这个messageHandler有多个,依次将消息进行处理。
在这里插入图片描述
这里取到的有两个messageHandler

  • SimpleBrokerMessageHandler
  • UserDestinationMessageHandler

进入SendTask,看一下run方法

//
public void run() {Message<?> message = this.inputMessage;try {message = applyBeforeHandle(message);if (message == null) {return;}this.messageHandler.handleMessage(message);triggerAfterMessageHandled(message, null);}catch (Exception ex) {triggerAfterMessageHandled(message, ex);if (ex instanceof MessagingException) {throw (MessagingException) ex;}String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;throw new MessageDeliveryException(message, description, ex);}catch (Throwable err) {String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);triggerAfterMessageHandled(message, ex2);throw ex2;}
}

这里的关键点是:this.messageHandler.handleMessage(message);
首先会进入SimpleBrokerMessageHandler的handleMessage
在这里插入图片描述
可以看到,这里直接跳出去了。
SimpleBrokerMessageHandler的作用就是,看是不是我们配置的广播消息的前缀,要满足这个条件,才能发送消息。我们配置的前缀是/topic,/queue,这里destination前缀是/user,所以提前返回,不处理。
然后,我们还有一个UserDestinationMessageHandler会继续处理。

在这里插入图片描述
这里对destination进行了处理,发现生成了一个result对象,这里解析出一个targetDestinations,可以看到我们的destination变成了下面的样子
/topic/answer-usero2zuy4zg

  • 这个的构成实际上就是把/user前缀去掉
  • 然后加上-user,后面加上sessionId,就是当前会话的id
  • 最后再以这个新生成的targetDestination,将消息发送出去!
    在这里插入图片描述

这里的messagingTemplate,就是SimpMessagingTemplate。又会回到上面分析的代码。

  • SimpMessagingTemplate调用messageChannel来发送消息
  • messageChannel中会取得两个messageHandler来处理。
    像不像递归调用?
    不过这一次由于我们的destination已经变成了/topic/answer-usero2zuy4zg。这时候,在进入SimpleBrokerMessageHandler时,情况就不一样了

在这里插入图片描述
由于destination变成了/topic开头的,此时我们不会跳出去,会找到用户(-user后面跟了SessionId)订阅,将消息发送出去

可以看到,我们找到了一个用户订阅。在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

其实是每个用户订阅时,会将/user前缀去掉,将用户的destination改写成了如下形式,
/user/topic/hello->/topic/hello-user{sessionId}
所以,经过UserDestinationMessageHandler处理,改写后的destination可以通过destination找到用户会话,将此消息发送出去。
到此,我们的用户消息的发送就分析完了

总结

发送用户消息的整个过程如下:

  • SimpMessageSendingOperations.convertAndSendToUser接口发送用户消息,这里不传/user前缀,注意一下
  • 接着SimpMessagingTemplate进行消息的发送
  • SimpMessagingTemplate会交由MessageChannel
  • MessageChannel将会调用MessageHandler来处理消息,有以下两个MessageHandler
    • SimpleBrokerMessageHandler
    • UserDestinationMessageHandler
  • 经过MessageHandler的处理,destination由/user/topic/answer,变成了/topic/answer-usero2zuy4zg。
  • 改写后的destination可以找到用户会话,将此消息发送出去

在这里插入图片描述


http://www.mrgr.cn/p/02312807

相关文章

【方法】PPT的“只读方式”如何设置?

PPT以“只读方式”打开&#xff0c;可以防止被随意更改&#xff0c;那PPT的“只读方式”如何设置呢&#xff1f;下面分享3种方法&#xff0c;一起来看看吧&#xff01; 方法1&#xff1a; 我们可以通过“属性”来设置“只读方式”。 首先&#xff0c;选中PPT文件&#xff0c;…

【攻防技术系列】-- JNDI注入

JNDI概念首先第一个问题,什么是 JNDI?JNDI (Java Naming and Directory Interface),是Java平台提供的一个API,它允许Java应用程序访问不同的命名和目录服务。简而言之,JNDI为Java应用提供了一种统一的方式来查询和访问外部资源,如数据库、文件系统、远程对象等。 虽然有点…

游戏行业干货分享 | 游戏行业非技术岗盘点

在游戏行业&#xff0c;除了技术开发岗位外&#xff0c;还有许多非技术岗位对游戏的成功至关重要。以下是一些游戏行业非技术岗位的盘点&#xff0c;以及它们各自的职责和对游戏项目的贡献&#xff1a; 以游戏上线为目的&#xff0c;从游戏研发到游戏发行会有如下岗位配置 这里…

[转]Cocos2dlua手游 Lua解密与资源解密实战

来自看雪:Cocos2dlua手游 Lua解密与资源解密实战 https://mp.weixin.qq.com/s/WeYxlXZvCPv_3nGgeKdunwCocos2dlua 逆向相关学习,略有删减,直接进入正文 APK大致如下:. ├── assets │ ├── res │ │ ├── ani │ │ │ └── logo │ │ │ …

JVM认识之垃圾收集算法

一、标记-清除算法 1、定义 标记-清除算法是最基础的垃圾收集算法。它分为标记和清除两个阶段。先标记出所有需要回收的对象&#xff08;即垃圾&#xff09;&#xff0c;在标记完成后再统一回收所有垃圾对象。 2、优点和缺点 优点&#xff1a;实现简单缺点&#xff1a; 可能…

Spring如何控制Bean的加载顺序

正常情况下,Spring 容器加载 Bean 的顺序是不确定的,那么我们如果需要按顺序加载 Bean 时应如何操作?本文将详细讲述我们如何才能控制 Bean 的加载顺序。前言正常情况下,Spring 容器加载 Bean 的顺序是不确定的,那么我们如果需要按顺序加载 Bean 时应如何操作?本文将详细…

MySQL数据库---增删查改汇总

前言 欢迎来到我的博客 个人主页:北岭敲键盘的荒漠猫-CSDN博客 本文着重整理MySQL数据库增删查改功能 主要是整理语法 争取做到要用什么语法 可以快速找到复制粘贴 增添语法 INSERT into tab(列名,列名,列名) values(内容,内容,内容); 插入一行数据 INSERT into tab(列名,…

印染工厂5G智能制造数字孪生可视化平台,推进行业数字化转型

印染工厂5G智能制造数字孪生可视化平台&#xff0c;推进行业数字化转型。印染工厂正迈入一个全新的时代&#xff0c;这个时代以5G智能制造数字孪生可视化平台为核心&#xff0c;推动整个行业的数字化转型。不仅是一场技术革命&#xff0c;更是一次产业变革&#xff0c;为印染工…

Flink架构与原理

Flink集群运行剖析 Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager。 Client:Client是提交作业的客户端,虽然不是运行时和作业执行时的一部分,但它负责准备和提交作业到JobManager,它可以运行在任何机器上,只要与JobManager环境连通即可。…

Nexpose v6.6.250 for Linux Windows - 漏洞扫描

Nexpose v6.6.250 for Linux & Windows - 漏洞扫描Nexpose v6.6.250 for Linux & Windows - 漏洞扫描 Rapid7 Vulnerability Management, Release May 02, 2024 请访问原文链接:Nexpose v6.6.250 for Linux & Windows - 漏洞扫描,查看最新版。原创作品,转载请保…

Metasploit Pro 4.22.3-2024050201 (Linux, Windows) - 专业渗透测试框架

Metasploit Pro 4.22.3-2024050201 (Linux, Windows) - 专业渗透测试框架Metasploit Pro 4.22.3-2024050201 (Linux, Windows) - 专业渗透测试框架 Rapid7 Penetration testing, Release May 03, 2024 请访问原文链接:Metasploit Pro 4.22.3-2024050201 (Linux, Windows) - 专…

Linux(openEuler、CentOS8)常用的IP修改方式(文本配置工具nmtui+配置文件+nmcli命令)

----本实验环境为openEuler系统<以server方式安装>&#xff08;CentOS类似&#xff0c;可参考本文&#xff09;---- 一、知识点 &#xff08;一&#xff09;文本配置工具nmtui(openEuler已预装) nmtui&#xff08;NetworkManager Text User Interface&#xff09;是一…

[转]vscode必备插件,美化、炫酷、实用-留着防丢

vscode必备插件,美化、炫酷、实用-留着防丢 - 知乎 (zhihu.com) https://zhuanlan.zhihu.com/p/112016680本篇文章只推荐看起来不错的插件,并不详细介绍插件的使用方法,插件的具体使用方法可以单独对其进行百度搜索。当然,有啥问题也可以在下面评论,但我觉得可以百度的地方…

应急响应web1

应急响应的过程 目的:分析攻击时间、攻击操作、攻击结果、安全修复等并给出合理的解决方案。 保护阶段:直接断网,保护现场,看是否能够恢复数据; 分析阶段:对入侵过程进行分析,常见的方法为指纹库搜索、日志时间分析、后门追查分析、漏洞检查分析等; 复现阶段:还原攻击…

二叉树习题汇总

片头 嗨&#xff01;大家好&#xff0c;今天我们来练习几道二叉树的题目来巩固知识点&#xff0c;准备好了吗&#xff1f;Ready Go ! ! ! 第一题&#xff1a;二叉树的最大深度 解答这道题&#xff0c;我们采用分治思想 1. 递归子问题&#xff1a;左子树的高度和右子树的高度 …

FTP主动模式和被动模式(3)NAT对FTP的影响 - NAT ALG

NAT对FTP的影响 NAT环境下FTP存在的问题 FTP主动模式 FTP服务器在外部网络 在FTP主动模式下,如果网络中存在NAT,且FTP客户端在NAT内部网络中,那么FTP数据连接会出现下面的问题,如图:内部网络中的FTP客户端和外部网络中的FTP服务器端通过NAT地址转换是可以正常建立控制连接…

python读写json文件

1. 新建json文件打开记事本,重命名为.json后缀 使用的样例如下,注意看json文件格式:{"server":{"host": "example.com","port": 443,"protocol": "https"},"authentication":{"username":…

FPGA ov5640视频以太网传输

1 实验任务 使用DFZU4EV MPSoC 开发板及双目OV5640摄像头其中一个摄像头实现图像采集&#xff0c;并通过开发板上的以太网接口发送给上位机实时显示。 2 Verilog代码 2.1 顶层模块 timescale 1ns / 1ps //以太网传输视频顶层模块module ov5640_udp_pc (input sys_cl…