zookeeper源码-ZAB协议之集群同步_3

zookeeper源码-ZAB协议之集群同步_3

上一节主要分析了Leader选举的流程,Zookeeper集群在Leader选举完成后,集群中的各个节点就确定了自己的角色信息:Leader、Follower或Observer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//判断对端节点ServerState
// 1、如果是LOOKING,则表示还处于选举投票状态下
// 2、如果是OBSERVING则表示不能参与投票节点,忽略消息
// 3、如果是FOLLOWING或LEADING,则表示集群中选举已经完成
switch (n.state) {
case LOOKING:
LOOKING分支逻辑 //选举分支
case OBSERVING:
OBSERVING分支逻辑
case FOLLOWING:
FOLLOWING分支逻辑
case LEADING:
LEADING分支逻辑 //当前节点被选为Leader执行的逻辑分支
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid);
break;
}

如上述代码所述,节点确定了自己的角色后,就会进入自己的角色分支:对于Leader而言创建Leader实例并调用其lead()函数,对于Follower而言创建Follower实例并调用其followLeader()函数,对于Observer而言创建Observer实例并调用其observeLeader()函数。在这三个函数中,服务器会进行相关的初始化并完成最终的启动。

对于Follower和Observer而言,主要的初始化工作是要建立与Leader的连接并同步epoch信息,最后完成与Leader的数据同步。而Leader会启动LearnerCnxAcceptor线程,该线程会接受来自Follower和Observer(统称为Learner)的连接请求并为每个连接创建一个LearnerHandler线程,该线程会负责包括数据同步在内的与learner的一切通信。

Learn(Follower或Observer)节点会主动向Leader发起连接,Zookeeper就会进入集群同步阶段,集群同步主要完成集群中各节点状态信息和数据信息的一致。选出新的Leader后的流程大致分为:计算epoch、统一epoch、同步数据、广播模式等四个阶段。其中其前三个阶段:计算epoch、统一epoch、同步数据就是这一节主要介绍的集群同步阶段的主要内容,这三个阶段主要完成新Leader与集群中的节点完成同步工作,处于这个阶段的zk集群还没有真正做好对外提供服务的能力,可以看着是新leader上任后进行的内部沟通、前期准备工作等,只有等这三个阶段全部完成,新leader才会真正的成为leader,这时zk集群会恢复正常可运行状态并对外提供服务。

总体流程

被选举为Leader角色的节点,会创建一个Leader实例,然后执行Leader.lead()进入到Leader角色的任务分支中,其流程大致如下所示:

1521034934473

Leader分支大致可以分为5个阶段:启动LearnerCnxAcceptor线程、计算newEpoch、广播newEpoch、数据同步和集群状态监测。

Leader.lead()方法控制着Leader角色节点的主体流程,其实现较为简单,大致模式都是通过阻塞方法阻塞当前线程,直到该阶段完成Leader线程才会被唤醒继续执行下一个阶段;而每个阶段实现的具体细节及大量的网络IO操作等都在LearnerHandler中实现。比如计算newEpoch,Leader中只会判断newEpoch计算完成没,没有计算完成就会进入阻塞状态挂起当前Leader线程,直到集群中一半以上的节点同步了epoch信息后newEpoch正式产生才会唤醒Leader线程继续向下执行;而计算newEpoch会涉及到Leader去收集集群中大部分Learner服务器的epoch信息,会涉及到大量的网络IO通信等内容,这些细节部分都在LearnerHandler中实现。

涉及到网络IO就会存在Server和Client,这里的Server就是Leader,Client就是Learner(Follower和Observer统称Learner),对于Server端,主要关注Leader和LearnerHandler这两个类,而对于Client端,根据角色分类主要关注Follower或Observer这两个类。

Zookeeper中主要存在三个端口:
​ 1、客户端请求端口:对应于配置中的clientPort,默认是2181,就是客户端连接ZK对其进行增删改操作的端口;
​ 2、集群选举端口:之前分析过的集群中Leader选举涉及到网络IO使用的端口,对应于配置中“server.0=10.80.8.3:2888:2999”这里的2999就是集群选举端口;
​ 3、集群同步端口:Leader选举出后就会涉及到Leader和Learner之间的数据同步问题,集群同步端口的作用就是做这个使用的,对应于配置中”server.0=10.80.8.3:2888:2999“这里的2888;

启动LearnerCnxAcceptor线程

Leader首先会启动一个LearnerCnxAcceptor线程,这个线程做的工作就非常简单了,就是不停的循环accept接收Learner端的网络请求(这里的监听端口就是上面说的同步监听端口,而不是选举端口),Leader选举结束后被分配为Follower或Observer角色的节点会主动向Leader发起连接,Leader端接收到一个网络连接就会封装成一个LearnerHandler线程。

Leader类可以看成一个总管,和每个Learner服务器的交互任务都会被分派给LearnerHandler这个助手完成,当Leader检测到一个任务被一半以上的LearnerHandler处理完成,即认为该阶段结束,进入下一个阶段。

计算epoch

epoch在Zookeeper中是一个很重要的概念,前面也介绍过了:epoch就相当于Leader的身份编号,就如同身份证编号一样,每次选举产生一个新Leader时,都会为该Leader重新计算出一个新epoch。epoch被设计成一个递增值,比如上一个Leader的epoch是1,假如重新选举新的Leader就会被分配epoch=1。

epoch作用:可以防止旧Leader活过来后继续广播之前旧提议造成状态不一致问题,只有当前Leader的提议才会被Follower处理。Zookeeper集群所有的事务请求操作都要提交由Leader服务器完成,Leader服务器将事务请求转成一个提议(Proposal)并分配一个事务ID(zxid)后广播给Learner,zxid就是由epoch和counter(递增)组成,当存在旧leader向follower发送命令的时候,follower发现zxid所在的epoch比当前的小,则直接拒绝,防止出现不一致性。

Leader中代码如下:

1
2
3
4
/**
1.getEpochToPropose():计算新epoch,该方法会阻塞,直到集群中过半的Follower节点同步完epoch并计算出一个新的epoch才会继续向下执行
*/
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

Leader中代码比较简单,具体如何计算newEpoch及其细节逻辑可以查看LearnerHandler中的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
QuorumPacket qp = new QuorumPacket();
/**
* 第一步、接收FOLLOWERINFO或OBSERVERINFO数据包,该数据包主要包含Learner节点的sid、protocolVersion、type(区分Follower还是Observer)和epoch,该数据包主要功能:获取到集群中过半节点的epoch,然后找出一个最大的epoch,新epoch就是在这个最大epoch的基础上加1
*/
ia.readRecord(qp, "packet");//接收Follower发送过来的FOLLOWERINFO数据包
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}
//接收learner发送过来的LearnerInfo,包含sid
byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
if (learnerInfoData.length == 8) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
this.sid = bbsid.getLong();
} else {
//正常会执行到这里
LearnerInfo li = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
this.sid = li.getServerid();//对端sid
this.version = li.getProtocolVersion();//默认版本一般是65536
}
} else {
this.sid = leader.followerCounter.getAndDecrement();
}

LOG.info("Follower sid: " + sid + " : info : " + leader.self.quorumPeers.get(sid));
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
//获取对端Follower节点发送过来的epoch
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
//计算出新的epoch,计算原则:上一任epoch基础上加1,计算结束标志:过半服务器参与计算,如果未过半参与则当前线程执行到这里会进入休眠状态
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);

总结:newEpoch计算逻辑比较简单的,大致流程如下:Learner服务器连接到Leader服务器时,会向Leader发送FOLLOWERINFO或OBSERVERINFO数据包,该数据包主要包含Learner节点的sid、protocolVersion、type(区分Follower还是Observer)和epoch,这里的epoch可以认为是上一代Leader的epoch值。Leader会收集集群中一半以上的服务器epoch信息后,选出一个最大值然后加1就是newEpoch值。

Leader线程进入阻塞直到过半节点参与计算并产生新的newEpoch,该阶段任务才算完成,Leader线程才会被唤醒继续执行下一个阶段。为避免一直阻塞,会被设置一个超时时间,如果超时后epoch仍未计算结束则抛出InterruptedException,当前服务器会退出Leader角色,重新进入Leader选举状态。

过半原则是ZAB协议中的一个核心思想,后面分析的每个阶段任务基本都是只要保证过半节点参与完成即可以代表该任务完成。Zookeeper集群正常工作的前提是过半服务器处于正常运行状态,因此,只要保证过半服务器参与事务变更就能保证正常集群中至少有一台服务器数据是正常的,又由于事务ID(zxid)递增性保证了事务的执行顺序,通过数据同步很容易实现集群中数据一致性。

统一epoch

newEpoch计算完成后,该值只有Leader知道,现在需要将newEpoch广播到集群中所有的服务器节点上,让他们都更新下新Leader的epoch信息,这样他们在处理请求时会根据epoch判断该请求是不是当前新Leader发出的,可以防止旧Leader活过来后继续广播之前旧提议造成状态不一致问题,只有当前Leader的提议才会被Follower处理。

Leader中代码:

1
2
3
4
/**
* 2.waitForEpochAck():将新epoch广播给集群中所有Learner节点,该方法会阻塞,直到新epoch广播到集群中,并收到一半以上节点ack反馈时才会继续向下执行
*/
waitForEpochAck(self.getId(), leaderStateSummary);

LearnerHandler中代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 第二步、计算出新epoch后,接下来就通过构建一个LEADERINFO数据包,该数据包中带有新newEpoch值,并将该数据包广播到集群中的所有Learner节点,完成newEpoch同步工作
*/
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();

/**
* 第三步、Learner服务器在收到带有newEpoch的LEADERINFO数据包后,会回复一个ACKEPOCH数据包,ACKEPOCH数据包同时会把Learner服务器的lastZxid带过来,接下来Leader和Learner数据同步采用哪种方式就是根据这个lastZxid进行匹配判断确定
*/
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);//阻塞直到收到过半的ACKEPOCH数据包

总结:广播newEpoch流程也比较简单,就是将之前计算出来的newEpoch封装到LEADERINFO数据包中,然后广播到集群中的所有节点,同时会收到ACKEPOCH回复数据包,当集群中一半以上的节点进行了回复则可以认为newEpoch广播完成,则进入下一阶段。同样,为避免线程一直阻塞,休眠线程依然会被添加超时时间,超时后仍未完成则抛出InterruptedException异常重新进入Leader选举状态。

数据同步

之前分析过Leader的选举策略:lastZxid越大越会被优先选为Leader。lastZxid是节点上最大的事务ID,由于zxid是递增的,lastZxid越大,则表示该节点处理的数据越新,即数据越完整。所以,被选为Leader的节点数据完整性越高,为了数据一致性,这时就需要其它节点和Leader进行数据同步保持数据一致性。

Leader中代码:

1
2
3
4
5
6
7
8
/**
* 3.新epoch计算完成并广播同步到集群中后,由于选举策略决定,新选出的Leader一定是数据最新的节点,其它节点要和Leader之间进行数据同步,同步大致流程:
1、根据Learner节点的lastZxid和Leader节点的lastZxid对比,确定同步方式DIFF、SNAP、TRUNC以及DIFF+TRUNC
2、数据同步Leader发送数据包大致分为三类:先发送同步方式数据包(DIFF+SNAP+TRUNC)、然后发送同步Proposal+Committed数据包、最后发送NewLeader数据包表示Leader已经将同步数据发送完成
3、Learner接收到同步方式后进行相关的准备工作,然后根据Proposal+Committed进行同步,最后收到NewLeader表示同步数据包发送完毕了,需要对NewLeader数据包进行回复Ack数据包
4、waitForNewLeaderAck()方法功能就是进入阻塞直到收到过半Follower节点回复的ack(包括自身)才会继续向下执行
*/
waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);

Leader中代码依然很简单,就是阻塞当前线程直到同步完成,具体实现细节在LearnerHandler中。由于这部分涉及到代码较多,这里就将大致流程梳理下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1、Leader和Learner间同步可以分为四种模式:SNAP(全量同步)、DIFF(差异同步)、TRUNC(回滚同步)、TRUNC+DIFF(回滚+差异同步)

2、具体采用何种方式同步,关键由如下几个参数判断确定:
peerLastZxid:该Learner服务器最后处理的ZXID
minCommittedLog:Leader服务器提议缓存队列committedLog中的最小ZXID
maxCommittedLog:Leader服务器提议缓存队列committedLog中的最大ZXID

a、peerLastZxid就是上面介绍的ACKEPOCH带过来的Learner端最大事务ID,
b、Leader端会在zk服务器上维护一个committedLog集合,该集合保存了最近事务请求的议案Proposal,默认保存最近500条;minCommittedLog就是这个集合中最小的zxid,而maxCommittedLog就是这个集合中最大的zxid

下面来分析下各种情况的逻辑:
a、全量同步(SNAP同步)
场景1:peerLastZxid < minCommittedLog
场景2:committedLog集合为空,且Leader和Learner的lastZxid不相等

先来看下场景1:由于peerLastZxid < minCommittedLog,说明Learner和Leader之间的数据差异较大,即使将committedLog集合中全部的议案同步个Learner也无法保持他们之间的数据一直,因此必须采用全量同步方式
再来看看场景2:由于committedLog集合为空表示没有可用于进行差异化同步的议案,但是Leader和Learner的lastZxid又不相等,说明他们之前的数据不是一致的,因此也只能采取全量同步方式

所谓全量同步就是Leader服务器将本机上的全量内存数据构建出一个镜像快照snapshot,通过网络IO同步给Learner,Learner节点获取到Leader的镜像快照数据即可用于覆盖当前内存数据,这样就保证同步后Learner和Leader之间数据的一致。

b、差异化同步(DIFF同步)
场景:minCommittedLog <= peerLastZxid <= maxCommittedLog

这种场景说明Learner和Leader之间数据存在数据差异,且差异的数据不大,可以进行差异化同步,即找出committedLog集合中在Learner端没有执行的议案,然后将这些议案发送过去,让Learner重新执行下这个议案,同时在每个议案后面都会被追加一个COMMIT议案进行提交(zk中的议案Proposal就相当于Oracle中的redo或MySql中binlog,通过将这些日志逐条重新执行一次可以完成数据的恢复),最后达到Leader和Learner间数据一致

c、回滚+差异化同步(TRUNC+DIFF)
上面分析minCommittedLog <= peerLastZxid <= maxCommittedLog情况,会进行差异化同步,但是这里会存在一类特殊情况,即:Learner存在Leader上不存在的数据,这里需要先对Learner进行回滚,然后再进行DIFF操作

这种场景是如何出现的呢?
1、假设有A、B、C三台机器,假如某一时刻A是Leader服务器,其epoch=1,其上已经存在的事务ID包括:0x10000001和0x10000002,然后客户端发送一个事务请求q1,其事务ID为:0x10000003,A将事务请求q1写入到本地事务日志文件后,准备将其广播出去,但还未广播出去时A服务器奔溃导致zk集群重新进入Leader选举状态
2、这时B和C进行选举,假设C被选为新的Leader后,其epoch变更为2,接收到客户端新的事务请求q2,其事务ID为:0x20000001
3、这时之前奔溃得到A节点活过来了,得知当前Leader为B节点,节点A就会连接节点B进行数据同步,这时B节点作为Leader其维护的committedLog中包括的事务ID有:0x10000001、0x10000002和0x20000001,而获取到节点A的peerLastZxid为
0x10000003,其符合minCommittedLog <= peerLastZxid <= maxCommittedLog,但是0x10000003在Leader上并不存在,这时就需要将节点A上的0x10000003回滚掉,然后将0x20000001发送给A进行差异化同步,即可完成数据一致性同步

还会存在另一种场景同样会导致这种情况出现:
1、假设A、B、C、D、E五台服务器,假如某一时刻A是Leader服务器,其epoch=1,现在集群中已存在的事务有2个:0x10000001和0x10000002,然后客户端发送一个事务请求q1,其事务ID为:0x10000003,A将事务请求q1写入到本地事务日志文件后,就会将其广播出去,如果这时只广播给了节点B后就挂了,然后进入选举状态
2、虽然节点B的lastZxid最大会被优先选为Leader,但是可能节点B的网络较差最终C、D和E三台节点都选举E节点作为新Leader,3大于集群(5)的一半,选举是有效的,这时依然会存在上述的情况

d、回滚同步(TRUNC同步)
场景:peerLastZxid大于maxCommittedLog。
这种场景其实就是上述先回滚再差异化同步的简化模式,Learner只需要回滚到ZXID值为maxCommitedLog即可和Leader保持数据一致

3、同步大致流程:
1、首先确定同步方式,然后将同步方式并附带一些额外的参数封装成一个数据包发送给Learner节点,让Learner知道接下来如何进行数据同步,以及Learner可能会进行一些前期的准备工作,比如TRUNC+DIFF方式则需要先回滚一些数据,SNAP方式则需要先清理掉内存数据等等
2、然后发送Proposal+Commit或Snapshot镜像数据进行同步工作
3、最后Leader会向Learner发送NEWLEADER数据包,代表Leader已经将所有需要同步的数据包发送完毕了,Learner收到NEWLEADER数据包会回复一个ACK数据包
4、当收到过半回复的ACK数据包后,表示Leader端进行数据同步完成,Leader最后会向Learner发送UPTODATE数据包,告知Learner集群同步任务已经完成,Learner也会退出同步流程

详细代码逻辑可以参见:LearnerHandler、Leader、Follower、Observer等这几个类。

总结

新Leader被选举出来后,这个新Leader并不一定就是真正的Leader,只能称为准Leader。因为,其刚上任政权还不太稳固,随时可能会被推翻下课重新进入Leader选举状态,这时的准Leader就需要和集群中的节点进行沟通,如确定自己的政权代号epoch并广播告知整个集群,同时将数据和集群中其它节点进行同步保持数据的一致性等。注意:每个任务只需要保证集群中的大部分节点(超过一半以上节点)参与同意即可,不需要集群中所有节点都参与同意,大大降低了任务难度。当这些工作完成后,即可认为这个准Leader获取了大部分节点信任,这时就可以登上宝座成为真正的Leader,并将zk集群恢复至正常可运行状态并开始对外提供服务。

坚持原创技术分享,您的支持将鼓励我继续创作!