Kafka源码分析(四) - Server端-请求处理框架

news/2024/5/15 0:18:37

系列文章目录

Kafka源码分析-目录

一. 总体结构

先给一张概览图:
在这里插入图片描述

服务端请求处理过程涉及到两个模块:kafka.networkkafka.server

1.1 kafka.network

该包是kafka底层模块,提供了服务端NIO通信能力基础。

有4个核心类:SocketServer、Acceptor、Processor、RequestChannel。各自角色如下:

  • SocketServer:服务端的抽象,是服务端通信的入口;

  • Acceptor:Reactor通信模式中处理连接ACCEPT事件的线程/线程池所执行的任务;

  • Processor:Reactor通信模式中处理连接可读/可写事件的线程/线程池所执行的任务;

  • RequestChannel:请求队列,存储已经解析好的请求以等待处理;

对于上层模块而言,该基础模块有两个输入和一个输出

  1. 输入:IP+端口号,该模块会对目标端口实现监听;

  2. 输出:解析好的请求,通过RequestChannel进行输出;

  3. 输入:待发送的Response,通过Processor.responseQueue来完成输入;

1.2 kafka.server

该包在kafka.network的基础上实现各种请求的处理逻辑,主要包含KafkaServer和KafkaApis两个类。其中:

  • KafkaServer:Kafka服务端的抽象,统一维护Kafka服务端的各流程和状态;

  • KakfaApis:维护了各类请求对应的业务逻辑,通过KafkaServer.apis字段组合到KafkaServer之中;

二. Server的端口监听

整体流程如图:
在这里插入图片描述

接下来按调用顺序依次分析各方法

2.1 KafkaServer.startup()

关于端口监听的核心逻辑分4步,代码如下(用注释说明各部分的目的):

def startup() {// 省略无关代码... ...// 1. 创建SocketServersocketServer = new SocketServer(config, metrics, time, credentialProvider)// 2. 启动端口监听// (在这里完成了Acceptor的创建和端口ACCEPT事件的监听)// (startupProcessors = false表示暂不启动Processor处理线程)socketServer.startup(startupProcessors = false)// 3. 启动请求处理过程中的相关依赖// (这也是第2步中不启动Processor处理线程的原因,有依赖项需要处理)... ...// 4. 启动端口可读/可写事件处理线程(即Processor线程)socketServer.startProcessors()// 省略无关代码... ...
}

2.2 SocketServer.startup(Boolean)

代码及说明性注释如下:

def startup(startupProcessors: Boolean = true) {this.synchronized {// 省略无关代码... ...// 1. 创建Accetpor和Processor的实例,// 同时页完成了Acceptor对端口ACCEPT事件的监听createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)// 2. [可选]启动各Acceptor对应的Processor线程if (startupProcessors) {startProcessors()}}
}

2.3 ScocketServer.createAcceptorAndProcessor()

直接上注释版的代码,流程分3步:

// 入参解释
// processorsPerListener: 对于每个IP:Port, 指定Reactor模式子线程池大小, 
//                        即处理端口可读/可写事件的线程数(Processor线程);
// endpoints: 接收请求的IP:Port列表;
def createAcceptorAndProcessors(processorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {// 省略无关代码... ...endpoints.foreach { endpoint =>// 省略无关代码... ...// 1. 创建Acceptor对象// 在此步骤中调用Acceptor.openServerSocket, 完成了对端口ACCEPT事件的监听val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)// 2. 创建了与acceptor对应的Processor对象列表// (这里并未真正启动Processor线程)addProcessors(acceptor, endpoint, processorsPerListener)// 3. 启动Acceptor线程KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()// 省略无关代码... ...}}

2.4 Acceptor.openServerSocket()

该方法中没什么特殊点,就是java NIO的标准流程:

def openServerSocket(host: String, port: Int): ServerSocketChannel = {// 1. 构建InetSocketAddress对象val socketAddress =if (host == null || host.trim.isEmpty)new InetSocketAddress(port)elsenew InetSocketAddress(host, port)// 2. 构建ServerSocketChannel对象, 并设置必要参数值val serverChannel = ServerSocketChannel.open()serverChannel.configureBlocking(false)if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)serverChannel.socket().setReceiveBufferSize(recvBufferSize)// 3. 端口绑定, 实现事件监听try {serverChannel.socket.bind(socketAddress)info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))} catch {case e: SocketException =>throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)}// 4. 返回ServerSocketChannel对象, 用于后续register到Selector中serverChannel
}

2.5 SocketServer.startProcessor()

从这步开始,仅剩的工作就是启动Processor线程,代码都非常简单。比如本方法只是遍历Acceptor列表,并调用Acceptor.startProcessors()

def startProcessors(): Unit = synchronized {acceptors.values.asScala.foreach { _.startProcessors() }info(s"Started processors for ${acceptors.size} acceptors")
}

2.6 Acceptor.startProcessors()

该方法很简明,直接上代码

def startProcessors(): Unit = synchronized {if (!processorsStarted.getAndSet(true)) {startProcessors(processors)}
}def startProcessors(processors: Seq[Processor]): Unit = synchronized {processors.foreach { processor =>KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",processor).start()}
}

三. 请求/响应的格式

3.1 格式概述

在这里插入图片描述
请求和响应都由两部分组成:Header和Body。RequestHeader中包含ApiKey、ApiVersion、CorrelationId、ClientId;ResponseHeader中只包含CorrelationId字段。接下来逐个讲解这些字段。

  • ApiKey

    2字节整型,指明请求的类型;比如0代表Produce请求,1代表Fetch请求;具体id和请求类型之间的映射关系可在 org.apache.kafka.common.protocol.ApiKeys 中找到;

  • ApiVersion

    随着API的升级迭代,各类型请求的请求体格式可能有变更;这个2字节的整型指明了请求体结构的版本;

  • CorrelationId

    4字节整型,在Response中传回,Kafka Server端不处理,用于客户端内部关联业务数据;

  • ClientId

    可变长字符串,标识客户端;

3.2 请求体/响应体的具体格式

各业务操作(比如Produce、Fetch等)对应的请求体和响应体格式都维护在 org.apache.kafka.common.protocol.ApiKeys 中。接下来以Produce为例讲解ApiKeys是如何表达数据格式的。

ApiKeys是个枚举类,其核心属性如下:

public enum ApiKeys {// 省略部分代码... ...// 上文提到的请求类型对应的idpublic final short id;// 业务操作名称public final String name;// 各版本请求体格式public final Schema[] requestSchemas;// 各版本响应体格式public final Schema[] responseSchemas;// 省略部分代码... ...
}

其中PRODUCE枚举项的定义如下

PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions())

可以看到各版本的请求格式维护在 ProduceRequest.schemaVersions(),代码如下

public static Schema[] schemaVersions() {return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6};
}

这里只是简单返回了一个Schema数组。一个Schema对象代表了一种数据格式。请求头中的ApiVersion指明了请求体的格式对应数组的第几项(从0开始)。

接下来我们看看Schema是如何表达数据格式的。其结构如下
在这里插入图片描述
Schema有两个字段:fields和fieldsByName。其中fields是体现数据格式的关键,它指明了字段的排序和各字段类型;而fieldsByName只是按字段名重新组织的Map,用于根据名称查找对应字段。

BoundField只是Field的简单封装。Field有两个核心字段:name和type。其中name表示字段名称,type表示字段类型。常见的Type如下:

Type.BOOLEAN;
Type.INT8;
Type.INT16;
Type.INT32;// 可通过org.apache.kafka.common.protocol.types.Type查看全部类型
... ...

回到PRODUCE API,通过查看Schema的定义,能看到其V0版本的请求体和响应体的结构如下:
在这里插入图片描述

四. 请求的处理流程

在这里插入图片描述

  1. Acceptor监听到ACCEPT事件(TCP创建连接"第一次握手"的SYN);

  2. Acceptor将将连接注册到Processor列表内的其中一个,由该Processor监听这个连接的后续可读可写事件;

  3. Processor接收到完整请求后,会将Request追加到RequestChannel中进行排队,等待后续处理;

  4. KafkaServer中有个requestHandlerPool的字段,KafkaRequestHandlerPool类型,代表请求处理线程池;KafkaRequestHandler就是其中的线程,会从RequestChannel拉请求进行处理;

  5. KafkaRequestHandler将拉到的Request传入KafkaApis.handle(Request)方法进行处理;

  6. KafkaApis根据不同的ApiKey调用不同的方法进行处理,处理完毕后会将Response最终写入对应的Processor的ResponseQueue中等待发送;KafkaApis.handle(Request)的方法结构如下:

    def handle(request: RequestChannel.Request) {try {// 省略部分代码... ...request.header.apiKey match {case ApiKeys.PRODUCE => handleProduceRequest(request)case ApiKeys.FETCH => handleFetchRequest(request)case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)case ApiKeys.METADATA => handleTopicMetadataRequest(request)case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)// 省略部分代码... ...}} catch {case e: FatalExitError => throw ecase e: Throwable => handleError(request, e)} finally {request.apiLocalCompleteTimeNanos = time.nanoseconds}
    }
    
  7. Processor从自己的ResponseQueue中拉取待发送的Respnose;

  8. Processor将Response发给客户端;

五. 总结

才疏学浅,未能窥其十之一二,随时欢迎各位交流补充。若文章质量还算及格,可以点赞收藏加以鼓励,后续我继续更新。

另外也可以在目录中找到同系列的其他文章:
Kafka源码分析系列-目录(收藏关注不迷路)
感谢阅读。


http://www.mrgr.cn/p/18608836

相关文章

FIR滤波器——DSP学习笔记三(包含一个滤波器设计的简明案例)

​​​​​​ 背景知识 FIR滤波器的特性与优点 可精确地实现线性相位响应(Linear phase response),无相位失真; 总是稳定的,所有极点都位于原点 线性相位FIR滤波器的性质、类型及零点位置 冲击响应满足:奇…

在React函数组件中使用错误边界和errorElement进行错误处理

在React 18中,函数组件可以使用两种方式来处理错误: 使用 ErrorBoundary ErrorBoundary 是一种基于类的组件,可以捕获其子组件树中的任何 JavaScript 错误,并记录这些错误、渲染备用 UI 而不是冻结的组件树。 在函数组件中使用 ErrorBoundary,需要先创建一个基于类的 ErrorB…

【GitHub】主页简历优化

【github主页】优化简历 写在最前面一、新建秘密仓库二、插件卡片配置1、仓库状态统计2、Most used languages(GitHub 常用语言统计)使用细则 3、Visitor Badge(GitHub 访客徽章)4、社交统计5、打字特效6、省略展示小猫 &#x1f…

智慧浪潮下的产业园区:洞察智慧化转型如何打造高效、绿色、安全的新园区

目录 一、引言 二、智慧化转型的内涵与价值 三、打造高效园区的智慧化策略 1、建设智能化基础设施 2、推广智能化应用 3、构建智慧化服务平台 四、实现绿色园区的智慧化途径 1、推动绿色能源应用 2、实施绿色建筑设计 3、加强环境监测与治理 五、保障园区安全的智慧…

文化旅游3D数字孪生可视化管理平台推动文旅产业迈向更加美好的未来

随着数字化、智能化管理成为文旅产业发展的必然趋势,数字孪生公司深圳华锐视点创新性地推出了景区三维可视化数字孪生平台,将线下的实体景区与线上的虚拟世界完美融合,引领智慧文旅新潮流。 我们运用先进的数字孪生、web3D开发和三维可视化等…

【无监督+自然语言】 GPT,BERT, GPT-2,GPT-3 生成式预训练模型方法概述 (Generative Pre-Traning)

主要参考 【GPT,GPT-2,GPT-3 论文精读【李沐论文精读】-2022.03.04】 https://www.bilibili.com/video/BV1AF411b7xQ/ 大语言模型综述: https://blog.csdn.net/imwaters/article/details/137019747 GPT与chatgpt的关系 图源:L…

时间序列生成数据,TransformerGAN

简介:这个代码可以用于时间序列修复和生成。使用transformer提取单变量或者多变时间窗口的趋势分布情况。然后使用GAN生成分布类似的时间序列。 此外,还实现了基于prompt的数据生成,比如指定生成某个月份的数据、某半个月的数据、某一个星期的…

【树莓派Linux内核开发】入门实操篇(虚拟机Ubuntu环境搭建+内核源码获取与配置+内核交叉编译+内核镜像挂载)

【树莓派Linux内核开发】入门实操篇(虚拟机Ubuntu环境搭建内核源码获取与配置内核交叉编译内核镜像挂载) 文章目录 【树莓派Linux内核开发】入门实操篇(虚拟机Ubuntu环境搭建内核源码获取与配置内核交叉编译内核镜像挂载)一、搭建…

WEB攻防-ASP安全-MDB下载

MDB下载漏洞主要涉及到早期ASPAccess构架的数据库文件。当Web站点提供文件下载功能时,如果没有对下载请求进行充分的验证和过滤,或者服务器配置不当,就可能产生文件下载漏洞。攻击者可以利用这个漏洞,通过修改请求参数或尝试猜测或…

「React Native」为什么要选择 React Native 作为的跨端方案

文章目录 前言一、常见因素二、举个栗子2.1 项目背景2.2 为什么选择 React Native2.3 项目实施2.4 成果总结 前言 没有完美的跨端技术,只有适合的场景。脱离适用场景去谈跨端技术没有什么意义。 一、常见因素 共享代码库: React Native 允许开发者编写…

[C++基础学习]----02-C++运算符详解

前言 C中的运算符用于执行各种数学或逻辑运算。下面是一些常见的C运算符及其详细说明:下面详细解释一些常见的C运算符类型,包括其原理和使用方法。 正文 01-运算符简介 算术运算符: a、加法运算符():对两个…

【QT】ROS2 Humble联合使用QT教程

【QT】ROS2 Humble联合使用QT教程 文章目录 【QT】ROS2 Humble联合使用QT教程1. 安装ROSProjectManager插件2. 创建ROS项目3.一个快速体验的demoReference 环境的具体信息如下: ubunt 22.04ros2 humbleQt Creator 13.0.0ROS ProjectManager 13.0.0 本文建立在已经…

重生之我是Nginx服务专家

nginx服务访问页面白色 问题描述 访问一个域名服务返回页面空白,非响应404。报错如下图。 排查问题 域名解析正常,网络通讯正常,绕过解析地址访问源站IP地址端口访问正常,nginx无异常报错。 在打开文件时,发现无法…

在IDEA中使用.env文件导入系统配置的图文教程

JetBrains的IDEA是一款功能强大的集成开发环境,为开发人员提供了丰富的功能和工具。使用.env文件来管理配置信息在IDEA中非常简单。 旧版本默认支持,新版本idea需要安装插件才可以。 这里我们可以安装EnvFile插件,步骤如下: 在弹…

2017年全国职业院校技能大赛高职组“信息安全管理与评估”样题

培训、环境、资料、考证 公众号:Geek极安云科 网络安全群:624032112 网络系统管理群:223627079 网络建设与运维群:870959784 移动应用开发群:548238632 极安云科专注于技能提升,赋能 2024年广东省高校的技…

项目部署总结

1、安装jdk 第一步:上传jdk压缩安装包到服务器 第二步:将压缩安装包解压 tar -xvf jdk-8uXXX-linux-x64.tar.gz 第三步:配置环境变量 编辑/etc/profile文件,在文件末尾添加以下内容: export JAVA_HOME/path/to/j…

GPT学术优化推荐(gpt_academic )

GPT学术优化 (GPT Academic):支持一键润色、一键中英互译、一键代码解释、chat分析报告生成、PDF论文全文翻译功能、互联网信息聚合GPT等等 ChatGPT/GLM提供图形交互界面,特别优化论文阅读/润色/写作体验,模块化设计,支持自定义快捷按钮&…

Treiber Stack简单分析

Treiber Stack简单分析 Treiber Stack Algorithm是一个可扩展的无锁栈,利用细粒度的并发原语CAS来实现的,Treiber Stack在 R. Kent Treiber在1986年的论文Systems Programming: Coping with Parallelism中首次出现. 基本原理 该算法的基本原理是&…

构建RAG应用-day06: 个人知识库助手项目

个人知识库助手 本文基于datawhale开源学习项目:llm-universe/docs/C6 at main datawhalechina/llm-universe (github.com) 获取数据库 该项目llm-universe个人知识库助手选用 Datawhale 一些经典开源课程、视频(部分)作为示例,具体包括:《机器学习公式详解》PDF版本 《面…

IDEA代码重构

重构 重构的目的: 提高代码的可读性、可维护性、可扩展性和性能。 重命名元素 重命名类 当我们进行重命名操作的时候可以看到第六行存在一个R(rename),点击后就会弹出所偶有引用,这样可以避免我们在修改后存在遗漏引用处未修改。 我们可以通过…