package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup // 1.等待组
done := make(chan struct{}) // 2.通道(仅充当标识完成与否)
wq := make(chan interface{}) // 3.通道(存储消费数据)
workerCount := 2 // 4.消费者
for i := 0; i < workerCount; i++ {
wg.Add(1) // 5.添加等待组计数,一个消费者对应一个
go doit(i,wq,done,&wg) // 6.并发执行消费函数
}
for i := 0; i < workerCount; i++ { // 9.写入0,1到消费通道wq
wq <- i
}
close(done) // 11. 关闭完成通道,使消费者可以从关闭通道里获取空值,从而推出监听for循环
wg.Wait() // 15. 等待组在计数为0之前,阻塞进程
fmt.Println("all done!")
}
func doit(workerId int, wq <-chan interface{},done <-chan struct{},wg *sync.WaitGroup) { // 消费者ID,消费通道(仅可只读),完成通道(仅可只读),等待组指针
fmt.Printf("worker [%v] is running\n",workerId) // 7. 打印消费者状态
defer wg.Done() // 14. 一个消费者完成消费函数,等待组计数减1
for { // 8.每一个消费者内部循环监听消费通道wq
select {
case m := <- wq: // 10.一旦消费通道出现数据,则消费者就从消费通道里获取数据,并打印
fmt.Printf("worker [%v] consumed %v\n",workerId,m)
case <- done: // 12. 从完成通道里获取空值,从而达成执行条件
fmt.Printf("worker [%v] is done\n",workerId)
return // 13. 退出循环监听
default:
time.Sleep(1 * time.Second) // 8-1. 在没有消费数据之前,每一秒监听一次
fmt.Println("No Event!")
}
}
}
11执行之后,12如何确保在10之后运行。