0%

Disruptor 介绍与原理解析

Disruptor 是 LMAX 公司开发的一个高性能队列,开发初衷是解决 Java 提供的内存队列的延迟问题。目前有很多知名项目都应用了 Disruptor 以获得高性能,包括 Apache Storm、Camel、Log4j2 等,大量项目借鉴了它的设计机制,故了解 Disruptor 的实现原理是有必要的

这里所说的队列,是系统内部的内存队列,不是分布式队列

本文代码基于 disruptor:4.0.0 源码地址:https://github.com/LMAX-Exchange/disruptor

Java 内存队列存在的延迟问题

并发场景下,多个线程操作一个队列,需要考虑线程安全问题。jdk 提供的线程安全队列,Java的内置队列如下表所示

内置队列 有界性 数据结构 线程安全方案
ArrayBlockingQueue 有界 加锁 arraylist
LinkedBlockingQueue 可选 加锁 linkedlist
ConcurrentLinkedQueue 无界 无锁 linkedlist
LinkedTransferQueue 无界 无锁 linkedlist
PriorityBlockingQueue 无界 加锁 heap
DelayQueue 无界 加锁 heap

java 线程安全的内置队列,由于锁争抢、伪共享存在一定的性能问题,原因如下

GC 与访问速度

由于链表结构相比于数组,更容易出现内存碎片,更容易触发垃圾回收,而且一般队列在使用中,都是 FIFO 生产消费场景,很少会遇到中间插入/删除的情况,所以数组比链表有更高的内存利用率和更少的 GC 次数

另外,连续的内存空间可以通过索引定位,比链表有更快的访问速度

加锁与CAS的性能比较

加锁采用悲观锁的思想,认为线程会冲突,对数据操作之前,先加锁再执行,执行之后解锁,同时只有一个线程持有锁和访问数据

CAS 采用乐观锁的思想,认为线程不会冲突,先执行,记录数据最初的值,执行完给数据赋值时,比较数据当前值与最初值是否一致,如果一致,则这期间没有其他线程操作,或无效操作,可以赋值;如果不一致,则放弃赋值,重新执行,直到赋值成功。CAS 是 CPU 的一个指令,由 CPU 保证原子性

可以发现,如果执行动作很轻量,CAS 的成本更低,如果执行动作很耗时,应该选择加锁

ArrayBlockingQueue 通过加锁控制并发,竞争锁会导致线程挂起、等待、恢复,这个过程存在很大开销,对性能有严重的影响

这里直接参考 Disruptor 论文中的实验数据:

image-20240225155345326

从上图可知:

单线程:不加锁的性能 > CAS操作的性能 > 加锁的性能

多线程:CAS操作的性能 > 加锁的性能

所以加锁的性能是最差的

伪共享

多级缓存

为了解决 cpu 处理速度与内存访问速度的差异,cpu 设置了 L1、L2、L3 三级缓存,越靠近 cpu 的缓存,速度越快,容量越小。当 CPU 读取数据时,会先到 L1 查找,如果没有再去 L2、L3,最后是内存,走的越远耗时越长

CPU 访问不同级别缓存/内存的耗时如下

位置 耗时
内存 60-80 ns
L3 cache 15 ns
L2 cache 3 ns
L1 cache 1 ns

可见 CPU 从内存读数据比 L1 慢了 2 个数量级

缓存写入与失效

缓存由多个缓存行(cache line)组成,每个 cache line 为 64 字节,可以存 8 个 long 类型变量

CPU 在缓存中没有找到数据,到内存中读到数据后,会写入缓存中。CPU 每次从内存拉数据,会把相邻的 64 字节(将内存空间按 64 字节分成小块)一起写入缓存的一个 cache line 中

比如在访问一个 long 数组 arr 时,假设 arr[0] ~ arr[7] 在一个 64 字节内存块中,读取了 arr[0],会把 arr[1] ~ arr[7] 也加载到缓存,这样再访问 arr[1] 时,直接在 L1 中读,不需要到内存中,提高了读取速度

当 cache line 对应的内存数据被修改,由于缓存和内存数据不一致,则 cache line 失效,需要重新到内存读,在多 CPU 下,需要让所有 CPU 的缓存中的 cache line 都失效

伪共享的验证

这样会出现这样一个场景:CPU1 不断读 arr[0],CPU2 不断写 arr[1],CPU1 本可以从自己的 L1 cache 中拿到 arr[0] 的值,却由于 CPU2 的写操作,导致自己的 L1 cache 中对应的 cache line 不停失效,不得不到内存中读数据。看似两个 CPU 在处理不同的变量,却互相影响了,这种无法充分利用 cache line 的现象,称为伪共享

对于伪共享,一般的解决方案,是增大变量之间的间隔,使两个变量不可能出现在一个 cache line 上,以空间换时间

实验验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Share {

private static final long turn = 500L * 1000L * 1000L;
private static final int len = 10;
private static final long[] arr = new long[len * 8];

public static void runInFalseShare(int i) {
for (int j = 0; j < turn; j++) {
arr[i] = j;
}
}

public static void runInShare(int i) {
for (int j = 0; j < turn; j++) {
// 让数据不在一个缓存行中
arr[i * 8] = j;
}
}

public static void test() throws InterruptedException {
Thread[] threads = new Thread[len];
for (int i = 0; i < len; i++) {
int finalI = i;
threads[i] = new Thread(() -> {
// runInFalseShare(finalI); // 测试伪共享时执行
runInShare(finalI); // 测试共享时执行
});
}
long start = System.nanoTime();
for (int i = 0; i < len; i++) {
threads[i].start();
}
for (int i = 0; i < len; i++) {
threads[i].join();
}
long end = System.nanoTime();
System.out.println(end - start);
}

public static void main(String[] args) throws InterruptedException {
test();
}
}

上面的例子,runInFlaseShare(伪共享)方法让每个线程访问的数据存在位于同一个缓存行中的情况,runInShare(共享)方法通过设置了 8 * 8 = 64 byte,让每个线程访问的数据一定不在同一个缓存行中

分别执行 runInFlaseShare 和 runInShar 的测试方法,运行 10 次取均值结果如下:

方法 耗时
runInFlaseShare 5737 ms
runInShare 765 ms

可以看到伪共享耗时是共享耗时的 7.5 倍

问题讨论

可以看到三个问题:GC次数、线程安全方案 和 伪共享,会让 java 内置线程安全队列,产生一定性能问题,如果让我们解决这些问题,应该如何设计队列呢

思路:

  1. 使用数组结构
  2. 使用 CAS 控制并发
  3. 让队列的元素不在同一个缓存行中,避免伪共享发生

按照这个思路,把 ArrayBlockingQueue 的并发控制从加锁改为 CAS,再对其中所有元素前后都插入 4 个 8 字节,是不是就实现了一个高效的内存队列呢,下面看下 Disruptor 的做法

Disruptor 的设计方案

在了解 Disruptor 如何解决上面问题之前,先要了解一些术语

术语介绍

先看一个简单的栗子

1
2
3
4
5
6
7
8
9
// 创建Disruptor
Disruptor<Event> disruptor = new Disruptor<>(new EventFactory(), 1024, Executors.defaultThreadFactory());
// 连接事件处理器
disruptor.handleEventsWith(new EventProcessor());
// 启动Disruptor
RingBuffer<Event> ringBuffer = disruptor.start();
// 生产事件
EventProducer eventProducer = new EventProducer(ringBuffer);
eventProducer.produce("Hello Disruptor!");

下面解释栗子中出现的术语

  • Event:存放消息的单位
  • Ring Buffer:环形数据缓冲区,一个首尾相接的环,用于存放 Event ,供生产者存入数据,和消费者拉取数据
  • Sequence:序列,用于记录消息位置
  • Sequencer:生产者,有单生产者和多生产者两种实现
  • Sequence Barrier:序列屏障,控制消费者消费进度,防止消费到不可用消息
  • Wait Strategy:等待策略,消费者等待生产者的策略
  • Event Processor:消息处理器,循环从 RingBuffer 批量获取 Event 并执行 Event Handler
  • Event Handler:执行消费逻辑

Event

存放消息,如:

1
2
3
4
@Data
class Event {
private String message;
}

Event Factory

创建消息的工厂

1
2
3
4
5
6
class EventFactory implements com.lmax.disruptor.EventFactory<Event> {
@Override
public Event newInstance() {
return new Event();
}
}

Sequence

一个数组 arr,我们通过 arr[i] 可以取到一个位置的元素,但是这样是线程不安全的,比如线程 A 想取第 i 个元素,在读数据前,其他线程修改了 i 的值,这样线程 A 取到了错误的值;Sequence 用来代替 i ,线程安全的从数组中取数

用来记录 Ring Buffer 中槽的位置,结构如下

sequence.drawio

前面提到,一个缓存行是 64 字节,Sequence 通过在 value 前后填充 56 字节,使得 value 一定独享一个缓存行,避免了伪共享

更新 Sequence 的 value:

1
2
3
4
5
6
7
8
9
public long addAndGet(long increment) {
long currentValue;
long newValue;
do {
currentValue = this.get();
newValue = currentValue + increment;
} while(!this.compareAndSet(currentValue, newValue));
return newValue;
}

通过 CAS 的方式,更新 value 的值,即当 currentValue 等于内存中 value 的值,才为 value 赋值

Sequence 通过填充字节和CAS,优化了并发场景下读写 Sequence 的性能

令 sequence B 是可消费的最后一个槽的索引,那么消费者线程 N 再每次消费时,都要读 B 的值,判断当前索引是否小于 B,假设 B 没有填充字节,则有可能 B 所在 cache line 的其他数据被写,导致缓存失效,N 需要从内存中读 B 的值;有了填充字节,只要 B 没有被写,缓存就不会失效

Sequence 的功能与 AtomicLong 的功能几乎一致,在 AtomicLong 的基础上优化了缓存行伪共享的问题

Sequencer

生产者,有两种实现:单生产者(SingleProducerSequencer)和多生产者(MultiProducerSequencer)

SingleProducerSequencer

一个 Ring Buffer 只有一个生产者

SingleProduerSequencer 的结构如下 sequencer.drawio

nextValue:下一个可申请的序列

cachedValue:缓存最小已被占用的序列,用来计算剩余序列是否足够被申请

nextValue.drawio

如图,nextValue = 6,cachedValue = 3,说明序列 3、4、5 已被占用,6、7、0、1、2 可申请

下面看几个重要方法:

方法 功能
SingleProducerSequencer(final int bufferSize, final WaitStrategy waitStrategy) 构造方法bufferSize:ring buffer 的大小,必须是 2 的 n 次幂waitStrategy:生产消息时发现队列已满的等待策略
boolean hasAvailableCapacity(int requiredCapacity) 是否有可用容量,通过 nextValue + requiredCapacity 与 cachedValue 比较,判断剩余的序列是否够用
long next(int n) 声明后面 n 个序列的状态是发布中
void publish(long sequence) 发布一个序列,意味着这个序列已经被充填了数据(已生产)

SingleProducerSequencer 生产 n 个消息的执行流程是:

  1. 创建一个大小为 bufferSize 的 ring buffer(构造方法)
  2. 查看当前容量是否足够(hasAvailableCapacity 方法)
  3. 如果容量足够,声明 n 个序列为发布中(next 方法)
  4. 为这 n 个序列填充数据,将已填充数据的序列的状态改为已发布(publish 方法)
  5. 如果容量不足,执行等待策略
MultiProducerSequencer

一个 Ring Buffer 有多个生产者

MultiProducerSequencer 的结构如下

1
2
3
4
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final int[] availableBuffer;
private final int indexMask;
private final int indexShift;

gatingSequenceCache:记录生产的位置

availableBuffer:记录每个槽当前数据所在的圈数,初始值 -1,用来判断数据是否可用(可以被消费),大小为 buffer size

indexMask = bufferSize - 1,用来计算 sequence 在 availableBuffer 中的位置:index = sequence & indexMask = sequence & (2^n - 1)

indexShift = log2(bufferSize),用来计算 availableBuffer[i] 的值,flag = availableBuffer[i] = sequence >>> indexShift

下面介绍几个重要方法

1
2
3
4
5
6
7
8
9
10
// 计算 sequence 的槽位号
private int calculateIndex(final long sequence)
{
return ((int) sequence) & indexMask;
}
// 计算 sequence 的圈数
private int calculateAvailabilityFlag(final long sequence)
{
return (int) (sequence >>> indexShift);
}

比如 ring buffer 的 buffer size = 8,indexMask = 7(0111),indexShift = log2(8) = 3

sequence(13) 的槽位号 index = 1101 & 0111 = 0101 = 5,圈数 flag = 13 >>> 3 = 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public long tryNext(final int n) throws InsufficientCapacityException
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.get();
next = current + n;

if (!hasAvailableCapacity(gatingSequences, n, current))
{
throw InsufficientCapacityException.INSTANCE;
}
}
while (!cursor.compareAndSet(current, next));
return next;
}

tryNext 方法将 cursor + n,cursor 是 ring buffer 上可用的最高位 sequence,比如一个线程 A 想向队列写 n 个消息,通过 tryNext(n) 即可将 cursor 向前移动 n 位,则 [cursor + 1, curosr + n] 区间内的槽只能由线程 A 写入,其他线程再写入,需要从 cursor + n + 1 开始写,从而避免了每次写都要处理并发冲突;这么看来,在写入 ring buffer 时,先在线程内部积累一些数据,再批量写入

不过这样带来一个问题:ring buffer 被分成多份同时写,不能通过比较消费最高位和生产最高位的大小,来判断槽位是否可用了,因为中间可能存在没生产完的槽位,availableBuffer 就是用来解决这个问题的

生产者在 sequence 的槽位 i 上生产完消息,执行 publish,将 availableBuffer[i] 的值设为 flag

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void publish(long sequence) {
this.setAvailable(sequence);
this.waitStrategy.signalAllWhenBlocking();
}
private void setAvailable(long sequence) {
this.setAvailableBufferValue(this.calculateIndex(sequence), this.calculateAvailabilityFlag(sequence));
}
// 计算 sequence 是否可用(即有数据/可消费)
public boolean isAvailable(final long sequence)
{
int index = calculateIndex(sequence);
int flag = calculateAvailabilityFlag(sequence);
// availableBuffer[index] == flag,认为 sequence 是可用的
return (int) AVAILABLE_ARRAY.getAcquire(availableBuffer, index) == flag;
}

例如 buffer size = 8,生产者写完 sequence(13),availableBuffer[5] = 1

消费者来读 sequence(5),flag = 0 不等于 availableBuffer[5],说明 sequence(5) 不可用,已经被 sequence(13) 覆盖掉了

sequence(14) 的 index = 6,flag = 1,availableBuffer[6] = 0,不相等,说明 sequence(14) 不可用,因为 sequence(14) 还是第一圈的旧数据,已经被消费过了,第二圈的还没有写入

MultiProducerSequencer 生产 n 个消息的执行流程是:

  1. 创建一个大小为 bufferSize 的 ring buffer、availableBuffer,indexMask,indexShift
  2. tryNext 申请 n 个连续槽位(CAS)
  3. 如果剩余槽位足够,将 cursor 后移 n 位
  4. 向槽位写入数据,通过 publish 修改 availableBuffer 对应位置的状态(flag)
  5. 如果槽位不足,执行等待策略

Ring Buffer

Ring Buffer 是一个环形数据缓冲区,用于存放 Event,数据结构如下

ring_buffer.drawio

说明:

  1. 前后的 56 byte 用于填充缓存行,防止 indexMask 和 Sequencer 发生伪共享
  2. indexMask 用于计算 sequence 的槽位号,同 Sequencer
  3. entries 是 Event 对象引用的数组,长度为 2 的 n 次幂,每个元素大小为 4 byte(开启指针压缩),前后 32 个元素被指定为 BUFFER_PAD(128 byte)
  4. BufferSize:Sequencer 的长度

不是很理解 BUFFER_PAD 的长度为什么设计为 32,缓存行填充只需要 56 byte,14 个元素即可

指针压缩:64位系统中,每个指针默认大小 8 byte,为了节省内存空间,开启指针压缩可以让指针大小缩小为 4 byte(默认开启);32 位系统指针都是 4 byte,不存在指针压缩

重要方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 向后申请一个 sequence
public long next()
{
return sequencer.next();
}
// 取 sequence 上的数据
public E get(final long sequence)
{
return elementAt(sequence);
}
// 将 sequence 位置上数据的状态改为已发布
public void publish(final long sequence)
{
sequencer.publish(sequence);
}

可以看到,ring buffer 都是通过 sequencer 来操作槽位的状态

Sequence Barrier

屏障是用来控制消费者,消费者只能消费屏障内的消息,防止消费到不可用的消息

消费者消费的模式分为两种:

  1. 依赖生产者最大可达数据
  2. 消费者依赖图

为了更简单明白原理,只对模式1分析

Sequence Barrier 的结构如下

sequencer:生产者

waitStrategy:等待策略

cursorSequence:当前生产到的消息

dependentSequences:依赖的消息

消费者希望消费 sequence 位置上的消息前,要通过屏障的 waitFor 方法来获取可用的位置(屏障边界),消费者拿到可用位置后,就可以消费这个位置以内的消息

比如:连续可用消息的最大位置是 100,消费者 A 希望消费位置 90,则屏障返回 100,说明此刻消费者 A 可以消费位置 100 以内的消息;消费者 B 希望消费位置 101,由于 101 还未被生产,屏障会执行等待策略,阻止 B 消费

代码解读:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 入参:sequence 希望消费的位置
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// 等待策略有多种,这里用默认的 BlockingWaitStrategy 来介绍:比较期望位置(sequence)和当前生产位置(cursorSequence)的大小,如果 sequence > cursorSequence 等待,如果 sequence <= cursorSequence,返回 cursorSequence,如果有依赖节点,则需 sequence <= dependentSequence,否则等待
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
// 对于 BlockingWaitStrategy 返回的是一个大于等于 sequence 的值(比如休眠一段时间起来发现,cursorSequence 已遥遥领先 sequence,得抓紧干活了),故不会进入下面的分支
if (availableSequence < sequence)
{
return availableSequence;
}
// 当 BlockingWaitStrategy 返回一个大于 sequence 的值,在 multiProducerSequener 并发生产时,可能 cursorSequence 内有不可用的位置,所以要从 sequence 开始向后遍历,找到最大的可用位置
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

当消费者线程进入休眠,生产者生产了一条消息,就要通知所有消费者起来干活,是通过屏障的 alert 方法

1
2
3
4
5
public void alert()
{
alerted = true;
waitStrategy.signalAllWhenBlocking();
}

上面两个方法都用到了 WaitStrategy 的方法,下面继续介绍

WaitStrategy

消费者消费时,可能遇到当前 sequence 是不可用状态,需要按照给定的策略等待

有以下几个等待策略

策略 适用场景
BlockingWaitStrategy CPU 资源宝贵,吞吐量和低延迟次要
BusySpinWaitStrategy CPU 性能比较好,追求高吞吐量和低延迟
TimeoutBlockingWaitStrategy CPU 资源宝贵,吞吐量和低延迟次要
LiteTimeoutBlockingWaitStrategy CPU 资源宝贵,吞吐量和低延迟次要
PhasedBackoffWaitStrategy CPU 资源宝贵,吞吐量和低延迟次要

下面分别介绍

BlockingWaitStrategy

因为消息之间可能存在依赖,需要等待 dependentSequence 被消费,为了简化理解,我们先认为消息之间没有依赖,即 dependentSequence = cursorSequence(当前消息)

该策略使用系统调用让消费线程等待,在生产一条消息后再唤醒消费线程,当吞吐量和低延迟不如 CPU 资源重要时,可以使用此策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public long waitFor(final long sequence, final Sequence cursorSequence, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
// 期望位置大于生产最大位置,期望位置消息不可用,需等待 cursorSequce 增大
if (cursorSequence.get() < sequence)
{
// wait 方法必须在同步代码块中使用
synchronized (mutex)
{
while (cursorSequence.get() < sequence)
{
// 监听是否有新消息被生产
barrier.checkAlert();
// 释放锁,开始等待 barrier 的通知(alert方法)
mutex.wait();
}
}
}
// 在依赖生产者最大可达数据情况下,dependentSequence 就是生产最大位置,也就是 cursorSequence
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
Thread.onSpinWait();
}
// 返回最大生产位置(>= sequence),由于中间可能存在未生产完的情况,在 barrier 中要对 [sequence, avaiableSequence] 的可用性做检查,找到最大可用位置
return availableSequence;
}

在上面代码中,消费者期望位置不可用,会先占有 mutex 锁,通过 mutex.wait() 方法让线程进入等待状态

当生产者调用 barrier 的 alert 方法,会调用 waitStrategy 的 signalAllWhenBlocking 来唤醒 mutex 上等待的消费者

1
2
3
4
5
6
7
8
@Override
public void signalAllWhenBlocking()
{
synchronized (mutex)
{
mutex.notifyAll();
}
}
BusySpinWaitStrategy

与 BlockWaitStrategy 的区别是,不需要线上等待,而是周期性轮询直到位置可用

这个策略避免了系统调用,不过增大了 CPU 的 busy 程度,如果 CPU 性能比较好可以考虑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public long waitFor(
final long sequence, final Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
Thread.onSpinWait();
}
return availableSequence;
}

public void signalAllWhenBlocking()
{
}
TimeoutBlockingWaitStrategy

在 BlockWaitStrategy 基础上加入了线程等待的最大时间,防止线程无限期的等待下去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (cursorSequence.get() < sequence)
{
synchronized (mutex)
{
while (cursorSequence.get() < sequence)
{
barrier.checkAlert();
timeoutNanos = awaitNanos(mutex, timeoutNanos);
if (timeoutNanos <= 0)
{
throw TimeoutException.INSTANCE;
}
}
}
}
LiteTimeoutBlockingWaitStrategy

在 TimeoutBlockingWaitStrategy 基础上,加上了是否需要通知校验的优化,只有在有消费者等待的情况下才会 notifyAll

1
2
3
4
5
6
7
8
9
10
public void signalAllWhenBlocking()
{
if (signalNeeded.getAndSet(false))
{
synchronized (mutex)
{
mutex.notifyAll();
}
}
}
PhasedBackoffWaitStrategy

先自旋,自旋次数达到阈值后,采用 backup 策略

更多的策略可阅读源码

Event Processor

消费处理器,用于执行消费逻辑

实现:BatchEventProcessor

结构:

  • dataProvider:用于读取某个位置上的数据
  • sequenceBarrier:屏障
  • eventHandler:消息处理器
  • maxBatchSize:消费一批消息的最大数量
  • batchRewindStrategy:重试策略
  • sequence:当前已消费的位置

执行消费方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void processEvents()
{
T event = null;
// 已消费的下一个位置
long nextSequence = sequence.get() + 1L;
while (true)
{
// 要消费的起始位置
final long startOfBatchSequence = nextSequence;
try
{
try
{
// 获取包含 nextSequence 的可消费的最大位置
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
// 计算本消费批次的结束位置
final long endOfBatchSequence = min(nextSequence + batchLimitOffset, availableSequence);

if (nextSequence <= endOfBatchSequence)
{
// 实现为空,不太理解这里是做什么
eventHandler.onBatchStart(endOfBatchSequence - nextSequence + 1, availableSequence - nextSequence + 1);
}

while (nextSequence <= endOfBatchSequence)
{
event = dataProvider.get(nextSequence);
// 执行消费逻辑
eventHandler.onEvent(event, nextSequence, nextSequence == endOfBatchSequence);
nextSequence++;
}

retriesAttempted = 0;
// 更新已消费位置
sequence.set(endOfBatchSequence);
}
...
}
...
}
}

流程汇总

通过下面几个步骤串起 disruptor 生产和消费过程(多生产者,基于最大生产消息)

步骤1. 生产者 P1、P2、P3 调用 MultiProducerSequencer.tryNext(n) 使用 CAS 修改 cursor 的值,分别申请 11、15、25 个位置,并将 ring buffer 的 cursor 后移至 50,同时生产消息;生产完位置 sequence,令 availableBuffersequence = 0

disruptor-flow.drawio

假设P1生产完 sequence = 7 后阻塞

步骤2. 消费者 C1 开始消费

C1 调用 barrier.waitFor(1),maxBatchSize = 10,获取可消费的最大位置 = 7,C1 开始从 0 开始消费到 7。消费完,C1 的 sequence = 7

步骤3. C1 调用 barrier.waitFor(8),发现位置 8 的消息不可用(availableByffer[8] = -1 不等于 0),执行等待策略

步骤4. P1 此时阻塞结束,生产完消息 8 的消息,执行 publish,将 availableBuffer[8] = 0,通过 barrier.alert 方法通知 C1

步骤5. C1 被唤醒,barrier.waitFor(8) = 8,消费位置 8 的消息

总结

Disruptor 是通过下面设计思想,解决内存队列并发性能问题的

  • 环形数组:环形数组可以覆盖过期的数据,减少 GC 次数;同时,数组可以充分利用 CPU 的缓存共享机制
  • 预申请:提前申请一段独享空间,避免每次写都争抢资源
  • 无锁:通过 CAS 保证线程安全
  • 属性填充:避免伪共享

Reference

Disruptor: High performance alternative to bounded queues for exchanging data between concurrent threads

高性能队列 - Disruptor

Disruptor介绍及原理讲解

Disruptor(二)Sequencer

高并发数据结构Disruptor解析(3)

高并发数据结构Disruptor解析(5)