【Golang教程】go 并发之 chan

零 Golang教程评论81字数 22870阅读76分14秒阅读模式

一、简述

  • 传统的多线程间通信方式是通常是以共享内存实现的,多个线程同时读写某个内存上的变量,并通过互斥锁解决并发读写问题,<span style="color: red;">需要开发人员手动管理锁,可能会导致竞态条件和死锁问题,以及性能问题;
  • chan 实现了 channel 管道,多个 goroutine 之间可以通过向管道读写数据来进行通信,管道有自己的缓冲区和读写等待队列,<span style="color: red;">即使多个 goroutine 同时发送或接收数据,也不会产生竞态条件和死锁问题,其也是通过锁来解决一些并发问题,但是锁的粒度非常小;
  • 另外,chan 还可以通过 select 语句实现非阻塞式通信,当无法从 chan 读取或写入数据时,<span style="color: red;">可以不阻塞 goroutine,继续处理后续逻辑;共享内存通信时,<span style="color: red;">可能因为竞态条件会存在某个线程长时间获取不到锁,导致整个程序长时间阻塞住无法处理后续逻辑(高并发时的性能瓶颈);

二、基本原理

  • chan 能够创建指定大小的环形缓冲区,该大小也可以是 0,并分别创建了用于 goroutine 阻塞等待的生产队列和消费队列;
  • 当 goroutine 从 chan 中读写数据时,如果能够直接读写数据成功,则操作完后直接返回,并更新索引;
  • 如果因为 chan 中没有数据,或是没有等待的生产者写数据,那么消费者 goroutine 就会进入消费队列阻塞等待,直到管道关闭或有消费者写数据时被唤醒,并继续后续操作;
  • 如果因为 chan 中数据满了,或是没有等待的消费者读数据,那么生产者 goroutine 就会进入生产队列阻塞等待,直到管道关闭或有生产者写数据时被唤醒,并继续后续操作;
  • 如果是通过 select 非阻塞的方式从 chan 中读写数据,即使无法读写,也不会阻塞等待,直接返回;

三、基本用法

1,应用场景

1.1 任务定时

  • 与 timer 结合,实现超时控制,等待 100 ms 后,如果 s.stopc 还没有读出数据或者被关闭,就直接结束;

go

复制代码
select {
	case <-time.After(100 * time.Millisecond):
	case <-s.stopc:
		return false
}

1.2 解耦生产与消费

  • 服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for {} 无限循环里,从某个 channel 消费工作任务并执行;
  • 5 个工作协程在不断地从工作队列里取任务,生产方只管往 channel 发送任务即可,解耦生产方和消费方;

go

复制代码
func main() {
	taskCh := make(chan int, 100)
	go worker(taskCh)

    // 塞任务
	for i := 0; i < 10; i++ {
		taskCh <- i
	}

    // 等待 1 小时
	select {
	case <-time.After(time.Hour):
	}
}

func worker(taskCh <-chan int) {
	const N = 5
	// 启动 5 个工作协程
	for i := 0; i < N; i++ {
		go func(id int) {
			for {
				task := <- taskCh
				fmt.Printf("finish task: %d by worker %dn", task, id)
				time.Sleep(time.Second)
			}
		}(i)
	}
}

1.3 控制并发数

  • 构建一个缓冲型的 channel,容量为 3,接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务在 w() 中完成,在执行 w() 之前,要先往 channel 中写数据,如果阻塞住了,则说明当前执行 w() 任务的 goroutine 已经达到上限;如果写入成功,则说明未达到上线,能够占一个可执行的位置,直接执行,执行完后将数据从 chan 中读取,让出占位给其他 goroutine 继续执行;这样就可以控制同时运行的 w() 数量;
  • 这里,limit <- 1 放在 func 内部而不是外部,如果在外层就是控制系统 goroutine 的数量,可能会阻塞 for 循环,影响业务逻辑;如果 w() 发生 panic,那可能就无法让出占位,因此需要使用 defer 来保证;

go

复制代码
var limit = make(chan int, 3)

func main() {
    // …………
    for _, w := range work {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    // …………
}

2,注意事项

  • <span style="color: red;">关闭一个 nil 管道会 panic;
  • <span style="color: red;">关闭一个已经关闭的管道会 panic;
  • <span style="color: red;">向一个已经关闭的管道写数据会 panic;
  • <span style="color: red;">向一个 nil 管道读写数据会永久阻塞;

3,chan 的常见关闭方式

3.1 M 个接收者和一个发送者

  • <span style="color: red;">发送者通过关闭用来传输数据的通道来传递发送结束信号

go

复制代码
package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano()) // Go 1.20之前需要
	log.SetFlags(0)

	const Max = 100000
	const NumReceivers = 100

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	dataCh := make(chan int)

	// 发送者
	go func() {
		for {
			if value := rand.Intn(Max); value == 0 {
				// 此唯一的发送者可以安全地关闭此数据通道。
				close(dataCh)
				return
			}
			dataCh <- value
		}
	}()

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func() {
			defer wgReceivers.Done()

			// 接收数据直到通道dataCh已关闭并且dataCh的缓冲队列已空。
			for value := range dataCh {
				log.Println(value)
			}
		}()
	}

	wgReceivers.Wait()
}

3.2 一个接收者和 N 个发送者

  • 不能让接收者关闭用来传输数据的通道来停止数据传输,因为这样做违反了通道关闭原则;
  • <span style="color: red;">可以让接收者关闭一个额外的信号通道来通知发送者不要再发送数据了;
  • 对于此额外的信号通道 stopCh,它只有一个发送者,即 dataCh 数据通道的唯一接收者,dataCh 数据通道的接收者关闭了信号通道 stopCh,这是不违反通道关闭原则的;
  • 在此例中,数据通道 dataCh 并没有被关闭,当一个通道不再被任何协程所使用后,它将逐渐被垃圾回收掉,无论它是否已经被关闭;

go

复制代码
package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano()) // Go 1.20之前需要
	log.SetFlags(0)

	const Max = 100000
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(1)

	dataCh := make(chan int)
	stopCh := make(chan struct{})
	// stopCh是一个额外的信号通道。它的发送者为dataCh数据通道的接收者。它的接收者为dataCh数据通道的发送者。

	// 发送者
	for i := 0; i < NumSenders; i++ {
		go func() {
			for {
				// 这里的第一个尝试接收用来让此发送者协程尽早地退出。对于这个特定的例子,此select代码块并非必需。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已经关闭,此第二个select代码块中的第一个分支仍很有可能在若干个循环步内依然不会被选中。如果这是不可接受的,则上面的第一个select代码块是必需的。
				select {
				case <- stopCh:
					return
				case dataCh <- rand.Intn(Max):
				}
			}
		}()
	}

	// 接收者
	go func() {
		defer wgReceivers.Done()

		for value := range dataCh {
			if value == Max-1 {
				// 此唯一的接收者同时也是stopCh通道的唯一发送者。尽管它不能安全地关闭dataCh数据通道,但它可以安全地关闭stopCh通道。
				close(stopCh)
				return
			}

			log.Println(value)
		}
	}()

	wgReceivers.Wait()
}

3.3 M 个接收者和 N 个发送者

  • 不能让接收者和发送者中的任何一个关闭用来传输数据的通道,也不能让多个接收者之一关闭一个额外的信号通道,这两种做法都违反了通道关闭原则;
  • <span style="color: red;">可以引入一个中间调解者角色并让其关闭额外的信号通道来通知所有的接收者和发送者结束工作;
  • 信号通道 toStop 的容量必须至少为 1,如果它的容量为 0,则在中间调解者还未准备好的情况下就已经有某个协程向 toStop 发送信号时,此信号将被抛弃;
  • 也可以不使用尝试发送操作向中间调解者发送信号,但信号通道 toStop 的容量必须至少为数据发送者和数据接收者的数量之和,以防止向其发送数据时(有一个极其微小的可能)导致某些发送者和接收者协程永久阻塞;

go

复制代码
package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
	"strconv"
)

func main() {
	rand.Seed(time.Now().UnixNano()) // Go 1.20之前需要
	log.SetFlags(0)


	const Max = 100000
	const NumReceivers = 10
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	dataCh := make(chan int)
	// stopCh是一个额外的信号通道。它的发送者为中间调解者。它的接收者为dataCh数据通道的所有的发送者和接收者。
	stopCh := make(chan struct{})
	// toStop是一个用来通知中间调解者让其关闭信号通道stopCh的第二个信号通道。
	// 此第二个信号通道的发送者为 dataCh 数据通道的所有的发送者和接收者,它的接收者为中间调解者,它必须为一个缓冲通道。
	toStop := make(chan string, 1)

	var stoppedBy string

	// 中间调解者
	go func() {
		stoppedBy = <-toStop
		close(stopCh)
	}()

	// 发送者
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				if value == 0 {
					// 为了防止阻塞,这里使用了一个尝试发送操作来向中间调解者发送信号。
					select {
					case toStop <- "发送者#" + id:
					default:
					}
					return
				}

				// 此处的尝试接收操作是为了让此发送协程尽早退出。标准编译器对尝试接收和尝试发送做了特殊的优化,因而它们的速度很快。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已关闭,如果这个select代码块中第二个分支的发送操作是非阻塞的,则第一个分支仍很有可能在若干个循环步内依然不会被选中。
				// 如果这是不可接受的,则上面的第一个尝试接收操作代码块是必需的。
				select {
				case <- stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()

			for {
				// 和发送者协程一样,此处的尝试接收操作是为了让此接收协程尽早退出。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已关闭,如果这个select代码块中第二个分支的接收操作是非阻塞的,则第一个分支仍很有可能在若干个循环步内依然不会被选中。
				// 如果这是不可接受的,则上面尝试接收操作代码块是必需的。
				select {
				case <- stopCh:
					return
				case value := <-dataCh:
					if value == Max-1 {
						// 为了防止阻塞,这里使用了一个尝试发送操作来向中间调解者发送信号。
						select {
						case toStop <- "接收者#" + id:
						default:
						}
						return
					}

					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}

	wgReceivers.Wait()
	log.Println("被" + stoppedBy + "终止了")
}

四、源码解读

1,hchan

go

复制代码
type hchan struct {
	qcount   uint           // chan 中的数据量
	dataqsiz uint           // chan 循环队列的长度,底层是数组
	buf      unsafe.Pointer // chan 指向底层缓冲数组的指针
	elemsize uint16         // chan 中元素大小
	closed   uint32         // chan 是否被关闭,非0表示关闭
	elemtype *_type         // chan 中元素类型
	sendx    uint           // 生产队列可发送的元素在数组中的索引,即从此处开始写入
	recvx    uint           // 消费队列可接收的元素在数组中的索引,即从此处开始消费
	recvq    waitq          // 等待接收数据的goroutine队列,消费队列
	sendq    waitq          // 等待发送数据的goroutine队列,生产队列

	// 锁定保护 hchan 中的所有字段,以及阻塞在此通道上的 sudogs 中的多个字段。
	// 在保持此锁时不要更改另一个 G 的状态(特别是不要把 G 转为准备状态),因为这可能会导致堆栈收缩而死锁。
	lock mutex
}
const (
	// 用来设置内存最大对齐值,对应就是64位系统下cache line的大小
	maxAlign = 8
	//  向上进行 8 字节内存对齐,假设hchan{}大小是14,hchanSize是16。
	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
	debugChan = false
)
  • hchan 是 chan 的结构体类型,其底层是一个数组,包含数组的地址、长度、数组中元素的个数、元素的大小、元素的类型,以及生产队列、消费队列及其对应的索引,其基本结构图如下所示;
  • hchanSize 表示了 hchan 结构体所占内存字节数,进行了字节对齐,对齐单位为 maxAlign;chan基本结构图

2,waitq

go

复制代码
// goroutine 的生产队列或消费队列
type waitq struct {
	first *sudog // 指向goroutine队列的第一个
	last  *sudog // 指向goroutine队列的最后一个
}
// 将 sudog 添加到协程的等待队列的尾部
func (q *waitq) enqueue(sgp *sudog) {
	sgp.next = nil
	x := q.last
	if x == nil {
		sgp.prev = nil
		q.first = sgp
		q.last = sgp
		return
	}
	sgp.prev = x
	x.next = sgp
	q.last = sgp
}
// 从协程的等待队列首部取出 sudog
func (q *waitq) dequeue() *sudog {
	for {
		// 获取队列中的首个协程
		sgp := q.first
		if sgp == nil {
			// 为空则直接返回
			return nil
		}
		y := sgp.next
		if y == nil {
			// 如果该协程下个协程为空,则整个队列都为空
			q.first = nil
			q.last = nil
		} else {
			// 否则将下个协程的前置指针置空
			y.prev = nil
			// 将下个赋给首位
			q.first = y
			// 将要出队的协程的后置指针置空,切断与其他协程的联系
			sgp.next = nil
		}

		// 如果一个 goroutine 因为 select 而被放到这个队列上,那么在被不同情况唤醒的 goroutine 和它抓取通道锁之间有一个小窗口。
		// 一旦它有了锁,它就会将自己从队列中删除,所以之后我们不会看到它。
		// 在 G 结构中使用一个标志来发出这个 goroutine 的信号,其他人何时赢得了竞争但 goroutine 还没有将自己从队列中删除
		if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
			continue
		}

		return sgp
	}
}
  • waitq 定义了一个goroutine 的队列结构,分别存储了指向队列头部和尾部的 goroutine 指针,并定义了进出队列的方法;
  • enqueue 将 goroutine 添加到该队列的尾部,并更新指向尾部的指针;
  • dequeue 从队列头部中取出一个 goroutine,并更新指向队列头部的指针;

3,makechan

go

复制代码
func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	// 编译器校验类型,但是是非安全的
	if elem.size >= 1<<16 { // 元素类型判断
		throw("makechan: invalid channel element type")
	}
	// 元素对齐方式校验
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}
	// chan大小乘以元素大小,得到要分配的内存大小,需要判断乘积是溢出,以及是否超过了最大内存分配等
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// 如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
	var c *hchan
	switch {
	case mem == 0:
		// 当chan为无缓冲或元素为空结构体时,需要分配的内存为0,仅分配需要存储chan的内存
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 元素不包含指针。在一次调用中分配 hchan 和 buf。
		// 当通道数据元素不含指针,hchan和buf内存空间调用mallocgc一次性分配完成,hchanSize用来分配通道的内存,mem用来分配buf的内存
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		// c 为指向 hchan 的指针,再加上该结构的大小 hchanSize,即可得到指向 buf的指针,它们在内存上是连续的
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 通道中的元素包含指针,分别创建 chan 和 buf 的内存空间,方便 gc 进行内存回收
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)   // 元素大小
	c.elemtype = elem                // 元素类型
	c.dataqsiz = uint(size)          // chan 的容量
	lockInit(&c.lock, lockRankHchan) // todo ?
	return c
}
  • 此函数用于根据元素类型和管道长度申请内存以创建 chan,通过 make 创建 chan 时会调用此函数;
  • 首先会对元素类型和元素大小进行校验,以及根据元素大小和管道长度计算需要申请的数组内存大小,并判断是否越界;
  • 当需要申请的数组内存大小为 0 时,说明创建是无缓冲或元素为空结构体的管道,此时只需要申请 hchan 结构体的内存
  • <span style="color: red;">当需要申请的数组内存大小不为 0 时,且元素是不含指针的类型,则直接将 hchan 结构体和数组一起申请内存,只需要调用 mallocgc 一次即可分配完成,数组的内存在 hchan 之后,并将数组内存的首地址赋给 buf 字段
  • <span style="color: red;">当需要申请的数组内存大小不为 0 时,且元素是含指针的类型,对 hchan 和数组分别申请内存,以方便 gc 进行内存回收

4,chansend

go

复制代码
// go:nosplit 编译代码中 c <- X 的入口点
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}

// 返回 false 表示写入失败
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// 如果 block 为 false,goroutine 将不允许被阻塞,不等于非缓冲
	if c == nil {
		// 非阻塞模式下直接返回
		if !block {
			return false
		}
		// 阻塞模式下,直接阻塞当前写入协程
		// 调用 gopark 将当前 goroutine 休眠,关闭 nil 的 chan 会 panic,故当前 goroutine 会一直休眠,陷入死锁
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// c 不为 nil,非阻塞模式,且chan没有关闭,但已经满了
	if !block && c.closed == 0 && full(c) {
		return false
	}

	// 执行到此处说明是以下3种情况中的某一种或两种
	// 1,阻塞模式,block==true;
	// 2,chan 已经关闭;
	// 3,管道非满的情况
	// 	3.1 无缓冲管道,且接收队列不为空;
	// 	3.2 缓冲管道,但缓冲管道未满
	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)
	// 2,chan 已经关闭;
	if c.closed != 0 { // todo 向一个关闭的通道写入数据会panic
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	// 执行到此处,说明管道是未关闭的,阻塞模式或管道非满
	// 从接收者队列 recvq 中取出一个接收者,接收者不为空情况下,直接将数据传递给该接收者
	// 3.1 无缓冲管道,且接收队列不为空;即使非阻塞能写则写
	if sg := c.recvq.dequeue(); sg != nil {
		// todo 非常细节,找到一个等待的接收器,将要发送的值直接传递给接收器,绕过通道缓冲区(如果有的话)。
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// 3.2 缓冲管道,但没有接收者,且缓冲管道未满;即使非阻塞能写则写
	if c.qcount < c.dataqsiz {
		// 找到最新能够写入元素的位置
		qp := chanbuf(c, c.sendx)
		// 将要写入的元素的值拷贝到该处
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++                  // 写入的位置往后移
		if c.sendx == c.dataqsiz { // 如果等于数组长度,则跳转到首位(循环队列)
			c.sendx = 0
		}
		c.qcount++ // chan 中的元素个数加一
		unlock(&c.lock)
		return true
	}

	// 执行到此处,说明如果是无缓冲管道则没有接收者,是缓冲管道则已经满了
	if !block {
		// 不允许阻塞,直接返回 false
		unlock(&c.lock)
		return false
	}
	// channel 未关闭,且已满,没有接收者,goroutine允许被阻塞,接下来让该 goroutine 阻塞等待
	// 获取当前发送数据的 goroutine,然后绑定到一个 sudog 结构体 (包装为运行时表示)
	gp := getg()
	mysg := acquireSudog()
	// 获取 sudog 结构体,并且设置相关字段 (包括当前的 channel,是否是 select 等)
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// 在 gp.waiting 上分配 elem 和将 mysg 排队之间没有堆栈拆分,copystack 可以找到它。
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	// 当前 goroutine 进入发送等待队列
	c.sendq.enqueue(mysg)
	// 向任何试图缩小我们的堆栈的人发出信号,表明我们将停在通道上。
	// 从这个 G 的状态更改到我们设置 gp.activeStackChans 之间的窗口对于堆栈收缩是不安全的。
	atomic.Store8(&gp.parkingOnChan, 1)
	// 挂起当前 goroutine, 进入休眠 (等待接收)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// 确保发送的值保持活动状态,直到接收方将其复制出来。sudog 具有指向堆栈对象的指针,但 sudog 不被视为堆栈跟踪器的根。
	KeepAlive(ep)

	// 从这里开始被唤醒了(channel 有机会可以发送了)
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success // 如果 goroutine 因为通过通道 c 传递了值而被唤醒,则为 true,如果因为 c 被关闭而唤醒,则为 false。
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	// 取消 sudog 和 channel 绑定关系
	mysg.c = nil
	releaseSudog(mysg) // 去掉 mysg 上绑定的 channel
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		// 被唤醒后,管道关闭了,todo 向一个关闭的管道发送数据会panic
		panic(plainError("send on closed channel"))
	}
	return true
}
  • chansend 用于向管道中写入数据,对于当前 goroutine 是否允许被阻塞,分为不同的处理方式;
  • <span style="color: red;">如果 chan 为 nil,非阻塞模式下直接返回 false,阻塞模式下 goroutine 被挂起,会一直休眠陷入死锁,故直接抛出错误并退出,错误内容:"fatal error: all goroutines are asleep - deadlock!";
  • 如果 chan 不为 nil,非阻塞模式,且 chan 没有关闭,但已经满了,则直接返回 false,即在不获取锁的情况下快速处理;
  • <span style="color: red;">如果 chan 已经关闭,则直接 panic,即向一个关闭的通道写入数据会 panic
  • 尝试从阻塞等待的消费者队列中取出一个 goroutine:
    • 如果能够能够取到,无论是阻塞模式还是非阻塞模式,<span style="color: red;">直接将数据从生产者的 goroutine 拷贝到消费者的 goroutine 并提前返回 true;
    • 如果当前管道是缓冲管道且又能够从消费者队列获取 goroutine 则说明缓冲区为空,<span style="color: red;">直接拷贝能够避免将数据先从生产者 goroutine 拷贝到缓冲区再从缓冲区拷贝到消费者 goroutine 的二次拷贝带来的性能损耗
  • 如果没有消费者,且缓冲区未满,即使非阻塞也是能写则写,直接将数据从生产者 goroutine 拷贝到缓冲区写入索引处,并更新写入索引和元素个数,提前返回 true;
  • 如果没有消费者,且缓冲区已满,无论是缓冲管道还是非缓冲管道,生产者 goroutine 此时是无法写入数据的,对于非阻塞模式,直接返回 false;
  • 对于阻塞模式,<span style="color: red;">将该 goroutine 添加到该 chan 的生产者队列,并挂起阻塞等待直到被唤醒,唤醒的方式分为两种:
    • <span style="color: red;">消费者 goroutine 从阻塞等待的生产者队列中获取到该 goroutine 并从其拷贝数据,然后将其唤醒
    • <span style="color: red;">该 chan 被关闭了,唤醒所有的等待的 goroutine
  • 在被其他 goroutine 唤醒前,会先将其从 chan 的生产队列中出队;
  • 被唤醒后,该 goroutine 会与当前的 chan 解除绑定,并判断被唤醒的原因,<span style="color: red;">如果是因为 chan 关闭而被唤醒则直接 panic,即无法向一个关闭的 chan 写数据,如果是被其他 goroutine 唤醒则返回 true;

go

复制代码
// send 通过 unlockf 解锁 c,sg 必须已从 c 的消费者队列中出队,ep 必须为非 nil 并指向堆或调用方的堆栈,c 必须是无缓冲或是空的
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	// sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
	if sg.elem != nil {
		// 直接拷贝内存(从发送者到接收者)
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true // 因为用数据写入而唤醒 goroutine
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	// 唤醒接收的 goroutine. skip 和打印栈相关
	// 调用 goready 函数将接收方 goroutine 唤醒并标记为可运行状态
	// 并把其放入发送方所在处理器 P 的 runnext 字段等待执行,runnext 字段表示最高优先级的 goroutine
	goready(gp, skip+1)
}
// src 在当前 goroutine 的栈上,dst 是另一个 goroutine 的栈
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// 一旦我们从 sg 中读取 sg.elem,如果 dst 的堆栈被复制(缩小),它将不再更新。
	// 因此,请确保在读取和使用之间不会发生抢占点,需要在读和写之前加上一个屏障

	// 向一个非缓冲型的 channel 发送数据、从一个无元素的(非缓冲型或缓冲型但空)的 channel
	// 接收数据,都会导致一个 goroutine 直接操作另一个 goroutine 的栈
	// 由于 GC 假设对栈的写操作只能发生在 goroutine 正在运行中并且由当前 goroutine 来写
	// 所以这里实际上违反了这个假设。可能会造成一些问题,所以需要用到写屏障来规避
	dst := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	// 不需要 cgo 写入屏障检查,因为 dst 始终是 Go 内存。
	memmove(dst, src, t.size)
}
  • send 处理无缓冲区或是空管道 c 上的写入操作,生产者 goroutine 上待写入的值 ep 被复制到消费者 sg,管道 c 必须为空并锁定(此时才能直接在生产者和消费者两个 goroutine 之间拷贝数据)
  • 同时将 <span style="color: red;">sg.success 设置为 true,表示该消费者 goroutine 是因为读到了数据而被唤醒,然后唤醒该 goroutine

1,对于缓冲管道,当消费队列不为空时,说明管道中没有数据;当生产队列不为空时,说明管道已经满了;(不可能同时存在消费队列和生产队列都不为空的情况???)<br> 2,对于非缓冲管道,当消费队列不为空时,说明此时生产队列为空,消费队列被阻塞等待中;当生产队列不为空时,说明消费队列为空,生产队列被阻塞等待中;

5,chanrecv

go

复制代码
// <- c 代码的编译入口
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}
// selected 表示该 goroutine 是否被选中,用于 select 函数的 case 判断
// recevied 表示是否成功从 c 中接收到数据
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	if c == nil {
		if !block {
			return // channel 为 nil,非阻塞模式下直接返回
		}
		// 阻塞模式下,调用 gopark 将当前 goroutine休眠,调用 gopark 时候,将传入 unlockf 设置为 nil,当前 goroutine 会一直休眠
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// empty 函数返回 true 的情况:
	//    1. 无缓冲 channel 并且没有发送方正在阻塞
	//    2. 有缓冲 channel 并且缓冲区没有数据
	if !block && empty(c) {
		// 非阻塞模式,且 c 为空
		if atomic.Load(&c.closed) == 0 {
			// 非阻塞、无数据、且未关闭,直接返回
			// 因为 channel 关闭后就无法再打开,所以只要 channel 未关闭,上述方法都是原子操作 (看到的结果都是一样的)
			return
		}

		// channel 已经关闭,重新检查 channel 是否存在等待接收的数据
		if empty(c) {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep) // 没有任何等待接收的数据,清理 ep 指针中的数据
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)

	// channel 已经关闭,且没有数据
	if c.closed != 0 && c.qcount == 0 {
		unlock(&c.lock) // 解锁
		if ep != nil {
			typedmemclr(c.elemtype, ep) // 清理 ep 指针中的数据
		}
		return true, false
	}

	// channel 未关闭,或关闭了但是还有数据
	// 还有阻塞的发送者协程,说明没有缓冲区或是缓冲区已满
	if sg := c.sendq.dequeue(); sg != nil {
		// 从发送队列获取第一个发送者协程
		// 如果是无缓冲区,直接从发送 goroutine 拷贝数据到接收数据的地址
		// 否则,缓冲区已满,从接收队列头部的 goroutine 开始接收数据,并将数据添加到发送队列尾部的 goroutine
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	// 没有阻塞的发送者协程,但是channel里面还有数据
	if c.qcount > 0 {
		// 根据可消费数据的索引直接获取到要消费的数据的地址
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			// 直接从缓冲区的地址上拷贝数据到接收数据的地址
			typedmemmove(c.elemtype, ep, qp)
		}
		// 清除已经消费的数据
		typedmemclr(c.elemtype, qp)
		// 消费索引往后移
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		// 元素数量减一
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	// 没有等待的发送者协程,缓冲区没有数据,且非阻塞的,直接返回
	if !block {
		unlock(&c.lock)
		return false, false
	}

	// 没有等待的发送者协程,缓冲区没有数据,且阻塞的
	// 获取当前接收的协程 goroutine,然后绑定到一个 sudog 结构体 (包装为运行时表示)
	gp := getg()
	mysg := acquireSudog() // 获取 sudog 结构体,并设置相关参数
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep // 设置接收数据的地址
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp           // 设置 goroutine
	mysg.isSelect = false // 设置是否 select
	mysg.c = c            // 设置当前的 channel
	gp.param = nil
	c.recvq.enqueue(mysg) // 进入接收队列等待

	// 给任何试图缩小我们堆栈的人发信号告诉他们我们要停在一个 chan 上。当这个G的状态改变时和我们设置 gp.activeStackChans 之间的窗口对于堆栈收缩是不安全的。
	atomic.Store8(&gp.parkingOnChan, 1)
	// 挂起当前 goroutine, 进入休眠 (等待发送方发送数据),阻塞中
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// 因为某种原因而被唤醒,重新获取gp
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	// todo 被唤醒的原因,true,因为写入了数据,false,因为关闭了管道
	success := mysg.success
	gp.param = nil
	mysg.c = nil       // 取消 sudog 和 channel 绑定关系
	releaseSudog(mysg) // 释放 sudog
	return true, success
}
  • chanrecv 用于从管道中接收数据,与 chansend 一样,对于当前 goroutine 是否允许被阻塞,分为不同的处理方式:
  • <span style="color: red;">如果 chan 为 nil,非阻塞模式下直接返回 false、false,阻塞模式下 goroutine 被挂起,会一直休眠陷入死锁,故直接抛出错误并退出
  • 如果 chan 不为 nil,非阻塞模式下,且 chan 为空,如果 chan 没关闭,则直接返回 false、false;如果 chan 已经关闭,则清理 ep 指针中的数据,并返回 true、false;
  • 如果 chan 已经关闭,且无数据,对阻塞模式,同样是清理 ep 指针中的数据,并返回 true、false;
  • 如果 chan 未关闭,或是已经关闭了但还有数据,则尝试从阻塞等待的生产队列中取出一个 goroutine:
    • 如果能够能够取到,无论是阻塞模式还是非阻塞模式,<span style="color: red;">直接将数据从生产者的 goroutine 拷贝到消费者的 goroutine 并提前返回 true、true,同上能够避免二次拷贝;
  • 如果没有生产者,且缓冲区还有数据,即使非阻塞型也是能读则读,直接将数据从缓冲区读出索引出拷贝到消费者 goroutine,并封信读出索引和元素个数,提前返回 true、true;
  • 没有生产者,缓冲区没有数据,无论是缓冲管道还是非缓冲管道,消费者 goroutine 此时是无法读取到数据的,对于非阻塞模式,直接返回 false、false;
  • 对于阻塞模式,<span style="color: red;">将该 goroutine 添加到该 chan 的消费者队列,并挂起阻塞等待直到被唤醒,唤醒的方式分为两种:
    • <span style="color: red;">生产者 goroutine 从阻塞等待的消费者队列中获取到该 goroutine 并拷贝数据给它,然后将其唤醒
    • <span style="color: red;">该 chan 被关闭了,唤醒所有的等待的 goroutine
  • 在被其他 goroutine 唤醒前,会先将其从 chan 的消费队列中出队;
  • 被唤醒后,该 goroutine 会与当前的 chan 解除绑定,并判断被唤醒的原因,<span style="color: red;">如果是因为 chan 关闭而被唤醒则返回 true、false,即无法向一个关闭的 chan 写数据,如果是被其他 goroutine 唤醒则返回 true、true;

go

复制代码
// recv 用 unlockf 解锁 c,sg 必须已经从 c 出队,非 nil 的 ep 必须指向堆或调用者的堆栈,c 必须是无缓冲或是满的
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	// 还有阻塞的发送者协程,说明没有缓冲区或是缓冲区已满
	if c.dataqsiz == 0 {
		// 无缓冲区
		if ep != nil {
			recvDirect(c.elemtype, sg, ep) // 直接从发送者 goroutine 将数据拷贝到接收者 goroutine 的 ep 上
		}
	} else {
		// 缓冲区已满,从消费索引处获取数据的指针
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			// 将消费索引处的数据拷贝到接收数据的指针
			typedmemmove(c.elemtype, ep, qp)
		}

		// 因为缓冲区已经满了,所以生产索引和消费索引是同一个位置,直接将发送者协程的数据拷贝到消费索引处
		typedmemmove(c.elemtype, qp, sg.elem)

		c.recvx++ // 消费索引加一
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		// 缓冲区是满的,两者相等,元素数量不变
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil // 发送者协程的数据指针置空
	gp := sg.g
	unlockf() // 解锁
	gp.param = unsafe.Pointer(sg)
	sg.success = true // 因为写入值成功而被唤醒
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	// 调用 goready 函数将接收方 goroutine 唤醒并标记为可运行状态,并把其放入发送方所在处理器 P 的 runnext 字段等待执行
	goready(gp, skip+1)
}

// dst 在当前 goroutine 的栈上,src 是另一个 goroutine 的栈
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	// channel 被锁定,因此 src 在此操作期间不会移动
	src := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}
  • recv 处理无缓冲区或是满管道 c 上的读出操作,对于无缓冲管道,直接从生产者 goroutine 拷贝数据到消费者 goroutine
  • 对于缓冲性满管道,写入和读出索引是相同的,直接从管道的读出索引处拷贝数据到消费者者 goroutine,并从生产者 goroutine 拷贝数据到管道的该处,同时更新生产和写入索引;
  • 将发送者协程的数据指针置空,并将 sg.success 置为 true,表示该生产者 goroutine 是因为写入值成功而被唤醒,然后唤醒该 goroutine;

6,closechan

go

复制代码
func closechan(c *hchan) {
	if c == nil { // todo 关闭一个空的 chan 会 panic
		panic(plainError("close of nil channel"))
	}
	// 加锁,这个锁的粒度比较大,会持续到释放完所有的 sudog 才解锁
	lock(&c.lock)
	if c.closed != 0 { // todo 关闭一个已经关闭的 chan 会 panic
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	c.closed = 1    // 设置 channel 状态为已关闭
	var glist gList // 用于存放发送+接收队列中的所有 goroutine

	// 将接收队列中所有 goroutine 加入 gList 列表
	for {
		// 如果此通道的接收数据协程队列不为空(这种情况下,缓冲队列必为空),
		// 此队列中的所有协程将被依个弹出,并且每个协程将接收到此通道的元素类型的一个零值,然后恢复至运行状态
		sg := c.recvq.dequeue()
		if sg == nil {
			// 出队的 sudog 为 nil,说明发送队列为空,直接跳出循环
			break
		}
		// 如果 elem 不为空,说明此 receiver 未忽略接收数据,给它赋一个相应类型的零值
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		// 取出 goroutine
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false // todo import 因为关闭而唤醒,设为false
		glist.push(gp) // 将 sg 对应的 goroutine 添加到 glist 列表

	}

	// 将发送队列中所有 goroutine 加入 gList 列表
	// todo 如果存在,这些 goroutine 将会 panic,在写入处引发panic
	for {
		// 如果此通道的发送数据协程队列不为空,此队列中的所有协程将被依个弹出,并且每个协程中都将产生一个 panic(因为向已关闭的通道发送数据)。
		sg := c.sendq.dequeue()
		if sg == nil {
			break // 出队的 sudog 为 nil,说明发送队列为空,直接跳出循环
		}

		sg.elem = nil // 忽略发送协程的值
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false // todo import 因为关闭而唤醒,设为false
		glist.push(gp) // 将 sg 对应的 goroutine 添加到 glist 列表
	}

	unlock(&c.lock) // 解锁

	// 准备好所有 G,现在我们已经删除了通道锁。
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		// 唤醒所有线程,接收队列里的协程获取零值,继续后续执行,发送队列里的协程,触发panic
		goready(gp, 3)
		// 	唤醒发送和接收协程,发送协程从 chansend 中的 gopark 后开始执行;接收协程从 chanrecv 中的 gopark 后开始执行
	}
}

  • close 函数用于关闭管道 c,如果 c 是 nil 管道,直接 panic,关闭一个 nil 的 chan 会 panic;
  • close 函数先上一把大锁,将 c 设置为关闭状态,将所有阻塞等待的消费者 goroutine 出队,赋给其一个相应类型的零值,并设置 sg.success 为 false,表示是因为管道关闭而被唤醒的,然后将该 goroutine 加入到一个链表中;将所有阻塞等待的生产者 goroutine 出队,同样设置 sg.success 为 false,表示是因为管道关闭而被唤醒的,然后将该 goroutine 加入到一个链表中;解锁,然后将该链表中的所有 goroutine 唤醒;
  • 被唤醒后,生产者 goroutine 会继续执行 chansend 函数里 goparkunlock 函数之后的代码,检测到自己是因为 chan 关闭而被唤醒的,直接 panic;消费者 goroutine 同样会继续执行 chanrecv 函数里 goparkunlock 函数之后的代码,检测到自己是因为 chan 关闭而被唤醒的,返回 received 为 false,表示读取的是零值;
  • 关闭 chan 后,对于阻塞等待的消费者而言,会收到一个相应类型的零值,对于阻塞等待的生产者而言,会直接 panic,所以,在不了解 chan 还有没有接收者的情况下,不能贸然关闭 chan;

7,selectnb*

go

复制代码
// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
// select case 编译时,发送数据为非阻塞,即非阻塞型
// todo must important, 没有default时,select case 编译成 chansend1(c *hchan, elem unsafe.Pointer),即阻塞型
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}
  • 在 select 语句中通过 case 分支监听向管道 c 中写入数据:
    • 如果该 select 语句中没有 default 分支,则该语句会编译成 chansend1,阻塞型的,函数 chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)中的 block 参数为 true,如果无法写入数据时,该 goroutine 会在 chansend 函数中被阻塞并等待;
    • 如果有 default 分支,则该语句会被编译成 selectnbsend,即非阻塞型的,block 参数为 false,如果无法写入数据时,该 goroutine 会立即从 chansend 函数中返回,case 分支不会被命中,执行 default 分支代码;

go

复制代码
// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selected, ok = selectnbrecv(&v, c); selected {
//		... foo
//	} else {
//		... bar
//	}
// select case 编译时,接收数据为非阻塞,即非阻塞型
// todo must important, 没有default时,select case 编译成 chanrecv1(c *hchan, elem unsafe.Pointer),即阻塞型
// 3. 所有case都未ready,且没有default语句
//   3.1 将当前协程加入到所有channel的等待队列
//   3.2 当将协程转入阻塞,等待被唤醒
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
	return chanrecv(c, elem, false)
}
  • 在 select 语句中通过 case 分支监听从管道 c 中读取数据:
    • 如果该 select 语句中没有 default 分支,如果没有变量接收数据或忽略接收数据该语句会编译成 chanrecv1,只要有变量接收数据该语句会编译成 chanrecv2,即阻塞型的,函数 chanrecv(c *hchan, ep unsafe.Pointer, block bool)中的 block 参数为 true,如果无法读出数据时,该 goroutine 会在 chanrecv 函数中被阻塞并等待;
    • 如果有 default 分支,则该语句会被编译成 selectnbrecv,即非阻塞型的,block 参数为 false,如果无法读出数据时,该 goroutine 会立即从 chanrecv 函数中返回,case 分支不会被命中,执行 default 分支代码;

8,辅助函数

details文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/15510.html

go

复制代码
// chanbuf 获取 buf 中第 i 个位置的元素
func chanbuf(c *hchan, i uint) unsafe.Pointer {
	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

// full 函数检测 channel 缓冲区是否已满
func full(c *hchan) bool {
	// c.dataqsiz 是不可变的(在创建通道后永远不会写入),因此在通道操作期间随时读取是安全的。
	if c.dataqsiz == 0 {
		// 无缓冲,且没有消费队列
		return c.recvq.first == nil
	}
	// 有缓冲,且缓冲区大小和chan中实际元素个数相等,即满了
	return c.qcount == c.dataqsiz
}

// 判断 c 是否为空
func empty(c *hchan) bool {
	if c.dataqsiz == 0 {
		// 无缓冲 channel 并且没有发送方正在阻塞
		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
	}
	// 有缓冲 channel 并且缓冲区没有数据
	return atomic.Loaduint(&c.qcount) == 0
}

func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
	// 有未解锁的 sudogs 指向 gp 的堆栈。堆栈复制必须锁定那些 sudogs 的通道。
	// 在这里设置 activeStackChans,而不是在我们尝试 park 之前,因为我们可能在通道锁上的堆栈增长中陷入死锁。
	gp.activeStackChans = true
	// 标记栈收缩发生在现在是安全的,因为任何获取这个G的栈进行收缩的线程都保证会在这个存储之后观察到activeStackChans。
	atomic.Store8(&gp.parkingOnChan, 0)
	// 确保我们在设置activeStackChans和取消设置parkingOnChan后解锁。在我们解锁chanLock的那一刻,我们就冒着gp准备好通道操作的风险,所以gp可以在解锁可见之前(甚至对gp本身)继续运行。
	unlock((*mutex)(chanLock))
	return true
}
  • chanbuf 通过指针运算获取 chan 缓冲区中的第 i 个位置上的元素
  • full 函数判断 chan 是否已满,如果没有缓冲区,则查看是否存在消费者;如果有缓冲区, 则比较元素数量和缓冲区长度是否一致;
  • empty 判断 chan 是否为空,如果没有缓冲区,则查看是否存在生产者;如果有缓冲区,则判断元素数量是否为 0;
  • chanparkcommit 用于实现 goroutine 在 chan 上的阻塞和唤醒机制;

chan 场景总结文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/15510.html 文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/15510.html

零
  • 转载请务必保留本文链接:https://www.0s52.com/bcjc/golangjc/15510.html
    本社区资源仅供用于学习和交流,请勿用于商业用途
    未经允许不得进行转载/复制/分享

发表评论