Zookeeper源码2-ZAB协议之Leader选举

zookeeper源码2-ZAB协议之Leader选举

ZAB协议

ZooKeeper服务器会在本地处理只读请求(exists、getData和getChildren)。假如一个服务器接收到客户端的getData请求,服务器读取该状态信息,并将这些信息返回给客户端。因为服务器会在本地处理请求,所以ZooKeeper在处理以只读请求为主要负载时,性能会很高。我们还可以增加更多的服务器到ZooKeeper集群中,这样就可以处理更多的读请求,大幅提高整体处理能力。

那些会改变ZooKeeper状态的客户端请求(create、delete和setData)将会被转发给群首,集群在同一时刻只会存在一个群首,其他服务器追随群首被称为追随者(follower)。群首作为中心点处理所有对ZooKeeper系统变更的请求,它就像一个定序器,建立了所有对ZooKeeper状态的更新的顺序。Leader接收到客户端的请求后,会将请求构建成一个提议(Proposal),同时会为该提议绑定一个zxid(zxid可以表示执行顺序),然后将该提议广播到集群上的所有服务器,Leader等待Follwer反馈,当有过半数(>=N/2+1) 的Follower反馈信息后,Leader将再次向集群内Follower广播Commit信息,Commit为将之前的Proposal提交。

zxid:事务请求唯一标记,由leader服务器负责分配对事务请求进行定序,是8字节的long类型,由两部分组成:前4字节代表epoch,后4字节代表counter,即zxid=epoch+counter。

epoch可以认为是Leader编号,每一次重新选举出一个新Leader时,都会为该Leader分配一个epoch,该值也是一个递增的,可以防止旧Leader活过来后继续广播之前旧提议造成状态不一致问题,只有当前Leader的提议才会被Follower处理。Leader没有进行选举期间,epoch是一致不会变化的。

counter:ZooKeeper状态的每一次改变, counter就会递增加1.

zxid=epoch+counter,其中epoch不会改变,counter每次递增1,,这样zxid就具有递增性质, 如果zxid1小于zxid2, 那么zxid1肯定先于zxid2发生。

这就是ZAB协议在处理数据一致性大致的原理流程,由于请求间可能存在着依赖关系,ZAB协议保证Leader广播的变更序列被顺序的处理:一个状态被处理那么它所依赖的状态也已经提前被处理;ZAB协议支持的崩溃恢复可以保证在Leader进程崩溃的时候可以重新选出Leader并且保证数据的完整性。

ZAB协议分为4个阶段:Election、Discovery、Synchronization、Broadcast。这一节我们主要来研究Election,即Leader选举阶段。

选举阶段Zookeeper实现又可以大致分为两部分:选举初始化阶段和Leader选举。

选举初始化

Leader选举初始化入口:QuorumPeer.startLeaderElection(),其核心代码逻辑如下:

1
2
3
4
5
6
7
8
QuorumCnxManager qcm = createCnxnManager();//创建QuorumCnxManager,QuorumCnxManager负责选举时网络IO通信
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();//Listener是一个线程,主要完成选举监听端口开启及维护网络连接
le = new FastLeaderElection(this, qcm);//选举算法使用FastLeaderElection
} else {
LOG.error("Null listener when initializing cnx manager");
}

大致工作如下:
​ 1、创建一个QuorumCnxManager实例;
​ 2、启动QuorumCnxManager.Listener线程;
​ 3、构建一种选举算法FastLeaderElection,早期Zookeeper实现了四种选举算法,但是后面废弃了三种,最新版本只保留FastLeaderElection这一种选举算法;

Leader选举期间集群中各节点之间互相进行投票,就会涉及到网络IO通信,QuorumCnxManager就是用来管理维护选举期间网络IO通信的工具类。

网络IO

1521034934473

如上图所示,Leader选举涉及到两个核心类:QuorumCnxManager和FastLeaderElection,红色线之上的是QuorumCnxManager工作区域,红色线之下的是FastLeaderElection工作区域。搞懂这两个类基本上对Zookeeper的选举流程及原理就比较清楚了。选举算法逻辑被封装在FastLeaderElection类中,后面会进行分析;而QuorumCnxManager则用于管理维护选举期间的网络IO。

上图描述了QuorumCnxManager维护选举期间的网络IO的大致流程:
​ 1、QuorumCnxManager有一个内部类Listener,其继承了Thread,初始化阶段就会启动该线程,Listener的run方法实现也非常简单:初始化一个ServerSocket,然后在一个while循环中调用accept接收客户端(注意:这里的客户端指的是集群中其它服务器)连接;
​ 2、当有客户端连接进来后,会将该客户端Socket封装成RecvWorker和SendWorker,它们都是线程,分别负责和该Socket所代表的客户端进行读写;RecvWorker和SendWorker是成对出现的,每对负责维护和集群中的一台服务器进行网络IO通信;

现在假设这个场景:集群中存在A、B两个节点:
​ 1、当A节点连接B节点时,在B节点上会维护一对RecvWorker和SendWorker用于B节点和A节点进行通信;
​ 2、同理,如果B节点连接A节点,在A节点上会维护一对RecvWorker和SendWorker用于A节点和B节点进行通信;
​ 3、A和B之间创建了两条通道,实际上A和B间通信只需要一条通道即可,为避免浪费资源,Zookeeper采用如下原则:myid小的一方作为服务端,否则连接无效会被关闭;
​ 4、比如A的myid是1,B的myid是2,如果A去连接B,B收到连接请求后B发现对端myid小于自己,判定该连接无效,会关闭该连接;如果是B连接A,A收到连接请求后发现对端myid大于自己,则认为该连接有效,并会为该连接创建一对RecvWorker和SendWorker线程并启动

​ 3、FastLeaderElection负责Leader选举核心规则算法实现,注意FastLeaderElection类中也包含了两个内部类WorkerSender和WorkerReceiver,类似于QuorumCnxManager中的SendWorker和RecvWorker,也是用于发送和接收线程;
​ 4、FastLeaderElection中进行选举时广播投票信息时,将投票信息写入到对端服务器大致流程如下:
​ a、将数据封装成ToSend格式放入到sendqueue;
​ b、WorkerSender线程会一直轮询提取sendqueue中的数据,当提取到ToSend数据后,会获取到集群中所有参与Leader选举节点(除Observer节点外的节点)的sid,如果sid即为本机节点,则转成Notification直接放入到recvqueue中,因为本机不再需要走网络IO;否则放入到queueSendMap中,key是要发送给哪个服务器节点的sid,ByteBuffer即为ToSend的内容,queueSendMap维护的着当前节点要发送的网络数据信息,由于发送到同一个sid服务器可能存在多条数据,所以queueSendMap的value是一个queue类型;
​ c、QuorumCnxManager中的SendWorkder线程不停轮询queueSendMap中是否存在自己要发送的数据,每个SendWorkder线程都会绑定一个sid用于标记该SendWorkder线程和哪个对端服务器进行通信,因此,queueSendMap.get(sid)即可获取该线程要发送数据的queue,然后通过queue.poll()即可提取该线程要发送的数据内容;
​ d、然后通过调用SendWorkder内部维护的socket输出流即可将数据写入到对端服务器
​ 5、FastLeaderElection中进行选举时广播投票信息时,从对端服务器读取投票信息的大致流程如下:
​ a、QuorumCnxManager中的RecvWorker线程会一直从Socket的输入流中读取数据,当读取到对端发送过来的数据时,转成Message格式并放入到recvQueue中;
​ b、FastLeaderElection.WorkerReceiver线程会轮询方式从recvQueue提取数据并转成Notification格式放入到recvqueue中;
​ c、FastLeaderElection从recvqueu提取所有的投票信息进行比较 最终选出一个Leader

Leader选举

上面已经介绍了Leader选举期间网络IO的大致流程,下面介绍下具体选举算法如何实现。

Leader选举策略使用的是FastLeaderElection,到底上面时候调用选举策略执行选举呢?上篇文章已介绍过Zookeeper启动的大致流程(见下图),最下方QuorumPeer线程中会有一个Loop循环,获取serverState状态后进入不同分支,当分支退出后继续下次循环,FastLeaderElection选举策略调用就是发生在检测到serverState状态为LOOKING时进入到LOOKING分支中调用的。

1521130535349

进入到LOOKING分支执行的代码逻辑:

1
2
3
4
5
6
7
try {
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());//调用FastLeaderElection.lookForLeader()
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}

从上面代码可以看出,Leader选举策略入口方法为:FastLeaderElection.lookForLeader()方法。当QuorumPeer.serverState变成LOOKING时,该方法会被调用,表示执行新一轮Leader选举。下面来看下lookForLeader方法的大致实现逻辑:
​ 1、更新自己期望投票信息,即自己期望选哪个服务器作为Leader(用sid代替期望服务器节点)以及该服务器zxid、epoch等信息,第一次投票默认都是投自己当选Leader,然后调用sendNotifications方法广播该投票到集群中所有可以参与投票服务器,广播涉及到网络IO流程前面已讲解,这里就不再细说;

updateProposal()方法有三个参数:a.期望投票给哪个服务器(sid)、b.该服务器的zxid、c.该服务器的epoch,在后面会看到这三个参数是选举Leader时的核心指标,后面再介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
synchronized(this){
//logicalclock是一个AtomicLong类型,默认是0,执行incrementAndGet累加操作变成1
//logicalclock维护electionEpoch,即选举轮次,在进行投票结果赛选的时候需要保证大家在一个投票轮次
logicalclock.incrementAndGet();
//getInitId()用于获取当前myid
//getInitLastLoggedZxid()提取lastProcessedZxid值,lastProcessedZxid是最后一次commit的事务请求的zxid
//getPeerEpoch():获取epoch值,每个leader任期内都要有一个epoch代表该Leader轮次,同时把该epoch同步到集群送的所有其它节点,并会被保存到本地硬盘dataLogDir目录下currentEpoch文件中,这里的getPeerEpoch()就是获取最近一次Leader的epoch,如果是第一次部署启动则默认从0开始
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}

/**
* 1、将proposedLeader、proposedZxid、electionEpoch、peerEpoch、sid(要发送给哪个节点的sid)等信息封装为一个ToSend对象,并放入到LinkedBlockingQueue<ToSend> sendqueue队列中,注意遍历集群中所有参与投票节点的sid,为每个sid封装成一个ToSend
* 2、WorkerSender线程将会从sendqueue队列中获取要发送消息根据sid发送给集群中指定的节点
*/
sendNotifications();//,发送给集群中所有可参与投票节点,注意也包括自身节点

​ 2、让后就开始等待其它服务器发送给自己的投票信息,接收投票涉及的网络IO流程看前面”网络IO”一节介绍:

1
2
3
4
5
6
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
//从recvqueue队列中取Notification
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
......
}

​ 3、将接收到投票的state进行判断确定执行哪个分支逻辑:
​ a.如果是FOLLOWING或LEADING,则说明对端已选举出Leader,这时只需要验证下这个Leader是否有效即可,有效则代表选举结束,否则继续接收投票信息
​ b.OBSERVING:忽略该投票信息,因为Observer不能参与投票
​ c.LOOKING:则表示对端也还处于Leader选举状态,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//判断对端节点ServerState
// 1、如果是LOOKING,则表示还处于选举投票状态下
// 2、如果是OBSERVING则表示不能参与投票节点,忽略消息
// 3、如果是FOLLOWING或LEADING,则表示集群中选举已经完成
switch (n.state) {
case LOOKING:
LOOKING分支逻辑
case OBSERVING:
OBSERVING分支逻辑
case FOLLOWING:
FOLLOWING分支逻辑
case LEADING:
LEADING分支逻辑
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid);
break;
}

​ 4、下面来重点看下LOOKING分支情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}

首先对之前提到的选举轮次electionEpoch进行判断,这里分为三种情况:
​ a.只有对方发过来的投票的electionEpoch和当前节点相等表示是同一轮投票,即投票有效,然后调用totalOrderPredicate()对投票进行PK,返回true代表对端胜出,则表示第一次投票是错误的(第一次都是投给自己),更新自己投票期望对端为Leader,然后调用sendNotifications()将自己最新的投票广播出去。返回false则代表自己胜出,第一次投票没有问题,就不用管
​ b.如果对端发过来的electionEpoch大于自己,则表明重置自己的electionEpoch,然后清空之前获取到的所有投票recvset,因为之前获取的投票轮次落后于当前则代表之前的投票已经无效了,然后调用totalOrderPredicate()将当前期望的投票和对端投票进行PK,用胜出者更新当前期望投票,然后调用sendNotifications()将自己期望头破广播出去。注意:这里不管哪一方胜出,都需要广播出去,而不是步骤a中己方胜出不需要广播,这是因为由于electionEpoch落后导致之前发出的所有投票都是无效的,所以这里需要重新发送
​ c.如果对端发过来的electionEpoch小于自己,则表示对方投票无效,直接忽略不进行处理

totalOrderPredicate()实现了对投票进行PK规则:

1
2
3
4
5
6
7
8
9
/**
* 对端投票胜出返回true情况:
* 1、对端peerEpoch > 当前peerEpoch
* 2、对端peerEpoch == 当前peerEpoch时,对端zxid > 当前zxid
* 3、对端peerEpoch == 当前peerEpoch,对端zxid == 当前zxid时,对端serverID > 当前serverID
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));

下面简单说下这个PK逻辑原理(胜出一方代表更有希望成为Leader):
​ 1、首先比较epoch,哪个epoch哪个胜出,前面介绍过epoch代表了Leader的轮次,是一个递增的,epoch越大就意味着数据越新,Leader数据越新则可以减少后续数据同步的效率,当然应该优先选为Leader;
​ 2、然后才是比较zxid,由于zxid=epoch+counter,第一步已经把epoch比较过了,其实这步骤只是相当于比较counter大小,counter越大则代表数据越新,优先选为Leader。注:其实第1和第2可以合并到一起,直接比较zxid即可,因为zxid=epoch+counter,第1比较显的有些多余
​ 3、如果前两个指标都没法比较出来,只能通过sid来确定,zxid相等说明两个服务器的数据是一致的,所以选哪个当Leader其实没有区别,这里就随机选择一个sid大的当Leader

下面来看下LOOKING分支的最后一部分逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//将接收到的有效投票放入到recvset中,该集合保存接收到的所有投票数据
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

//termPredicate判断是否已经选举出Leader,其原理很简单:前面已经通过PK修正了自己期望选举的Leader投票,termPredicate要做的就是从所有投票recvset中赛选出期望选举的Leader投票,然后看票数是否大于集群一半以上,超过则表示选举结束,否则则表明当前投票不足以选出Leader,需要继续接收投票
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {

// Verify if there is any change in the proposed leader
//这里保险起见,将recvqueue接收到还没处理的投票继续进行验证下,确定当前选出的Leader是否有效
while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
//如果进入这个if中,则表明当前选出的Leader和对端PK被PK掉了,即当前选出的Leader是无效的
//将recvqueue提取的数据还放回去,然后跳出继续下一次从recvqueue提取数据进行投票处理逻辑
recvqueue.put(n);
break;
}
}

if (n == null) {//执行到这里则表明当前已经选出Leader,并且验证后也是有效的
//判断选出的Leader的sid是否就是自己,是则将peerState设置成LEADING,否则则设置成FOLLOWING或OBSERVING
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);//将recvqueue清空,因为接受到的数据已经没有作用,所以要清空
return endVote;//返回选举出来的Leader信息
}
}

总结

至此,Leader的选举流程已经全部完成,Leader的选举流程会在系统刚启动时或Leader挂掉后,系统进入选举阶段,选举出来的Leader最终能否真正成为Leader,还需要进行数据恢复阶段考验,只有保证集群中的数据状态一致后,才算度过实习期真正成为Leader并对外提供服务。数据如何进行数据恢复保证集群状态一致性再后续博文中再继续进行分析。

坚持原创技术分享,您的支持将鼓励我继续创作!