当前位置: 首页 > news >正文

开源限流组件分析(二):uber-go/ratelimit

文章目录

  • 漏桶限流算法
  • uber的漏桶算法
  • 使用
  • mutex版本
    • 数据结构
    • 获取令牌
    • 松弛量
  • atomic版本
    • 数据结构
    • 获取令牌
    • 测试漏桶的松弛量
  • 总结

漏桶限流算法

漏桶限流算法思路很简单,水(数据或者请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,漏桶算法能强行限制请求的处理速度
在这里插入图片描述


相比于令牌桶中只要桶内还有剩余令牌,调用方就可以马上消费令牌的策略。漏桶相对来说更加严格,调用方只能按照预定的间隔顺序消费令牌

例如:假设漏桶出水的间隔是10ms,上次在0ms时刻出水,那么下次是10ms时刻,再下次是20ms时刻

uber的漏桶算法

uber对标准的漏桶算法做了一些优化,加了一些松弛量,这么做了后就能应对突发流量,达到和令牌桶一样的效果

关于什么是松弛量,在介绍获取令牌流程时再详细分析

阅读的源码:https://github.com/uber-go/ratelimit,版本:v0.3.1

使用

下面展示一个没有使用松弛量的漏桶使用示例:

func Example_default() {// 参数100:代表每秒放行100个请求,即每10ms放行一个请求rl := ratelimit.New(100)prev := time.Now()for i := 0; i < 10; i++ {// Take获取令牌,返回获取令牌成功的时间now := rl.Take()if i > 0 {// 打印每次放行间隔fmt.Println(i, now.Sub(prev))}prev = now}
}

打印结果:

1 10ms
2 10ms
3 10ms
4 10ms
5 10ms
6 10ms
7 10ms
8 10ms
9 10ms

可以看出每次放行间隔10ms,符合预期

uber的漏桶提供了mutex版本atomic版本两种实现,主要区别在于怎么控制并发,以及怎么记录松弛量


mutex版本

数据结构

type mutexLimiter struct {sync.Mutex// 上次请求放行的时刻last time.Time// 桶中积累的松弛量sleepFor time.Duration// 每个请求之间的间隔perRequest time.Duration// 最大松弛量maxSlack   time.Durationclock      Clock
}

初始化桶:

func newMutexBased(rate int, opts ...Option) *mutexLimiter {config := buildConfig(opts)perRequest := config.per / time.Duration(rate)l := &mutexLimiter{perRequest: perRequest,// config.slack默认值=10, 例如当perRequest=10ms时// maxSlack就是 -10 * 100ms = -1smaxSlack: -1 * time.Duration(config.slack) * perRequest,clock:    config.clock,}return l
}func buildConfig(opts []Option) config {c := config{clock: clock.New(),// 最大松弛量默认是perRequest的10倍slack: 10,per:   time.Second,}for _, opt := range opts {opt.apply(&c)}return c
}

主要设置两个变量:

  • 每次放行时间间隔perRequest:根据参数rate计算,rate表示每单位时间per能放行多少请求

    • 例如当per=1s,rate=100时,每次放行时间间隔perRequest=10ms
  • 最大松弛量maxSlack:config.slack默认值=10

    • 例如当perRequest=10ms时,maxSlack就是 -10 * 100ms = -1s

获取令牌

要实现标准的漏桶算法,其实比较简单:

记录上次放行时间last,当本次请求到来时, 如果此刻的时间与last相比并没有达到 perRequest 规定的间隔大小, sleep 一段时间即可

在这里插入图片描述

对应代码为(删除了松弛相关代码):

func (t *mutexLimiter) Take() time.Time {t.Lock()defer t.Unlock()now := t.clock.Now()// 如果第一次请求,直接放行if t.last.IsZero() {t.last = nowreturn t.last}// 当前时间距离上次放行时间的间隔,sleepFor := t.perRequest - now.Sub(t.last)// 如果比规定的间隔小,需要sleepif sleepFor > 0 {t.clock.Sleep(t.sleepFor)t.last = now.Add(t.sleepFor)t.sleepFor = 0return t.last
}

松弛量

如果不引入松弛量,按照标准漏桶算法的流程:

假设现在有三个请求,req1req2req3,获取令牌间隔perRequest规定为 10ms

  1. req1 先到来

  2. req1 完成之后 20msreq2 才到来,此时距离上次放行有20ms,大于10ms,可以对req2 放行

  3. req2 放行后5msreq3 到来,此时距离上次放行才5ms,不足 规定的间隔10ms,因此还需要等待 5ms 才能继续放行req3

在这里插入图片描述

这种策略有什么问题?无法应对任何突发流量,没有弹性,定死了相邻两次请求放行的间隔必须大于等于规定的值

于是引入了松弛量的概念:当较长时间(超过perRequest)没有请求到来时,会在桶中积累一些松弛量,这样接下来的请求可以先消耗松弛量,而不会被漏洞的获取令牌间隔限制。直到把松弛量耗尽为止

当然松弛量也不是无限积累,这样当很长时间没有请求后,就跟没有限流没区别了。因此桶中规定了最多积累多少松弛量maxSlack

加上松弛量后代获取令牌代码如下:

  1. 计算perRequest - now.Sub(t.last)

    1. 如果大于0,说明当前时间距离上次放行时间不足规定间隔,需要等待,或者需要消耗松弛量
    2. 如果小于0,说明当前时间距离上次放行时间超过了规定间隔,那么当前请求一定可以放行,并且可以在桶中积累一些松弛量,给下次请求使用
    3. 将结果累加到t.sleepFor
  2. 假设累加的松弛值超过了maxSlack,修正为maxSlack,默认为10倍的规定间隔

  3. 如果t.sleepFor > 0,说明本次请求应该等待的时间,在使用完桶中的松弛量后还不够,需要sleep这段时间,结束后将sleepFor置为0

  4. 否则t.sleepFor <= 0,说明桶中的存储松弛余量可以满足本次请求,本次请求放行

func (t *mutexLimiter) Take() time.Time {t.Lock()defer t.Unlock()now := t.clock.Now()// 如果第一次请求,直接放行if t.last.IsZero() {t.last = nowreturn t.last}// 假设perRequest=10ms,now.Sub(t.last)=5ms,那么需要睡5ms// 假设perRequest=10ms,now.Sub(t.last)=15ms,说明距离上传请求的时间超过了漏桶的限流间隔10ms,那么sleepFor变为-5t.sleepFor += t.perRequest - now.Sub(t.last)// 默认最多松弛 10倍的perRequest// 如果累加的松弛值超过了maxSlack,修正为maxSlackif t.sleepFor < t.maxSlack {t.sleepFor = t.maxSlack}// 需要sleepif t.sleepFor > 0 {t.clock.Sleep(t.sleepFor)t.last = now.Add(t.sleepFor)t.sleepFor = 0// 如果sleepFor <= 0,说明还有存储的松弛余量,可以放行} else {t.last = now}return t.last
}

atomic版本

数据结构

type atomicInt64Limiter struct {// 上次在哪个时刻发放许可state      int64// 每次请求间隔perRequest time.Duration// 最大松弛量maxSlack   time.Durationclock      Clock
}func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter {config := buildConfig(opts)perRequest := config.per / time.Duration(rate)l := &atomicInt64Limiter{perRequest: perRequest,// 最大松弛量:10倍的perRequestmaxSlack: time.Duration(config.slack) * perRequest,clock:    config.clock,}atomic.StoreInt64(&l.state, 0)return l
}

获取令牌

atomic版本的漏桶不是用一个变量存储桶中的松弛量,而是记录了上次在哪个时刻发放许可timeOfNextPermissionIssue,那松弛量用什么表示呢?当前时间now减去上次发放许可时间timeOfNextPermissionIssue就代表松弛量

怎么理解?这个值代表应该在什么时刻发放令牌:

  • 如果该值 比now大,说明需要sleep一段时间等待时间流逝,本次才能获取到令牌
  • 如果该值 比now小,说明应该在更早的时间就可以获取令牌,本次请求不需要等待

每次请求时会在timeOfNextPermissionIssue的基础上加上perRequest,并更新timeOfNextPermissionIssue,表示本次发放许可的时间比上次前进了perRequest时间

如果较长时间没有请求,那天然就积累了一些松弛量,因为上次发放许可时间就会比较小,即便加上本次的消耗perRequest也没有now大,就可以直接放行

  • 如果上次发放许可时间,加上固定间隔perRequest小于now,说明本次可以放行,并更新上次发放许可时间 += perRequest

    • 当然timeOfNextPermissionIssue不能比now小太多,否则限流就没意义了。因此规定该值最多比now小10倍的perRequest,表示松弛量最多为perRequest的10倍
  • 否则不能放行,需要等待时间流逝到timeOfNextPermissionIssue + perRequest为止

func (t *atomicInt64Limiter) Take() time.Time {var (newTimeOfNextPermissionIssue int64now                          int64)for {now = t.clock.Now().UnixNano()// 上次在哪个时刻发放许可timeOfNextPermissionIssue := atomic.LoadInt64(&t.state)switch {// 第一次请求,或者不是第一次请求,且(没有松弛量,且now 比 上次发放许可时间 多了超过perRequest )case timeOfNextPermissionIssue == 0 || (t.maxSlack == 0 && now-timeOfNextPermissionIssue > int64(t.perRequest)):// 本次放行,本次在now时刻发放许可newTimeOfNextPermissionIssue = now// 可以有松弛量,且 当前时间 - 上次发放许可时间 > 11倍perRequestcase t.maxSlack > 0 && now-timeOfNextPermissionIssue > int64(t.maxSlack)+int64(t.perRequest):// 本次在 now - 最大松弛量 时刻发放许可newTimeOfNextPermissionIssue = now - int64(t.maxSlack)/**1.没有松弛量,且当前时间距离 上次发放许可时间 超过了 不到perRequest,本次应该在上次发放许可时间 + perRequest 再发放2.有松弛量,且 当前时间 - 上次发放许可时间 <= 11倍perRequest,那本次应该在上次发放许可时间 + perRequest 再发放*/default:// 更新上次发放许可时间,+= perRequestnewTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest)}if atomic.CompareAndSwapInt64(&t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) {break}}sleepDuration := time.Duration(newTimeOfNextPermissionIssue - now)// 没有余量了,需要sleepif sleepDuration > 0 {t.clock.Sleep(sleepDuration)// 返回放行时的时间return time.Unix(0, newTimeOfNextPermissionIssue)}return time.Unix(0, now)
}

测试漏桶的松弛量

下面对漏桶的松弛量进行测试:如果先积累一些松弛量,就会起到因对突发流量的效果,例如让时间先走45ms

func Example_default() {// 每10ms放行一个请求, 最大松弛量=100msrl := ratelimit.New(100)// 在0ms发放许可rl.Take()// 此时时间来到45mstime.Sleep(time.Millisecond * 45)prev := time.Now()for i := 0; i < 10; i++ {now := rl.Take()if i > 0 {fmt.Println(i, now.Sub(prev))}prev = now}
}

打印结果:

1 1µs
2 103µs
3 4µs
4 4.79ms
5 10ms
6 10ms
7 10ms
8 10ms
9 10ms

可以看出:前4次都没等,直接获取到令牌,第4次等了大约5ms,之后每次放行间隔都是10ms

执行到for循环时,上次发放许可时间10ms,当前时刻45ms

拆解for循环中的每次take中的漏桶状态:

  1. i=0, 下次应该在10ms发放许可,当前时刻45ms,放行
  2. i=1, 下次应该在20ms发放许可,当前时刻45ms,放行
  3. i=2, 下次应该在30ms发放许可,当前时刻45ms,放行
  4. i=3, 下次应该在40ms发放许可,当前时刻45ms,放行
  5. i=4, 下次应该在50ms发放许可,当前时刻45ms,更新timeOfNextPermissionIssue=50,需要sleep 5ms
  6. i=5, 下次应该在60ms发放许可,当前50ms,需要sleep10ms,更新timeOfNextPermissionIssue=60

如下图所示:

在这里插入图片描述

总结

加上松弛量后,这个漏桶和标准的令牌桶区别就不大了:

  • 都能应对一定的突发流量
  • 当桶中没有松弛量(没有令牌时),都按照固定时间间隔放行请求

这个库有个问题是:对桶中等待中的请求数没有限制,这样当并发量非常大时,会导致请求一直在桶中堆积

因此工程中要使用的话,最好对源码进行改造:当等待中的请求数超过阈值就快速返回失败


http://www.mrgr.cn/news/53352.html

相关文章:

  • 5、JavaScript(二) 对象+DOM
  • Ping百度,出现“ping:baidu.com: Temporary failure in name resolution“解决方案
  • NCCL报错
  • C++ 类的基础用法与详细说明:简单易懂的入门指南
  • PHP 表达式 Exception::__toString
  • (42)MATLAB中使用fftshift绘制以零为中心的功率谱
  • VideoCLIP-XL:推进视频CLIP模型对长描述的理解
  • 数据结构与算法 - 树 #数的概念 #二叉树 #堆 - 堆的实现/堆排序/TOP-K问题
  • 韩信走马分油c++
  • java缓存技术介绍
  • 如何实现采购数字化?
  • C语言笔记20
  • IO编程--多线程实现文件拷贝
  • 爬虫逆向-js进阶(续写,搭建网站)
  • 刘诗诗亮相 VOGUE 时尚盛典,三套造型出尘绝伦,美丽再惊艳众人
  • 业务诊断简介
  • [强网杯 2019]随便注1
  • 实验21:红外遥控实验
  • 通过比较list与vector在简单模拟实现时的不同进一步理解STL的底层
  • Hikyuu教程 | 滚动回测与滚动寻优系统