kratos源码分析:滑动窗口
前言
在限流器、熔断器中,需要根据过去一段时间内的请求数据来判断本次请求是否可以放行
- 在熔断器中,如果过去一段时间请求失败率高,就不放行
- 在限流器中,如果过去一段时间负载超过阈值,就不放行
此时就需要一个滑动窗口,用于平滑收集并统计这些数据。对于在窗口时间范围外的数据自动丢弃
数据结构
bucket
底层由每个桶bucket
存储,多个bucket构成一个window
Bucket 提供了 Append
方法,用于向 Points
中添加数据,Points
是 float64 类型的 slice,主要存放单个指标的值,如延迟,错误次数等等
type Bucket struct {Points []float64 // 单个节点中的统计数据Count int64 // 总数next *Bucket // 指向下一个桶,用于遍历桶
}
Bucket 提供了两种数值添加的接口:Append
和 Add
,这两个方法会被上层 Window 的方法调用
// 在point中直接追加数据
func (b *Bucket) Append(val float64) {b.Points = append(b.Points, val)b.Count++
}// 在Bucket 的 Points 数组中的指定 index 位置累加值
func (b *Bucket) Add(offset int, val float64) {b.Points[offset] += valb.Count++
}
window
window由多个桶组成
type Window struct {buckets []Bucketsize int
}
初始化滑动窗口:
func NewWindow(opts Options) *Window {buckets := make([]Bucket, opts.Size)for offset := range buckets {// 初始化每个bucketsbuckets[offset].Points = make([]float64, 0)nextOffset := offset + 1if nextOffset == opts.Size {// 最后一个节点要连上第一个节点nextOffset = 0}// 每个节点都指向下一个节点,构建成环buckets[offset].next = &buckets[nextOffset]}return &Window{buckets: buckets, size: opts.Size}
}
window也提供了append和add方法:
// 往第offset个桶 append
func (w *Window) Append(offset int, val float64) {w.buckets[offset%w.size].Append(val)
}// 往第offset个桶 的0号位置累加值
func (w *Window) Add(offset int, val float64) {offset %= w.sizeif w.buckets[offset].Count == 0 {// 如果没有第0个,先初始化第0个w.buckets[offset].Append(val)return}w.buckets[offset].Add(0, val)
}
window的迭代器
迭代器用于滑动窗口的遍历,遍历的目的是为了对窗口的数据做提取和计算
比如,要计算截至当前时间滑动窗口的请求失败率,就需要遍历从窗口 start 位置到目前时间的所有 Bucket 的 错误总数/请求总数
为啥需要迭代器?方便外层使用:要遍历桶时,只用不断调迭代器的next()
方法判断还有没有下个桶,如果有就调迭代器的bucket()
方法拿到下个桶即可,不用关心内部的细节
迭代器结构:
// 遍历完成的条件: i.count == i.iteratedCount
type Iterator struct {count int // 要遍历多少个桶iteratedCount int // 已经遍历了多少个桶cur *Bucket // 从哪个桶开始遍历
}
构造迭代器:
/** 参数
offset: 从哪个桶开始遍历
count: 要遍历多少个桶
*/
func (w *Window) Iterator(offset int, count int) Iterator {return Iterator{count: count,cur: &w.buckets[offset%w.size],}
}
next方法规定了迭代器退出条件:已经遍历了count个桶
func (i *Iterator) Next() bool {return i.count != i.iteratedCount
}
获取要遍历的桶:
// Bucket gets current bucket.
func (i *Iterator) Bucket() Bucket {if !(i.Next()) {panic()}// 获取当前bucketbucket := *i.cur// 累加计数器i.iteratedCount++// 把cur指向下一个bucketi.cur = i.cur.Next()return bucket
}
滑动窗口RollingPolicy
数据结构
业务上真正可用的滑动窗口结构是RollingPolicy,封装了滑动窗口,加入了互斥锁、单个桶代表多长时间跨度、最近一次的有效的桶在哪个位置,最近一次有效桶的开始时间
// 外部可调用的结构体
type RollingPolicy struct {mu sync.RWMutex// 滑动窗口有多少个桶size int window *Window// 上次的有效最后一个位置offset int // 一个桶代表多长时间bucketDuration time.Duration // 上次的有效最后一个桶的开始时间lastAppendTime time.Time
}// 初始化RollingPolicy
func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy {return &RollingPolicy{window: window,size: window.Size(),offset: 0,bucketDuration: opts.BucketDuration,lastAppendTime: time.Now(),}
}
来看几个核心方法:
timespan计算当前调用此方法的时刻,距离上一次所在桶 滑过了几个 Bucket
func (r *RollingPolicy) timespan() int {v := int(time.Since(r.lastAppendTime) / r.bucketDuration)if v > -1 {return v}return r.size
}
添加数据
当有新的数据需要加入桶时:
- 如果当前时间距离上次最后一个桶的开始时间不足一个桶的跨度,就还是在上次那个桶操作
- 否则,就需要把一些已经过期的桶清空
- 然后在最新的桶中增加统计数据
例如:假设有4个桶,编号为0 1 2 3
-
上次有效的最后一个位置
r.offset=1
,因此上次有效开始位置start= offset + 1 = 2
- 为啥?因为整个是个
环
,结束位置的下一个就是开始微信
- 为啥?因为整个是个
-
本次跨过了1个桶,需要从2(上次有效开始位置)开始,将往后的1个桶清空
- 也就是将最早过期的1个桶清空
-
本次执行后,有效最后一个位置更新为2
func (r *RollingPolicy) apply(f func(offset int, val float64), val float64) {r.mu.Lock()defer r.mu.Unlock()// 跨过了几个buckettimespan := r.timespan()oriTimespan := timespan// 有跨度if timespan > 0 {// offset:上次的有效最后一个位置,+1就是上次的有效开始位置start := (r.offset + 1) % r.size// end:本次执行后,有效的最后一个位置end := (r.offset + timespan) % r.sizeif timespan > r.size {timespan = r.size}// 从start开始,往后timespan个bucket置空r.window.ResetBuckets(start, timespan)r.offset = end// 更新lastAppendTime为当前时刻所在bucket的开始时间r.lastAppendTime = r.lastAppendTime.Add(time.Duration(oriTimespan * int(r.bucketDuration)))}// 添加到offset位置f(r.offset, val)
}
统计数据
当需要统计桶中的数据时,调Reduce方法,核心是计算两个问题:
- 从哪个桶开始遍历
- 需要遍历多少个桶
首先计算当前时间距离最近一次写入跨过了多少桶timespan,因此offset + 1等于上次有效的开始位置,那么offset + 1 + timespan
就是跨过已过期的桶后,有效的开始位置
那么遍历多少个桶呢?总数size - timespan
,即总数减去过期的桶数,就是合法的桶数量
func (r *RollingPolicy) Reduce(f func(Iterator) float64) (val float64) {r.mu.RLock()defer r.mu.RUnlock()// 计算距离最近一次写入跨过了多少桶timespan := r.timespan()// 需要遍历的桶数量 = size - 跨过的桶if count := r.size - timespan; count > 0 {// 从哪个桶开始遍历呢?// 假设总共有4个桶:0 1 2 3。假设之前offset=2, 最新一条数据在offset=2的桶中// 本次timeSpan = 1,跨过了1个桶。那就从0位置开始遍历,遍历count = 3个桶// 此时offset=0,就是第一个有效的桶offset := r.offset + timespan + 1if offset >= r.size {offset = offset - r.size}// 迭代器从offset开始,遍历count个bucketval = f(r.window.Iterator(offset, count))}return val
}
现在知道要从哪个桶开始遍历,要遍历多少个桶后,就需要执行具体的业务统计操作
举个例子:当要通过所有桶中的总数时,传入f如下:
func Count(iterator Iterator) float64 {var result int64for iterator.Next() {bucket := iterator.Bucket()result += bucket.Count}return float64(result)
}
调迭代器从offset开始,遍历count个桶,把这些桶的Count值累加