Luckylau's Blog

Netty之ByteBuf

什么是ByteBuf

​ 数据在网络上是以字节流的形式进行传输的。Java官方的NIO提供了一个ByteBuffer类作为字节的容器。但是ByteBuffer的使用比较复杂,尤其是需要通过flip()方法对读写进行切换。因此netty重新设计了一个字节容器,即ByteBuf,没有了ByteBuf,Netty就失去了灵魂,其他所有的都将变得毫无意义。

ByteBuf与ByteBuffer的对比

​ Netty的ByteBuf采用了读写索引分离的策略(readerIndex与writerIndex),一个初始化(里面尚未有任何数据)的ByteBuf的readerIndex与writerIndex值都为0;

当读索引与写索引处于同一个位置时,如果继续读取,那么就会抛出IndexOutOfBoundsException;

对于ByteBuf的任何读写操作都会分别单独维护读索引与写索引。maxCapacity最大容量默认的限制时Integer.MAX_VALUE;

存储字节的数组时动态的,其最大值默认是Integer.MAX_VALUE。这里的动态性时体现在write方法中的,write方法在执行时会判断buffer容量,如果不足则自动扩容;

ByteBuf的读写索引是完全分开的,使用起来更加方便;

一旦分配好之后不能动态扩容与收缩;而且当待存储的数据字节很大时就很有可能出现IndexOutOfBoundsException。如果要预防这个异常,那就需要在存储之前完全确定好待存储的字节大小。如果ByteBuffer的空间不足,我们只有一种解决方案:创建一个全新的ByteBuffer对象,然后再将之前的ByteBuffer的数据复制过去,这一切操作都需要由开发者自己来手动完成;

ByteBuffer只使用一个position指针来标识位置信息,在进行读写切换时就需要调用flip方法或是rewind方法,使用起来很不方便;

ByteBuf的结构

discardable bytes

表示已经被读过的字节,可以被丢弃了,当前readerIndex已经到达了被丢弃字节的索引位置,0~readerIndex的就被视为discard的,调用discardReadBytes方法,可以释放这部分空间,它的作用类似ByteBuffer的compact方法,将字节往前移动,readerIndex设置为0,writerIndex设置为oldWriterIndex - oldReaderIndex;

readable bytes

表示还没有被读过的字节,是当前ByteBuf对象中保存的内容。需要注意的是,如果可读字节数已经耗尽了,这时再次尝试从ByteBuf中读取内容的话,将会抛出IndexOutOfBoundsException的异常,所以在读数据之前最好先通过byteBuf.isReadable()判断一下;

writable bytes

表示当前ByteBuf对象中剩余可写的字节空间,当writerIndex移动到capacity-1的位置时,就不可再写了,ByteBuf默认的最大容量是Integer.MAX_VALUE。如果没有更多的可写空间了,但是仍然还在往ByteBuf中写数据的话,会抛出IndexOutOfBoundsException的异常,所以在往ByteBuf中写数据之前最好先通过byteBuf.isWriteable()判断一下;

ByteBuf的扩容原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private int calculateNewCapacity(int minNewCapacity) {
final int maxCapacity = this.maxCapacity;
final int threshold = 1048576 * 4; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}

首先设置门限阈值为4M,当需要的新容量正好等于门限阈值时,使用阈值作为新的缓冲区容量。如果新申请的内存空间大于阈值,不能采用倍增的方式(防止内存膨胀和浪费)扩张内存,而采用每次倍增4M的方式进行内存扩张。扩张的时候需要对扩张后的内存和最大内存(maxCapacity)进行比较,如果大于缓冲区的最大长度,则使用maxCapacity作为扩容后的缓冲区容量。如果扩容后的新容量小于阈值,则以64为计数进行倍增,直到倍增后的结果大于或等于需要的容量值。

分析原因:

采用倍增或者步进算法的原因如下:如果以minNewCapacity作为目标容量,则本次扩容后的可写字节数刚好够本次写入使用。写入完成后,它的可写字节数会变为0,下次做写入操作的时候需要做动态扩张。这样就会形成第一次动态扩张,每次写入操作都会进行动态扩张,由于动态扩张需要进行内存复制,频繁的内存复制会导致性能下降。

采用先倍增后步进的原因如下:当内存比较小的情况下,倍增操作并不会带来太多的内存浪费,例如64字节–>128字节–>256字节,这样的内存扩张方式对于大多数应用系统是可以接受的。但是,当内存增长到一定阈值后,在进行倍增就会带来额外的内存浪费,例如10MB,采用倍增后变为20MB。但很有可能系统只需要12MB,则扩张到20M会带来8MB的内存浪费。由于每个客户端连接都可能维护自己独立的接收和发送缓冲区,这样随着客户读的线性增长,内存浪费也会成比例地增加,因此,达到某个阈值后就需要以步进的方式对内存进行平滑的扩张。

这个阈值是个经验值,不同的应用场景,这个值可能不同,此处,ByteBuf取值为4MB。

重新计算完动态扩张后的目标容量后,需要重新创建个新的缓冲区,将原缓冲区的内容复制到新创建的ByteBuf中,最后设置读写索引和mark标签等。由于不同的子类会对应不同的复制操作,所以该方法依然是个抽象方法,由子类负责实现。

ByteBuf的相关子类

从内存分配的角度看,ByteBuf 可以分为两类:

​ 堆内存字节缓冲区 HeapByteBuf:优点是内存分配和回收速度快,可以被 JVM 自动回收。缺点是,如果进行 Socket 的 I/O 读写,需要额外做一次内存复制,在堆内存缓冲区和内核 Channel 之间进行复制,性能会有一定程度下降。

​ 直接内存字节缓冲区 DirectByteBuf:非堆内存,直接在堆外进行分配。相比于堆内存,内存分配和回收稍慢,但是可以减少复制,提升性能。

两种内存,各有利弊。Netty 最佳实践表明:在 I/O 通信线程的读写缓冲区使用 DirectByteBuf,后端业务消息的编解码模块使用 HeapByteBuf,这样组合可以达到性能最优。

从内存回收的角度看,ByteBuf 也分为两类:基于对象池的 ByteBuf 和普通 ByteBuf。两者区别在于基于对象池的 ByteBuf 可以重用 ByteBuf 对象,它自己维护了一个内存池,可以循环利用创建的 ByteBuf,提升内存的使用效率,降低由于高负载导致的频繁 GC。内存池的缺点是管理和维护比较复杂,使用时需要更加谨慎。

由上图我们发现:

ByteBuf有3个主要的子类:EmptyByteBuf,WrappedByteBuf,AbstractByteBuf;其中比较常用的是AbstractByteBuf,而AbstractByteBuf类又有一个子类AbstractReferenceCountedByteBuf,该类主要是实现了引用计数,然后AbstractReferenceCountedByteBuf又有很多的子类。下面我们对一些关键类进行分析和解读。

CompositeByteBuf

​ CompositeByteBuf允许将多个ByteBuf的实例组装到一起,形成一个统一的视图,有点类似于数据库将多个表的字段组装到一起统一用视图展示。假设我们有一份协议数据, 它由头部和消息体组成, 而头部和消息体是分别存放在两个 ByteBuf 中的当需要对消息进行编码的时候需要进行整合,如果使用JDK的默认能力,有以下2种方式:1)将某个BygeBuffer复制到另一个ByteBuffer中,或者创建一个新的ByteBuffer,将2者复制到新建的ByteBuffer中; 2)通过List或数组等容器,将消息头和消息体放到容器中进行统一维护和处理;上面的做法非常别扭,实际上遇到的问题跟数据库中视图解决的问题一致——缓冲区有多个,但是需要统一展示和处理,必须有存放它们的统一容器。为了解决这个问题,Netty提供了CompositeByteBuf。CompositeByteBuf 里面有个ComponentList,继承ArrayList,聚合的bytebuf都放在ComponentList里面,最小容量为16,当超过16时候,CompositeByteBuf是创建一个新的bytebuf,把数组里的16个bytebuf 写到这个新创建bytebuf里,然后把ComponentList清空掉,并把新创建的一个大的bytebuf添加到该ComponentList里,这时候就会涉及到多次copy。所以在使用的时候务必要设置好maxNumComponents,避免不必要的聚合。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//cIndex是指存放在ComponentList中的索引下标
public CompositeByteBuf addComponent(int cIndex, ByteBuf buffer) {
addComponent0(cIndex, buffer);
consolidateIfNeeded();
return this;
}
private int addComponent0(int cIndex, ByteBuf buffer) {
checkComponentIndex(cIndex);
if (buffer == null) {
throw new NullPointerException("buffer");
}
int readableBytes = buffer.readableBytes();
if (readableBytes == 0) {
return cIndex;
}
//构建一个Component
//字节序分为两种:BIG-ENDIAN—-大字节序,LITTLE-ENDIAN—-小字节序;
//BIG-ENDIAN就是最低地址存放最高有效字节,LITTLE-ENDIAN是最低地址存放最低有效字节;
//java字节序:JAVA虚拟机中多字节类型数据的存放顺序,JAVA字节序也是BIG-ENDIAN;
//现在大多平台都是小端序,网络字节序是大端序,jvm默认也是大端序;
Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
if (cIndex == components.size()) {
components.add(c);
if (cIndex == 0) {
c.endOffset = readableBytes;
} else {
Component prev = components.get(cIndex - 1);
c.offset = prev.endOffset;
c.endOffset = c.offset + readableBytes;
}
} else {
components.add(cIndex, c);
updateComponentOffsets(cIndex);
}
return cIndex;
}
private void consolidateIfNeeded() {
// Consolidate if the number of components will exceed the allowed maximum by the current
// operation.
final int numComponents = components.size();
if (numComponents > maxNumComponents) {
final int capacity = components.get(numComponents - 1).endOffset;
ByteBuf consolidated = allocBuffer(capacity);
// We're not using foreach to avoid creating an iterator.
// noinspection ForLoopReplaceableByForEach
for (int i = 0; i < numComponents; i ++) {
Component c = components.get(i);
ByteBuf b = c.buf;
consolidated.writeBytes(b);
c.freeIfNecessary();
}
Component c = new Component(consolidated);
c.endOffset = c.length;
components.clear();
components.add(c);
}
}

UnpooledHeapByteBuf 和UnpooledDirectByteBuf

​ 首先看看它们的继承关系:UnpooledHeapByteBuf 和 UnPooledHeapByteBuf –> AbstractReferenceCountedByteBuf –> AbstractByteBuf;基本上的特性都来自于AbstractReferenceCountedByteBuf和AbstractByteBuf;AbstractByteBuf 都做了哪些事儿呢?我们先看一下其主要的成员变量:读写指针;用于标记回滚的 marked 读写指针;最大容量 maxCapacity,用于进行内存保护;与本 ByteBuf 大小端属性相反的 ByteBuf:SwappedByteBuf,除此之外还提供了操作索引:修改读写指针、mark & reset;重用缓冲区:discardReadBytes;丢弃部分数据:skipBytes等子类的通用功能;AbstractReferenceCountedByteBuf实现了计数相关的功能,我们看一下其成员变量:refCnt:记录对象引用次数;refCntUpdater:用于对 refCnt 进行原子更新。

​ UnpooledHeapByteBuf 是基于堆内存进行内存分配的字节缓冲区,它没有基于对象池实现,意味着每次 I/O 读写都会创建一个新的 UnpooledHeapByteBuf 对象,频繁进行内存的分配和释放对性能会有一定的影响,但是相对堆外内存的申请和释放,成本稍低。它的成员变量主要有负责内存分配的 ByteBufAllocator;缓冲区实现 byte 数组和从 ByteBuf 到 NIO 的 ByteBuffer 的转换对象 tmpNioBuf。

UnpooledDirectByteBuf是通过ByteBuffer.allocateDirect()方法来分配直接内存,并指向变量ByteBuffer。

PooledByteBuf

池化的好处在于能快速分配内存,同时复用对象,降低gc频率。PooledByteBuf有3个子类,分别是PooledDirectByteBuf,PooledHeapByteBuf,PooledUnsafeDirectByteBuf,主要关注其池化的原理,包括内存的设计和Recycler对象池的设计。

我们从PoolThreadCache入手,由下面的代码我们分析其内存设计

1
2
3
4
5
6
7
8
9
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

内存池的数据结构(arena、chunk、page、subpage)

netty里面,内存单元是以Chunk为单位的向操作系统申请的,一个内存单元16M;但是如果我们要分配10k的内存,用一个chunk就太浪费了。所以chunk下面又有page,一个page的大小8k,所以一个chunk一共有16m/8k=2^11个page。如果要找到10k的内存,只要找到两个page就可以;如果我们每次只要分配10b的内存,一个page又显得太浪费了,所以又开始有了subpage;

首先我们看arena,arena分为heapArena和directArena。

​ 一个Arena由多个ChunkList组成,每个ChunkList之间通过双向链表的方式连接。一个ChunkList又由多个Chunk通过双向链表的方式组成。也就是各个Chunlist组成双向链表,ChunkList里面的Chunk又各自组成双向链表。

​ ChunList之间为什么要通过双向链表的方式连接?先给出答案:每个ChunkList里面的Chunk的使用率是不同的,Chunk使用率相同的归位一个ChunList,这样netty就能通过一定的算法找到合适的Chunk,提高内存分配的效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
subpageOverflowMask = ~(pageSize - 1);
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
numSmallSubpagePools = pageShifts - 9;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
//q100里面的Chunk的内存已经使用100%以上
//q075里面的Chunk的内存已经使用75%到100%之间
//q050代表它里面的Chunk的内存已经使用50%到100%之间
//依此类推
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
//qInit<=>q00<=>q025<=>q050<=>q075<=>q100
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
metrics.add(qInit);
metrics.add(q000);
metrics.add(q025);
metrics.add(q050);
metrics.add(q075);
metrics.add(q100);
chunkListMetrics = Collections.unmodifiableList(metrics);
}

如果我们要分配的内存小于Chunk也就是16M,我们用chunk来分配不划算,浪费空间,所以就有了page。一个page的大小是8k,如果我们要分配内存是10k,那么我们用chunk里面的两个page来分配就很划算了。但是如果只要2k呢,一个page也显得不划算,所有又有了subpage。subpage的大小介于0~8k之间。我们看到PoolArena里面有个PoolSubpage,PoolSubpage有个elemSize,就是subpage的大小,可以为0~8k之间的值,有个bitmap,如果是1说明分配了,是0则未分配。

prev和next指针,说明subpage之间也就通过双向链表链接的,第一个成员变量final PoolChunk chunk,说明了这个subpage属于哪个chunk。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
final class PoolSubpage<T> implements PoolSubpageMetric {
final PoolChunk<T> chunk;
private final int memoryMapIdx;
private final int runOffset;
private final int pageSize;
private final long[] bitmap;
PoolSubpage<T> prev;
PoolSubpage<T> next;
boolean doNotDestroy;
int elemSize;
private int maxNumElems;
private int bitmapLength;
private int nextAvail;
private int numAvail;

缓存的数据结构

MemoryRegionCache就是其缓存的数据结构,它是个抽象类,有2个子类,分别是NormalMemoryRegionCache和SubPageMemoryRegionCache;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
private abstract static class MemoryRegionCache<T> {
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private int allocations;
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
...
}
//queue里面的Entry
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
long handle = -1;
Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() {
chunk = null;
handle = -1;
recyclerHandle.recycle(this);
}
}

queue 里面的Entry有recyclerHandle和chunk两个属性,chunk就是内存,handler是指向内存的指针(指向唯一一块连续的内存),通过chunk和handler就可以找到唯一 一块内存;

sizeClass 大小有tiny(0~512B)、small(512B~8K)、normal(8K~16M),由于16M以上的数据不加入缓存,所以没有16M以上的大小;

size 如果是tiny的,有32个,每个的size是16B的倍数,也就是,tiny大小的MemoryRegionCache组合成一个长度为32的数组,分别有:0,16B,32B,48B,72B…496B。不包括512B。个数就是496/16+1=32;如果是small的,有4个,每个的size分别为512B,1K,2K,4K,也就是,small大小的MemoryRegionCache组合成一个长度为4的数组;如果是normal的,有3个,每个的size分别为8K,16K,32K,也就是normal大小的MemoryRegionCache组合成一个长度为3的数组,所以有32+4+3=39种内存规格的MemoryRegionCache。

缓存的分配

我们回到PoolArena的allocate来看看缓存分配的流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
//1.如果这个请求的内存reqCapacity是大于chunkSize
//2.如果不是tiny,也就是大于512b,就翻倍返回;
//3.如果是tiny,那就加16
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
//我们以这个为例子
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}

我们以cache.allocateTiny(this, buf, reqCapacity, normCapacity)分配tiny的内存为例子,其他small或者normal的流程大致相同,流程如下:

1
2
3
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

找到对应size的MemoryRegionCache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
//normCapacity是我需要分配的内存的大小,右移4也就是除以16
//tiny数组是这样子的:
//tiny[0]=0,tiny[1]=16,tiny[2]=32,tiny[3]=48,tiny[4]=64...tiny[15]=496。如果是496,除以16等于15,那就是下标为15的;如果是32,除以2,就是下标为2的,这也验证了我们tiny的数组的分布规则。
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache[idx];
}

从queue中弹出一个entry给ByteBuf初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
//chunk.initBufWithSubpage(buf, handle, reqCapacity);
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
entry.recycle();
// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}

将探出的entry扔到对象池里面复用。

netty为了减少对象的回收消耗的资源,专门做了一个对象池,不需要的对象可以回收,下次使用

1
2
3
4
5
6
7
8
9
10
11
12
void recycle() {
chunk = null;
handle = -1;
recyclerHandle.recycle(this);
}
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this);
}

内存的分配(page && subpage && huge)

我们继续看这个代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if ((normCapacity & subpageOverflowMask) == 0) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
if ((normCapacity & 0xFFFFFE00) == 0) { // < 512
tableIdx = normCapacity >>> 4;
table = tinySubpagePools;
} else {
tableIdx = 0;
int i = normCapacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
}
table = smallSubpagePools;
}
synchronized (this) {
final PoolSubpage<T> head = table[tableIdx];
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
return;
}
}
} else if (normCapacity > chunkSize) {
allocateHuge(buf, reqCapacity);
return;
}
allocateNormal(buf, reqCapacity, normCapacity);
}

page级别内存分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//代码入口
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
}
//具体实现
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
//尝试在现有的chunk上面分配内存
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}
// 如果现有的chunk分配不成功,那就创建一个chunk进行内存分配
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
long handle = c.allocate(normCapacity);
assert handle > 0;
// 初始化PooledByteBuf,也就是将PooledByteBuf需要的内存指向chunk的内存
c.initBuf(buf, handle, reqCapacity);
qInit.add(c);
}
//尝试在现有的chunk上面分配内存实现
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (head == null || normCapacity > maxCapacity) {
// Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
// be handled by the PoolChunks that are contained in this PoolChunkList.
return false;
}
for (PoolChunk<T> cur = head;;) {
long handle = cur.allocate(normCapacity);
//分配失败
if (handle < 0) {
cur = cur.next;
if (cur == null) {
return false;
}
} else {
cur.initBuf(buf, handle, reqCapacity);
//分配完之后,如果使用率超过最大使用率,就放到下一个chunklist里面去
if (cur.usage() >= maxUsage) {
remove(cur);
nextList.add(cur);
}
return true;
}
}
}

ByteBuf的创建

官方的建议是通过ByteBufAllocator来创建,ByteBufAllocator是个接口,我们来看一下类的继承图:

同时又分为堆内和堆外,关于ByteBufAllocator,参考Netty之ByteBufAllocator。

ByteBuf的释放

UnpooledHeapByteBuf 和UnpooledDirectByteBuf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
//UnpooledDirectByteBuf
@Override
protected void deallocate() {
ByteBuffer buffer = this.buffer;
if (buffer == null) {
return;
}
this.buffer = null;
if (!doNotFree) {
freeDirect(buffer);
}
}
protected void freeDirect(ByteBuffer buffer) {
PlatformDependent.freeDirectBuffer(buffer);
}
//UnpooledHeapByteBuf
@Override
protected void deallocate() {
array = null;
}

PooledHeapByteBuf和PooledDirectByteBuf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public boolean release() {
boolean deallocated = super.release();
if (deallocated) {
leak.close();
}
return deallocated;
}
public boolean release() {
return buf.release();
}
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
//内存释放
chunk.arena.free(chunk, handle);
//加入内存池复用
recycle();
}
}

参考项目

https://github.com/Luckylau/Netty-Learn

参考

https://www.jianshu.com/p/5959aa4ab1a9

https://www.jianshu.com/p/bdd37e2750da

https://www.jianshu.com/p/854b855bd198

https://blog.csdn.net/fst438060684/article/details/82532093

https://blog.csdn.net/fst438060684/article/details/82563423

Luckylau wechat
如果对您有价值,看官可以打赏的!