Luckylau's Blog

Netty之ByteBufAllocator

ByteBufAllocator接口的继承图如下:

它提供了一系列接口给用户调用, 包括生成各种ByteBuf 的操作,例如 ioBuffer(), heapBuffer(), directBuffer(), compositeBuffer()等等,并且提供了一个默认工厂ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; 在ByteBufUtil工具类里定义了一个默认工厂, 默认工厂的具体类型需要由具体实现的子类确定,默认情况下, 根据当前应用所在的平台, 判断需要使用池化工厂还是非池化工厂。

AbstractByteBufAllocator

AbstractByteBufAllocator抽象类首先实现了ByteBufAllocator接口,这是对工厂实现的一层抽象, 设计巧妙。

内存泄漏检测功能

​ 首先抽象层实现了内存泄漏检测功能, 原因是对于DirectBuf,其内存不受VM垃圾回收控制只有在调用release导致计数为0时才会主动释放内存,而PooledByteBuf只有在release后才能被回收到池中以循环利用。netty定义了4种层级的内存泄漏检测,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public enum Level {
/**
* Disables resource leak detection.
*/
//取消内存泄露检测
DISABLED,
/**
* Enables simplistic sampling resource leak detection which reports there is a leak or not,
* at the cost of small overhead (default).
*/
//使用抽样检测的方式(抽样间隔为:samplingInterval),并且仅仅只是打印是否发生了内存泄露,
SIMPLE,
/**
* Enables advanced sampling resource leak detection which reports where the leaked object was accessed
* recently at the cost of high overhead.
*/
//使用抽样检测的方式(抽样间隔为:samplingInterval),并且打印哪里发生了内存泄露
ADVANCED,
/**
* Enables paranoid resource leak detection which reports where the leaked object was accessed recently,
* at the cost of the highest possible overhead (for testing purposes only).
*/
//对每一个对象都进行检测,并且打印内存泄露的地方,我就是这么任性,这么偏执。负载较高,适合测试模式
PARANOID;
/**
* Returns level based on string value. Accepts also string that represents ordinal number of enum.
*
* @param levelStr - level string : DISABLED, SIMPLE, ADVANCED, PARANOID. Ignores case.
* @return corresponding level or SIMPLE level in case of no match.
*/
static Level parseLevel(String levelStr) {
String trimmedLevelStr = levelStr.trim();
for (Level l : values()) {
if (trimmedLevelStr.equalsIgnoreCase(l.name()) || trimmedLevelStr.equals(String.valueOf(l.ordinal()))) {
return l;
}
}
return DEFAULT_LEVEL;
}
}

默认的监测等级是SIMPLE;所以启用内存泄露检测之后,内存得到的ByteBuf对象都是经过toLeakAwareBuffer()方法封装的,该方法作用就是对ByteBuf对象进行引用计数,使用SimpleLeakAwareByteBuf或者AdvancedLeakAwareByteBuf来包装ByteBuf。首先通过toLeakAwareBuffer(ByteBuf )方法根据不同的等级调用会AbstractByteBuf.leakDetector.track(buf),它的目的在于返回一个DefaultResourceLeak对象,用于构建SimpleLeakAwareByteBuf或者AdvancedLeakAwareByteBuf。DefaultResourceLeak继承了虚引用WeakReference,实现了ResourceLeakTracker和ResourceLeak接口。我们看一下代码部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
//创建ResourceLeakTracker逻辑
private DefaultResourceLeak track0(T obj) {
Level level = ResourceLeakDetector.level;
if (level == Level.DISABLED) {
//如果跟踪登记是Disable,直接返回null
return null;
}
// 如果等级小于Paranoid
if (level.ordinal() < Level.PARANOID.ordinal()) {
if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
// 如果这次随机触发了采样间隔
// 就报告现有的泄漏
// 并返回一个DefaultResourceLeak示例来跟踪当前资源
// 注意为了性能,这里使用了ThreadLocalRandom
reportLeak();
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
// 否则如果没触发采样间隔
// 则直接返回null 表示不用跟踪这次资源
return null;
}
//每次资源创建都需要跟踪上一次的内存泄漏
reportLeak();
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
private void reportLeak() {
// 如果没有启用error日志,仅仅清空当前ReferenceQueue即可
if (!logger.isErrorEnabled()) {
clearRefQueue();
return;
}
// 检查和报告之前所有的泄漏
for (;;) {
@SuppressWarnings("unchecked")
DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
if (ref == null) {
break;
}
// 如果这个DefaultResourceLeak对象的dispose方法返回false
// 说明它所跟踪监控的资源已经被正确释放,不存在泄露
if (!ref.dispose()) {
continue;
}
// 到这里说明已经产生泄露了
// 获取这个泄露的相关记录的字符串
String records = ref.toString();
if (reportedLeaks.putIfAbsent(records, Boolean.TRUE) == null) {
if (records.isEmpty()) {
// 如果字符串为空说明没有任何记录
// 就需要报告为untracked的泄漏
// 这个方法就直接记录日志,没什么可看的
reportUntracedLeak(resourceType);
} else {
// 否则就是报告为tracked的泄漏
// 这个方法就直接记录日志就好,没什么可看的
reportTracedLeak(resourceType, records);
}
}
}
}
boolean dispose() {
clear();
return allLeaks.remove(this, LeakEntry.INSTANCE);
}

PooledByteBufAllocator

heap内存的分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;
final ByteBuf buf;
if (heapArena != null) {
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}

direct内存的分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}

UnpooledByteBufAllocator

heap内存的分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
checkNotNull(alloc, "alloc");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
setArray(allocateArray(initialCapacity));
setIndex(0, 0);
}
private void setArray(byte[] initialArray) {
array = initialArray;
tmpNioBuf = null;
}
public ByteBuf setIndex(int readerIndex, int writerIndex) {
if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
readerIndex, writerIndex, capacity()));
}
setIndex0(readerIndex, writerIndex);
return this;
}
final void setIndex0(int readerIndex, int writerIndex) {
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
}

UnpooledHeapByteBuf和UnpooledUnsafeHeapByteBuf存储的时候,都是通过一个array。但是_getByte(array,index)的时候,UnpooledUnsafeHeapByteBuf底层是直接操作array:

1
2
3
4
5
6
7
8
9
10
@Override
public byte getByte(int index) {
checkIndex(index);
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(array, index);
}

而 UnpooledHeapByteBuf是通过UNSAFE操作这个数组:

1
2
3
4
5
6
7
8
9
10
@Override
public byte getByte(int index) {
ensureAccessible();
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return HeapByteBufUtil.getByte(array, index);
}

UnpooledHeapByteBuf和UnpooledUnsafeHeapByteBuf存储的时候,都是通过一个array。但是_getByte(array,index)的时候,UnpooledHeapByteBuf底层是直接操作array;而 UnpooledUnsafeHeapByteBuf是通过UNSAFE操作这个数组。

direct内存的分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
}
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
}
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
this.buffer = buffer;
tmpNioBuf = null;
capacity = buffer.remaining();
}
//ByteBuffer.allocateDirect方法
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}

UnpooledDirectByteBuf和UnpooledUnsafeDirectByteBuf存储的时候,都通过了一个buffer存储(而且UnpooledUnsafeDirectByteBuf还会计算这个buffer在内存的地址memoryAddress,以后通过操作这个memoryAddress+index操作buffer)。_getByte(index)的时候UnpooledDirectByteBuf通过操作这个buffer操作数据;

1
2
3
4
5
6
7
8
9
10
@Override
public byte getByte(int index) {
ensureAccessible();
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return buffer.get(index);
}

而UnpooledUnsafeDirectByteBuf通过操作unsafe操作index(这个index是memoryAddress+index得来的)来操作数据。(但是其实操作jdk底层buffer的时候都是通过unsafe操作的,所以direct的数据最终都是通过unsafe操作的)。

1
2
3
4
5
6
7
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(addr(index));
}
long addr(int index) {
return memoryAddress + index;
}

参考

https://www.jianshu.com/p/af5419b7dc0a

Luckylau wechat
如果对您有价值,看官可以打赏的!