0%

服务限流 原理与实现

本文简述限流的原理,代码走读分享

常见的限流算法

给定单位时间 T(统一设为1s)内的最大访问量 MR,下面讨论几种限流算法

不平滑的限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private AtomicInteger count;
private Long nextTime;
private Long maxReqNum;

private boolean limit() {
long now = System.currentTimeMillis();
if (now > nextTime) {
count = new AtomicInteger(1);
nextTime = now + 1000;
return true;
}
int num = count.incrementAndGet();
if (num > maxReqNum) {
return false;
}
return true;
}
  1. 单位时间 T 内维护计数器 count
  2. 当请求到达时,判断时间是否进入下一个单位时间
  3. 如果是,重置 count = 1
  4. 如果不是,count++,判断 count 是否超过最大访问量 MR,如果超过,则拒绝访问

不平滑的限流算法在两个单位时间的临界值上可能失效,如下图,在每个 T 内可以满足 MR <= 3,但在临界的 T 内存在 MR = 5

不平滑限流器.drawio

漏桶

漏桶算法思路是先将请求进入到桶内,漏桶以固定的速度出水,出水意味着处理请求,如果桶已满,则水会溢出,也就是拒绝请求。漏桶可以控制请求的处理速率

漏桶不能处理突发请求,比如 MR = 1000,T 时刻我们希望处理 1000 条请求,漏桶会把这 1000 个请求平铺在 1 秒内处理完,而不会在 T 时刻同时处理这 1000 个请求

令牌桶

令牌桶以恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。相比于漏桶,令牌桶允许处理突发请求

如何实现

我的思路:设一个计数器 count 表示桶容积,是写一个线程向一个桶里循环的放令牌(count++),每次 sleep 1000/限流值 ms,当有请求来的时候 count--

看起来是很直观的方法,但是存在两个严重的问题:

  1. 如果有多个接口都要限流,要有多个限流桶,也就有多个线程在一直运行
  2. 为了处理同时投放和取令牌的场景,每次放/取令牌都要抢锁/加锁

显然这个方案是不行的,下面看下 guava 的实现方式(guava:23.0),大致思路是:请求时再取令牌+放令牌

guava 的实现,分为 SmoothBurstry 和 SmoothWarmingUp

SmoothBurstry

创建令牌桶
1
2
3
4
5
6
7
8
9
10
11
// 创建一个令牌桶,每秒投放 premitsPerSecond 个令牌
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

maxBurstSeconds:最多存储多少秒的令牌,默认值为1s

permitsPerSecond:每秒生产的令牌数

1
2
3
4
5
6
7
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}

设置限流

1
2
3
4
5
6
7
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}

stableIntervalMicros:生产一个令牌的毫秒数

重新计算桶内令牌数、下一次可以取令牌时间

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
double coolDownIntervalMicros() {
return stableIntervalMicros;
}

void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}

nowMicros:当前时间戳(毫秒)

nextFreeTicketMicros:下一次可以取令牌的时间戳

newPermits:新产生的令牌数

maxPermits:最大容积(也就是1秒产生的令牌数)

storedPermit:桶内的令牌数

设置最大容积和已有令牌数

第一次创建或修改限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
// oldMaxPermits 为 0,说明第一次创建,storedPermits = 0
// oldMaxPermit 不为 0,说明已创建过,本次是修改限流,做等比例缩放
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}

oldMaxPermits:之前的容积

maxBurstSeconds:装满令牌桶需要的时间(秒)

从令牌桶里取令牌
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 通常取一个令牌
@CanIgnoreReturnValue
public double acquire() {
return acquire(1);
}

/**
* permits: 要取的令牌数,通常为 1
* 返回值:0 表示不限流,非0 表示等待的时长(毫秒)
**/
@CanIgnoreReturnValue
public double acquire(int permits) {
// 预约,如果当前不能直接获取到 permits,需要等待
long microsToWait = reserve(permits);
// sleep 阻塞
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 返回 sleep 的时长
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

计算取到下一个令牌需要的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 预约 permits(通常为1)个令牌需要的时间
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 可获取令牌的时间
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 还需等待的时间,0 表示不需等待
return max(momentAvailable - nowMicros, 0);
}

@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 更新 storedPermits
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// 桶内令牌不足时,还需要添加的令牌数
double freshPermits = requiredPermits - storedPermitsToSpend;
// 计算需要等待的时间,storedPermitsToWaitTime 在 SmoothBursty 中恒为 0,在 SmoothWarmingUp 有其他实现
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 计算下一次发令牌的时间
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 更新桶内令牌数
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}

本次不管 storedPermits 够不够,返回的是 nextFreeTicketMicros 的旧值,当次的 acquire 是可以成功取到令牌的,如果 storedPermit 不够,将 nextFreeTicketMicros 往后推一段时间

SmoothBursty 可以处理突发请求,它会缓存最多 1 秒的 permits

SmoothWarmingUp

SmoothWarmingUp 适用于资源需要预热的场景,比如我们的某个接口业务,需要使用到线程池、数据库连接、缓存,如果我们的系统刚启动,池子中的线程还没有创建,或者长时间处于低负载或零负载状态,线程被慢慢释放掉了,此时池子是冷的,如果此时有大量请求,线程池需要瞬间创建大量的线程,可能会导致系统负载过高甚至压垮系统

这里导致系统负载高的原因,不是由于请求太大线程池处理不过来,线程池处理不过来可以进等待队列或者直接丢弃,这个过程不会消耗什么资源

所以线程池是冷的,我们不能让大流量都进入线程池,要给线程池一个预热过程

在 SmoothBurstry 中,由于桶里已经有1秒的 permits,突发请求会直接通过,不给预热的时间;在 SmoothWarmingUp 中,取令牌需要耗时的,这样对取令牌的速度进行控制

在下图中,X 轴代表 storedPermits,Y 轴代表获取一个 permit 需要的时间(在令牌足够的情况下),storedPermits 越大,代表越冷,获取令牌耗时越多

smooth-warm-up

如图分析三种场景:

  1. 系统繁忙时,storedPermits <= thresholdPermits,需要的时间为 stable interval,最短时间
  2. 系统空闲时,storedPermits = maxPermits,需要时间为 Cold interval,最大时间
  3. 系统预热过程中,thresholdPermit < storedPermits < maxPermits,需要时间在 stable interval 到 cold interval 之间

图片引用于 https://www.javadoop.com/post/rate-limiter

参数值设定

直接介绍下代码中给出的参数值,设长方形面积(RS) = stableInterval * thresholdPermits,梯形面积(TS) = (stableInteval + coldInteval) * (maxPermits - thresholdPermits) / 2,coldInteval = stableInterval * coldFactor,其中 coldFactor 为常数 3

有关系如下:

1
2
3
TS = 2 * RS;
thresholdPermits = TS * 0.5 / stableInterval;
maxPermits = 2 * RS / (stableInterval + coldInteval) + thresholdPermits;
如何实现

只看下和 SmoothBurstry 差异的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
double coldIntervalMicros = stableIntervalMicros * coldFactor;
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
// 梯形边的斜率
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}

计算取令牌需要等待的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
// 大于 thresholdPermits,需要预热,计算梯形面积
if (availablePermitsAboveThreshold > 0.0) {
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// TODO(cpovirk): Figure out a good name for this variable.
double length = permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
// 梯形面积
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
// 矩形x轴边长
permitsToTake -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
// 总时间 = 梯形面积 + 矩形面积
micros += (stableIntervalMicros * permitsToTake);
return micros;
}

其他流程与 SmoothBurstry 相同

由于 SmoothWarmingUp 每次取令牌都有等待时间,会让每个请求之间保持一段时间,所以 SmoothWarmingUp 可以看作漏桶

Sentinel

Reference

流量控制算法——漏桶算法和令牌桶算法

限流-RateLimiter 源码分析 (Guava 和 Sentinel 实现)