Luckylau's Blog

Zookeeper的原理(3)

​ 本章开始讲解Zookeeper重要的技术实现。之前一篇我们讲解了Zookeeper的系统模型,并详细讲解了Watch机制,其中提到了客户端与服务端通信的问题,就此我们深入挖掘Zookeeper的客户端与会话原理。

客户端

核心组件

客户端是开发人员使用ZooKeeper最主要的途径。包括以下核心组件:

ZooKeeper实例:客户端的入口;

ClientWatchManager:客户端Watcher管理器;

HostProvider:客户端地址列表管理器;

ClientCnxn:客户端核心线程,其内部又包含两个线程,即SendThread和EventThread。前者是一个I/O线程,主要负责ZooKeeper客户端和服务端之间的网络I/O通信;后者是一个事件线程,主要负责对服务端事件进行处理。

ZooKeeper实例

ZooKeeper客户端的构造方法如下,初始化的过程主要设置默认Watcher,设置ZooKeeper服务器地址列表,创建ClientCnxn。

1
2
3
4
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly);

ClientWatchManager

ZKWatchManager实现ClientWatchManager接口,内部定义了dataWatches,existWatches,childWatches,主要用于存储Watcher。

1
2
3
4
5
6
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();

HostProvider

​ 在使用ZooKeeper构造方法时,用户传入的ZooKeeper服务器地址列表,即connectString参数,例如192.168.0.1:2181,192.168.0.1:2181,192.168.0.1:2181,ConnectStringParser解析器首先对传入的connectString做两个主要处理:解析chrootPath保存服务器地址列表

​ 解析chrootPath:3.2.0及其之后版本的ZooKeeper中,添加了“Chroot”特性,该特性允许每个客户端为自己设置一个命名空间(Namespace)。如果一个ZooKeeper客户端设置了Chroot,那么该客户端对服务器的任何操作,都将会被限制在其自己的命名空间下。

​ 保存服务器地址列表: ConnectStringParser解析器会对服务器地址做一个简单的处理,并将服务器地址和相应的端口封装成一个InetSocketAddress对象,以ArrayList形式保存在ConnectStringParser.serverAddress属性中。然后,经过处理的地址列表会被进一步封装到StaticHostProvider类(StaticHostProvider是对HostProvider的默认实现)中。

HostProvider接口:

接口方法 说明
int size() 该方法用于返回当前服务器地址列表的个数, 不能返回0,也就是说 HostProvider中必须至少有一个服务器地址
InetSocketAddress next(long spinDelay) 该方法用于返回一个 已被解析的服务器地址InetSocketAddress,以便客户端进行服务器连接
void onConnected() 这时一个回调方法,如果客户端与服务器成功创建连接,就通过调用这个方法来通知HostProvider

​ StaticHostProvider实现了next()方法,获取可用的服务器。这个next()方法并非简单的从serverAddress中依次获取一个服务器地址,而是先将随机打散后的服务器地址列表拼装成一个环形循环队列。

​ 简单来说,有2个指针lastIndex和currentIndex,初始化时候均为-1。每当尝试获取一个可用的服务器,currentIndex向前移动1位。客户端与服务器成功创建连接后调用onConnected,lastIndex = currentIndex。当再次获取一个可用的服务器,currentIndex继续向前移动1位,当currentIndex等于serverAddresses.size(),currentIndex =0;如果服务器的数目为极小,容易出现currentIndex+1之后依然等于lastIndex ,此时做了个小优化,会进行spinDelay毫秒时间的等待,依然使用尝试使用原先的服务器。

ClientCnxn

​ ClientCnxn是ZooKeeper客户端的核心工作类,负责维护客户端与服务端之间的网络连接并进行一系列网络通信。 Packet是ClientCnxn内部定义的一个对协议层的封装,作为ZooKeeper中请求与响应的载体,其数据结构如下图所示

​ Packet的createBB()方法负责对Packet对象进行序列化,最终生成可用于底层网络传输的ByteBuffer对象。在这个过程中,只会将requestHeader、request和readOnly三个属性进行序列化,其余属性都保存在客户端的上下文,不会进行与服务端之间的网络传输。

OutgoingQueue和PendingQueue

​ ClientCnxn中,有两个比较核心的队列outgoingQueue和pendingQueue,分别代表客户端的请求发送队列和服务端响应的等待队列。Outgoing队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet集合。Pending队列是为了存储那些从客户端发送到服务端的,但是需要等待服务端响应的Packet集合。在正常情况下(即客户端与服务端之间的TCP连接正常且会话有效地情况下),会从outgoingQueue队列中提取出一个可发送的Packet对象,同时生成一个客户端请求序号XID并将其设置到Packet请求头中去,然后将其序列化后进行发送。请求发送完毕后,会立即将该Packet保存到pendingQueue队列中,以便等待服务端响应返回后进行相应的处理。

客户端获取到来自服务器的完整响应数据后,根据不同的客户端请求类型,会进行不同的处理 :1. 如果检测到当前客户端还尚未进行初始化,那么说明当前客户端与服务端之间正在进行会话创建,那么就直接接收到的ByteBuffer(incomingBuffer)序列化成ConnectResponse对象;2. 如果当前客户端已经处于正常的会话周期,并且接收到的服务端响应是一个事件,那么ZooKeeper客户端会将接收到的ByteBuffer(incomingBuffer)序列化成WatcherEvent对象,并将该事件放入待处理队列中;3. 如果是一个常规的请求响应(指的是Create、GetData和Exist等操作请求),那么会从pendingQueue队列中取出一个Packet来进行相应的处理。ZooKeeper客户端首先会通过检验服务端响应中包含的XID值来确保请求处理的顺序性,然后再将接收到的ByteBuffer(incomingBuffer)序列化成相应的Response对象。

SendThread和EventThread

​ SendThread是客户端ClientCnxn内部一个核心的I/O调度线程,用于管理客户端和服务端之间的所有网络I/O操作。在ZooKeeper客户端的实际运行过程中,一方面,SendThread维护了客户端与服务端之间的会话生命周期,其通过在一定的周期频率内向服务端发送一个PING包来实现心跳检测。同时,在会话周期内,如果客户端与服务端之间出现TCP连接断开的情况,那么就会自动且透明化的完成重连操作。SendThread管理了客户端所有的请求发送和响应接收操作,其将上层客户端API操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调。同时,SendThread还负责将来自服务端的事件传递给EventThread去处理。

​ EventThread是客户端ClientCnxn内部的另一个核心线程,负责客户端的事件处理,并触发客户端注册的Watcher监听。EventThread中有一个waitingEvents队列,用于临时存放那些需要被触发的Object,包括那些客户端注册的Watcher和异步接口中注册的回调器AsyncCallback。同时,EventThread会不断地从waitingEvents这个队列中取出Object,识别出其具体类型(Watcher或者AsyncCallback),并分别调用process和processResult接口方法来实现对事件的触发和回调。

ClientCnxnSocket

​ ClientCnxn中,ClientCnxnSocket定义了底层Socket通信的接口。3.4.0版本开始,抽取出了这个接口类。在使用ZooKeeper客户端的时候,可以通过在zookeeper.clientCnxnSocket这个系统变量中配置ClientCnxnSocket实现类的全类名,以指定底层Socket通信层的自定义实现,例如,-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNIO。在ZooKeeper中,其默认的实现是ClientCnxnSocketNIO。

会话

会话状态

在理解会话的创建,管理,清理以及重连之前,先看看一个会话有哪些状态。ZooKeeper会话在整个运行期间的生命周期中,会在不同的会话状态之间进行切换,这些状态一般可以分为CONNECTING、CONNECTED、RECONNECTING、RECONNECTED和CLOSE

会话创建流程

前面讲解了客户端一些核心组件,下面我们看一下一次会话创建的流程。

初始化阶段

​ 基本上就是上面核心组件的初始过程,包括ZooKeeper对象的实例化,根据传入的Watcher或者默认的Watcher创建ClientWatchManger对象,HostProvider的构造,以及ClientCnxn的创建初始化的同时还有outgoingQueue和pendingQueue,SendThread和EventThread实例化。

会话创建阶段

​ SendThread首先会判断当前客户端的状态,进行一系列清理性工作,为客户端发送“会话创建”请求做准备,然后获取一个ZooKeeper服务器的目标地址,委托给ClientCnxnSocket去创建与ZooKeeper服务器的TCP连接。在TCP连接创建完毕后,SendThread会负责根据当前客户端的实际设置,构造出一个ConnectionRequest请求,该请求代表了客户端试图与服务器创建一个会话。同时ZooKeeper客户端还会进一步将该请求包装成网络I/O层的Packet对象,放入请求发送队列outgoingQueue中去。当客户端请求准备完毕后,就可以开始向服务端发送请求了。ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,向服务端进行发送。

响应处理阶段

​ ClientCnxnSocket接收到服务端的响应后,会首先判断当前的客户端状态是否是“已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方法来处理该响应。ClientCnxnSocket会对接收到的服务端响应进行反序列化,得到ConnectResponse对象,并从中获取到ZooKeeper服务端分配的会话sessionId。连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,包括readTimeout和connectTimeout等,并更新客户端状态;另一方面,需要通知地址管理器HostProvider当前成功连接的服务器地址。另外为了能够让上层应用感知到会话的成功创建,SendThread会生成一个事件SyncConnected-None,代表客户端与服务器会话创建成功,并将该事件传递给EventThread线程,EventThread线程收到事件后,会从ClientWatchManager管理器中查询出对应的Watcher,针对SyncConnected-None事件,直接找出存储的默认Watcher,将其放到EventThread的waitingEvents队列中去。 EventThread不断地从waitingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接口方法,以达到触发Watcher的目的。

Luckylau wechat
如果对您有价值,看官可以打赏的!