go-016并发

阅读量: zyh 2020-11-25 15:10:15
Categories: > Tags:

前言

go并发涉及两个概念:

go的程序运行在goroutine,并通过channel来发送和接收数据。

因此,go不是通过共享内存通信,而是通过channel进行通信共享内存。

goroutine

go的第一个goroutine是main()。你可以在一个go程序中通过go 函数发起更多的goroutine来执行函数

💥go 关键词后面是函数,函数,函数

package main

import (
    "fmt"
    "net/http"
    "time"
)

func checkAPI(api string){
    if _, err := http.Get(api); err != nil {
        fmt.Printf("ERROR: %s is down!\n", api)
    }
    fmt.Printf("SUCCESS: %s is up and running!\n", api)
}
func main() {
    start := time.Now()

    apis := []string{
        "https://management.azure.com",
        "https://dev.azure.com",
        "https://api.github.com",
        "https://outlook.office.com/",
        "https://api.somewhereintheinternet.com/",
        "https://graph.microsoft.com",
    }

    for _, api := range apis {
        go checkAPI(api)
    }

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}
Done! It took 1.6802e-05 seconds!

在上面的例子中, go checkAPI(api) 构建更多的 goroutine,但对于当前 main 的 goroutine 来说,go checkAPI(api)已执行过。因没有机制来阻止main()执行完,这导致go checkAPI(api)无法在有限的时间内执行完毕。

🤷‍♀️当然,可以通过手动添加time来阻止main()终止。

这时,就需要一种机制来告诉 main,go checkAPI(api)还未执行完。这个机制就是 channel.

channel

当你需要将值从一个 goroutine 发送到另一个 goroutine 时,你可以使用 channel。

在上面的例子中,这个值就是你告诉 main,go checkAPI(api)还未执行完.

写法

channel作为数据通道,需要定义可以传输的数据类型。

// 创建无缓冲channel
ch := make(chan 数据类型)
// 创建有缓冲channel
ch := make(chan 数据类型, 缓冲长度)
// 形参
checkAPI(api string, ch chan 数据类型)
// 函数形参ch只允许传入
checkAPI(api string, ch chan<- 数据类型)
// 函数形参ch只允许传出
checkAPI(api string, ch <-chan 数据类型)
// 数据发送
ch <- xxx
// 数据接收
xxx <- ch
// 函数形参只允许传出
xxx
// 数据接收,如果没有函数接收,则表示丢弃
<- ch
// 关闭
close(ch)

✨当ch关闭后,有三种后续状况:

  1. 数据发送对象将触发严重错误。
  2. 数据接收对象会一次性接收所有数据。
    1. 再次执行数据接收,只会得到对应数据类型的默认值。

无缓冲channel

默认创建的channel是没有缓冲的。所以,有两个特点:

package main

import (
    "fmt"
    "net/http"
    "time"
)

func apiCheck(api string, ch chan string){
    if _, err := http.Get(api); err != nil {
        ch <- fmt.Sprintf("ERROR: %s is down!\n", api)
    }
    ch <- fmt.Sprintf("SUCCESS: %s is up and running!\n", api)
}
func main() {
    start := time.Now()

    apis := []string{
        "https://management.azure.com",
        "https://dev.azure.com",
        "https://api.github.com",
        "https://outlook.office.com/",
        "https://api.somewhereintheinternet.com/",
        "https://graph.microsoft.com",
    }

    ch := make(chan string)
    for _, api := range apis {
        go apiCheck(api,ch)
    }

    fmt.Printf(<-ch)
    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}
➜   go run main.go
SUCCESS: https://api.github.com is up and running!
Done! It took 0.657230012 seconds!

倒数第四行fmt.Printf(<-ch) 作为channel数据接收的代码,会阻止程序执行下一条指令。

不过一旦fmt.Printf(<-ch)拿到了任意数据(多次执行程序,你会发现输出并不一样),程序就会执行下一行,从而导致其它的 go apiCheck(api,ch) 因没有新的数据输出对象而无法发送数据。

✨我们可以利用这个机制,来判断出哪个站点响应最快。

解决的方式之一是:

fmt.Printf(<-ch)放入for循环内,让数据输入对象和数据输出对象保持1:1。

可以看出:

✨无缓冲channel在goroutine之间的通信是同步的,因为数据发送和传出都需要等待另一方才能完成,否则就会阻塞。

有缓冲channel

验证规则2,当队列满的时候:

package main

import (
    "fmt"
)

func send(ch chan string, message string) {
    ch <- message
}

func main() {
    size := 2 // 将队列改为2
    ch := make(chan string, size)
    send(ch, "one")
    send(ch, "two")
    send(ch, "three")
    send(ch, "four")
    fmt.Println("All data sent to the channel ...")

    for i := 0; i < size; i++ {
        fmt.Println(<-ch)
    }

    fmt.Println("Done!")
}
➜   go run main.go
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.send(...)
        /home/zyh/gocode/027/main.go:8
main.main()
        /home/zyh/gocode/027/main.go:16 +0x98
exit status 2

错误提示所有goroutines都处于等待。但程序只有一个goroutines,就是main()。

因此在前两个send()执行后,队列已满,第三个send()无法执行导致main()处于等待,也就无法执行后面的数据接收对象fmt.Println(<-ch)

解决方法:

数据发送对象send()添加到新的goroutine,避免阻塞main()

package main

import (
    "fmt"
)

func send(ch chan string, message string) {
    ch <- message
}

func main() {
    size := 2
    ch := make(chan string, size)
    go send(ch, "one")
    go send(ch, "two")
    go send(ch, "three")
    go send(ch, "four")
    fmt.Println("All data sent to the channel ...")

    for i := 0; i < 4; i++ {
       fmt.Println(<-ch)
    }

    fmt.Println("Done!")
}

网址校验例子

package main

import (
    "fmt"
    "net/http"
    "time"
)

func apiCheck(api string, ch chan string){
    if _, err := http.Get(api); err != nil {
        ch <- fmt.Sprintf("ERROR: %s is down!\n", api)
    }
    ch <- fmt.Sprintf("SUCCESS: %s is up and running!\n", api)
}
func main() {
    start := time.Now()

    apis := []string{
        "https://management.azure.com",
        "https://dev.azure.com",
        "https://api.github.com",
        "https://outlook.office.com/",
        "https://api.somewhereintheinternet.com/",
        "https://graph.microsoft.com",
    }

    ch := make(chan string, 10)
    for _, api := range apis {
        go apiCheck(api,ch)
    }
    
    for i:=0;i<len(apis);i++{
        fmt.Printf(<-ch)
    }
    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}

两者最大不同点

就是无缓冲的channel,数据发送和数据接收是强绑定的,任何时候都缺一不可,否则就会阻塞所在的goroutine。

而有缓冲的channel,在队列没有满之前,数据发送和数据接收是解耦的。满了之后和无缓冲规则一致。

select

select语句与switch类似,但它仅用来监听和channel有关的IO操作,当 IO 操作发生时,触发相应的动作。

那么既然case条件与channel有关,则case条件必然伴随着写入和读取chan类型变量

子句执行规则:

select判断所有case条件是否与chan有关:

与chan无关:

	直接报错

与chan有关:

	判断case条件【是否可完成】chan的I/O操作:

	可完成:

		`随机`执行一个完成case条件的语句

	不可完成:

		有default子句:执行default子句

		无default子句:阻塞goruntine,直到出现一个可完成的case条件
package main

import (
    "fmt"
)

func main() {
    ch1 := make(chan int,1)
    for i := 0; i < 10; i++{
        select {
        case x := <- ch1:
            fmt.Printf("ch1取出数据%d\n",x)
        case ch1 <- i:
            fmt.Printf("ch1插入数据%d\n",i)
        }
    }
}
➜   go run main.go
ch1插入数据0
ch1取出数据0
ch1插入数据2
ch1取出数据2
ch1插入数据4
ch1取出数据4
ch1插入数据6
ch1取出数据6
ch1插入数据8
ch1取出数据8

之所以是输出偶数,原因在于:

ch1是一个队列为1的缓冲channel。

所以,当i=0时,执行插入操作,插入0;

当i=1时,因无法继续插入,只能取出,因此,取出0;

当i=2时,执行插入操作,插入2;

当i=3时,因无法继续插入,只能取出,因此,取出2;

斐波那契数列例子

package main

import (
    "fmt"
    "time"
)

var quit = make(chan bool)

func fib(c chan int) {
    x, y := 1, 1

    for {
        select {
            case c <- x:
                x, y = y, x+y
            case <-quit:
                fmt.Println("Done calculating Fibonacci!") // 4
            return
        }
    }
}

func main() {
    start := time.Now()

    command := ""
    data := make(chan int)

    go fib(data) // 1

    for { // 3
        num := <-data // 2
        fmt.Println(num)
        fmt.Scanf("%s", &command)
        if command == "quit" {
            quit <- true
            break
        }
    }

    time.Sleep(1 * time.Second)

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}

通过 data 无缓冲 channel 构建一发(1号代码)一收(2号代码)的循环(3号代码)。

通过 quit 全局无缓冲 channel 构建是否退出 for(3号代码) 循环,以及退出前发送(4号代码)。