深入浅出Netty:read

本系列:


boss线程主要负责监听并处理accept事件,将socketChannel注册到work线程的selector,由worker线程来监听并处理read事件,本节主要分析Netty如何处理read事件。

accept->read

当work线程的selector检测到OP_READ事件发生时,触发read操作。

该read方法定义在类NioByteUnsafe中。

1、allocHandle负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少,先看看AdaptiveRecvByteBufAllocator内部实现:

SIZE_TABLE:按照从小到大的顺序预先存储可以分配的缓存大小。
从16开始,每次累加16,直到496,接着从512开始,每次增大一倍,直到溢出。
DEFAULT_MINIMUM:最小缓存(64),在SIZE_TABLE中对应的下标为3。
DEFAULT_MAXIMUM :最大缓存(65536),在SIZE_TABLE中对应的下标为38。
DEFAULT_INITIAL :初始化缓存大小,第一次分配缓存时,由于没有上一次实际收到的字节数做参考,需要给一个默认初始值。
INDEX_INCREMENT:上次预估缓存偏小,下次index的递增值。
INDEX_DECREMENT :上次预估缓存偏大,下次index的递减值。

2、allocHandle.allocate(allocator) 申请一块指定大小的内存。

通过ByteBufAllocator的ioBuffer方法申请缓存。

根据平台是否支持unsafe,选择使用直接物理内存还是堆上内存。

direct buffer方案:

UnpooledUnsafeDirectByteBuf是如何实现缓存管理的?对Nio的ByteBuffer进行了封装,通过ByteBuffer的allocateDirect方法实现缓存的申请。

memoryAddress = PlatformDependent.directBufferAddress(buffer) 获取buffer的address字段值,指向缓存地址。
capacity = buffer.remaining() 获取缓存容量。

方法toLeakAwareBuffer(buf)对申请的buf又进行了一次包装:

Netty中使用引用计数机制来管理资源,ByteBuf实现了ReferenceCounted接口,当实例化一个ByteBuf时,引用计数为1, 代码中需要保持一个该对象的引用时需要调用retain方法将计数增1,对象使用完时调用release将计数减1。当引用计数变为0时,对象将释放所持有的底层资源或将资源返回资源池。

3、方法doReadBytes(byteBuf) 将socketChannel数据写入缓存。

最终底层采用ByteBuffer实现read操作,这里有一块逻辑不清楚,为什么要用tmpNioBuf?

int localReadAmount = doReadBytes(byteBuf);
1、如果返回0,则表示没有读取到数据,则退出循环。
2、如果返回-1,表示对端已经关闭连接,则退出循环。
3、否则,表示读取到了数据,数据读入缓存后,触发pipeline的ChannelRead事件,byteBuf作为参数进行后续处理,这时自定义Inbound类型的handler就可以进行业务处理了。

其中参数msg,就是对应的byteBuf,当请求的数据量比较大时,会多次触发channelRead事件,默认最多触发16次,可以通过maxMessagesPerRead字段进行配置。
如果客户端传输的数据过大,可能会分成好几次传输,因为TCP一次传输内容大小有上限,所以同一个selectKey会触发多次read事件,剩余的数据会在下一轮select操作继续读取。

在实际应用中,应该把所有请求数据都缓存起来再进行业务处理。
所有数据都处理完,触发pipeline的ChannelReadComplete事件,并且allocHandle记录这次read的字节数,进行下次处理时缓存大小的调整。

到此为止,整个NioSocketChannel的read事件已经处理完成。

打赏支持我写出更多好文章,谢谢!

打赏作者

打赏支持我写出更多好文章,谢谢!

1 2 收藏 评论

关于作者:占小狼

我是占小狼。在魔都艰苦奋斗,白天是上班族,晚上是知识服务工作者。如果读完觉得有收获的话,记得关注和点赞哦。非要打赏的话,我也是不会拒绝的。 个人主页 · 我的文章 · 9 ·  

相关文章

可能感兴趣的话题



直接登录
跳到底部
返回顶部