当前位置: 首页 > news >正文

golang并发编程—— 并发模式

并发模式

并发模式是指在并发编程中常用的设计模式和方法,用于有效地管理和协调多个并发任务。以下是一些常见的并发模式,结合 Go 语言的示例代码来介绍它们的应用。

1. 工作池(Worker Pool)

工作池模式通过一组固定数量的工作 Goroutine 来处理大量的任务,避免因为过多的 Goroutine 而导致资源的过度消耗。

package mainimport ("fmt""sync""time"
)// Worker 函数,模拟处理任务
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {defer wg.Done()for j := range jobs {fmt.Printf("Worker %d started job %d\n", id, j)time.Sleep(time.Second) // 模拟工作fmt.Printf("Worker %d finished job %d\n", id, j)results <- j * 2 // 返回结果}
}func main() {const numJobs = 5const numWorkers = 3jobs := make(chan int, numJobs)results := make(chan int, numJobs)var wg sync.WaitGroup// 启动 worker Goroutinefor w := 1; w <= numWorkers; w++ {wg.Add(1)go worker(w, jobs, results, &wg)}// 发送任务到 jobs 通道for j := 1; j <= numJobs; j++ {jobs <- j}close(jobs)// 等待所有 worker 完成wg.Wait()close(results)// 打印结果for result := range results {fmt.Println("Result:", result)}
}

删除close(results)语句会出现fatal error: all goroutines are asleep - deadlock!错误

2. 扇入(Fan-in)

扇入模式将多个输入通道合并到一个通道,从而可以在单个 Goroutine 中处理来自多个来源的数据。

package mainimport ("fmt""time"
)// generator 生成一系列数据并发送到通道
func generator(start int, end int, c chan<- int) {for i := start; i <= end; i++ {c <- itime.Sleep(time.Millisecond * 500) // 模拟延迟}close(c)
}// fanIn 将多个通道合并为一个通道
func fanIn(channels ...<-chan int) <-chan int {out := make(chan int)var wg sync.WaitGroupoutput := func(c <-chan int) {for n := range c {out <- n}wg.Done()}for _, c := range channels {wg.Add(1)go output(c)}go func() {wg.Wait()close(out)}()return out
}func main() {c1 := make(chan int)c2 := make(chan int)go generator(1, 5, c1)go generator(6, 10, c2)for n := range fanIn(c1, c2) {fmt.Println(n)}
}
  • 如何将close语句移到Wait前面,则提前关闭通道,没有数据。

  • 如果删除close语句,会出现all goroutines are asleep的错误。

    fatal error: all goroutines are asleep - deadlock!
    

3. 扇出(Fan-out)

扇出模式将一个输入通道的数据分发到多个 Goroutine 进行并行处理。

package mainimport ("fmt""sync""time"
)// worker 函数处理来自 jobs 通道的任务
func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {defer wg.Done()for j := range jobs {fmt.Printf("Worker %d processing job %d\n", id, j)time.Sleep(time.Second) // 模拟工作}
}func main() {jobs := make(chan int, 10)var wg sync.WaitGroup// 启动 worker Goroutinefor w := 1; w <= 3; w++ {wg.Add(1)go worker(w, jobs, &wg)}// 发送任务到 jobs 通道for j := 1; j <= 10; j++ {jobs <- j}close(jobs)// 等待所有 worker 完成wg.Wait()
}

如果调换wait与close的顺序或者删除close语句,会出现all goroutines are asleep的错误

fatal error: all goroutines are asleep - deadlock!

4. 管道(Pipeline)

管道模式将任务分解为一系列的处理步骤,每个步骤由一个 Goroutine 处理,并将结果传递到下一个步骤的 Goroutine。

package mainimport ("fmt""time"
)// 生成器生成一系列数字
func generator(nums ...int) <-chan int {out := make(chan int)go func() {for _, n := range nums {out <- n}close(out)}()return out
}// 阶段1:数字加倍
func doubler(in <-chan int) <-chan int {out := make(chan int)go func() {for n := range in {out <- n * 2}close(out)}()return out
}// 阶段2:数字平方
func squarer(in <-chan int) <-chan int {out := make(chan int)go func() {for n := range in {out <- n * n}close(out)}()return out
}func main() {// 生成数据并通过管道传递nums := generator(1, 2, 3, 4, 5)doubled := doubler(nums)squared := squarer(doubled)// 打印结果for n := range squared {fmt.Println(n)}
}

5. 多路复用(Multiplexing)

多路复用模式通过 select 语句从多个通道中选择可用的通道进行处理。

package mainimport ("fmt""time"
)// 生成器函数生成数据并发送到通道
func generator(name string, interval time.Duration) <-chan string {c := make(chan string)go func() {for i := 1; ; i++ {c <- fmt.Sprintf("%s: %d", name, i)time.Sleep(interval)}}()return c
}func main() {c1 := generator("Channel 1", 2*time.Second)c2 := generator("Channel 2", 3*time.Second)for i := 0; i < 5; i++ {select {case msg1 := <-c1:fmt.Println(msg1)case msg2 := <-c2:fmt.Println(msg2)}}
}

6. 生产者-消费者(Producer-Consumer)

生产者-消费者模式通过缓冲通道协调生产者和消费者的工作,确保数据的安全传递和处理。

以下代码有错误

package mainimport ("fmt""sync""time"
)// 生产者函数生成数据并发送到通道
func producer(id int, jobs chan<- int, wg *sync.WaitGroup) {defer wg.Done()for j := 1; j <= 5; j++ {fmt.Printf("Producer %d producing job %d\n", id, j)jobs <- jtime.Sleep(time.Millisecond * 500)}
}// 消费者函数从通道接收数据并处理
func consumer(id int, jobs <-chan int, wg *sync.WaitGroup) {defer wg.Done()for j := range jobs {fmt.Printf("Consumer %d processing job %d\n", id, j)time.Sleep(time.Second)}
}func main() {jobs := make(chan int, 10)var wg sync.WaitGroup// 启动生产者 Goroutinefor p := 1; p <= 2; p++ {wg.Add(1)go producer(p, jobs, &wg)}// 启动消费者 Goroutinefor c := 1; c <= 2; c++ {wg.Add(1)go consumer(c, jobs, &wg)}// 等待所有生产者完成wg.Wait()close(jobs)
}

fatal error: all goroutines are asleep - deadlock

多生产者,多消费者正确代码如下

package mainimport ("fmt""math/rand""sync""time"
)type Product struct {name  intvalue int
}func producer(wg *sync.WaitGroup, products chan<- Product, name int, stop *bool) {for !*stop {product := Product{name: name, value: rand.Intn(100)}products <- productfmt.Printf("producer %v produce a product: %#v\n", name, product)time.Sleep(time.Duration(200+rand.Intn(1000)) * time.Millisecond)}wg.Done()
}func consumer(wg *sync.WaitGroup, products <-chan Product, name int) {for product := range products {fmt.Printf("consumer %v consume a product: %#v\n", name, product)time.Sleep(time.Duration(200+rand.Intn(1000)) * time.Millisecond)}wg.Done()
}func main() {var wgp sync.WaitGroupvar wgc sync.WaitGroupstop := falseproducts := make(chan Product, 10)// 创建 5 个生产者和 5 个消费者for i := 0; i < 5; i++ {go producer(&wgp, products, i, &stop)go consumer(&wgc, products, i)wgp.Add(1)wgc.Add(1)}time.Sleep(time.Duration(1) * time.Second)stop = true     // 设置生产者终止信号wgp.Wait()      // 等待生产者退出close(products) // 关闭通道wgc.Wait()      // 等待消费者退出
}

参考资料

golang如何优雅地关闭通道
并发模式
Go Channel 详解
总结了才知道,原来channel有这么多用法
生产者消费者模式 (Producer-Consumer Pattern)

补充资料

Channel 语法

ch <- x     // sends (or writes ) x through channel ch
x = <-ch   // x receives (or reads) data sent to the channel ch
<-ch      // receives data, but the result is discarded

分别是:

  • 通过通道ch发送(或写入)x
  • x接收(或读取)发送到通道ch的数据
  • 接收数据,但结果被丢弃

WaitGroup介绍

WatiGroup是sync包中的一个struct类型,用来收集需要等待执行完成的goroutine。下面是它的定义:

// WaitGroup用于等待一组线程的结束。
// 父线程调用Add方法来设定应等待的线程的数量。
// 每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束。
type WaitGroup struct {// 包含隐藏或非导出字段
}// Add方法向内部计数加上delta,delta可以是负数;
// 如果内部计数器变为0,Wait方法阻塞等待的所有线程都会释放,如果计数器小于0,方法panic。
// 注意Add加上正数的调用应在Wait之前,否则Wait可能只会等待很少的线程。
// 一般来说本方法应在创建新的线程或者其他应等待的事件之前调用。
func (wg *WaitGroup) Add(delta int)// Done方法减少WaitGroup计数器的值,应在线程的最后执行。
func (wg *WaitGroup) Done()// Wait方法阻塞直到WaitGroup计数器减为0。
func (wg *WaitGroup) Wait()

http://www.mrgr.cn/news/14501.html

相关文章:

  • UI自动化测试的边界怎么定义?
  • 【秋招笔试】8.21华为秋招-三语言题解
  • PbootCMS程序安全设置建议
  • kafka操作
  • 海康二次开发学习笔记5-二次开发小技巧
  • C语言实现经典排序算法
  • Excel十进制度转为度分秒格式
  • 奇安信渗透测试岗位三面经验分享
  • JVM调优原理
  • 不同语言的转义字符
  • 最新高仿拼夕夕源码/拼单系统源码/拼单商城/类目功能齐全
  • 2024年“羊城杯”粤港澳大湾区网络安全大赛 MISC部分
  • 适用于 Visual Studio 的 C++ 万能头
  • SpringBoot集成kafka接收对象消息
  • 全程云OA UploadEditorFile接口存在任意文件上传漏洞 附POC
  • 【蓝桥杯青少组】第十五届省赛python(2024)
  • 排序题目:颜色分类
  • Android Webview 详解
  • 如何利用命令模式实现一个手游后端架构|命令模式|手游后端|架构设计
  • 最新国内Docker 安装