当前位置: 首页 > news >正文

kratos源码分析:滑动窗口

前言

在限流器、熔断器中,需要根据过去一段时间内的请求数据来判断本次请求是否可以放行

  • 在熔断器中,如果过去一段时间请求失败率高,就不放行
  • 在限流器中,如果过去一段时间负载超过阈值,就不放行

此时就需要一个滑动窗口,用于平滑收集并统计这些数据。对于在窗口时间范围外的数据自动丢弃

数据结构

bucket

底层由每个桶bucket存储,多个bucket构成一个window

Bucket 提供了 Append 方法,用于向 Points 中添加数据,Points 是 float64 类型的 slice,主要存放单个指标的值,如延迟,错误次数等等

type Bucket struct {Points []float64 // 单个节点中的统计数据Count  int64  // 总数next   *Bucket // 指向下一个桶,用于遍历桶
}

Bucket 提供了两种数值添加的接口:AppendAdd,这两个方法会被上层 Window 的方法调用

// 在point中直接追加数据
func (b *Bucket) Append(val float64) {b.Points = append(b.Points, val)b.Count++
}// 在Bucket 的 Points 数组中的指定 index 位置累加值
func (b *Bucket) Add(offset int, val float64) {b.Points[offset] += valb.Count++
}

window

window由多个桶组成

type Window struct {buckets []Bucketsize    int
}

初始化滑动窗口:

func NewWindow(opts Options) *Window {buckets := make([]Bucket, opts.Size)for offset := range buckets {// 初始化每个bucketsbuckets[offset].Points = make([]float64, 0)nextOffset := offset + 1if nextOffset == opts.Size {// 最后一个节点要连上第一个节点nextOffset = 0}// 每个节点都指向下一个节点,构建成环buckets[offset].next = &buckets[nextOffset]}return &Window{buckets: buckets, size: opts.Size}
}

window也提供了append和add方法:

// 往第offset个桶 append
func (w *Window) Append(offset int, val float64) {w.buckets[offset%w.size].Append(val)
}// 往第offset个桶 的0号位置累加值
func (w *Window) Add(offset int, val float64) {offset %= w.sizeif w.buckets[offset].Count == 0 {// 如果没有第0个,先初始化第0个w.buckets[offset].Append(val)return}w.buckets[offset].Add(0, val)
}

window的迭代器

迭代器用于滑动窗口的遍历,遍历的目的是为了对窗口的数据做提取和计算

比如,要计算截至当前时间滑动窗口的请求失败率,就需要遍历从窗口 start 位置到目前时间的所有 Bucket 的 错误总数/请求总数

为啥需要迭代器?方便外层使用:要遍历桶时,只用不断调迭代器的next()方法判断还有没有下个桶,如果有就调迭代器的bucket()方法拿到下个桶即可,不用关心内部的细节

迭代器结构:

// 遍历完成的条件: i.count == i.iteratedCount
type Iterator struct {count         int // 要遍历多少个桶iteratedCount int // 已经遍历了多少个桶cur           *Bucket // 从哪个桶开始遍历
}

构造迭代器:

/** 参数
offset: 从哪个桶开始遍历
count: 要遍历多少个桶
*/
func (w *Window) Iterator(offset int, count int) Iterator {return Iterator{count: count,cur:   &w.buckets[offset%w.size],}
}

next方法规定了迭代器退出条件:已经遍历了count个桶

func (i *Iterator) Next() bool {return i.count != i.iteratedCount
}

获取要遍历的桶:

// Bucket gets current bucket.
func (i *Iterator) Bucket() Bucket {if !(i.Next()) {panic()}// 获取当前bucketbucket := *i.cur// 累加计数器i.iteratedCount++// 把cur指向下一个bucketi.cur = i.cur.Next()return bucket
}

滑动窗口RollingPolicy

数据结构

业务上真正可用的滑动窗口结构是RollingPolicy,封装了滑动窗口,加入了互斥锁、单个桶代表多长时间跨度、最近一次的有效的桶在哪个位置,最近一次有效桶的开始时间

// 外部可调用的结构体
type RollingPolicy struct {mu     sync.RWMutex// 滑动窗口有多少个桶size   int window *Window// 上次的有效最后一个位置offset int // 一个桶代表多长时间bucketDuration time.Duration // 上次的有效最后一个桶的开始时间lastAppendTime time.Time     
}// 初始化RollingPolicy
func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy {return &RollingPolicy{window: window,size:   window.Size(),offset: 0,bucketDuration: opts.BucketDuration,lastAppendTime: time.Now(),}
}

来看几个核心方法:

timespan计算当前调用此方法的时刻,距离上一次所在桶 滑过了几个 Bucket

func (r *RollingPolicy) timespan() int {v := int(time.Since(r.lastAppendTime) / r.bucketDuration)if v > -1 {return v}return r.size
}

添加数据

当有新的数据需要加入桶时:

  1. 如果当前时间距离上次最后一个桶的开始时间不足一个桶的跨度,就还是在上次那个桶操作
  2. 否则,就需要把一些已经过期的桶清空
  3. 然后在最新的桶中增加统计数据

例如:假设有4个桶,编号为0 1 2 3
在这里插入图片描述

  • 上次有效的最后一个位置r.offset=1,因此上次有效开始位置start= offset + 1 = 2

    • 为啥?因为整个是个,结束位置的下一个就是开始微信
  • 本次跨过了1个桶,需要从2(上次有效开始位置)开始,将往后的1个桶清空

    • 也就是将最早过期的1个桶清空
  • 本次执行后,有效最后一个位置更新为2

func (r *RollingPolicy) apply(f func(offset int, val float64), val float64) {r.mu.Lock()defer r.mu.Unlock()// 跨过了几个buckettimespan := r.timespan()oriTimespan := timespan// 有跨度if timespan > 0 {// offset:上次的有效最后一个位置,+1就是上次的有效开始位置start := (r.offset + 1) % r.size// end:本次执行后,有效的最后一个位置end := (r.offset + timespan) % r.sizeif timespan > r.size {timespan = r.size}// 从start开始,往后timespan个bucket置空r.window.ResetBuckets(start, timespan)r.offset = end// 更新lastAppendTime为当前时刻所在bucket的开始时间r.lastAppendTime = r.lastAppendTime.Add(time.Duration(oriTimespan * int(r.bucketDuration)))}// 添加到offset位置f(r.offset, val)
}

统计数据

当需要统计桶中的数据时,调Reduce方法,核心是计算两个问题:

  1. 从哪个桶开始遍历
  2. 需要遍历多少个桶

首先计算当前时间距离最近一次写入跨过了多少桶timespan,因此offset + 1等于上次有效的开始位置,那么offset + 1 + timespan就是跨过已过期的桶后,有效的开始位置

那么遍历多少个桶呢?总数size - timespan,即总数减去过期的桶数,就是合法的桶数量

func (r *RollingPolicy) Reduce(f func(Iterator) float64) (val float64) {r.mu.RLock()defer r.mu.RUnlock()// 计算距离最近一次写入跨过了多少桶timespan := r.timespan()// 需要遍历的桶数量 = size - 跨过的桶if count := r.size - timespan; count > 0 {// 从哪个桶开始遍历呢?// 假设总共有4个桶:0 1 2 3。假设之前offset=2, 最新一条数据在offset=2的桶中// 本次timeSpan = 1,跨过了1个桶。那就从0位置开始遍历,遍历count = 3个桶// 此时offset=0,就是第一个有效的桶offset := r.offset + timespan + 1if offset >= r.size {offset = offset - r.size}// 迭代器从offset开始,遍历count个bucketval = f(r.window.Iterator(offset, count))}return val
}

现在知道要从哪个桶开始遍历,要遍历多少个桶后,就需要执行具体的业务统计操作

举个例子:当要通过所有桶中的总数时,传入f如下:

func Count(iterator Iterator) float64 {var result int64for iterator.Next() {bucket := iterator.Bucket()result += bucket.Count}return float64(result)
}

调迭代器从offset开始,遍历count个桶,把这些桶的Count值累加


http://www.mrgr.cn/news/50979.html

相关文章:

  • 主流RTOS系统
  • 锂锰电池和锂电池区别
  • 专题:哈希结构
  • Java 二分查找算法详解及通用实现模板案例示范
  • 【分布式微服务云原生】《探秘分布式系统基石:CAP、BASE 理论与 Soft 状态》
  • 腾讯云视立方·音视频通话 SDK 个人信息保护规则
  • Leetcode 第 141 场双周赛题解
  • 深入理解C++ STL中的 vector
  • (亲测可行)windows安装msys2配置c++opencv
  • Excel使用技巧:筛选2组数据;条件格式突出显示数据
  • Zsh 安装与配置
  • 小程序开发设计-模板与配置:WXML模板语法⑨
  • win11安装不了msi文件解决办法
  • 利士策分享,美国“假旗”行动,是否成为了网络空间的阴霾?
  • 机器学习:opencv--人脸检测以及微笑检测
  • HCIP-HarmonyOS Application Developer 习题(十)
  • Python 工具库每日推荐 【sqlparse】
  • leetcode128最长连续序列 golang版
  • mysql 实用命令
  • Rust默认使用UTF-8编码来解析源代码文件。如果在代码中包含无法用UTF-8编码表示的字符,编译器会报错!