Go 并发的几个模式-Pipeline、Fan_in_Fan_out模式

RenXin Lv3

这段时间由于项目的需要,本人正在研究关于如何优雅的进行go的并发,以下是结合资料和视频的结果,文末会给出参考资料

Go语言的并发模型主要通过goroutine和channel实现,通过这个我们可以更有效地使用IO和CPU

这里我们围绕生成一个随机数并且返回他的平方数的场景来讲解

Generator模式

这里我们设置两个函数,一个是生成随机数,一个是生成平方数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

这就是generator模型,将每个并发阶段分开执行,最后汇总到一起

Pipeline模式

顾名思义,就是像一个管道一样联通在一起,从上面流通到下面,此处有一张图,可以更好的说明关系

我们通过channel来传递数据,把功能细分化,从生成一个数到处理一个数最后再到输出,这便是Pipeline模型。在特殊情况下,比如说channel是无缓冲的,那这又是一个同步的函数,只有前一个函数处理了才能到后面一个函数,特别像一个流水线,但是有很多工人,大家都在争相处理着数据

然后我们在main函数里面把他联通起来:

1
2
3
4
5
6
7
8
9
func main() {
// 设置pipeline。
c := gen(2, 3)
out := sq(c)

// 消费输出。
fmt.Println(<-out) // 输出 4
fmt.Println(<-out) // 输出 9
}

甚至我们可以设置一个take函数,专门处理输出

1
2
3
4
5
func consumer(in <-chan int) {
for result := range in {
fmt.Println(result)
}
}

Fan in Fan out 模型

Fan-in 是指将多个输入合并(多路复用)到一个单一的流中,或者将来自一个源的输入流式传输到多个管道。简单来说,这个模式可以被视为生产者和消费者架构,其中我们有多个生产者向单一消费者发送输入,或者单一生产者向多个消费者发送输入。

类似于这张图

Fan-out 是指从同一个channel读取的多个函数。这通常用于在一组工作器之间分配工作,以并行化CPU和I/O。例如,你可能有一个生成器函数,它有一个goroutine迭代一系列数字,并将每个数字发送到一个channel中。然后,生成器函数返回存储这些数字的channel。

在Go语言中,fan-infan-out 模式是处理并发任务时常用的设计模式。这些模式特别适用于可以分解为多个可以并行执行的小任务的耗时任务

下面是一个简单的fan-out和fan-in的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

// Fan-out
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

// Worker
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

// Fan-in
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs.
// output copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are done.
// This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}

func main() {
// Set up the pipeline.
in := gen(2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)

这个Fan-in的启动量我们可以通过CPU来评估。

参考资料

  1. Go Concurrency Patterns: Pipelines and cancellation:这篇文章来自Go官方博客,详细介绍了pipeline模型和如何处理失败情况。
  2. Concurrency in Go: A Practical Guide with Hands-On Examples:这篇文章提供了一系列实用的例子,帮助您理解Go中的并发。
  3. Go Language and AI: Pioneering Concurrent Programming and Analysis
  4. Concurrent Programming in Go 这篇较为简单
  • Title: Go 并发的几个模式-Pipeline、Fan_in_Fan_out模式
  • Author: RenXin
  • Created at : 2024-04-05 00:00:00
  • Updated at : 2024-04-08 21:43:47
  • Link: https://blog.renxin.space/Go/goroutine_mode/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments
On this page
Go 并发的几个模式-Pipeline、Fan_in_Fan_out模式