首页 > 文章列表 > 详解Go channel管道的运行原理

详解Go channel管道的运行原理

golang
150 2023-05-18

前言

Go推荐通过通信来共享内存,而channel就实现了这一理念。那channel是怎么运行的呢?

功能

举个例子看下channel的使用效果:

package main



import (

   "fmt"

   "math/rand"

   "time"

)



func write(c chan int) {

   for {

      num := rand.Intn(100)

      c <- num

   }

}



func read(c chan int) {

   for {

      num := <-c

      fmt.Println("读取管道的随机数:", num)

      time.Sleep(time.Second)

   }

}



func main() {

   var c = make(chan int, 8)

   go read(c)

   for i := 0; i < 5; i++ {

      go write(c)

   }

   time.Sleep(time.Minute)

}

以上代码新建了一个缓冲区为8的管道,然后开启read和五个write读写协程。写协程写入一个随机数,读协程每隔一秒读取并打印,效果如下:

说明协程间可以通过管道来互相通信。接着了解下channel的结构。

channel结构

channel结构体位于GOROOT/src/runtime/chan.go下的hchan,源码如下:

type hchan struct {

   qcount   uint           // 队列中元素总数

   dataqsiz uint           // 环型队列大小

   buf      unsafe.Pointer // 指向dataqsize的数组(即缓冲区)

   elemsize uint16 

   closed   uint32

   elemtype *_type        // 元素类型

   sendx    uint             // 发送到缓冲区的位置索引

   recvx    uint             // 接收到缓冲区的位置索引

   recvq    waitq           // 接收者队列

   sendq    waitq          // 发送者队列



   lock mutex // 锁,用于保护channel数据

}

其中发送者和接收者队列是一个waitq类型,具体如下:

type waitq struct {

   first *sudog

   last  *sudog

}

waitq里有队头first,队尾last的指针,指向sudog结构体。

也就是说,waitq是一个列表队列,队列里每个元素都是一个sudog结构体,sudog中包装着一个协程。

解析一个hchan各部分结构:

  • 头部
type hchan struct {

   qcount   uint           // 队列中元素总数

   dataqsiz uint           // 环型队列大小

   buf      unsafe.Pointer // 指向dataqsize的数组(即缓冲区)

   elemsize uint16 

   closed   uint32

   elemtype *_type        // 元素类型

   ...

}

这部分表示一个环型缓冲区。图解如下:

  • 尾部
type hchan struct {

   ...

   sendx    uint             // 发送到缓冲区的位置索引

   recvx    uint             // 接收到缓冲区的位置索引

   recvq    waitq           // 接收者队列

   sendq    waitq          // 发送者队列

   ...

}

这部分把协程分为两个身份,使用chan <- 语法的协程为发送者,使用<- chan 语法的协程为接收者,并放到各自队列中。图解如下:

结合示例代码。运行结构如下:

由于写协程一直写,读协程每隔一秒才读一次,因此很快将缓冲区写满了,这时:

  • 写协程被装入sudog进行休眠等待
  • 读协程每隔一秒从缓冲区读取数据

运行原理

使用chan <- 为发送者,对发送者来说:

  • 先查看是否有接收者,有则优先唤醒并拷贝数据给接收者,然后结束
  • 无接收者再查看缓冲区,数据未满则将数据放入缓冲区,然后结束
  • 缓冲区也满了,则封装成sudog,休眠等待

使用<- chan 为接收者,对接收者来说:

  • 优先接收缓冲区的值
  • 再接收发送者的值
  • 否则休眠等待

思考下:

有休眠的接收者,且缓冲区数据已满的情况是否存在?为什么?

有休眠的发送者,且缓冲区为空的情况是否存在?为什么?

以上答案:

有休眠的接收者,缓冲区不会出现数据已满情况。因为接收者要休眠,得缓冲区没数据才行。

有休眠的发送者,缓冲区不会出现为空情况。因为发送者要休眠,得缓冲区数据已满才行。

源码分析

使用chan <-后,会调用GOROOT\src\runtime\chan.go下的chansend1方法

func chansend1(c *hchan, elem unsafe.Pointer) {

   chansend(c, elem, true, getcallerpc())

}

然后调用chansend方法

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

   ...

   lock(&c.lock)



   if c.closed != 0 {

      unlock(&c.lock)

      panic(plainError("send on closed channel"))

   }



   // 尝试接收者队列出队,若有接收者,则直接拷贝数据给接收者

   if sg := c.recvq.dequeue(); sg != nil {

      send(c, sg, ep, func() { unlock(&c.lock) }, 3)

      return true

   }



   // 判断缓冲区是否还有空余

   if c.qcount < c.dataqsiz {

      // Space is available in the channel buffer. Enqueue the element to send.

      qp := chanbuf(c, c.sendx) // 有的话获得缓冲区要存放数据的地址

      if raceenabled {

         racenotify(c, c.sendx, nil)

      }

      typedmemmove(c.elemtype, qp, ep) // 将数据拷贝到缓冲区扩容地址qp上

      c.sendx++

      if c.sendx == c.dataqsiz {

         c.sendx = 0

      }

      c.qcount++

      unlock(&c.lock)

      return true

   }



   ...

   

   // 否则封装成sodug休眠自己,加入发送者等待队列

   gp := getg()

   mysg := acquireSudog()

   mysg.releasetime = 0

   if t0 != 0 {

      mysg.releasetime = -1

   }

   // No stack splits between assigning elem and enqueuing mysg

   // on gp.waiting where copystack can find it.

   mysg.elem = ep

   mysg.waitlink = nil

   mysg.g = gp

   mysg.isSelect = false

   mysg.c = c

   gp.waiting = mysg

   gp.param = nil

   c.sendq.enqueue(mysg)

   // Signal to anyone trying to shrink our stack that we're about

   // to park on a channel. The window between when this G's status

   // changes and when we set gp.activeStackChans is not safe for

   // stack shrinking.

   gp.parkingOnChan.Store(true)

   

   // 主动挂起

   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

   ...

   

   // 被唤醒后释放sudog

   gp.waiting = nil

   gp.activeStackChans = false

   closed := !mysg.success

   gp.param = nil

   if mysg.releasetime > 0 {

      blockevent(mysg.releasetime-t0, 2)

   }

   mysg.c = nil

   releaseSudog(mysg) // 释放sudog

   if closed {

      if c.closed == 0 {

         throw("chansend: spurious wakeup")

      }

      panic(plainError("send on closed channel"))

   }

   return true

}

使用<- chan后,会调用GOROOT\src\runtime\chan.go下的chanrecv1方法

func chanrecv1(c *hchan, elem unsafe.Pointer) {

   chanrecv(c, elem, true)

}

然后调用chanrecv方法

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

   ...

   

   lock(&c.lock)



   if c.closed != 0 {

      if c.qcount == 0 {

         if raceenabled {

            raceacquire(c.raceaddr())

         }

         unlock(&c.lock)

         if ep != nil {

            typedmemclr(c.elemtype, ep)

         }

         return true, false

      }

      // The channel has been closed, but the channel's buffer have data.

   } else {

      // 如果有发送者在休眠,则调用recv

      if sg := c.sendq.dequeue(); sg != nil {

         recv(c, sg, ep, func() { unlock(&c.lock) }, 3)

         return true, true

      }

   }



   // 无发送者,但缓冲区有数据

   if c.qcount > 0 {

      // Receive directly from queue

      qp := chanbuf(c, c.recvx)

      if raceenabled {

         racenotify(c, c.recvx, nil)

      }

      if ep != nil {

         typedmemmove(c.elemtype, ep, qp)

      }

      typedmemclr(c.elemtype, qp)

      c.recvx++

      if c.recvx == c.dataqsiz {

         c.recvx = 0

      }

      c.qcount--

      unlock(&c.lock)

      return true, true

   }



   if !block {

      unlock(&c.lock)

      return false, false

   }



   // 休眠自己

   gp := getg()

   mysg := acquireSudog()

   mysg.releasetime = 0

   if t0 != 0 {

      mysg.releasetime = -1

   }

   // No stack splits between assigning elem and enqueuing mysg

   // on gp.waiting where copystack can find it.

   mysg.elem = ep

   mysg.waitlink = nil

   gp.waiting = mysg

   mysg.g = gp

   mysg.isSelect = false

   mysg.c = c

   gp.param = nil

   c.recvq.enqueue(mysg) // 封装成sudog入队

 

   gp.parkingOnChan.Store(true)

   

   // 主动挂起

   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) 



   // 被唤醒后释放sudog

   if mysg != gp.waiting {

      throw("G waiting list is corrupted")

   }

   gp.waiting = nil

   gp.activeStackChans = false

   if mysg.releasetime > 0 {

      blockevent(mysg.releasetime-t0, 2)

   }

   success := mysg.success

   gp.param = nil

   mysg.c = nil

   releaseSudog(mysg)

   return true, success

}

当有发送者,会调用recv

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {

   if c.dataqsiz == 0 {

      if raceenabled {

         racesync(c, sg)

      }

      if ep != nil {

         // copy data from sender

         recvDirect(c.elemtype, sg, ep)

      }

   } else {

      // 获取缓冲区数据的位置

      qp := chanbuf(c, c.recvx)

      if raceenabled {

         racenotify(c, c.recvx, nil)

         racenotify(c, c.recvx, sg)

      }

      // copy data from queue to receiver

      if ep != nil {

         // 将缓冲区数据拷贝到

         typedmemmove(c.elemtype, ep, qp) 

      }

      // 将发送者的数据拷贝到缓冲区

      typedmemmove(c.elemtype, qp, sg.elem)

      c.recvx++

      if c.recvx == c.dataqsiz {

         c.recvx = 0

      }

      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz

   }

   sg.elem = nil

   gp := sg.g

   unlockf()

   gp.param = unsafe.Pointer(sg)

   sg.success = true

   if sg.releasetime != 0 {

      sg.releasetime = cputicks()

   }

   goready(gp, skip+1) // 唤醒发送者协程

}

因此,接收者还是先接收缓冲区数据,再接收发送者的数据。其实就是按队列的先进先出顺序。

总结

留下两个问题:

发送者分别遇到无有休眠接收协程,有休眠接收协程,无接收协程且缓冲区没满,缓冲区满了四种情况该如何处理?

接收者分别遇到无休眠发送协程且缓冲区为空,无发送协程且缓冲区有数据,有休眠发送协程且缓冲区已满,缓冲区满了四种情况该如何处理?