
进击的Matrix|饮茶鲜
2023/05/04阅读:14主题:凝夜紫
Netty的对象引用计数
从Netty 4
版本开始,某些对象的生命周期由其引用计数管理,因此Netty
可以在不再使用时立即将它们(或共享资源)返回到对象池中(或对象分配器)。垃圾回收和引用队列不能提供无法访问的有效实时保证,但是引用计数提供了一种替代机制,代价是有些轻微的不便。
ByteBuf
是最值得注意的类型,它利用引用计数来提高内存分配和释放内存的性能,本文档将解释Netty
中的ByteBuf
如何使用引用计数来工作的。
引用计数的基础知识
引用计数对象的初始引用计数为 1
ByteBuf buf = ctx.alloc().directBuffer();
assert buf.refCnt() == 1;
释放引用计数对象时,其引用计数将减少 1。如果引用计数达到 0,则释放被分配引用计数的对象或将其返回到它来自的对象池中:
assert buf.refCnt() == 1;
// release() returns true only if the reference count becomes 0.
boolean destroyed = buf.release();
assert destroyed;
assert buf.refCnt() == 0;
悬空引用
尝试访问引用计数为0的对象,将触发非法引用计数异常:
assert buf.refCnt() == 0;
try {
buf.writeLong(0xdeadbeef);
throw new Error("should not reach here");
} catch (IllegalReferenceCountExeception e) {
// Expected
}
增加引用计数
只要对象尚未销毁,引用计数值也可以通过retain()
操作递增:
ByteBuf buf = ctx.alloc().directBuffer();
assert buf.refCnt() == 1;
buf.retain();
assert buf.refCnt() == 2;
boolean destroyed = buf.release();
assert !destroyed;
assert buf.refCnt() == 1;
谁去销毁ByteBuf?
「一般的经验法则是,最后访问引用计数对象的一方负责销毁该引用计数对象,具体来说」:
-
如果一个[发送]组件将一个引用计数的对象传递给另一个[接收]组件,则发送组件通常不需要销毁它,而是由接收组件进行销毁。 -
如果一个组件使用了一个引用计数的对象,并且知道没有其他对象将再访问它(即,不会将引用传递给另一个组件),则该组件应该销毁它。
下面是一个简单的示例:
public ByteBuf a(ByteBuf input) {
input.writeByte(42);
return input;
}
public ByteBuf b(ByteBuf input) {
try {
output = input.alloc().directBuffer(input.readableBytes() + 1);
output.writeBytes(input);
output.writeByte(42);
return output;
} finally {
input.release();
}
}
public void c(ByteBuf input) {
System.out.println(input);
input.release();
}
public void main() {
...
ByteBuf buf = ...;
// This will print buf to System.out and destroy it.
c(b(a(buf)));
assert buf.refCnt() == 0;
}
上面的main( )
方法调用函数中,我可以的可得到以下的调用过程,并列出程序相应动作:
Action | Who should release? | Who released? |
---|---|---|
1. main() creates buf | buf→main() | |
2. main() calls a() with buf | buf→a() | |
3. a() returns buf merely. | buf→main() | |
4. main() calls b() with buf | buf→b() | |
5. b() returns the copy of buf | buf→b(), copy→main() | b() releases buf |
6. main() calls c() with copy | copy→c() | |
7. c() swallows copy | copy→c() | c() releases copy |
派生缓冲区
ByteBuf.duplicate()
、ByteBuf.slice()
和 ByteBuf.order(ByteOrder)
这些方法创建一个派生缓冲区,该缓冲区共享父缓冲区的内存区域。派生缓冲区没有自己的引用计数,但共享父缓冲区的引用计数。
ByteBuf parent = ctx.alloc().directBuffer();
ByteBuf derived = parent.duplicate();
// Creating a derived buffer does not increase the reference count.
assert parent.refCnt() == 1;
assert derived.refCnt() == 1;
相比之下,ByteBuf.copy()
和 ByteBuf.readBytes(int)
不是派生缓冲区,返回的字节空间是重新分配,需要释放。
请注意,父缓冲区及其派生缓冲区共享相同的引用计数,并且在创建派生缓冲区时,引用计数不会增加。因此,如果要将派生缓冲区传递给应用程序的其他组件,则必须首先对其调用 retain()
——增加引用计数。
//创建一个派生缓冲区
ByteBuf parent = ctx.alloc().directBuffer(512);
parent.writeBytes(...);
try {
while (parent.isReadable(16)) {
ByteBuf derived = parent.readSlice(16);
//derived需要传递process()方法,需手动调用retain()
derived.retain();
process(derived);
}
} finally {
parent.release();
}
...
public void process(ByteBuf buf) {
...
buf.release();
}
ByteBufHolder接口
有时,ByteBuf
由缓冲区持有者包含,例如 DatagramPacket
、HttpContent
和 WebSocketframe
。这些类型扩展了一个名为ByteBufHolder
的通用接口。
缓冲区持有者共享其包含的缓冲区的引用计数,就像派生的缓冲区一样。
ChannelHandler中的引用计数器
Inbound messages
当事件循环将数据读入ByteBuf
并随之触发 channelRead()
事件时,由相应管道中的 ChannelHandler
负责释放缓冲区。因此,使用接收到的数据的处理程序应在 channelRead()
处理程序方法中的数据上调用 release()
。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
try {
...
} finally {
buf.release();
}
}
如本文档的“「谁去销毁ByteBuf?」”部分所述,如果您的处理程序将缓冲区(或任何引用计数的对象)传递给下一个处理程序,则无需释放它:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
...
ctx.fireChannelRead(buf);
}
请注意,ByteBuf
并不是 Netty
中唯一的引用计数类型。如果您正在处理解码器生成的消息,则消息很可能也是引用计数的。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
...
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
try {
...
} finally {
content.release();
}
}
}
如果您有任何疑问,或者想要简化消息的发布,可以使用ReferenceCountUtil.release()
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
...
} finally {
ReferenceCountUtil.release(msg);
}
}
或者,您可以考虑扩展SimpleChannelHandler
,它为您收到的所有消息调用 ReferenceCountUtil.release(msg)
。
Outbound messages
与入队消息不同,出队消息由应用程序创建,Netty
有责任在将这些消息写出到网络后释放这些消息。但是,拦截写入请求的处理程序应确保正确释放任何中间对象。 (e.g. encoders)
// Simple-pass through
public void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise) {
System.err.println("Writing: " + message);
ctx.write(message, promise);
}
// Transformation
public void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise) {
if (message instanceof HttpContent) {
// Transform HttpContent to ByteBuf.
HttpContent content = (HttpContent) message;
try {
ByteBuf transformed = ctx.alloc().buffer();
....
ctx.write(transformed, promise);
} finally {
content.release();
}
} else {
// Pass non-HttpContent through.
ctx.write(message, promise);
}
}
缓冲区泄漏疑难解答
「引用计数的缺点是很容易泄漏引用计数的对象,由于 JVM 不知道 Netty 实现的引用计数,因此一旦它们变得无法访问,它将自动对它们进行垃圾回收,即使它们的引用计数不为零。一旦垃圾回收,对象就无法复活,因此无法返回到它来自的池中,因此会产生内存泄漏。」
*幸运的是,尽管很难找到泄漏,但默认情况下,Netty 会对大约 1% 的缓冲区分配进行采样,以检查应用程序中是否存在泄漏。如果泄漏,您会发现以下日志消息*:
LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=advanced' or call ResourceLeakDetector.setLevel()
中文: ByteBuf.release() 在垃圾回收之前没有被调用。启用高级泄漏报告以找出泄漏发生的位置。要启用高级泄漏报告,请指定 JVM 选项“-Dio.netty.leakDetectionLevel=advanced”或调用 ResourceLeakDetector.setLevel()
使用上面提到的 JVM 选项重新启动应用程序,您将看到访问泄漏缓冲区的应用程序的最新位置。以下输出显示了我们的单元测试(XmlFrameDecoderTest.testDecodeWithXml()
)的泄漏:
Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.toString(AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133)
...
Created at:
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.UnpooledUnsafeDirectByteBuf.copy(UnpooledUnsafeDirectByteBuf.java:465)
io.netty.buffer.WrappedByteBuf.copy(WrappedByteBuf.java:697)
io.netty.buffer.AdvancedLeakAwareByteBuf.copy(AdvancedLeakAwareByteBuf.java:656)
io.netty.handler.codec.xml.XmlFrameDecoder.extractFrame(XmlFrameDecoder.java:198)
io.netty.handler.codec.xml.XmlFrameDecoder.decode(XmlFrameDecoder.java:174)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:227)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:140)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:74)
io.netty.channel.embedded.EmbeddedEventLoop.invokeChannelRead(EmbeddedEventLoop.java:142)
io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:317)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
io.netty.channel.embedded.EmbeddedChannel.writeInbound(EmbeddedChannel.java:176)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:147)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133)
...
如果您使用 Netty 5
或更高版本,则会提供其他信息来帮助您找到最后处理泄漏缓冲区的处理程序。下面的示例显示泄漏的缓冲区由名称为EchoServerHandler#0
的处理程序处理,然后被垃圾回收,这意味着 EchoServerHandler#0
可能忘记释放缓冲区。
12:05:24.374 [nioEventLoop-1-1] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 2
#2:
Hint: 'EchoServerHandler#0' will handle the message from this point.
io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:329)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:133)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
java.lang.Thread.run(Thread.java:744)
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:589)
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:208)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:125)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
java.lang.Thread.run(Thread.java:744)
Created at:
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
java.lang.Thread.run(Thread.java:744)
泄漏检测级别
目前有 4 个泄漏检测级别的:
-
DISABLED - 完全禁用泄漏检测。不推荐。 -
SIMPLE - 抽样 1% 的缓冲区是否有泄漏。默认。 -
ADVANCED - 抽样 1% 的缓冲区是否泄漏,以及缓冲区泄漏的代码位置。 -
PARANOID - 与 ADVANCED 相同,只是它适用于每个缓冲区,适用于自动化测试阶段。如果生成输出包含“LEAK:”,则可能会使生成失败。
您可以将泄漏检测级别指定为 JVM 选项 -Dio.netty.leakDetection.level
。
java -Dio.netty.leakDetection.level=advanced ...
NOTE: 此属性以前称为 io.netty.leakDetectionLevel。
避免泄漏的最佳实践
-
在 PARANOID 泄漏检测级别以及 SIMPLE 级别运行单元测试和集成测试。 -
在 SIMPLE 级别向整个集群推出应用程序之前,请先在相当长的时间内查看是否存在泄漏。 -
如果有泄漏,灰度发布中使用 ADVANCED 级别,以获得有关泄漏来源的一些提示。 -
不要将泄漏的应用程序部署到整个群集。
修复单元测试中的泄漏
在单元测试中很容易忘记释放缓冲区或消息。它将生成泄漏警告,但并不一定意味着您的应用程序存在泄漏。您可以使用ReferenceCountUtil.releaseLater()
实用程序方法,而不是使用 try-finally
块包装单元测试以释放所有缓冲区:
import static io.netty.util.ReferenceCountUtil.*;
@Test
public void testSomething() throws Exception {
// ReferenceCountUtil.releaseLater() will keep the reference of buf,
// and then release it when the test thread is terminated.
ByteBuf buf = releaseLater(Unpooled.directBuffer(512));
...
}
原文链接: https://netty.io/wiki/reference-counted-objects.html#wiki-h2-10
「扩展阅读链接」
-
Why do we need to manually handle reference counting for Netty ByteBuf if JVM GC is still in place? -
Buffer ownership in Netty 4: How is buffer life-cycle managed?
最后欢迎大家点赞、收藏、评论,转发!
欢迎大家关注我的微信公众号!随机分享无用的计算机知识,
微信搜索:进击的Matrix

作者介绍

进击的Matrix|饮茶鲜
软件开发工程师