对Go Channel 的一点认识

对 GopherCon 2017 Kavya Joshi 的Talk的总结

本文是对GopherChon 2017上Kavya Joshi的对Go Channel的Talk一个小小的总结。原视频的链接是: GopherCon 2017: Kavya Joshi - Understanding Channels

并发性

众所周知并发特性是Go的一个核心特性。

为了实现并发性,Go 有两个重要的组件:1

  • goroutines

    • 独立执行任务,并尽可能保持并行
  • channels

    • 用于goroutines之间的通信和同步

channel 的性质

Channel有几种令人感兴趣的性质

  • 线程安全
  • 可以在goroutines之间传递信息
  • channel内的元素是先进先出的
  • channel会导致goroutine的阻塞和恢复

那么channel是如何达成这些性质的呢? 我将在接下来从3个方面来帮助各位理解channel是如何工作的。

  • Channel的构建
  • Goroutiine 如何发送和接收

分析

构建Channel

在我们进行别的分析之前,我们先来介绍一下如何创建一个Channel以及它的结构。

众所周知,channel有两种不同的类型:有缓存的和无缓存,在这里我们主要把注意力集中在有缓存的channel。

根据前面提到的特性,channel是goroutine安全的,我们很容易想到用一个带锁的队列来实现。现实也确实如此,channel的底层是一个叫做hchan的结构体,从chan.go的源代码中,我们很容易了解到 hcan的结构如下2

type hchan struct {
    // chan 里元素数量
    qcount   uint
    // chan 底层循环数组的长度
    dataqsiz uint
    // 指向底层循环数组的指针
    // 只针对有缓冲的 channel
    buf      unsafe.Pointer
    // chan 中元素大小
    elemsize uint16
    // chan 是否被关闭的标志
    closed   uint32
    // chan 中元素类型
    elemtype *_type // element type
    // 已发送元素在循环数组中的索引
    sendx    uint   // send index
    // 已接收元素在循环数组中的索引
    recvx    uint   // receive index
    // 等待接收的 goroutine 队列
    recvq    waitq  // list of recv waiters
    // 等待发送的 goroutine 队列
    sendq    waitq  // list of send waiters

    // 保护 hchan 中所有字段
    lock mutex
}

我们先把目光带到buf, buf是一个指向环形队列的指针。 然后sendxrecvx这两个变量则是用于记录已发送的元素在队列中的位置,以及已经接受的元素在队列中的位置。 同时lock 是一个互斥锁,来确保没有竞争。

当我们调用make chan的时候,我们会把创建hchan结构,并分配在堆上,然后返回一个指向该hchan的指针。因为chan本身是一个指向hchan的指针,所以我们才可以在函数间直接的传递他,而不是使用&chan等形式, 因为值传递传递的是地址,都指向同一个内存空间。。

Goroutiine如何使用Channel发送和接收

简单的聊完了chan的创建,那么在这一个部分我们来聊一下,我们如何在不同的goroutine之间,向chan发送,接受数据。

这里我们简单的举几个例子,用G1代表一个发送者,用G2代表一个接受者。

G1

func mai() {
    ...

    for _, task := range tasks {
        taskCh <- task 
    }
    ...
}

G2

func worker() {
    for {
        task := <- taskCh  
        process(task
    }
}

这里的taskCh 是一个长度为3的有缓存的chan

  • 简单的例子

在这里首先给大家介绍一个简单的说明,我们是如何的发送和接收。

  1. G1首先到来,并发送信息到channe

    1. 那么我们首先会获取锁,
    2. 并把值放到队列中。入队的过程本质是一个内存复制的过程(通过增减sendx与recvx)
    3. 释放锁
  2. 随后,G2到达,想要从channel中获取数据

    1. 这时候我们也会先上锁
    2. 让数值出队列,同样的使用内存复制来完成(通过增减sendx与recvx)

我们可以看到,我们都是通过简单的内存复制来完成数据的传输,这养的过程非常的安全。同时,我们通过mutex保证了没有数据被共享。在上述的步骤中,我们唯一共享的内存就是hchan, 这符合我们的原则, "Do not communication by sharing memory; instead, share memory by communciation"。

阻塞与恢复

现在,我们考虑一个新的情况,G2在调用process()函数的时候需要长时间的处理,但在此期间,我们的G1还是在不断的像channel发送任务:

  1. ch <- tasd1
  2. ch <- task2
  3. ch <- task3

当我们有三个任务的时候,我们的大小为三的带缓存的channel就已经满了。这时候,G1 又发送了第四个任务:

ch <- task4

由于ch的缓存已满,G1就会被阻塞,直到有人从ch中取走一个任务,G1才能重新的恢复过来。那么这些操作是如何做到的呢?

在这里我们要引入Go的调度器。Go的调度器是一个M:N的调度模型,换句话说就是有N个goroutine 运行在 M个系统线程上,两者不是一一对应的关系。 Go的调度器是一个非常复杂的组件,也是我们随意使用goroutine的保证,因为这篇是讲channel,关于调度器这里我就不过多赘述。

  • goroutine的阻塞

ch <- task 4 执行的时候,由于channel已经满员,我们需要阻塞G1。 这时,

  1. 我们会使用运行时的gopark函数
  2. 这时go的runtime scheduler 会接管函数
  3. G1设置为waiting的状态
  4. 同时吧G1从M中调度出,并从P取得新的runnableG
  5. 建立新的 G 和 M 的关系
  6. 当调度器返回时,新的G已经开始运行,我们的G1已经不在被运行

我们可以看到在这个过程中,G1虽然被阻塞了,但我们的系统线程并没有。

既然我们停止了G1的运行, 那么我们如何将它恢复呢?

  • goroutine的恢复

我们把目光重新转到hchan这个在channel底层的结构,我们知道他有两个队列:sendqrecvq,而这两个变量就时让goroutine恢复的功臣。

type hchan struct {
    ...
    // 等待接收的 goroutine 队列
    recvq    waitq  // list of recv waiters
    // 等待发送的 goroutine 队列
    sendq    waitq  // list of send waiters
    ...
}

waitq是一个双向链表,其底层结构是sudogsudog可以大致看成对goroutine的封装,其结构如下:

type sudog struct {
g *g // 指向groutine的指针 
    ..
    elem unsafe.Pointer //指向元素的指针(可能指向栈空间内)
    ...
}

在之前阻塞的时候,

  • G1会给自己创建一个sudo类型的变量,
  • 并把它放到sendq中, 来方便接受者来恢复G1

我们上面提到的这些过程都在调用调度器之前。

那么,我们究竟是如何恢复的?

G2调用t := <-ch 的时候,执行了一下的操作

  1. G2 先使用出队操作,从buf中获得了一个任务。
  2. G2 之后会从sendq中弹出一个sudog
  3. 将弹出来的sudogelem的值通过入队操作加入到buf (G2完成该过程)
  4. 然后我们讲sudog指向的goroutineG1的状态修改为runable
    1. G2 还需要告诉调度器G1已经好了,调用goready(G1)
    2. 这会调用调度器,调度器负责将G1的状态修改
    3. 随后调度器把G1放到P的运行队列中
    4. 返回到G2

上面我们讲的阻塞与恢复是发送者先向channel发送信息,那么当接受者先到达呢?

  • 接收者先到达

大致的过程与前面相似:

  1. G2给自己创建一个sudog结构的变量,然后把自己放到recq
  2. 再调用gopark 来让调度器接管
    1. 调度器修改goroutine的状态为wait
    2. 断开GM
    3. 从P中取出新的M,建立新关系
    4. 运行新的groutine

但在这之后G1开始发送数据流程,与之前部分不同。

G1在开始运行的时候,会直接把数据通过内存复制,复制给G2对应的sudogt。大家可能会很奇怪,这样goroutine不是互相访问内存了嘛?说好的不访问别人的栈内数据呢?但其实你考虑一下,当G2 停止运行的时候,这个操作肯定是安全的。且当我们这么操作之后,当G2恢复的时候,它不需要再去获取锁,访问buf队列。从而,我们能够节省出内存复制和锁操作的开销。

非阻塞模式的channel的读写 (使用select)

这个部分是talk讲述不多的一个部分,我在这里做一些简单的补充。

slect操作3

  1. 先把所有需要操作的 channel 上锁
  2. 给自己创建一个 sudog,然后添加到所有 channel 的 sendq或recvq
  3. 把所有的 channel 解锁,然后 pause 当前调用 select 的 goroutine(gopark())
  4. 然后当有任意一个 channel 可用时,select 的这个 goroutine 就会被调度执行。
  • 接收数据

对于非阻塞模式下(select)下的channel, 我们其实是和会阻塞的不同的。对于会阻塞的channel来说,当读取不到数据时候,我们只需调用gopark来让线程阻塞,但非阻塞这样是不可以的。有下面几点不同4

  • 如果channel是nil
    • 阻塞,会快速挂起goroutine
    • 非阻塞,会直接返回
  • 读取不到数据
    • 阻塞,挂起
    • 非阻塞,会直接返回
  • channel 已经关闭
    • 阻塞,返回 对应的零值与false
    • select 返回 对应的零值或者buf中的值与false 但仍能被select 选中
  • 发送数据
  • 如果channel是nil
    • 阻塞,会快速挂起goroutine
    • 非阻塞,会直接返回
  • 如果channel未关闭,且buf已经满了(有缓冲是buf满,无缓冲则是已经有人在等待)
    • 阻塞,挂起
    • 非阻塞, 返回false
  • 如果 channel 已经关闭
    • 两者都是直接panic

关闭channel

  1. 判断是否为空,如果未空,则panic
  2. 上锁
  3. 如果已经为空,解锁,panic
  4. 修改为关闭状态
  5. 循环 释放所有等接收队列里的sudog,
    • 如果elem不为空,赋0值
    • 将goroutine相连成链表
  6. 循环 释放所有等待发送队列的sudog
    • 发送者 会panic
    • 将goroutine 形成链表
  7. 解锁,遍历循环链表唤醒所有的sudog
  1. https://github.com/gophercon/2017-talks/blob/master/KavyaJoshi-UnderstandingChannels/Kavya%20Joshi%20-%20Understanding%20Channels.pdf

  2. https://golang.org/src/runtime/chan.go

  3. https://blog.lab99.org/post/golang-2017-10-04-video-understanding-channels.html#shi-pin-xin-xi

  4. https://golang.org/src/runtime/chan.go