本文是对GopherChon 2017上Kavya Joshi的对Go Channel的Talk一个小小的总结。原视频的链接是: GopherCon 2017: Kavya Joshi - Understanding Channels
并发性
众所周知并发特性是Go的一个核心特性。
为了实现并发性,Go 有两个重要的组件:1
goroutines
- 独立执行任务,并尽可能保持并行
channels
- 用于goroutines之间的通信和同步
channel 的性质
Channel有几种令人感兴趣的性质
- 线程安全
- 可以在goroutines之间传递信息
- channel内的元素是先进先出的
- channel会导致goroutine的阻塞和恢复
那么channel是如何达成这些性质的呢? 我将在接下来从3个方面来帮助各位理解channel是如何工作的。
- Channel的构建
- Goroutiine 如何发送和接收
分析
构建Channel
在我们进行别的分析之前,我们先来介绍一下如何创建一个Channel以及它的结构。
众所周知,channel有两种不同的类型:有缓存的和无缓存,在这里我们主要把注意力集中在有缓存的channel。
根据前面提到的特性,channel是goroutine安全的,我们很容易想到用一个带锁的队列来实现。现实也确实如此,channel的底层是一个叫做hchan
的结构体,从chan.go的源代码中,我们很容易了解到 hcan的结构如下2:
type hchan struct {
// chan 里元素数量
qcount uint
// chan 底层循环数组的长度
dataqsiz uint
// 指向底层循环数组的指针
// 只针对有缓冲的 channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被关闭的标志
closed uint32
// chan 中元素类型
elemtype *_type // element type
// 已发送元素在循环数组中的索引
sendx uint // send index
// 已接收元素在循环数组中的索引
recvx uint // receive index
// 等待接收的 goroutine 队列
recvq waitq // list of recv waiters
// 等待发送的 goroutine 队列
sendq waitq // list of send waiters
// 保护 hchan 中所有字段
lock mutex
}
我们先把目光带到buf
, buf
是一个指向环形队列的指针。 然后sendx
与recvx
这两个变量则是用于记录已发送的元素在队列中的位置,以及已经接受的元素在队列中的位置。 同时lock
是一个互斥锁,来确保没有竞争。
当我们调用make chan
的时候,我们会把创建hchan
结构,并分配在堆上,然后返回一个指向该hchan
的指针。因为chan
本身是一个指向hchan
的指针,所以我们才可以在函数间直接的传递他,而不是使用&chan
等形式, 因为值传递传递的是地址,都指向同一个内存空间。。
Goroutiine如何使用Channel发送和接收
简单的聊完了chan
的创建,那么在这一个部分我们来聊一下,我们如何在不同的goroutine
之间,向chan
发送,接受数据。
这里我们简单的举几个例子,用G1
代表一个发送者,用G2
代表一个接受者。
G1
func mai() {
...
for _, task := range tasks {
taskCh <- task
}
...
}
G2
func worker() {
for {
task := <- taskCh
process(task
}
}
这里的taskCh
是一个长度为3的有缓存的chan
。
- 简单的例子
在这里首先给大家介绍一个简单的说明,我们是如何的发送和接收。
G1
首先到来,并发送信息到channe- 那么我们首先会获取锁,
- 并把值放到队列中。入队的过程本质是一个内存复制的过程(通过增减sendx与recvx)
- 释放锁
随后,
G2
到达,想要从channel中获取数据- 这时候我们也会先上锁
- 让数值出队列,同样的使用内存复制来完成(通过增减sendx与recvx)
我们可以看到,我们都是通过简单的内存复制来完成数据的传输,这养的过程非常的安全。同时,我们通过mutex保证了没有数据被共享。在上述的步骤中,我们唯一共享的内存就是hchan
, 这符合我们的原则, "Do not communication by sharing memory; instead, share memory by communciation"。
阻塞与恢复
现在,我们考虑一个新的情况,G2
在调用process()
函数的时候需要长时间的处理,但在此期间,我们的G1还是在不断的像channel发送任务:
ch <- tasd1
ch <- task2
ch <- task3
当我们有三个任务的时候,我们的大小为三的带缓存的channel就已经满了。这时候,G1
又发送了第四个任务:
ch <- task4
由于ch
的缓存已满,G1
就会被阻塞,直到有人从ch
中取走一个任务,G1
才能重新的恢复过来。那么这些操作是如何做到的呢?
在这里我们要引入Go的调度器。Go的调度器是一个M:N的调度模型,换句话说就是有N个goroutine 运行在 M个系统线程上,两者不是一一对应的关系。 Go的调度器是一个非常复杂的组件,也是我们随意使用goroutine的保证,因为这篇是讲channel,关于调度器这里我就不过多赘述。
- goroutine的阻塞
当ch <- task 4
执行的时候,由于channel已经满员,我们需要阻塞G1
。 这时,
- 我们会使用运行时的
gopark
函数 - 这时go的runtime scheduler 会接管函数
- 把
G1
设置为waiting
的状态 - 同时吧
G1
从M中调度出,并从P取得新的runnable
的G
- 建立新的 G 和 M 的关系
- 当调度器返回时,新的
G
已经开始运行,我们的G1
已经不在被运行
我们可以看到在这个过程中,G1
虽然被阻塞了,但我们的系统线程并没有。
既然我们停止了G1
的运行, 那么我们如何将它恢复呢?
- goroutine的恢复
我们把目光重新转到hchan
这个在channel底层的结构,我们知道他有两个队列:sendq
和recvq
,而这两个变量就时让goroutine恢复的功臣。
type hchan struct {
...
// 等待接收的 goroutine 队列
recvq waitq // list of recv waiters
// 等待发送的 goroutine 队列
sendq waitq // list of send waiters
...
}
waitq
是一个双向链表,其底层结构是sudog
。sudog
可以大致看成对goroutine
的封装,其结构如下:
type sudog struct {
g *g // 指向groutine的指针
..,
elem unsafe.Pointer //指向元素的指针(可能指向栈空间内)
...
}
在之前阻塞的时候,
G1
会给自己创建一个sudo
类型的变量,- 并把它放到
sendq
中, 来方便接受者来恢复G1
我们上面提到的这些过程都在调用调度器之前。
那么,我们究竟是如何恢复的?
当G2
调用t := <-ch
的时候,执行了一下的操作
G2
先使用出队操作,从buf
中获得了一个任务。G2
之后会从sendq
中弹出一个sudog
- 将弹出来的
sudog
中elem
的值通过入队操作加入到buf
(G2完成该过程) - 然后我们讲
sudog
指向的goroutineG1
的状态修改为runable
G2
还需要告诉调度器G1
已经好了,调用goready(G1)
- 这会调用调度器,调度器负责将
G1
的状态修改 - 随后调度器把
G1
放到P的运行队列中 - 返回到
G2
上面我们讲的阻塞与恢复是发送者先向channel发送信息,那么当接受者先到达呢?
- 接收者先到达
大致的过程与前面相似:
G2
给自己创建一个sudog
结构的变量,然后把自己放到recq
中- 再调用
gopark
来让调度器接管- 调度器修改goroutine的状态为
wait
- 断开
G
和M
- 从P中取出新的
M
,建立新关系 - 运行新的
groutine
- 调度器修改goroutine的状态为
但在这之后G1
开始发送数据流程,与之前部分不同。
G1
在开始运行的时候,会直接把数据通过内存复制,复制给G2
对应的sudog
的t
。大家可能会很奇怪,这样goroutine不是互相访问内存了嘛?说好的不访问别人的栈内数据呢?但其实你考虑一下,当G2
停止运行的时候,这个操作肯定是安全的。且当我们这么操作之后,当G2
恢复的时候,它不需要再去获取锁,访问buf
队列。从而,我们能够节省出内存复制和锁操作的开销。
非阻塞模式的channel的读写 (使用select)
这个部分是talk讲述不多的一个部分,我在这里做一些简单的补充。
slect操作3:
- 先把所有需要操作的 channel 上锁
- 给自己创建一个 sudog,然后添加到所有 channel 的 sendq或recvq
- 把所有的 channel 解锁,然后 pause 当前调用 select 的 goroutine(gopark())
- 然后当有任意一个 channel 可用时,select 的这个 goroutine 就会被调度执行。
- 接收数据
对于非阻塞模式下(select)下的channel, 我们其实是和会阻塞的不同的。对于会阻塞的channel来说,当读取不到数据时候,我们只需调用gopark
来让线程阻塞,但非阻塞这样是不可以的。有下面几点不同4:
- 如果channel是nil
- 阻塞,会快速挂起goroutine
- 非阻塞,会直接返回
- 读取不到数据
- 阻塞,挂起
- 非阻塞,会直接返回
- channel 已经关闭
- 阻塞,返回 对应的零值与false
- select 返回 对应的零值或者buf中的值与false 但仍能被select 选中
- 发送数据
- 如果channel是
nil
- 阻塞,会快速挂起goroutine
- 非阻塞,会直接返回
- 如果channel未关闭,且
buf
已经满了(有缓冲是buf满,无缓冲则是已经有人在等待)- 阻塞,挂起
- 非阻塞, 返回
false
- 如果 channel 已经关闭
- 两者都是直接
panic
- 两者都是直接
关闭channel
- 判断是否为空,如果未空,则panic
- 上锁
- 如果已经为空,解锁,panic
- 修改为关闭状态
- 循环 释放所有等接收队列里的sudog,
- 如果elem不为空,赋0值
- 将goroutine相连成链表
- 循环 释放所有等待发送队列的sudog
- 发送者 会panic
- 将goroutine 形成链表
- 解锁,遍历循环链表唤醒所有的sudog