Disruptor之概览

Diruptor

概述

“多核危机”驱动了并发编程的复兴,然后并发编程和一般的系统相比,复杂性有个很大梯度的上升。多线程开发很大困难在于:多个线程间存在依赖关系时,如何进行协调。依赖一方面是执行顺序的依赖,如某个线程执行需要依赖其他线程执行或其它线程的某些阶段执行结果,Java为我们提供的解决方案是:wait/notify、lock/condition、join、yield、Semaphore、CountDownLatch、CyclicBarrier以及JDK7新增的一个Phaser等;数据依赖主要是多个线程对同一资源并发修改导致的数据状态不一致问题,Java中主要依靠Lock和CAS两种方案,也就是我们熟知的悲观锁、乐观锁。

然而,当你在并发编程方面慢慢有些经验并开始在项目中使用时,你会发现仅仅依赖JDK提供的上面所说开发工具类是远远不够的, JDK提供的工具类都只能解决一个个功能“点”的问题。并发编程复杂性一个体现就是:多个顺序执行流在多核CPU中同时并行执行与我们已经习惯的单个数据顺序流执行的方式产生了很大的冲突。

好比:现在你开车从A地到B地去,传统的开发模式就像从A地到B地之间只存在一条公路,你只需要延着这个公路一直开下去就可以达到B地;假如经过多年发展,现在A地到B地横起有10条公路,纵起有10条公路,它们之间相互交叉形成错综复杂的公路网,你再开车从A地到B地就会存在太多的选择,可能从东南西北任何方向出发最终都能到达B地。这就体现了并发编程和传统编程复杂性的对比:传统编程由于只存在一个顺序执行流,可以很好的预判程序的执行流程;而并发编程存在太多的顺序执行流导致很难准确的预判出它们真正的执行流程,一旦出现问题也很难排查,就好比上面的例子第二种情况,你很难预判你开车的真正路线,而且可能存在每次路线都不一样情况。

我认为一个并发编程项目好坏其中一个关键核心就是:项目的整体结构是否清晰。很简单的一个例子,调用notify()方法唤醒挂起在指定对象上的休眠线程,如果没有一个清晰简单的架构设计,可能会导致在该对象上进行休眠的对象散落到系统中各处代码上,很难把控具体唤醒的是哪个线程从而与你的业务逻辑发生偏差导致bug的出现。当然,项目结构清晰在传统编程中也是非常看重的,只有结构清晰的架构才会让人易于理解,同时和他人沟通探讨时方便描述,但是在并发编程中这点尤为重要,因为并发编程的复杂性更高,没有一个清晰的结构设计,你可能经过大量测试修改暂时做出了一个看似没有bug的项目,但是后期需求变更或者是其他人来维护这个项目时,很难下手导致后期会引入大量的bug,而且不利于项目功能的扩展。

常用的并发编程使用的模型有并行模型、流水线模型、生产者/消费者模型、Actor模型等,采用模型设计一方面是因为这些模型都是大牛们经过长时间实际生产经验的积累总结出的并发编程方面一些好的解决方案;另一方面,采用模型设计可以解决相关人员之间沟通信息不对等问题,降低沟通学习成本。

并行模型是JDK8中Stream所采用的实现并发编程的方式,并行模型非常简单,就是为每个任务分配一个线程直到该任务执行结束,示意图如下:

并行模型

并行模型太过简单导致对任务的精细化控制不足,一个任务可能会被分解为多个阶段,而每个阶段的子任务特性可能差别很大,这时并行模型就无能为力了。并行模型只适合于CPU密集型且任务中不含IO阻塞等情况的任务。这时,就演进出流水线模型,示意图如下:

流水线模型

流水线模型在实际的并发编程中使用比较常见,我们所说的Pipeline设计模型、Netty框架等都是这一思想的体现。

生产者/消费者模型在并发编程中也是使用频度非常高的一个模型,生产者/消费者模型可以很容易地将生产和消费进行解耦,优化系统整体结构,并且由于存在缓冲区,可以缓解两端性能不匹配的问题。

Actor模型其典型代表就是Akka,基于Akka可以轻松实现一个分布式异步数据处理集群系统,非常强大,后期我们有机会可以再深入讨论下Akka。

好了,说了这么多,终于要开始正题:Disruptor,官方宣传基于该框架构建的系统单线程可以支撑每秒处理600万订单,此框架真乃惊为天人。Disruptor在生产者/消费者模型上获得尽量高的吞吐量和尽量低的延迟,其目标就是在性能优化方面做到极致。国内国外都存在大量的知名项目在广泛使用,比如我们所熟知的strom底层就依赖Disruptor的实现,其在并发、缓存区、生产者/消费者模型、事务处理等方面都存在一些性能优秀的方案,因此是非常值得深入研究的。

生产者/消费者模型

生产者/消费者模型在编程中使用频度非常高的一个模型,生产者/消费者模型可以很容易地将生产和消费进行解耦,优化系统整体结构,并且由于存在缓冲区,可以缓解两端性能不匹配的问题。生产者/消费者和我们所熟悉的设计模式中的观察者模型很相似,生产者类似于被观察者,消费者类似于观察者,被观察者的任何变动都以事件的方式通知到观察者;同理,生产者生产的数据都要传递给消费者最终都要被消费者处理。

一般项目开发中,我们可以使用JDK提供的阻塞队列BlockingQueue很简单的实现一个生产者/消费者模型,其中生产者线程负责提交需求,消费者线程负责处理任务,二者之间通过共享内存缓冲区进行通信。

生产者消费者模型

BlockingQueue实现类主要有两个:ArrayBlockingQueue和LinkedBlockingQueue,底层实现一个是基于数组的,一个是基于链表的,这种实现方式的差异导致了它们使用场景的不一样。在生产者/消费者模型中的缓存设计上肯定优先使用ArrayBlockingQueue,但是查看ArrayBlockingQueue底层源码会发现,读写操作通过重入锁实现同步,而且读写操作使用的是同一把锁,并没有实现读写锁分离;另外,锁本身的成本还是比较高的,锁容易导致线程上下文频繁的发生切换,了解CPU核存储硬件架构的可能会知道,每核CPU都会存在一个独享的高速缓存L1,假如线程切换到其它CPU上执行会导致之前CPU高速缓存L1中的数据不能再被使用,降低了高速缓存使用效率。因此,在高并发场景下,性能不是很优越。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//向Queue中写入数据
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//可中断方式获取锁,实现同步
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
//从Queue中取出数据
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//可中断方式获取锁,实现同步
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

Disruptor消息生产模型

生产模型

Producer生产出一个消息事件Event,需要放入到RingBuffer中,流程大致如下:

​ 1、首先调用Sequencer.next()方法,获取RingBuffer上可用的序号用于将新生成的消息事件放入;

​ 2、Sequencer首先对nextValue+1代表当前需要申请的RingBuffer序号(nextValue标记了之前已经申请过的序号,nextValue+1就是下一个可申请的序号),但是nextValue+1指向的RingBuffer槽位存放的消息可能并没有被消费,如果直接返回这个序号给生产者,就会导致生产一方将该槽位的消息事件重新填充覆盖导致之前数据丢失,这里就需要一个判断:判断申请的RingBuffer序号代表的槽位之前的消息事件是否已被消费,判断逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public long next(int n) 
{
if (n < 1) //n表示此次生产者期望获取多少个序号,通常是1
{
throw new IllegalArgumentException("n must be > 0");
}

long nextValue = this.nextValue;
//这里n一般是1,代表申请1个可用槽位,nextValue+n就代表了期望申请的可用槽位序号
long nextSequence = nextValue + n;
//减掉RingBuffer的bufferSize值,用于判断是否出现‘绕圈覆盖’
long wrapPoint = nextSequence - bufferSize;
//cachedValue缓存之前获取的最慢消费者消费到的槽位序号
long cachedGatingSequence = this.cachedValue;
//如果申请槽位序号-bufferSize比最慢消费者序号还大,代表生产者绕了一圈后又追赶上了消费者,这时候就不能继续生产了,否则把消费者还没消费的消息事件覆盖
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{

/**
cursor代表当前已经生产完成的序号,了解多线程可见性可能会知道:
1、CPU和内存间速度不匹配,硬件架构上一般会在内存和CPU间还会存在L1、L2、L3三级缓存
2、特别是L1高速缓存是CPU间相互独立不能共享的,线程操作可以看着基于L1缓存进行操作,就会导致线程间修改不会立即被其它线程感知,只有L1缓存的修改写入到主存然后其它线程将主存修改刷新到自己的L1缓存,这时线程1的修改才会被其它线程感知到
3、线程修改对其它线程不能立即可见特别是在高并发下可能会带来些问题,JAVA中使用volatile可以解决可见性问题
4、这里就是采用UNSAFE.putLongVolatile()插入一个StoreLoad内存屏障,具体可见JMM模型,主要保证cursor的真实值对所有的消费线程可见,避免不可见下消费线程无法消费问题
*/
cursor.setVolatile(nextValue);
long minSequence;
//Util.getMinimumSequence(gatingSequences, nextValue)获取当前时刻所有消费线程中,消费最慢的序号
//上面说过cachedValue是缓存的消费者最慢的序号
//这样做目的:每次都去获取真实的最慢消费线程序号比较浪费资源,而是获取一批可用序号后,生产者只有使用完后,才继续获取当前最慢消费线程最小序号,重新获取最新资源
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
//如果获取最新最慢消费线程最小序号后,依然没有可用资源,做两件事:
// 1、唤醒waitStrategy上所有休眠线程,这里即是消费线程(避免因消费线程休眠而无法消费消息事件导致生产线程一直获取不到资源情况)
// 2、自旋休眠1纳秒
//可以看到,next()方法是一个阻塞接口,如果一直获取不到可用资源,就会一直阻塞在这里
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1L);
}
//有可用资源时,将当前最慢消费线程序号缓存到cachedValue中,下次再申请时就可不必再进入if块中获取真实的最慢消费线程序号,只有这次获取到的被生产者使用完才会继续进入if块
this.cachedValue = minSequence;
}
//申请成功,将nextValue重新设置,下次再申请时继续在该值基础上申请
this.nextValue = nextSequence;
//返回申请到RingBuffer序号
return nextSequence;
}

​ 3、申请到可用序号后,提取RingBuffer中该序号中的Event,并重置Event状态为当前最新事件状态

​ 4、重置完成后,调用Sequencer.publish()提交序号,提交序号主要就是修改cursor值,cursor标记已经生产完成序号,这样消费线程就可以来消费事件了

1
2
3
4
5
6
7
8
@Override
public void publish(long sequence)
{
//修改cursor序号,消费者就可以进行消费
cursor.set(sequence);
//唤醒消费线程,比如消费线程消息到无可用消息时可能会进入休眠状态,当放入新消息就需要唤醒休眠的消费线程
waitStrategy.signalAllWhenBlocking();
}

总结:消息事件生产主要包含三个步骤:

​ 1、申请序号:表示从RingBuffer上获取可用的资源

​ 2、填充事件:表示获取到RingBuffer上可用资源后,将新事件放入到该资源对应的槽位上

​ 3、提交序号:表示第二部新事件放入到RingBuffer槽位全部完成,提交序号可供消费线程开始消费

Disruptor消息处理模型

消费模型

消息处理端需要从RingBuffer中提取可用的消息事件,并注入到用户的业务逻辑中进行处理,流程大致如下:

​ 1、消费端核心类是EventProcessor,它实现了Runnable接口,Disruptor在启动的时候会将所有注册上来的EventProcessor提交到线程池中执行,因此,一个EventProcessor可以看着一个独立的线程流用于处理RingBuffer上的数据

​ 2、EventProcessor通过调用SequenceBarrier.waitFor()方法获取可用消息事件的序号,其实SequenceBarrier内部还是调用WaitStrategy.waitFor()方法,WaitStrategy等待策略主要封装如果获取消息时没有可用消息时如何处理的逻辑信息,是自旋、休眠、直接返回等,不同场景需要使用不同策略才能实现最佳的性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ProcessingSequenceBarrier:
WaitStrategy waitStrategy;
Sequence dependentSequence;
boolean alerted = false;
Sequence cursorSequence;//可供消费消息的sequence
Sequencer sequencer;

ProcessingSequenceBarrier中核心方法只有一个:waitFor(long sequence),传入希望消费得到起始序号,返回值代表可用于消费处理的序号,一般返回可用序号>=sequence,但也不一定,具体看WaitStrategy实现
/**
* 总结:
* 1、sequence:EventProcessor传入的需要进行消费的起始sequence
* 2、这里并不保证返回值availableSequence一定等于given sequence,他们的大小关系取决于采用的WaitStrategy
* a.YieldingWaitStrategy在自旋100次尝试后,会直接返回dependentSequence的最小seq,这时并不保证返回值>=given sequence
* b.BlockingWaitStrategy则会阻塞等待given sequence可用为止,可用并不是说availableSequence == given sequence,而应当是指 >=
* c.SleepingWaitStrategy:首选会自旋100次,然后执行100次Thread.yield(),还是不行则LockSupport.parkNanos(1L)直到availableSequence >= given sequence
*/
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
//调用WaitStrategy获取RingBuffer上可用消息序号,无可消费消息是该接口可能会阻塞,具体逻辑由WaitStrategy实现
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

if (availableSequence < sequence)
{
return availableSequence;
}

//获取消费者可以消费的最大的可用序号,支持批处理效应,提升处理效率。
//当availableSequence > sequence时,需要遍历 sequence --> availableSequence,找到最前一个准备就绪,可以被消费的event对应的seq。
//最小值为:sequence-1
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

​ 3、通过waitFor()返回的是一批可用消息的序号,比如申请消费7好槽位,waitFor()返回的可能是8表示从6到8这一批数据都已生产完毕可以进行消费

​ 4、EventProcessor按照顺序从RingBuffer中取出消息事件,然后调用EventHandler.onEvent()触发用户的业务逻辑进行消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
while (true)
{
try
{
//读取可消费消息序号
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}

while (nextSequence <= availableSequence)
{
//循环提取所有可供消费的消息事件
event = dataProvider.get(nextSequence);
//将提取的消息事件注入到封装用户业务逻辑的Handler中
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
}
}

​ 5、当这批次的消息处理完成后,继续重复上面操作调用waitFor()继续获取可用的消息序号,周而复始

好了,这节主要对Disruptor的生产模型和消费模型进行了一个简单的介绍,后面会逐渐对Disruptor涉及到的每个核心组件进行分析,了解它们优秀的设计思想。

坚持原创技术分享,您的支持将鼓励我继续创作!