限流器是服务治理的重要一环。但常见的讨论集中于对 频率
的限制。本文结合笔者最近的需求,分析 Go 官方限流器 time/rate 的实现原理,结合实践对 带宽/流量限制
可能遇到的问题进行讨论。
注:本文分析的源代码为
go get golang.org/x/time/rate@v0.3.0
Table of Contents
0 背景
我们为 api 请求限制频率时,可以使用限流器可以保护服务不被流量冲垮。但是,如果在大量数据传输的场景中,直接使用限流器,是否可行?需求如下:
- 应用层需要自己控制数据同步速度
- 限流值可以随时更新,包括比较大的带宽变化。比如 10Gb/s -> 1Mb/s
1 限流器基础原理
常见限流器为 令牌桶(Token Bucket) 和 漏桶(Leaky bucket)。
Go 的官方限流库 time/rate 实现了令牌桶, Uber 开源的限流库实现的是漏桶。
1.1 令牌桶
(图 1 令牌桶原理示意 [1, 来源请求])
想象一个桶,
- 向桶内添加令牌,速度 T token/秒
- 桶的容量最大为 Burst 个
- 根据请求大小,尝试取出 N 个 token
- 若剩余 token 不足 则等待
1.2 漏桶
想象一个队列,
- 该队列最大处理速度为 N/s
- 请求进行排队,逐步处理
1.3 频率限制/带宽限制
我们进行带宽限制,可以以 Byte 为单位设置限流器,每次请求用实际 Byte 数申请通过或等待。
原理看似简单,经过笔者实践,需要考虑以下问题
a. 是否容忍突发?
令牌桶最大的特点之一就是容纳了 burst 个令牌,可以提供临时的突发份额。
如果不能容忍突发,漏桶更加合适。
b. 最小的请求是否 < burst(桶大小) ?
在 Go time/rate 中,请求的 N 不允许大于 burst。
这意味着限流值不能随意调整为很小的值。
c. wait 的阻塞时间!
对于 QPS
来讲,每次 take 的数量一般为 1
。如果限流器限流不低于 1 QPS
,阻塞时间在 1s 以内,基本可以忽略。更新限流器,也能较快生效。
但对于 Bandwidth
,如果限流值和 take 数量相差较大,将会导致意料外的长时间阻塞!。
除了阻塞请求本身外,如果限流器内部长时间地持有锁,也会导致更新
操作阻塞异常!。
比如限流 1KB/s
,请求 10MB
,下一个请求将阻塞 ~1000s
。更新限流值也将在 1000s
后才能生效!很多时候这是我们不想看到的 —— 我们宁愿退让 10s
后重试。
3 golang.org/x/time/rate
golang.org/x/time/rate 是 Go 官方提供的限流库。提供了以下方式使用
3.1 基本使用
// 创建每秒 r 个token,桶大小为 b 的限流器
func NewLimiter(r Limit, b int)
Wait/WaitN 会尝试请求,如果被限流则阻塞。注意,如果请求的 N>burst,请求将失败!
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
Allow/AllowN 检测是否允许,会立即返回
func (lim *Limiter) AllowN(t time.Time, n int) bool
Reserve/ReserveN 会返回一个 Reservation
,告知用户是否可以执行,需要等待多少时间。并提供 cancel 取消并归还预留的 tokens。
func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation
使用例
r := lim.ReserveN(time.Now(), 1)
if !r.OK() {
// Not allowed to act! Did you remember to set lim.burst to be > 0 ?
return
}
time.Sleep(r.Delay())
Act()
3.2 源码
lim.reserveN
Wait/WaitN, Allow/AllowN, Reserve/ReserveN 实际上都是通过内部方法 reserveN 实现的。
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
// 获取全局锁
// 意味着该限流器所有的请求都会争抢这个锁
lim.mu.Lock()
defer lim.mu.Unlock()
...
// 处理 limit = Inf 和 0 的情况
// limit=Inf 放行
// limit=0,burst 满足则放行,否则不 ok
...
// lazy 计算到目标时间 token 个数
// 以当前的 limite 值 * 经过时间
t, tokens := lim.advance(t)
// 扣减请求的 N
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// 如果为负,得等!计算等待时间
// 注意:在此后,时间已经返回了。
// 这意味着,无论是 Wait 还是 Reserve 后自行 sleep,
// 这期间更新限流器限流值,不能更改 sleep 时间了,
// 这可能导致带宽限流的长时间 sleep
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
...
return r
}
WaitN, AllowN
都是使用 reserveN 实现
WaitN:
func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
... 处理ctx,burst
... ctx 到期、N 大于 burst 报错
... limit 为 Inf 时直接返回通过
// 使用 Reserve 判断
r := lim.reserveN(t, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// ctx 和 timer 共同在 select 等待
// 虽然此时未占用锁,但意味着不会接收 limiter 更新
// Wait if necessary
delay := r.DelayFrom(t)
if delay == 0 {
return nil
}
ch, stop, advance := newTimer(delay)
defer stop()
advance() // only has an effect when testing
select {
case <-ch:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
}
SetLimit
SetLimit 会和请求共同争抢 mutex。改变 Limit/Burst 可能导致已经拿到 reserve 还未执行的请求限速不准。
func (lim *Limiter) SetLimit(newLimit Limit) {
lim.SetLimitAt(time.Now(), newLimit)
}
func (lim *Limiter) SetLimitAt(t time.Time, newLimit Limit) {
lim.mu.Lock()
defer lim.mu.Unlock()
t, tokens := lim.advance(t)
lim.last = t
lim.tokens = tokens
lim.limit = newLimit
}
实际代码中,只用到了 SetLimit(time.now, Limit)
。函数接口入参留了 time.Time
,仅用于单元测试。
这也提示我们,编写需要真实时间才能验证的库时,可以加入 time.Time
,用于在单元测试中模拟时间串行。
4 注意点
4.1 请求阻塞时发生了什么
- 请求拿
limit
和burst
时候,会争夺一次全局锁 - 请求在计算
reserve
的时候,会争夺第二次全局锁 - 拿到
reserve
后即释放,进入 select 等待状态 - 阻塞等待计数器唤醒。这时更新
limit
不会触发重新计算 - 其他请求继续重复上部分过程。若
limit
设置得很小,导致大部分请求都 hang 在等待计数器唤醒。
因此,针对带宽这种情形,笔者认为应该有判断和退让措施,比如
r := lim.ReserveN(time.Now(), 1000)
if !r.OK() {
// Not allowed to act! Did you remember to set lim.burst to be > 0 ?
return
}
if r.Delay() > time.Second * 10 {
// Delay is too large! Shouldn't act.
// return or backoff sleep
r.Cancel()
return
}
time.Sleep(r.Delay())
Act()
4.2 更新限流值,导致 hang 死?
比如日常带宽10Gb/s
限制到 1KB/s
,再变化到 10Gb/s
,会有问题。
此种场景可能常见,比如运维同学见压力大时,将后台的同步任务关得很小。
此时如果使用 Reserve
+ 退让措施 + 告警,会更加合理。
4.3 单个请求大小 > burst?
受限于 time/rate 库设计,此情况不能通过。因此应当视情况开大 Burst
。
5 小结
本文主要围绕 带宽限制
,分析了 Go 官方限流库。并分析了直接使用其作为限流器,可能遇到的问题和隐患
- 了解请求被限流阻塞时的行为
- 注意限流器更新是否及时,大幅度限流值变化引起的假死
- 判断阻塞时间,加入必要的退让措施
参考资料
[1][2] (来源请求: 实在没找到原始来源,看起来是个很经典的教材)
[3] Go 的官方限流库 time/rate