当前位置: 首页 > news >正文

Netty代码阅读

阅读Netty官方文档的时候,提到了Netty主要有三大核心,分别是buffer、channel、Event Model,接下来我们就从阅读Netty代码来理解这三大核心。

示例程序

先给出示例程序,方便自己也方便读者进行debug调试。

Server端代码

# Server.java文件package org.example;public class Server {public static void main(String[] args) throws Exception {int port = 8080;if (args.length > 0) {port = Integer.parseInt(args[0]);}new DiscardServer(port).run(); // ref-1}}

ref-1处的代码创建的DiscardServer对象如下所示。

// DiscardServer.java文件
package org.example;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class DiscardServer {private int port;public DiscardServer(int port) {this.port = port;}public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler()); // ref-2}}).option(ChannelOption.SO_BACKLOG, 128)          .childOption(ChannelOption.SO_KEEPALIVE, true);// Bind and start to accept incoming connections.ChannelFuture f = b.bind(port).sync();// Wait until the server socket is closed.// In this example, this does not happen, but you can do that to gracefully// shut down your server.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}}

ref-2处会将ChannelHandler的实现类TimeEncoder和TimeServerHandler的对象添加到pipeline的最后位置。这个两个类的代码如下所示:

// TimeEncoder.java
package org.example;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;public class TimeEncoder extends MessageToByteEncoder<UnixTime> {@Overrideprotected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {out.writeInt((int)msg.value()); // ref-3}}
// TimeServerHandler.java文件
package org.example;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;public class TimeServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ChannelFuture f = ctx.writeAndFlush(new UnixTime());f.addListener(ChannelFutureListener.CLOSE);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

ref-3处代码就是将数据写入到ByteBuf中,后文会详细讲解。

Client端代码

package org.example;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class Client {public static void main(String[] args) throws Exception {String host = "127.0.0.1";int port = 8080;EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap(); b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); // ref-4}});// Start the client.ChannelFuture f = b.connect(host, port).sync(); // Wait until the connection is closed.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();}}}

ref-4处会将ChannelHandler的实现类TimeDecoder和TimeClientHandler的对象注册到pipeline的最后处,这两个类的实现代码如下所示:

package org.example;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;public class TimeDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) {return;}out.add(new UnixTime(in.readUnsignedInt())); // ref-5}
}
package org.example;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;public class TimeClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {UnixTime m = (UnixTime) msg;System.out.println(m);ctx.close();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

ref-5处的代码会从ByteBuf中读取数据,详细内容后文会解析。

运行Server和Client

先运行Server的main方法,然后运行Client的main方法,会得到如下的输出:

Sat Aug 24 15:06:11 CST 2024

ByteBuf解析

写入数据到ByteBuf

ref-3代码会向ByteBuf写入一个Int类型的数据,先看一下在抽象类ByteBuf中的申明:

// io.netty.buffer.ByteBuf.java文件
/*** Sets the specified 32-bit integer at the current {@code writerIndex}* and increases the {@code writerIndex} by {@code 4} in this buffer.* If {@code this.writableBytes} is less than {@code 4}, {@link #ensureWritable(int)}* will be called in an attempt to expand capacity to accommodate.*/
public abstract ByteBuf writeInt(int value);

这个方法的大意就是会将32位的整数设置到当前写指针(writerIndex)的位置,并且将writerIndex增加4。如果可以写的空间少于4,那么就会调用ensureWritable(int)方法尝试扩大容量以容纳32位整数数据。

然后我们在看一下具体的实现:

// io.netty.buffer.AbstractByteBuf.java文件
@Override
public ByteBuf writeInt(int value) {ensureWritable0(4);_setInt(writerIndex, value);writerIndex += 4;return this;
}

在实现类中,这个写入32位整数的代码就是在完成上述申明中的步骤。先调用ensureWritable0(4)确保有足够的空间写入32位整数,然后调用_setInt(writerIndex, value)执行写入操作,最后将writerIndex增加4。

接下来我们追一下写入数据的步骤,如下所示:

// io.netty.buffer.PooledUnsafeDirectByteBuf.java文件
@Override
protected void _setInt(int index, int value) {UnsafeByteBufUtil.setInt(addr(index), value); // ref-6
}

继续跟一下,可以发现是调用了Java的Unsafe方法:

// io.netty.buffer.UnsafeByteBufUtil.java文件
static void setInt(long address, int value) {if (UNALIGNED) {PlatformDependent.putInt(address, BIG_ENDIAN_NATIVE_ORDER ? value : Integer.reverseBytes(value));} else {PlatformDependent.putByte(address, (byte) (value >>> 24));PlatformDependent.putByte(address + 1, (byte) (value >>> 16));PlatformDependent.putByte(address + 2, (byte) (value >>> 8));PlatformDependent.putByte(address + 3, (byte) value);}
}
// io.netty.util.internal.PlatformDependent.java文件
public static void putInt(long address, int value) {PlatformDependent0.putInt(address, value);
}
// io.netty.util.internal.PlatformDependent0.java文件
static void putInt(long address, int value) {UNSAFE.putInt(address, value);
}

我们看一下Unsafe类的说明,直接上jdk文档内容:


/*** A collection of methods for performing low-level, unsafe operations.* Although the class and all methods are public, use of this class is* limited because only trusted code can obtain instances of it.** <em>Note:</em> It is the responsibility of the caller to make sure* arguments are checked before methods of this class are* called. While some rudimentary checks are performed on the input,* the checks are best effort and when performance is an overriding* priority, as when methods of this class are optimized by the* runtime compiler, some or all checks (if any) may be elided. Hence,* the caller must not rely on the checks and corresponding* exceptions!** @author John R. Rose* @see #getUnsafe*/public final class Unsafe {......
}

第一句话就说明了,这个类提供了一系列方法来执行底层的、不安全的操作。简单点说,就是这个类直接操作的内存。

ref-6 处有个细节,就是计算地址的方法调用addr(index),我们下面详细看一下:

// io.netty.buffer.PooledUnsafeDirectByteBuf.java文件
private long addr(int index) {return memoryAddress + index;
}

计算地址就是起始地址加一个偏移量index,这个index就是我们在上层传递的writerIndex。这儿就体现了写指针的作用,它就是记录数据已经写到哪个位置了,下一次写数据就从这个位置开始写。

从ByteBuf读取数据

写入数据分析完了,我们再分析一下读取数据。ref-5处的代码就是在从ByteBuf中读取数据in.readUnsignedInt(),我们先看一下这个方法的申明。

// io.netty.buffer.ByteBuf.java文件
/*** Gets an unsigned 32-bit integer at the current {@code readerIndex}* and increases the {@code readerIndex} by {@code 4} in this buffer.** @throws IndexOutOfBoundsException*         if {@code this.readableBytes} is less than {@code 4}*/
public abstract long  readUnsignedInt();

这个方法会在readerIndex位置读取32位的整数,然后将readerIndex增加4。

我们再看一下具体实现:

// 会进入到io.netty.buffer.AbstractByteBuf.java中的这个方法。
@Override
public int readInt() {checkReadableBytes0(4);int v = _getInt(readerIndex);readerIndex += 4;return v;
}

接下来看看_getInt(readerIndex)方法的调用:

// io.netty.buffer.PooledUnsafeDirectByteBuf.java
@Override
protected int _getInt(int index) {return UnsafeByteBufUtil.getInt(addr(index));
}

这个方法是不是很熟悉啊,和写入数据一样,都是先计算地址,再进行操作,底层也是依赖的Unsafe类。

到这儿也能体现出来readerIndex的作用了,它就是记录读取数据到哪儿了,然后下一次读取的时候就从readerIndex开始读取。

ByteBuf总结

结合ByteBuf类上的注释,对它进行一个总结。ByteBuf是底层byte数组或者java NIO Buffer的一个视图,它维护了两个指针,分别是读指针(readerIndex)和写指针(writerIndex),这两个指针分别记录读取和写入数据的位置。

具体示意图如下:

       +-------------------+------------------+------------------+| discardable bytes |  readable bytes  |  writable bytes  ||                   |     (CONTENT)    |                  |+-------------------+------------------+------------------+|                   |                  |                  |0      <=      readerIndex   <=   writerIndex    <=    capacity

这两个指针将对应byte数组分成了三个区域。readable bytes区域是实际存储数据的区域,writable bytes是需要填充的未定义区域,discardable bytes区域包含的是已经被读操作获取了的数据。

Channel解析

接下来我们看一下核心组件Channel,它代表的是与Socket或者有能力进行I/O操作的组件的连结,比如读、写、连接或者绑定。

Channel为用户提供如下能力:

  • 获取channel的当前状态。
  • 获取Channel的配置参数。
  • Channel支持的I/O操作。
  • ChannelPipeling会处理和channel相关的所有I/O事件和请求。

由于使用Netty时并不直接使用Channel,所以对于Channel的理解,目前就到这儿。

Event Model解析

用下面的图对Netty事件模型进行一个总结:

在这里插入图片描述

Java NIO负责处理客户端的请求,每来一个请求就会创建一个channel进行处理。

channel会附带一个channelPipeline,里面有添加的ChannelHandler。

EventLoop其实就是一个被封装了的线程池,ChannelHandler的执行就是在EventLoop中的线程上完成的。

总结

自己的水平有限,对于Netty的源码就只能分析到这儿了。

做个简单的总结,Netty底层是基于Java NIO的,在其上创造了三个重要的概念,(1)Channel,接收客户端请求的通道;(2)ByteBuf 对底层内存进行直接操作的缓冲区;(3)Event Model,主要是EventLoop对线程池的封装,还有对各个生命周期函数的调用。


http://www.mrgr.cn/news/7944.html

相关文章:

  • 依赖包更新了但是没有release,如何安装更新的依赖包
  • 链表OJ题——相交链表
  • 电脑ip地址为什么会自己变更?电脑ip怎么改
  • python socket 发生UDP 和 UDPServer接受UDP实例
  • 二叉树的介绍
  • Kali Linux 秘籍 中文版
  • 安全面试常见问题任意文件下载
  • IP进程间的通信方式以及不同主机间的通信方式
  • MySQL 学习笔记之约束与外键
  • 编程思维模式比编程语言内容等更重要也更难传授-2024-机器人篇
  • SpringBoot接口内部从sftp服务器获取文件流实现文件下载
  • 什么是持续集成(持续交付、部署)
  • 组合数dfs
  • Ruby宝石光芒:探索SEO优化的瑰宝工具与库
  • 【Python】家庭用电数据分析Prophet预测
  • 【C++ Primer Plus习题】4.10
  • (贪心) LeetCode 45. 跳跃游戏 II
  • PV、UV、IP:网站流量分析的关键指标
  • viscode 自定义片段,快速生成自己的开发模板
  • java 字符串判断非空工具类 不用依赖