golang waitgroup

news/2024/5/14 14:16:11

案例

WaitGroup 可以解决一个 goroutine 等待多个 goroutine 同时结束的场景,这个比较常见的场景就是例如 后端 worker 启动了多个消费者干活,还有爬虫并发爬取数据,多线程下载等等。
我们这里模拟一个 worker 的例子

package mainimport ("fmt""sync"
)func worker(i int) {fmt.Println("worker: ", i)
}func main() {var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(i int) {defer wg.Done()worker(i)}(i)}wg.Wait()
}

问题: 反过来支持多个 goroutine 等待一个 goroutine 完成后再干活吗? 看我们接下来的源码分析你就知道了

源码分析

type WaitGroup struct {noCopy noCopy// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.// 64-bit atomic operations require 64-bit alignment, but 32-bit// compilers do not ensure it. So we allocate 12 bytes and then use// the aligned 8 bytes in them as state, and the other 4 as storage// for the sema.state1 [3]uint32
}

WaitGroup 结构十分简单,由 nocopystate1 两个字段组成,其中 nocopy 是用来防止复制的

type noCopy struct{}// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

由于嵌入了 nocopy 所以在执行 go vet 时如果检查到 WaitGroup 被复制了就会报错。这样可以一定程度上保证 WaitGroup 不被复制,对了直接 go run 是不会有错误的,所以我们代码 push 之前都会强制要求进行 lint 检查,在 ci/cd 阶段也需要先进行 lint 检查,避免出现这种类似的错误。

~/project/Go-000/Week03/blog/06_waitgroup/02 main*go run ./main.go~/project/Go-000/Week03/blog/06_waitgroup/02 main*go vet .
# github.com/mohuishou/go-training/Week03/blog/06_waitgroup/02
./main.go:7:9: assignment copies lock value to wg2: sync.WaitGroup contains sync.noCopy

state1 的设计非常巧妙,这是一个是十二字节的数据,这里面主要包含两大块,counter 占用了 8 字节用于计数,sema 占用 4 字节用做信号量
可以看出 state1 是一个元素个数为 3 个数组,且每个元素都是 占 32 bits
在 64 位系统里面,64位原子操作需要64位对齐
那么高位的 32 bits 对应的是 counter 计数器,用来表示目前还没有完成任务的协程个数
低 32 bits 对应的是 waiter 的数量,表示目前已经调用了 WaitGroup.Wait 的协程个数
那么剩下的一个 32 bits 就是 sema 信号量的了(后面的源码中会有体现)
在这里插入图片描述

为什么要这么搞呢?直接用两个字段一个表示 counter,一个表示 sema 不行么?
不行,我们看看注释里面怎么写的。

// 64-bit value: high 32 bits are counter, low 32 bits are waiter count. > // 64-bit atomic operations require 64-bit alignment, but 32-bit > // compilers do not ensure it. So we allocate 12 bytes and then use > // the aligned 8 bytes in them as state, and the other 4 as storage > // for the sema.

这段话的关键点在于,在做 64 位的原子操作的时候必须要保证 64 位(8 字节)对齐,如果没有对齐的就会有问题,但是 32 位的编译器并不能保证 64 位对齐所以这里用一个 12 字节的 state1 字段来存储这两个状态,然后根据是否 8 字节对齐选择不同的保存方式。

此处我们可以看到 , state 函数是 返回存储在 wg.state1 中的状态和 sema字段 的指针
这里需要重点注意 state() 函数的实现,有 2 种情况

第 1 种 情况是,在 64 位系统下面,返回 sema字段 的指针取的是 &wg.state1[2] ,说明 64 位系统时,state1 数据排布是 : counter , waiter,sema

第 2 种情况是,32 位系统下面,返回 sema字段 的指针取的是 &wg.state1[0] ,说明 64 位系统时,state1 数据排布是 : sema ,counter , waiter

在 32 位机器上,uint64 类型的变量通常会被编译器按照 4 字节对齐,而不是 8 字节对齐。因此,如果 uint64
类型的变量没有按照 4 字节对齐,就可能会导致原子操作失败。

在 32 位机器上,64 位原子操作需要使用两个 32 位的寄存器来完成,如果 uint64 类型的变量没有按照 4字节对齐,那么在读取或者写入 uint64 类型变量时,就可能会跨越两个 32位寄存器,从而导致原子操作失败。这种情况下,编译器可能会将多个 32 位读写操作组合成一个 64 位操作,或者使用特殊的汇编指令来实现原子性,但这样会增加代码的复杂度和性能开销。

为了避免这种问题,sync.WaitGroup 在 32 位机器上使用了一个包含 3 个 uint32
元素的数组来表示状态,其中前两个元素占用了 8 字节,可以按照 uint64 对齐,从而可以使用 64
位原子操作来保证状态的原子性。这种设计方式既可以在 32 位机器上保证状态的原子性,也可以在 64 位机器上提高程序的性能。

这个操作巧妙在哪里呢?

  • 如果是 64 位的机器那肯定是 8 字节对齐了的,所以是上面第一种方式
  • 如果在 32 位的机器上
    • 如果恰好 8 字节对齐了,那么也是第一种方式取前面的 8 字节数据
    • 如果是没有对齐,但是 32 位 4 字节是对齐了的,所以我们只需要后移四个字节,那么就 8 字节对齐了,所以是第二种方式

所以通过 sema 信号量这四个字节的位置不同,保证了 counter 这个字段无论在 32 位还是 64 为机器上都是 8 字节对齐的,后续做 64 位原子操作的时候就没问题了。
这个实现是在 state 方法实现的

golang 这样用,主要原因是 golang 把 counter 和 waiter 合并到一起统一看成是 1 个 64位的数据了,因此在不同的操作系统中
由于字节对齐的原因,64位系统时,前面 2 个 32 位数据加起来,正好是 64 位,正好对齐
对于 32 位系统,则是 第 1 个 32 位数据放 sema 更加合适,后面的 2 个 32 位数据就可以统一取出,作为一个 64 位变量

为什么要counter和waiter合一起?不能用三个变量吗

  1. 在并发编程中,多个 goroutine可能会同时访问共享的变量,这种并发访问可能会导致竞态条件,从而导致程序出现意料之外的结果。为了保证并发程序的正确性,需要使用同步原语来协调不同
  2. 首先,sync.WaitGroup 的状态包含两个值:计数器和等待的 goroutine 数量。在并发程序中,对于这两个值的修改必须是原子的,否则会导致竞态条件。如果使用两个单独的 uint32 变量来表示这两个值,那么在对它们进行增减操作时,必须使用互斥锁或原子操作来保证它们的原子性。而使用一个 uint32 数组,则可以使用原子操作来同时修改这两个值,从而避免了互斥锁的开销。
  3. goroutine 的访问,其中原子操作是一种常用的同步原语。
    原子操作是一种基本的操作,它可以在一个步骤内完成读取和修改操作,从而保证了操作的原子性。在 Go 中,原子操作主要通过
    sync/atomic 包提供。

sync/atomic 包提供了一系列原子操作,包括原子读写、原子增减、原子比较交换等等。这些原子操作可以被多个 goroutine
并发调用,而不会导致竞态条件。在底层实现上,sync/atomic 包使用了 CPU 提供的原子指令,通过锁总线或者其他硬件机制来保证多个
CPU 同时访问一个共享变量时的原子性。

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]} else {return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]}
}

state 方法返回 counter 和信号量,通过 uintptr(unsafe.Pointer(&wg.state1))%8 == 0 来判断是否 8 字节对齐

Add

func (wg *WaitGroup) Add(delta int) {// 先从 state 当中把数据和信号量取出来statep, semap := wg.state()// 在 waiter 上加上 delta 值state := atomic.AddUint64(statep, uint64(delta)<<32)// 取出当前的 counterv := int32(state >> 32)// 取出当前的 waiter,正在等待 goroutine 数量w := uint32(state)// counter 不能为负数if v < 0 {panic("sync: negative WaitGroup counter")}// 这里属于防御性编程// w != 0 说明现在已经有 goroutine 在等待中,说明已经调用了 Wait() 方法// 这时候 delta > 0 && v == int32(delta) 说明在调用了 Wait() 方法之后又想加入新的等待者// 这种操作是不允许的if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 如果当前没有人在等待就直接返回,并且 counter > 0if v > 0 || w == 0 {return}// 这里也是防御 主要避免并发调用 add 和 waitif *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 唤醒所有 waiter,看到这里就回答了上面的问题了*statep = 0for ; w != 0; w-- {runtime_Semrelease(semap, false, 0)}
}

Add 函数主要功能是将 counter +delta ,增加等待协程的个数:

我们可以看到 Add 函数,通过 state 函数获取到 上述 64位的变量(counter 和 waiter) 和 sema 信号量后,通过 atomic.AddUint64 函数 将 delta 数据 加到 counter 上面

这里为什么是 delta 要左移 32 位呢?

上面我们有说到嘛, state 函数拿出的 64 位变量,高 32 bits 是 counter,低 32 bits 是waiter,此处的 delta 是要加到 counter 上,因此才需要 delta 左移 32 位

Wait

wait 主要就是等待其他的 goroutine 完事之后唤醒

func (wg *WaitGroup) Wait() {// 先从 state 当中把数据和信号量的地址取出来statep, semap := wg.state()for {// 这里去除 counter 和 waiter 的数据state := atomic.LoadUint64(statep)v := int32(state >> 32)w := uint32(state)// counter = 0 说明没有在等的,直接返回就行if v == 0 {// Counter is 0, no need to wait.return}// waiter + 1,调用一次就多一个等待者,然后休眠当前 goroutine 等待被唤醒if atomic.CompareAndSwapUint64(statep, state, state+1) {runtime_Semacquire(semap)if *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}return}}
}

Done

这个只是 add 的简单封装

func (wg *WaitGroup) Done() {wg.Add(-1)
}

总结

  • WaitGroup 可以用于一个 goroutine 等待多个 goroutine 干活完成,也可以多个 goroutine 等待一个 goroutine 干活完成,是一个多对多的关系
    • 多个等待一个的典型案例是 singleflight,这个在后面将微服务可用性的时候还会再讲到,感兴趣可以看看源码
  • Add(n>0) 方法应该在启动 goroutine 之前调用,然后在 goroution 内部调用 Done 方法
  • WaitGroup 必须在 Wait 方法返回之后才能再次使用
  • Done 只是 Add 的简单封装,所以实际上是可以通过一次加一个比较大的值减少调用,或者达到快速唤醒的目的。

http://www.mrgr.cn/p/87685136

相关文章

振弦采集仪完整链条的岩土工程隧道安全监测

振弦采集仪完整链条的岩土工程隧道安全监测 隧道工程是一种特殊的地下工程&#xff0c;其建设过程及运行期间&#xff0c;都受到各种内外力的作用&#xff0c;如水压、地震、地质变形、交通荷载等&#xff0c;这些因素都会对隧道的安全性产生影响。因此&#xff0c;对隧道的安…

机器学习-Gradient Descent

机器学习(Gradient Descent) videopptblog 梯度下降(Gradient Descent) optimization problem: 损失函数最小化 假设本模型有两个参数&#x1d703;1和&#x1d703;2&#xff0c;随机取得初始值 求解偏微分&#xff0c;梯度下降对参数进行更新 Visualize: 确定梯度方向&…

强化学习QLearning 进行迷宫游戏和代码

强化学习是机器学习里面的一个分支。它强调基于环境而探索行动、学习&#xff0c;以取得最大化的预期收益。其灵感来源于心理学中的行为主义理论&#xff0c;既有机体如何在环境给予的奖励或者惩罚的刺激下&#xff0c;逐步形成对刺激的预期&#xff0c;产生能够最大利益的习惯…

七、Kafka源码分析之网络通信

1、生产者网络设计 架构设计图 2、生产者消息缓存机制 1、RecordAccumulator 将消息缓存到RecordAccumulator收集器中, 最后判断是否要发送。这个加入消息收集器&#xff0c;首先得从 Deque 里找到自己的目标分区&#xff0c;如果没有就新建一个批量消息 Deque 加进入 2、消…

【网络安全带你练爬虫-100练】第15练:模拟用户登录

目录 一、目标1&#xff1a;理清逻辑 二、目标2&#xff1a;将每一步用代码进行表示 三、网络安全O 一、目标1&#xff1a;理清逻辑 模拟登录的基本流程 1、进入入口程序 2、读取目标URL 3、请求加上线程 4、确定请求数据包 5、请求格式的确认 6、数据的处理与判断 二、目标…

Shell 排序法 - 改良的插入排序

说明 插入排序法由未排序的后半部前端取出一个值&#xff0c;插入已排序前半部的适当位置&#xff0c;概念简单但速度不快。 排序要加快的基本原则之一&#xff0c;是让后一次的排序进行时&#xff0c;尽量利用前一次排序后的结果&#xff0c;以加快排序的速度&#xff0c;Shel…

深度学习与神经网络

人工智能&#xff0c;机器学习&#xff0c;深度学习&#xff0c;神经网络&#xff0c;emmmm&#xff0c;傻傻分不清楚&#xff0c;这都啥呀&#xff0c;你知道吗&#xff1f;我不知道。你知道吗&#xff1f;我不知道。 来来来&#xff0c;接下来&#xff0c;整硬菜&#xff1a…

vue3自定义dialog createApp setup语法组件使用element

目录 index.vue <template><el-dialogcenterv-model"isVisible"width"650px":title"title":append-to-body"true"><div id"dialogMap_container"></div><template #footer><span class&q…

Chrome浏览器中的vue插件devtools的下载方式(使用Chrome应用商店/科学上网情况下)

目录 devtools对前端来说的好处——开发预览、远程调试、性能调优、Bug跟踪、断点调试等 下载步骤&#xff1a; 测试阶段&#xff1a; 最近做项目要使用devtools这个vue插件。 devtools对前端来说的好处——开发预览、远程调试、性能调优、Bug跟踪、断点调试等 下载步骤…

IT职场笔记

MySQL笔记之一致性视图与MVCC实现 一致性读视图是InnoDB在实现MVCC用到的虚拟结构&#xff0c;用于读提交&#xff08;RC&#xff09;和可重复度&#xff08;RR&#xff09;隔离级别的实现。 一致性视图没有物理结构&#xff0c;主要是在事务执行期间用来定义该事物可以看到什…

[JavaScript游戏开发] 2D二维地图绘制、人物移动、障碍检测

系列文章目录 第一章 2D二维地图绘制、人物移动、障碍检测 第二章 跟随人物二维动态地图绘制、自动寻径、小地图显示(人物红点显示) 第三章 绘制冰宫宝藏地图、人物鼠标点击移动、障碍检测 文章目录 系列文章目录前言一、列计划1.1、目标1.2、步骤 二、使用步骤2.1、准备素材(…

探索AI图像安全,助力可信AI发展

探索AI图像安全&#xff0c;助力可信AI发展 0. 前言1. 人工智能发展与安全挑战1.1 人工智能及其发展1.2 人工智能安全挑战 2. WAIC 2023 多模态基础大模型的可信 AI2.1 WAIC 2023 专题论坛2.2 走进合合信息 3. AI 图像安全3.1 图像篡改检测3.2 生成式图像鉴别3.3 OCR 对抗攻击技…

环境搭建-Ubuntu20.04.6系统TensorFlow BenchMark的GPU测试

1. 下载Ubuntu20.04.6镜像 登录阿里云官方镜像站&#xff1a;阿里巴巴开源镜像站-OPSX镜像站-阿里云开发者社区 2. 测试环境 Server OS&#xff1a;Ubuntu 20.04.6 LTS Kernel: Linux 5.4.0-155-generic x86-64 Docker Version&#xff1a;24.0.5, build ced0996 docker-com…

【SQL应知应会】表分区(二)• Oracle版

欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 本文收录于SQL应知应会专栏,本专栏主要用于记录对于数据库的一些学习&#xff0c;有基础也有进阶&#xff0c;有MySQL也有Oracle 分区表 • Oracle版 前言一、分区表1.什么是表分区…

Oracle 迁移 Hive 过程中遇到的问题总结

前言 最近一个小伙伴在做从 Oracle 到 Hive 的业务迁移工作,在迁移过程中属实遇到了一些坑,今天就来汇总一下这些坑,避免以后大家其他业务迁移的时候再出现类似的问题,即使出现了也可以拿过来进行对照解决。 问题1:Distinct window functions are not supported: count(…

Python案例分析|使用Python图像处理库Pillow处理图像文件

本案例通过使用Python图像处理库Pillow&#xff0c;帮助大家进一步了解Python的基本概念&#xff1a;模块、对象、方法和函数的使用 使用Python语言解决实际问题时&#xff0c;往往需要使用由第三方开发的开源Python软件库。 本案例使用图像处理库Pillow中的模块、对象来处理…

uniapp小程序,根据小程序的环境版本,控制的显页面功能按钮的示隐藏

需求&#xff1a;根据小程序环境控制控制页面某个功能按钮的显示隐藏&#xff1b; 下面是官方文档和功能实现的相关代码&#xff1a; 实现上面需要&#xff0c;用到了uni.getAccountInfoSync()&#xff1a; uni.getAccountInfoSync() 是一个 Uniapp 提供的同步方法&#xff0c…

微服务如何治理

微服务远程调用可能有如下问题&#xff1a; 注册中心宕机&#xff1b; 服务提供者B有节点宕机&#xff1b; 服务消费者A和注册中心之间的网络不通&#xff1b; 服务提供者B和注册中心之间的网络不通&#xff1b; 服务消费者A和服务提供者B之间的网络不通&#xff1b; 服务提供者…

操作系统_进程与线程(三)

目录 3. 同步与互斥 3.1 同步与互斥的基本概念 3.1.1 临界资源 3.1.2 同步 3.1.3 互斥 3.2 实现临界区互斥的基本方法 3.2.1 软件实现方法 3.2.1.1 算法一&#xff1a;单标志法 3.2.1.2 算法二&#xff1a;双标志法先检查 3.2.1.3 算法三&#xff1a;双标志法后检查 …

mysql的json处理

写在前面 需要注意&#xff0c;5.7以上版本才支持&#xff0c;但如果是生产环境需要使用的话&#xff0c;尽量使用8.0版本&#xff0c;因为8.0版本对json处理做了比较大的性能优化。你你可以使用select version();来查看版本信息。 本文看下MySQL的json处理。在正式开始让我们先…