netty4-数据读取性能大于处理时“直接内存水位处理器”使用admin2年前更新关注020 PS:禁止拷贝形式转载,转载请以URL形式 1 简介 实现程序 Tcp-Client Udp-Server Udp-Client Tcp-Server 背景: 业务需要实现大致如上图场景程序, Udp Server 读取Udp Client 的数据然后通过 Tcp client 发送出去 。 问题: 使用netty4 实现了一版程序,发现会出现下述场景:Udp Server读取出了大量数据但是Tcp client处理速度小于读取,导致数据积压抛出io.netty.util.internal.OutOfDirectMemoryError无法申请直接内存DirectBuf进而无法读取数据,最终UDP Server断开。 结果: 最终借鉴netty 写入时的WRITE_BUFFER_WATER_MARK高低水位限流实现了一个作用在UDP Server读取时我称为直接内存水位处理器DirectMemoryWaterHandler用来处理数据积压的场景 2 参考 超详细图文详解神秘的 Netty 高性能内存管理 深入理解Netty—从偶现宕机看Netty流量控制 3 环境 java:1.8 netty:4.1.90.Final 4 分析 PS:当前我使用的ByteBufAllocator实际为PooledByteBufAllocator(总共就两种池化和非池化,后续例子都是以池化进行解释) 协议上Udp在设计上无需连接、会话、ACK等额外限制只考虑发或收的情况下UDP读写都必然要高于TCP,所以无程序或业务限制下必然出现上述问题 netty类库读取数据(TCP,UDP都是)强制使用直接内存DirectBuf,写入的话最终也是使用直接内存DirectBuf写入(如果写入是堆内存HeapBuf最终在写入前会复制到直接内存DirectBuf在写入),即读写都是DirectBuf且这个DirectBuf由ByteBufAllocator进行创建管理。 当前我使用的ByteBufAllocator为PooledByteBufAllocator,基本当前程序不去额外配置的话只会存在一个ByteBufAllocator什么你配置了多个自己 PooledByteBufAllocator创建管理内存DirectBuf但是最终调用PlatformDependent去创建内存,PooledByteBufAllocator.pinnedDirectMemory()查询当前buf分配器内pinned住的内存, PlatformDependent.maxDirectMemory()和PlatformDependent.usedDirectMemory()查询当前系统下netty最大直接内存和以用直接内存。 netty 直接内存DirectBuf再写入socket channel时会被pinned住,即netty 管理的DirectBuf会有如下关系 最大直接内存:maxDirectMemory 已用直接内存:usedDirectMemory pinned住内存:pinnedDirectMemory 当前我使用的ByteBufAllocator实际为PooledByteBufAllocator(总共就两种池化和非池化,后续例子都是以池化进行解释),因为池化会缓存DirectBuf且缓存DirectBuf也被统计为已用直接内存,会出现已用内存占最大内存50%但是我们实际可用任是100%,结合我的业务场景使用的DirectBuf都是写入socket 中即会pinned住,所以后续判断是否触发水位线时使用已用直接内存usedDirectMemory,判断是否解除水位线时使用pinnedDirectMemory 5 实现 其实根据上述分析,实现重点 创建直接内存水位处理器DirectMemoryWaterHandler,设置大小水位线条 每次UDP Server UdpServerHandler读取数据时,由直接内存水位处理器DirectMemoryWaterHandler判断当前实际使用直接内存大小进行判断是否触发水位,根据状态进行相应数据处理(当前演示代码处理实现为丢弃) DirectMemoryWaterHandler:直接内存水位处理器 import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.socket.DatagramPacket; import io.netty.util.internal.PlatformDependent; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicBoolean; /** * @ClassName * @Description * @Author dyf * @Date 2023/6/14 * @Version 1.0 */ @Slf4j public class DirectMemoryWaterHandler { //低水位线百分比:当前固定为0.2 private final float lowWaterMark = 0.2F; //高水位线百分比:当前固定为0.95 private final float highWaterMark = 0.95F; private final PooledByteBufAllocator pooledByteBufAllocator; private final AtomicBoolean waterState = new AtomicBoolean(false); public DirectMemoryWaterHandler(PooledByteBufAllocator pooledByteBufAllocator) { this.pooledByteBufAllocator = pooledByteBufAllocator; } public boolean handle(DatagramPacket packet){ //当前netty 最大直接内存大小 long max = PlatformDependent.maxDirectMemory(); //当前netty 以用直接内存大小 long use = PlatformDependent.usedDirectMemory(); double percentage = use / (double) max; //水位限制 if (waterState.get()) { //处于水位控制中,当前演示代码处理实现为丢弃 packet.release(); //为了处理PooledByteBufAllocator,会缓存直接内存导致usedDirectMemory不是实际使用内存 //结合当前业务场景使用pinnedDirectMemory这个值更符合实际场景,当然该值会比正常的实际使用usedDirectMemory小一点,但是我当前业务场景可以忽略 percentage = pooledByteBufAllocator.pinnedDirectMemory() / (double) max; if (percentage < lowWaterMark) { waterState.set(false); log.warn("UDP Server 通道水位限制解除,当前水位[{}%-{}] 限制水位[{}%] 最大容量:[{}]", percentage, use, highWaterMark, max); } return false; } else if (percentage > highWaterMark) { //处于水位控制中,当前演示代码处理实现为丢弃 packet.release(); waterState.set(true); log.warn("UDP Server 通道水位限制生效,当前水位[{}%-{}] 限制水位[{}%] 最大容量:[{}]", percentage, use, highWaterMark, max); return false; } return true; } } UdpServerHandler:UDP读取时限流使用 import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import lombok.extern.slf4j.Slf4j; /** * @ClassName * @Description * @Author dyf * @Date 2023/5/6 * @Version 1.0 */ @Slf4j public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> { private DirectMemoryWaterHandler handler; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); handler = new DirectMemoryWaterHandler((PooledByteBufAllocator) ctx.alloc()); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket packet) throws Exception { if (handler.handle(packet)){ //业务处理:比如当前我的业务场景TCP客户端发送数据 } } } © 版权声明文章版权归作者所有,未经允许请勿转载,侵权请联系 admin@trc20.tw 删除。THE END后端# Java# Netty 喜欢就支持一下吧点赞0收藏 Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYz6kUEd' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345 admin关注 2.2W+0234.8W+ 这家伙很懒,什么都没有写... 上一篇 Go 语言之 SQLX 高级操作 sqlx.In 下一篇 Spring Security6 全新写法,大变样! 相关推荐 Camunda实战教程之员工请假流程Camunda实战教程之员工请假流程2年前 1104Aviator这么丝滑,怎么实现的呢?Aviator这么丝滑,怎么实现的呢?2年前 165阿里多线程值传递 transmittable-thread-local 教程阿里多线程值传递 transmittable-thread-local 教程2年前 127MybatisPlus方法详细使用,实现无SQL式开发MybatisPlus方法详细使用,实现无SQL式开发3年前 118MCSM面板一键搭建我的世界服务器 – 外网远程联机【内网穿透工具】MCSM面板一键搭建我的世界服务器 – 外网远程联机【内网穿透工具】2年前 112java与es8实战之四:SpringBoot应用中操作es8(无安全检查)java与es8实战之四:SpringBoot应用中操作es8(无安全检查)2年前 111 评论 抢沙发 欢迎您留下宝贵的见解!提交 昵称 图形验证码 取消 提交评论 昵称请填写用户信息:昵称(必填)邮箱(必填)代码请输入代码:确认图片请填写图片地址:确认 文章目录PS:禁止拷贝形式转载,转载请以URL形式1 简介2 参考3 环境4 分析5 实现 文章目录PS:禁止拷贝形式转载,转载请以URL形式1 简介2 参考3 环境4 分析5 实现热门推荐 登录没有帐号?立即注册用户名或邮箱登录密码图形验证码记住登录找回密码登录注册已有帐号,立即登录设置用户名设置密码重复密码图形验证码注册