channel是golang的一种重要特性,正是因为channel的存在才使得golang不同于其它语言。channel使得并发编程变得简单容易有趣。这篇文章介绍一下 Golang channel 的内部实现原理,包括 channel 的数据结构以及相关操作的代码实现。

Golang版本

➜  ~ go version
go version go1.8.3 darwin/amd64

Golang Channel的底层数据结构及解析

代码地址: runtime/chan.go

type hchan struct {
    //在队列中的数据总个数
    qcount uint // total data in the queue
    //当前channel的容量
    dataqsiz uint // size of the circular queue
    //指向环形数组队列
    buf unsafe.Pointer // points to an array of dataqsiz elements
    //channel中数据类型的大小
    elemsize uint16
    //channel close标志
    closed uint32
    //元素类型
    elemtype *_type // element type
    //send 和 recieve 的索引,用于实现环形数组队列
    sendx uint // send index
    recvx uint // receive index
    //存放阻塞在-> ch动作的go程队列
    recvq waitq // list of recv waiters
    //存放阻塞在ch<-动作的go程队列
    sendq waitq // list of send waiters
    lock mutex
}

阅读以上源码我们可以发现channel其实就是一个队列加一个锁,只不过这个锁是一个轻量级锁。其中 recvq 是读操作阻塞在 channel 的 goroutine 列表,sendq 是写操作阻塞在 channel 的 goroutine 列表。列表的实现是 sudog,其实就是一个对 g 的结构的封装。

waitq其实是一个队列

type waitq struct {
    first *sudog
    last  *sudog
}

make的时候channel做了什么

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    //判断元素类型大小是否安全
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    //判断对齐限制
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    //判断size是否过小(小于0),是否过大(大于系统为堆arena分配的最大值-1)
    if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
        panic(plainError("makechan: size out of range"))
    }

    var c *hchan
    //channel不含指针或者channel的buffer为0(即无缓冲channle)
    if elem.kind&kindNoPointers != 0 || size == 0 {
        // Allocate memory in one call.
        // Hchan does not contain pointers interesting for GC in this case:
        // buf points into the same allocation, elemtype is persistent.
        // SudoG's are referenced from their owning thread so they can't be collected.
        // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
        //在堆上分配连续的空间用作 channel
        //-->tip:当是一个小对象(<=32KB)的时候就从 per-P的空闲列表上分配;如果是个大对象(>32KB)的时候就在堆上直接分配。
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            // race detector uses this location for synchronization
            // Also prevents us from pointing beyond the allocation (see issue 9401).
            c.buf = unsafe.Pointer(c)
        }
    } else {
        //有缓冲 channel 初始化
        c = new(hchan)
        //堆上分配指定大小buf
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}

make的时候前面的两个 if 是一些异常判断:元素类型大小限制和对齐限制。第三个if也很明确,判断size是否过小(小于0)或者是否过大,int64(uintptr(size)) != size 这句也是判断 size 是否为负。值得一说的是最后面的判断条件

uintptr(size) > (_MaxMem-hchanSize)/elem.size
    // _MaxMem is the maximum heap arena size minus 1.
    //
    // On 32-bit, this is also the maximum heap pointer value,
    // since the arena starts at address 0.
    _MaxMem = 1<<_MHeapMap_TotalBits - 1

_MaxMem是系统为堆arena分配的最大值,也就是说channel是在堆上分配的

继续往下看是如何分配的,如果 channel 内数据类型不含有指针且 size > 0,则将其分配在连续的内存区域。如果 size = 0,实际上 buf 是不分配空间的。

来看下这个函数

// Allocate an object of size bytes.
// Small objects are allocated from the per-P cache's free lists.
// Large objects (> 32 kB) are allocated straight from the heap.
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer

解释很明确,小对象分配在per-P缓存的空闲队列上
大对象(>32kB)直接分配在在堆上

另外一种情况是size>0,channel 和 channel.buf 是分别进行分配的。

func newarray(typ *_type, n int) unsafe.Pointer {
    if n < 0 || uintptr(n) > maxSliceCap(typ.size) {
        panic(plainError("runtime: allocation size out of range"))
    }
    return mallocgc(typ.size*uintptr(n), typ, true)
}

newarray也是调用mallocgc来分配内存的

总结一下,make chan 的过程是在堆上进行分配,返回是一个 hchan 的指针。

send

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil { //①当 channel 未初始化或为 nil 时,向其中发送数据将会永久阻塞
        if !block {
            return false
        }
        //gopark 会使当前 goroutine 休眠,并通过 unlockf 唤醒,但是此时传入的 unlockf 为 nil, 因此,goroutine 会一直休眠
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not closed, we observe that the channel is
    // not ready for sending. Each of these observations is a single word-sized read
    // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
    // Because a closed channel cannot transition from 'ready for sending' to
    // 'not ready for sending', even if the channel is closed between the two observations,
    // they imply a moment between the two when the channel was both not yet closed
    // and not ready for sending. We behave as if we observed the channel at that moment,
    // and report that the send cannot proceed.
    //
    // It is okay if the reads are reordered here: if we observe that the channel is not
    // ready for sending and then observe that it is not closed, that implies that the
    // channel wasn't closed during the first observation.
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {//②
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    //获取同步锁
    lock(&c.lock)
    //向已经关闭的 channel 发送消息会产生 panic
    if c.closed != 0 {//③
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    //当有 goroutine 在 recv 队列上等待时,跳过缓存队列,将消息直接发给 reciever goroutine
    if sg := c.recvq.dequeue(); sg != nil {④
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    //缓存队列未满
    if c.qcount < c.dataqsiz {⑤
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        //则将消息复制到缓存队列上
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    if !block {
        unlock(&c.lock)
        return false
    }

    //缓存队列已满,处理 send 队列
    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog() //初始化 sudog
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    //加入队列
    c.sendq.enqueue(mysg)
    //休眠
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)//⑥

    // someone woke us up.
    //唤醒 goroutine
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

先来看下channel为nil的情况,也就是说向未make的channel发送数据,我们知道会直接报 fatal error: all goroutines are asleep - deadlock! 错误。这是为什么呢?我们来看一下

if c == nil { //当 channel 未初始化或为 nil 时,向其中发送数据将会永久阻塞
    if !block {
        return false
    }
    //gopark 会使当前 goroutine 休眠,并通过 unlockf 唤醒,但是此时传入的 unlockf 为 nil, 因此,goroutine 会一直休眠
    gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
    throw("unreachable")
}
// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) 

我们注意一下这个函数

// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).
func goparkunlock(lock *mutex, reason string, traceEv byte, traceskip int) {
    gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}

明确告诉我们,将当前goroutime置未等待状态,解锁当前的锁,直到重新调用goready(gp)
重新唤醒

①我们传入的unlockf未nil,gopark 会使当前 goroutine 休眠,并通过 unlockf 唤醒,但是此时传入的 unlockf 为 nil, 因此,goroutine 会一直休眠
②判断非缓存channel是否关闭,在未关闭的情况下且头结点是否为nil
③向已经关闭的channel发送数据直接panic(plainError("send on closed channel"))
④当有 goroutine 在 recv 队列上等待时,此时 hchan.buf 为空,跳过缓存队列,将消息直接发给 reciever goroutine
⑤缓存buf未满的情况下直接将goroutine 加入 send 队列
⑥当缓存buf满的时候,阻塞当前channel

总结一下:
当goroutime阻塞在channel上,此时 hchan.buf 为空,当有 goroutine 在 recv 队列上等待时,跳过缓存队列,将消息直接发给 reciever ,当buf未满的时候,直接将goroutime放到buf中,当buf满的时候,阻塞当前channel

recv

读取 channel ( <-c )和发送的情况非常类似。

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) 

当channel未nil的时候

if c == nil {
    if !block {
        return
    }
    gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
    throw("unreachable")
    ...//不再累述读者可以自己去分析
}

closed channel

func closechan(c *hchan) {
    if c == nil {//①
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    if c.closed != 0 { //②close已经closed的channle直接pani
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc(unsafe.Pointer(&c))
        racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
        racerelease(unsafe.Pointer(c))
    }

    c.closed = 1

    var glist *g

    // release all readers
    for {//③
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        gp.schedlink.set(glist)
        glist = gp
    }

    // release all writers (they will panic)//④
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        gp.schedlink.set(glist)
        glist = gp
    }
    unlock(&c.lock)//

    // Ready all Gs now that we've dropped the channel lock.
    for glist != nil {
        gp := glist
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

①若关闭一个未make的channel直接panic(plainError("close of nil channel"))
②若关闭一个已经closed的channel直接panic(plainError("close of nil channel"))
③循环遍历recvq上所有的gorutime,并释放
④循环遍历send上所有的gorutime,并释放

总结一下:
处理方式是分别遍历 recvq 和 sendq 队列,将所有的 goroutine 放到 glist 队列中,最后调用goready唤醒 glist 队列中的 goroutine。

参考链接:Go Channel 源码剖析

文章目录