golang[63]-concurrent并发荟萃

goroutine

并发程序指的是同时做好几件事情的程序,随着硬件的发展,并发程序显得越来越重要。Web服务器会一次处理成千上万的请求。平板电脑和手机app在渲染用户动畵的同时,还会后台执行各种计算任务和网络请求。卽使是传统的批处理问题–读取数据,计算,写输出–现在也会用并发来隐藏掉I/O的操作延迟充分利用现代计算机设备的多核,尽管计算机的性能每年都在增长,但并不是线性。

Go语言中的并发程序可以用两种手段来实现。这一章会讲解goroutine和channel,其支持“顺序进程通信”(communicating sequential processes)或被简称为CSP。CSP是一个现代的并发编程模型,在这种编程模型中值会在不同的运行实例(goroutine)中传递,尽管大多数情况下被限制在单一实例中。第9章会覆盖到更为传统的并发模型:多线程共享内存,如果你在其它的主流语言中写过并发程序的话可能会更熟悉一些。第9章同时会讲一些本章不会深入的并发程序带来的重要风险和陷阱。
尽管Go对并发的支持是众多强力特性之一,但大多数情况下跟踪并发程序还是很困难,并且在线性程序中我们的直觉往往还会让我们误入歧途。如果这是你第一次接触并发,那么我推荐你稍微多花一些时间来思考这两个章节中的样例。

在Go语言中,每一个并发的执行单元叫作一个goroutine。设想这里有一个程序有两个函数,一个函数做一些计算,另一个输出一些结果,假设两个函数没有相互之间的调用关系。一个线性的程序会先调用其中的一个函数,然后再调用来一个,但如果是在有两个甚至更多个goroutine的程序中,对两个函数的调用就可以在同一时间。我们马上就会看到这样的一个程序。
如果你使用过操作系统或者其它语言提供的线程,那么你可以简单地把goroutine类比作一个线程,这样你就可以写出一些正确的程序了。goroutine和线程的本质区别会在9.8节中讲。
当一个程序启动时,其主函数卽在一个单独的goroutine中运行,我们叫它main goroutine。新的goroutine会用go语句来创建。在语法上,go语句是一个普通的函数或方法调用前加上关键字go。go语句会使其语句中的函数在一个新创建的goroutine中运行。而go语句本身会迅速地完成。
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don’t wait

在下面的例子中,main goroutine会计算第45个菲波那契数。由于计算函数使用了效率非常低的递归,所以会运行相当可观的一段时间,在这期间我们想要让用户看到一个可见的标识来表明程序依然在正常运行,所以显示一个动畵的小图标:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
go spinner(100 * time.Millisecond)
const n = 45
fibN := fib(n) // slow
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}

func spinner(delay time.Duration) {
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}

func fib(x int) int {
if x < 2 {
return x
}
return fib(x-1) + fib(x-2)
}

动畵显示了几秒之后,fib(45)的调用成功地返回,并且打印结果:
Fibonacci(45) = 1134903170
然后主函数返回。当主函数返回时,所有的goroutine都会直接打断,程序退出。除了从主函数退出或者直接退出程序之外,没有其它的编程方法能够让一个goroutine来打断另一个的执行,但是我们之后可以看到,可以通过goroutine之间的通信来让一个goroutine请求请求其它的goroutine,并让其自己结束执行。
注意这里的两个独立的单元是如何进行组合的,spinning和菲波那契的计算。每一个都是写在独立的函数中,但是每一个函数都会并发地执行。

协程

开启1000个协程,等待1秒后主协程退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"time"
)

func main() {
for i := 0; i < 1000; i++ {
go func(i int) {
for {
fmt.Printf("Hello from "+
"goroutine %d\n", i)
}
}(i)
}
time.Sleep(time.Millisecond)
}

检查协程中的竞争

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"time"
"fmt"
)

func main() {
var a [10]int
for i := 0; i < 10; i++ {
go func(i int) {
for {
a[i]++
}
}(i)
}
time.Sleep(time.Second)
fmt.Println(a)
}

输出检查各个协程的工作:
[268823720 278385441 259742710 274100267 206918978 216150917 212971051 204840248 201749853 209193197]

1
go run -race  goroutine.go

输出为:

1
2
3
4
5
6
7
8
9
==================
WARNING: DATA RACE
Read at 0x00c00001a0a0 by main goroutine:
main.main()
/Users/jackson/Documents/u2pppw/goroutine/goroutine.go:18 +0xfb

Previous write at 0x00c00001a0a0 by goroutine 6:
main.main.func1()
/Users/jackson/Documents/u2pppw/goroutine/goroutine.go:13 +0x64

一个陷阱 错误代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"time"
"fmt"
)

func main() {
var a [10]int
for i := 0; i < 10; i++ {
go func() {
for {
a[i]++
}
}()
}
time.Sleep(time.Second)
fmt.Println(a)
}

上面的代码错误的原因是:panic: runtime error: index out of range
原因是现在的i的结果为10,超过了数组的限制。

Channels

如果说goroutine是Go语音程序的并发体的话,那么channels它们之间的通信机制。一个channels是一个通信机制,它可以让一个goroutine通过它给另一个goroutine发送值信息。每个channel都有一个特殊的类型,也就是channels可发送数据的类型。一个可以发送int类型数据的channel一般写为chan int。
使用内置的make函数,我们可以创建一个channel:
ch := make(chan int) // ch has type ‘chan int’
和map类似,channel也一个对应make创建的底层数据结构的引用。当我吗复制一个channel或用于函数参数传递时,我吗只是拷贝了一个channel引用,因此调用者何被调用者将引用同一个channel对象。和其它的引用类型一样,channel的零值也是nil。
两个相同类型的channel可以使用==运算符比较。如果两个channel引用的是相通的对象,那么比较的结果为眞。一个channel也可以和nil进行比较。
一个channel有发送和接受两个主要操作,都是通信行为。一个发送语句将一个值从一个goroutine通过channel发送到另一个执行接收操作的goroutine。发送和接收两个操作都是用<-运算符。在发送语句中,<-运算符分割channel和要发送的值。在接收语句中,<-运算符写在channel对象之前。一个不使用接收结果的接收操作也是合法的。
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
Channel还支持close操作,用于关闭channel,随后对基于该channel的任何发送操作都将导致panic异常。对一个已经被close过的channel之行接收操作依然可以接受到之前已经成功发送的数据;如果channel中已经没有数据的话讲产生一个零值的数据。
使用内置的close函数就可以关闭一个channel:
close(ch)
以最简单方式调用make函数创建的时一个无缓存的channel,但是我们也可以指定第二个整形参数,对应channel的容量。如果channel的容量大于零,那么该channel就是带缓存的channel。
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
我们将先讨论无缓存的channel,然后在8.4.4节讨论带缓存的channel。

最简单的协程与通道案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

func main() {
// create new channel of type int
ch := make(chan int)

// start new anonymous goroutine
go func() {
// send 42 to channel
ch <- 42
}()
// read from channel
<-ch
}

不带缓存的Channels

一个基于无缓存Channels的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的Channels上执行接收操作,当发送的值通过Channels成功传输之后,两个goroutine可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者goroutine也将阻塞,直到有另一个goroutine在相同的Channels上执行发送操作。
基于无缓存Channels的发送和接收操作将导致两个goroutine做一次同步操作。因为这个原因,无缓存Channels有时候也被称为同步Channels。当通过一个无缓存Channels发送数据时,接收者收到数据发生在唤醒发送者goroutine之前(译注:happens before,这是Go语言并发内存模型的一个关键术语!)。
在讨论并发编程时,当我们说x事件在y事件之前发生(happens before),我们并不是说x事件在时间上比y时间更早;我们要表达的意思是要保证在此之前的事件都已经完成了,例如在此之前的更新某些变量的操作已经完成,你可以放心依赖这些已完成的事件了。
当我们说x事件旣不是在y事件之前发生也不是在y事件之后发生,我们就说x事件和y事件是并发的。这并不是意味着x事件和y事件就一定是同时发生的,我们只是不能确定这两个事件发生的先后顺序。在下一章中我们将看到,当两个goroutine并发访问了相同的变量时,我们有必要保证某些事件的执行顺序,以避免出现某些并发问题。
在8.3节的客户端程序,它在主goroutine中(译注:就是执行main函数的goroutine)将标准输入复制到server,因此当客户端程序关闭标准输入时,后台goroutine可能依然在工作。我们需要让主goroutine等待后台goroutine完成工作后再退出,我们使用了一个channel来同步两个goroutine:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish
}

当用户关闭了标准输入,主goroutine中的mustCopy函数调用将返回,然后调用conn.Close()关闭读和写方向的网络连接。关闭网络链接中的写方向的链接将导致server程序收到一个文件(end-of-file)结束的信号。关闭网络链接中读方向的链接将导致后台goroutine的io.Copy函数调用返回一个“read from closed connection”(“从关闭的链接读”)类似的错误,因此我们临时移除了错误日志语句;在练习8.3将会提供一个更好的解决方案。(需要注意的是go语句调用了一个函数字面量,这Go语言中启动goroutine常用的形式。)
在后台goroutine返回之前,它先打印一个日志信息,然后向done对应的channel发送一个值。主goroutine在退出前先等待从done对应的channel接收一个值。因此,总是可以在程序退出前正确输出“done”消息。
基于channels发送消息有两个重要方面。首先每个消息都有一个值,但是有时候通讯的事实和发生的时刻也同样重要。当我们更希望强调通讯发生的时刻时,我们将它称为消息事件。有些消息事件并不携带额外的信息,它仅仅是用作两个goroutine之间的同步,这时候我们可以用struct{}空结构体作为channels元素的类型,虽然也可以使用bool或int类型实现同样的功能,done <- 1语句也比done <- struct{}{}更短。

通道作为一等公民

作为参数和返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"time"
)

func worker(id int, c chan int) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
}
}

func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}

func chanDemo() {
var channels [10]chan<- int
for i := 0; i < 10; i++ {
channels[i] = createWorker(i)
}

for i := 0; i < 10; i++ {
channels[i] <- 'a' + i
}

for i := 0; i < 10; i++ {
channels[i] <- 'A' + i
}

time.Sleep(time.Millisecond)
}


func main(){

chanDemo()
}

构建一个简单的计时器

In fact, you can build a simple timer with this approach - create a channel, start goroutine which writes to this channel after given duration and returns this channel to the caller of your func. The caller then blocks on reading from the channel for the exact amount of time

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import "time"

func timer(d time.Duration) <-chan int {
c := make(chan int)
go func() {
time.Sleep(d)
c <- 1
}()
return c
}

func main() {
for i := 0; i < 24; i++ {
c := timer(1 * time.Second)
<-c
}
}

Ping-pong 模式

This nice concurrency example was found in a great talk by googler Sameer Ajmani “Advanced Go Concurrency Patterns”. Of course, this pattern isn’t very advanced, but for those who only get themselves familiar with Go concurrency it may look quite fresh and interesting.

Here we have a channel as a table of the ping-pong game. The ball is an integer variable, and two goroutines-players that ‘hit’ the ball, increasing its value (hits counter).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import "time"

func main() {
var Ball int
table := make(chan int)
go player(table)
go player(table)

table <- Ball
time.Sleep(1 * time.Second)
<-table
}

func player(table chan int) {
for {
ball := <-table
ball++
time.Sleep(100 * time.Millisecond)
table <- ball
}
}

Fan-In模式

扇入 多个入,抢夺一个通道出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"time"
)

func producer(ch chan int, d time.Duration) {
var i int
for {
ch <- i
i++
time.Sleep(d)
}
}

func reader(out chan int) {
for x := range out {
fmt.Println(x)
}
}

func main() {
ch := make(chan int)
out := make(chan int)
go producer(ch, 100*time.Millisecond)
go producer(ch, 250*time.Millisecond)
go reader(out)
for i := range ch {
out <- i
}
}

fan-out模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"sync"
"time"
)

func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-tasksCh
if !ok {
return
}
d := time.Duration(task) * time.Millisecond
time.Sleep(d)
fmt.Println("processing task", task)
}
}

func pool(wg *sync.WaitGroup, workers, tasks int) {
tasksCh := make(chan int)

for i := 0; i < workers; i++ {
go worker(tasksCh, wg)
}

for i := 0; i < tasks; i++ {
tasksCh <- i
}

close(tasksCh)
}

func main() {
var wg sync.WaitGroup
wg.Add(36)
go pool(&wg, 36, 50)
wg.Wait()
}

worker or fan-out模型

多个goroutines抢夺一个通道进行处理
The opposite pattern to fan-in is a fan-out or workers pattern. Multiple goroutines can read from a single channel, distributing an amount of work between CPU cores, hence the workers name. In Go, this pattern is easy to implement - just start a number of goroutines with channel as parameter, and just send values to that channel - distributing and multiplexing will be done by Go runtime, automagically 😃

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

package main

import (
"fmt"
"sync"
"time"
)

func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-tasksCh
if !ok {
return
}
d := time.Duration(task) * time.Millisecond
time.Sleep(d)
fmt.Println("processing task", task)
}
}

func pool(wg *sync.WaitGroup, workers, tasks int) {
tasksCh := make(chan int)

for i := 0; i < workers; i++ {
go worker(tasksCh, wg)
}

for i := 0; i < tasks; i++ {
tasksCh <- i
}

close(tasksCh)
}

func main() {
var wg sync.WaitGroup
wg.Add(36)
go pool(&wg, 36, 50)
wg.Wait()
}

更复杂的fan-out模型

worker下面还有子worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
"fmt"
"sync"
"time"
)

const (
WORKERS = 5
SUBWORKERS = 3
TASKS = 20
SUBTASKS = 10
)

func subworker(subtasks chan int) {
for {
task, ok := <-subtasks
if !ok {
return
}
time.Sleep(time.Duration(task) * time.Millisecond)
fmt.Println(task)
}
}

func worker(tasks <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-tasks
if !ok {
return
}

subtasks := make(chan int)
for i := 0; i < SUBWORKERS; i++ {
go subworker(subtasks)
}
for i := 0; i < SUBTASKS; i++ {
task1 := task * i
subtasks <- task1
}
close(subtasks)
}
}

func main() {
var wg sync.WaitGroup
wg.Add(WORKERS)
tasks := make(chan int)

for i := 0; i < WORKERS; i++ {
go worker(tasks, &wg)
}

for i := 0; i < TASKS; i++ {
tasks <- i
}

close(tasks)
wg.Wait()
}

server

Next common pattern is similar to fan-out, but with goroutines spawned for the short period of time, just to accomplish some task. It’s typically used for implementing servers - create a listener, run accept() in a loop and start goroutine for each accepted connection. It’s very expressive and allows to implement server handlers as simple as possible. Take a look at this simple example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24


package main

import "net"

func handler(c net.Conn) {
c.Write([]byte("ok"))
c.Close()
}

func main() {
l, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
for {
c, err := l.Accept()
if err != nil {
continue
}
go handler(c)
}
}

It’s not very interesting - it seems there is nothing happens in terms of concurrency. Of course, under the hood there is a ton of complexity, which is deliberately hidden from us. “Simplicity is complicated”.

But let’s go back to concurrency and add some interaction to our server. Let’s say, each handler wants to write asynchronously to the logger. Logger itself, in our example, is a separate goroutine which does the job.

改进server,写入log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"net"
"time"
)

func handler(c net.Conn, ch chan string) {
ch <- c.RemoteAddr().String()
c.Write([]byte("ok"))
c.Close()
}

func logger(ch chan string) {
for {
fmt.Println(<-ch)
}
}

func server(l net.Listener, ch chan string) {
for {
c, err := l.Accept()
if err != nil {
continue
}
go handler(c, ch)
}
}

func main() {
l, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
ch := make(chan string)
go logger(ch)
go server(l, ch)
time.Sleep(10 * time.Second)
}

Server + Worker

Server with worker example is a bit advanced version of the logger. It not only does some work but sends the result of its work back to the pool using results channel. Not a big deal, but it extends our logger example to something more practical.
Let’s see the code and animation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main

import (
"net"
"time"
)

func handler(c net.Conn, ch chan string) {
addr := c.RemoteAddr().String()
ch <- addr
time.Sleep(100 * time.Millisecond)
c.Write([]byte("ok"))
c.Close()
}

func logger(wch chan int, results chan int) {
for {
data := <-wch
data++
results <- data
}
}

func parse(results chan int) {
for {
<-results
}
}

func pool(ch chan string, n int) {
wch := make(chan int)
results := make(chan int)
for i := 0; i < n; i++ {
go logger(wch, results)
}
go parse(results)
for {
addr := <-ch
l := len(addr)
wch <- l
}
}

func server(l net.Listener, ch chan string) {
for {
c, err := l.Accept()
if err != nil {
continue
}
go handler(c, ch)
}
}

func main() {
l, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
ch := make(chan string)
go pool(ch, 4)
go server(l, ch)
time.Sleep(10 * time.Second)
}

串联的Channels(Pipeline)

Channels也可以用于将多个goroutine链接在一起,一个Channels的输出作为下一个Channels的输入。这种串联的Channels就是所谓的管道(pipeline)。下面的程序用两个channels将三个goroutine串联起来

第一个goroutine是一个计数器,用于生成0、1、2、……形式的整数序列,然后通过channel将该整数序列发送给第二个goroutine;第二个goroutine是一个求平方的程序,对收到的每个整数求平方,然后将平方后的结果通过第二个channel发送给第三个goroutine;第三个goroutine是一个打印程序,打印收到的每个整数。为了保持例子清晰,我们有意选择了非常简单的函数,当然三个goroutine的计算很简单,在现实中确实没有必要为如此​​简单的运算构建三个goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
naturals := make(chan int)
squares := make(chan int)

// Counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()

// Squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()

// Printer (in main goroutine)
for {
fmt.Println(<-squares)
}
}

如您所料,上面的程序将生成0、1、4、9、……形式的无穷数列。像这样的串联Channels的管道(Pipelines)可以用在需要长时间运行的服务中,每个长时间运行的goroutine可能会包含一个死循环,在不同goroutine的死循环内部使用串联的Channels来通信。但是,如果我们希望通过Channels只发送有限的数列该如何处理呢?
如果发送者知道,没有更多的值需要发送到channel的话,那么让接收者也能及时知道没有多余的值可接收将是有用的,因为接收者可以停止不必要的接收等待。这可以通过内置的close函数来关闭channel实现:
close(naturals)
当一个channel被关闭后,再向该channel发送数据将导致panic异常。当一个被关闭的channel中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们会立卽返回一个零值。关闭上面例子中的naturals变量对应的channel并不能终止循环,它依然会收到一个永无休止的零值序列,然后将它们发送给打印者goroutine。
没有办法直接测试一个channel是否被关闭,但是接收操作有一个变体形式:它多接收一个结果,多接收的第二个结果是一个布尔值ok,ture表示成功从channels接收到值,false表示channels已经被关闭并且里面没有值可接收。使用这个特性,我们可以修改squarer函数中的循环代码,当naturals对应的channel被关闭并没有值可接收时跳出循环,并且也关闭squares对应的channel.

1
2
3
4
5
6
7
8
9
10
go func() {
for {
x, ok := <-naturals
if !ok {
break // channel was closed and drained
}
squares <- x * x
}
close(squares)
}()

因为上面的语法是笨拙的,而且这种处理模式很常见,因此Go语言的range循环可直接在channels上面迭代。使用range循环是上面处理模式的简洁语法,它依次从channel接收数据,当channel被关闭并且没有值可接收时跳出循环
在下面的改进中,我们的计数器goroutine只生成100个含数字的序列,然后关闭naturals对应的channel,这将导致计算平方数的squarer对应的goroutine可以正常终止循环并关闭squares对应的channel。(在一个更复杂的程序中,可以通过defer语句关闭对应的channel。最后,主goroutine也可以正常终止循环并退出程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
naturals := make(chan int)
squares := make(chan int)

// Counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()

// Squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()

// Printer (in main goroutine)
for x := range squares {
fmt.Println(x)
}
}

其实你并不需要关闭每一个channel。只要当需要告诉接收者goroutine,所有的数据已经全部发送时才需要关闭channel。不管一个channel是否被关闭,当它没有被引用时将会被Go语言的垃圾自动回收器回收。(不要将关闭一个打开文件的操作和关闭一个channel操作混淆。对于每个打开的文件,都需要在不使用的使用调用对应的Close方法来关闭文件。)
视图重复关闭一个channel将导致panic异常,视图关闭一个nil值的channel也将导致panic异常。关闭一个channels还会触发一个广播机制,我们将在8.9节讨论。

并发求素数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// A concurrent prime sieve
package main

import "fmt"

// Send the sequence 2, 3, 4, ... to channel 'ch'.
func Generate(ch chan<- int) {
for i := 2; ; i++ {
ch <- i // Send 'i' to channel 'ch'.
}
}

// Copy the values from channel 'in' to channel 'out',
// removing those divisible by 'prime'.
func Filter(in <-chan int, out chan<- int, prime int) {
for {
i := <-in // Receive value from 'in'.
if i%prime != 0 {
out <- i // Send 'i' to 'out'.
}
}
}

// The prime sieve: Daisy-chain Filter processes.
func main() {
ch := make(chan int) // Create a new channel.
go Generate(ch) // Launch Generate goroutine.
for i := 0; i < 10; i++ {
prime := <-ch
fmt.Println(prime)
ch1 := make(chan int)
go Filter(ch, ch1, prime)
ch = ch1
}
}

单方向的Channel

随着程序的增长,人们习惯于将大的函数拆分为小的函数。我们前面的例子中使用了三个goroutine,然后用两个channels连链接它们,它们都是main函数的局部变量。将三个goroutine拆分为以下三个函数是自然的想法:
func counter(out chan int)
func squarer(out, in chan int)
func printer(in chan int)
其中squarer计算平方的函数在两个串联Channels的中间,因此拥有两个channels类型的参数,一个用于输入一个用于输出。每个channels都用有相同的类型,但是它们的使用方式想反:一个只用于接收,另一个只用于发送。参数的名字in和out已经明确表示了这个意图,但是并无法保证squarer函数向一个in参数对应的channels发送数据或者从一个out参数对应的channels接收数据。
这种场景是典型的。当一个channel作为一个函数参数是,它一般总是被专门用于只发送或者只接收。
为了表明这种意图并防止被滥用,Go语言的类型系统提供了单方向的channel类型,分别用于只发送或只接收的channel。类型chan<- int表示一个只发送int的channel,只能发送不能接收。相反,类型<-chan int表示一个只接收int的channel,只能接收不能发送。(箭头<-和关键字chan的相对位置表明了channel的方向。)这种限制将在编译期检测。
因为关闭操作只用于断言不再向channel发送新的数据,所以只有在发送者所在的goroutine才会调用close函数,因此对一个只接收的channel调用close将是一个编译错误。
这是改进的版本,这一次参数使用了单方向channel类型:
gopl.io/ch8/pipeline3
func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}

func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}

func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}

func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
调用counter(naturals)将导致将chan int类型的naturals隐式地转换为chan<- int类型只发送型的channel。调用printer(squares)也会导致相似的隐式转换,这一次是转换为<-chan int类型只接收型的channel。任何双向channel向单向channel变量的赋值操作都将导致该隐式转换。这里并没有反向转换的语法:也就是不能一个将类似chan<- int类型的单向型的channel转换为chan int类型的双向型的channel。

带缓存的Channels

带缓存的Channel内部持有一个元素队列。队列的最大容量是在调用make函数创建channel时通过第二个参数指定的。下面的语句创建了一个可以持有三个字符串元素的带缓存Channel。图8.2是ch变量对应的channel的图形表示形式。
ch = make(chan string, 3)

向缓存Channel的发送操作就是向内部缓存队列的尾部插入原因,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,那么发送操作将阻塞直到因另一个goroutine执行接收操作而释放了新的队列空间。相反,如果channel是空的,接收操作将阻塞直到有另一个goroutine执行发送操作而向队列插入元素。
我们可以在无阻塞的情况下连续向新创建的channel发送三个值:
ch <- “A”
ch <- “B”
ch <- “C”
此刻,channel的内部缓存队列将是满的,如果有第四个发送操作将发生阻塞。

如果我们接收一个值,
fmt.Println(<-ch) // “A”
那么channel的缓存队列将不是满的也不是空的(图8.4),因此对该channel执行的发送或接收操作都不会发送阻塞。通过这种方式,channel的缓存队列解耦了接收和发送的goroutine。

在某些特殊情况下,程序可能需要知道channel内部缓存的容量,可以用内置的cap函数获取:
fmt.Println(cap(ch)) // “3”
同样,对于内置的len函数,如果传入的是channel,那么将返回channel内部缓存队列中有效元素的个数。因为在并发程序中该信息会随着接收操作而失效,但是它对某些故障诊断和性能优化会有帮助。
fmt.Println(len(ch)) // “2”
在继续执行两次接收操作后channel内部的缓存队列将又成为空的,如果有第四个接收操作将发生阻塞:
fmt.Println(<-ch) // “B”
fmt.Println(<-ch) // “C”
在这个例子中,发送和接收操作都发生在同一个goroutine中,但是在眞是的程序中它们一般由不同的goroutine执行。Go语言新手有时候会将一个带缓存的channel当作同一个goroutine中的队列使用,虽然语法看似简单,但实际上这是一个错误。Channel和goroutine的调度器机制是紧密相连的,一个发送操作——或许是整个程序——可能会永远阻塞。如果你只是需要一个简单的队列,使用slice就可以了。
下面的例子展示了一个使用了带缓存channel的应用。它并发地向三个镜像站点发出请求,三个镜像站点分散在不同的地理位置。它们分别将收到的响应发送到带缓存channel,最后接收者只接收第一个收到的响应,也就是最快的那个响应。因此mirroredQuery函数可能在另外两个响应慢的镜像站点响应之前就返回了结果。(顺便说一下,多个goroutines并发地向同一个channel发送数据,或从同一个channel接收数据都是常见的用法。)

1
2
3
4
5
6
7
8
9
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}

func request(hostname string) (response string) { /* ... */ }

如果我们使用了无缓存的channel,那么两个慢的goroutines将会因为没有人接收而被永远卡住。这种情况,称为goroutines泄漏,这将是一个BUG。和垃圾变量不同,泄漏的goroutines并不会被自动回收,因此确保每个不再需要的goroutine能正常退出是重要的。
关于无缓存或带缓存channels之间的选择,或者是带缓存channels的容量大小的选择,都可能影响程序的正确性。无缓存channel更强地保证了每个发送操作与相应的同步接收操作;但是对于带缓存channel,这些操作是解耦的。同样,卽使我们知道将要发送到一个channel的信息的数量上限,创建一个对应容量大小带缓存channel也是不现实的,因为这要求在执行任何接收操作之前缓存所有已经发送的值。如果未能分配足够的缓冲将导致程序死锁。
Channel的缓存也可能影响程序的性能。想象一家蛋糕店有三个厨师,一个烘焙,一个上糖衣,还有一个将每个蛋糕传递到它下一个厨师在生产线。在狭小的厨房空间环境,每个厨师在完成蛋糕后必须等待下一个厨师已经准备好接受它;这类似于在一个无缓存的channel上进行沟通。
如果在每个厨师之间有一个放置一个蛋糕的额外空间,那么每个厨师就可以将一个完成的蛋糕临时放在那里而马上进入下一个蛋糕在制作中;这类似于将channel的缓存队列的容量设置为1。只要每个厨师的平均工作效率相近,那么其中大部分的传输工作将是迅速的,个体之间细小的效率差异将在交接过程中弥补。如果厨师之间有更大的额外空间——也是就更大容量的缓存队列——将可以在不停止生产线的前提下消除更大的效率波动,例如一个厨师可以短暂地休息,然后在加快赶上进度而不影响其其他人。
另一方面,如果生产线的前期阶段一直快于后续阶段,那么它们之间的缓存在大部分时间都将是满的。相反,如果后续阶段比前期阶段更快,那么它们之间的缓存在大部分时间都将是空的。对于这类场景,额外的缓存并没有带来任何好处。
生产线的隐喻对于理解channels和goroutines的工作机制是很有帮助的。例如,如果第二阶段是需要精心制作的复杂操作,一个厨师可能无法跟上第一个厨师的进度,或者是无法满足第阶段厨师的需求。要解决这个问题,我们可以雇佣另一个厨师来帮助完成第二阶段的工作,他执行相同的任务但是独立工作。这类似于基于相同的channels创建另一个独立的goroutine。

1
2
3
4
5
6
7
8
9
func bufferedChannel() {
c := make(chan int, 3)
go worker(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
time.Sleep(time.Millisecond)
}

单爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

package main

import (
"fmt"
"os"
"net/http"
"golang.org/x/net/html"
"log"
)

func breadthFirst(f func(item string) []string, worklist []string) {
seen := make(map[string]bool)
for len(worklist) > 0 {
items := worklist
worklist = nil
for _, item := range items {
if !seen[item] {
seen[item] = true
worklist = append(worklist, f(item)...)
}
}
}
}

//!-breadthFirst

//!+crawl
func crawl(url string) []string {
fmt.Println(url)
list, err := Extract(url)
if err != nil {
log.Print(err)
}
return list
}

//!-crawl

//!+main
func main() {
// Crawl the web breadth-first,
// starting from the command-line arguments.
breadthFirst(crawl, os.Args[1:])
}

//!-main

func Extract(url string) ([]string, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, fmt.Errorf("getting %s: %s", url, resp.Status)
}

doc, err := html.Parse(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("parsing %s as HTML: %v", url, err)
}

var links []string
visitNode := func(n *html.Node) {
if n.Type == html.ElementNode && n.Data == "a" {
for _, a := range n.Attr {
if a.Key != "href" {
continue
}
link, err := resp.Request.URL.Parse(a.Val)
if err != nil {
continue // ignore bad URLs
}
links = append(links, link.String())
}
}
}
forEachNode(doc, visitNode, nil)
return links, nil
}

//!-Extract

// Copied from gopl.io/ch5/outline2.
func forEachNode(n *html.Node, pre, post func(n *html.Node)) {
if pre != nil {
pre(n)
}
for c := n.FirstChild; c != nil; c = c.NextSibling {
forEachNode(c, pre, post)
}
if post != nil {
post(n)
}
}

并发爬虫的错误案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main

import (
"fmt"
"log"
"os"

"gopl.io/ch5/links"
)

//!+crawl
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
log.Print(err)
}
return list
}

//!-crawl

//!+main
func main() {
worklist := make(chan []string)

// Start with the command-line arguments.
go func() { worklist <- os.Args[1:] }()

// Crawl the web concurrently.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}

//!-main

/*
//!+output
$ go build gopl.io/ch8/crawl1
$ ./crawl1 http://gopl.io/
http://gopl.io/
https://golang.org/help/

https://golang.org/doc/
https://golang.org/blog/
...
2015/07/15 18:22:12 Get ...: dial tcp: lookup blog.golang.org: no such host
2015/07/15 18:22:12 Get ...: dial tcp 23.21.222.120:443: socket:
too many open files
...
//!-output
*/

最初的错误信息是一个让人莫名的DNS查找失败,卽使这个域名是完全可靠的。而随后的错误信息揭示了原因:这个程序一次性创建了太多网络连接,超过了每一个进程的打开文件数限制,旣而导致了在调用net.Dial像DNS查找失败这样的问题。
这个程序实在是太他妈并行了。无穷无尽地并行化并不是什么好事情,因为不管怎么说,你的系统总是会有一个些限制因素,比如CPU核心数会限制你的计算负载,比如你的硬盘转轴和磁头数限制了你的本地磁盘IO操作频率,比如你的网络带宽限制了你的下载速度上限,或者是你的一个web服务的服务容量上限等等。为了解决这个问题,我们可以限制并发程序所使用的资源来使之适应自己的运行环境。对于我们的例子来说,最简单的方法就是限制对links.Extract在同一时间最多不会有超过n次调用,这里的n是fd的limit-20,一般情况下。
我们可以用一个有容量限制的buffered channel来控制并发,这类似于操作系统里的计数信号量概念。从概念上讲,channel里的n个空槽代表n个可以处理内容的token(通行证),从channel里接收一个值会释放其中的一个token,并且生成一个新的空槽位。这样保证了在没有接收介入时最多有n个发送操作。(这里可能我们拿channel里填充的槽来做token更直观一些,不过还是这样吧~)。由于channel里的元素类型并不重要,我们用一个零值的struct{}来作为其元素。
让我们重写crawl函数,将对links.Extract的调用操作用获取、释放token的操作包裹起来,来确保同一时间对其只有20个调用。信号量数量和其能操作的IO资源数量应保持接近。

第二个问题是这个程序永远都不会终止,卽使它已经爬到了所有初始链接衍生出的链接。为了使这个程序能够终止,我们需要在worklist为空或者没有crawl的goroutine在运行时退出主循环。这个版本中,计算器n对worklist的发送操作数量进行了限制。每一次我们发现有元素需要被发送到worklist时,我们都会对n进行操作,在向worklist中发送初始的命令行参数之前,我们也进行过一次操作。这里的操作++是在每启动一个crawler的goroutine之前。主循环会在n减为0时终止,这时候说明没活可干了。
现在这个并发爬虫会比5.6节中的深度优先搜索版快上20倍,而且不会出什么错,并且在其完成任务时也会正确地终止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"fmt"
"log"
"os"

"gopl.io/ch5/links"
)

//!+sema
// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)

func crawl(url string) []string {
fmt.Println(url)
tokens <- struct{}{} // acquire a token
list, err := links.Extract(url)
<-tokens // release the token

if err != nil {
log.Print(err)
}
return list
}

//!-sema

//!+
func main() {
worklist := make(chan []string)
var n int // number of pending sends to worklist

// Start with the command-line arguments.
n++
go func() { worklist <- os.Args[1:] }()

// Crawl the web concurrently.
seen := make(map[string]bool)
for ; n > 0; n-- {
list := <-worklist
for _, link := range list {
if !seen[link] {
seen[link] = true
n++
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}

下面的程序是避免过度并发的另一种思路。这个版本使用了原来的crawl函数,但没有使用计数信号量,取而代之用了20个长活的crawler goroutine,这样来保证最多20个HTTP请求在并发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

func main() {
worklist := make(chan []string) // lists of URLs, may have duplicates
unseenLinks := make(chan string) // de-duplicated URLs

// Add command-line arguments to worklist.
go func() { worklist <- os.Args[1:] }()

// Create 20 crawler goroutines to fetch each unseen link.
for i := 0; i < 20; i++ {
go func() {
for link := range unseenLinks {
foundLinks := crawl(link)
go func() { worklist <- foundLinks }()
}
}()
}

// The main goroutine de-duplicates worklist items
// and sends the unseen ones to the crawlers.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
unseenLinks <- link
}
}
}
}

所有的爬虫goroutine现在都是被同一个channel-unseenLinks喂饱的了。主goroutine负责拆分它从worklist里拿到的元素,然后把没有抓过的经由unseenLinks channel发送给一个爬虫的goroutine。
seen这个map被限定在main goroutine中;也就是说这个map只能在main goroutine中进行访问。类似于其它的信息隐藏方式,这样的约束可以让我们从一定程度上保证程序的正确性。例如,内部变量不能够在函数外部被访问到;变量在没有被转义的情况下是无法在函数外部访问的;一个对象的封装字段无法被该对象的方法以外的方法访问到。在所有的情况下,信息隐藏都可以帮助我们约束我们的程序,使其不发生意料之外的情况。
crawl函数爬到的链接在一个专有的goroutine中被发送到worklist中来避免死锁。为了节省空间,这个例子的终止问题我们先不进行详细阐述了。

基于select的多路复用

下面的程序会进行火箭发射的倒计时。time.Tick函数返回一个channel,程序会周期性地像一个节拍器一样向这个channel发送事件。每一个事件的值是一个时间戳,不过更有意思的是其传送方式。

1
2
3
4
5
6
7
8
9
func main() {
fmt.Println("Commencing countdown.")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
j<-tick
}
launch()
}

现在我们让这个程序支持在倒计时中,用户按下return键时直接中断发射流程。首先,我们启动一个goroutine,这个goroutine会尝试从标准输入中调入一个单独的byte并且,如果成功了,会向名为abort的channel发送一个值。

1
2
3
4
5
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
abort <- struct{}{}
}()

现在每一次计数循环的迭代都需要等待两个channel中的其中一个返回事件了:ticker channel当一切正常时(就像NASA jorgon的"nominal",译注:这梗估计我们是不懂了)或者异常时返回的abort事件。我们无法做到从每一个channel中接收信息,如果我们这么做的话,如果第一个channel中没有事件发过来那么程序就会立刻被阻塞,这样我们就无法收到第二个channel中发过来的事件。这时候我们需要多路复用(multiplex)这些操作了,为了能够多路复用,我们使用了select语句。
select {
case <-ch1:
// …
case x := <-ch2:
// …use x…
case ch3 <- y:
// …
default:
// …
}
上面是select语句的一般形式。和switch语句稍微有点相似,也会有几个case和最后的default选择支。每一个case代表一个通信操作(在某个channel上进行发送或者接收)并且会包含一些语句组成的一个语句块。一个接收表达式可能只包含接收表达式自身(译注:不把接收到的值赋值给变量什么的),就像上面的第一个case,或者包含在一个简短的变量声明中,像第二个case里一样;第二种形式让你能够引用接收到的值。
select会等待case中有能够执行的case时去执行。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select{},会永远地等待下去。
让我们回到我们的火箭发射程序。time.After函数会立卽返回一个channel,并起一个新的goroutine在经过特定的时间后向该channel发送一个独立的值。下面的select语句会会一直等待到两个事件中的一个到达,无论是abort事件或者一个10秒经过的事件。如果10秒经过了还没有abort事件进入,那么火箭就会发射。

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
// ...create abort channel...

fmt.Println("Commencing countdown. Press return to abort.")
select {
case <-time.After(10 * time.Second):
// Do nothing.
case <-abort:
fmt.Println("Launch aborted!")
return
}
launch()
}

下面这个例子更微秒。ch这个channel的buffer大小是1,所以会交替的为空或为满,所以只有一个case可以进行下去,无论i是奇数或者偶数,它都会打印0 2 4 6 8。

1
2
3
4
5
6
7
8
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x) // "0" "2" "4" "6" "8"
case ch <- i:
}
}

如果多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。增加前一个例子的buffer大小会使其输出变得不确定,因为当buffer旣不为满也不为空时,select语句的执行情况就像是抛硬币的行为一样是随机的。
下面让我们的发射程序打印倒计时。这里的select语句会使每次循环迭代等待一秒来执行退出操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
// ...create abort channel...

fmt.Println("Commencing countdown. Press return to abort.")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
select {
case <-tick:
// Do nothing.
case <-abort:
fmt.Println("Launch aborted!")
return
}
}
launch()
}

time.Tick函数表现得好像它创建了一个在循环中调用time.Sleep的goroutine,每次被唤醒时发送一个事件。当countdown函数返回时,它会停止从tick中接收事件,但是ticker这个goroutine还依然存活,继续徒劳地尝试从channel中发送值,然而这时候已经没有其它的goroutine会从该channel中接收值了–这被称为goroutine泄露(§8.4.4)。
Tick函数挺方便,但是只有当程序整个生命周期都需要这个时间时我们使用它才比较合适。否则的话,我们应该使用下面的这种模式:
ticker := time.NewTicker(1 * time.Second)
<-ticker.C // receive from the ticker’s channel
ticker.Stop() // cause the ticker’s goroutine to terminate
有时候我们希望能够从channel中发送或者接收值,并避免因为发送或者接收导致的阻塞,尤其是当channel没有准备好写或者读时。select语句就可以实现这样的功能。select会有一个default来设置当其它的操作都不能够马上被处理时程序需要执行哪些逻辑。
下面的select语句会在abort channel中有值时,从其中接收值;无值时什么都不做。这是一个非阻塞的接收操作;反复地做这样的操作叫做“轮询channel”。

1
2
3
4
5
6
7
select {
case <-abort:
fmt.Printf("Launch aborted!\n")
return
default:
// do nothing
}

channel的零值是nil。也许会让你觉得比较奇怪,nil的channel有时候也是有一些用处的。因为对一个nil的channel发送和接收操作会永远阻塞,在select语句中操作nil的channel永远都不会被select到。
这使得我们可以用nil来激活或者禁用case,来达成处理其它输入或输出事件时超时和取消的逻辑。我们会在下一节中看到一个例子。

select一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package main

import (
"fmt"
"math/rand"
"time"
)

func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
out <- i
i++
}
}()
return out
}

func worker(id int, c chan int) {
for n := range c {
time.Sleep(time.Second)
fmt.Printf("Worker %d received %d\n",
id, n)
}
}

func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}

func main() {
var c1, c2 = generator(), generator()
var worker = createWorker(0)

var values []int
tm := time.After(10 * time.Second) //10秒后往通道发送消息
tick := time.Tick(time.Second) //每隔一秒钟发送一次
for {
var activeWorker chan<- int
var activeValue int
if len(values) > 0 {
activeWorker = worker
activeValue = values[0]
}

select {
case n := <-c1:
values = append(values, n)
case n := <-c2:
values = append(values, n)
case activeWorker <- activeValue:
values = values[1:]

case <-time.After(800 * time.Millisecond):
fmt.Println("timeout")
case <-tick:
fmt.Println(
"queue len =", len(values))
case <-tm:
fmt.Println("bye")
return
}
}
}

select的第二个例子:

时间+先到先得,但是只能一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func ToChansTimedTimerSelect(d time.Duration, message Type, a, b chan Type) (written int) {
t := time.NewTimer(d)
for i := 0; i < 2; i++ {
select {
case a <- message:
a = nil
case b <- message:
b = nil
case <-t.C:
return i
}
}
t.Stop()
return 2
}

time.timer定时器

死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import "time"

func toChanTimed(t *time.Timer, ch chan int) {
t.Reset(1 * time.Second)

defer func() {

if !t.Stop() {
<-t.C
}
}()


select {
case ch <- 42:
case <-t.C:
return
}

}

func main(){

t := time.NewTimer(time.Second)

var ch chan int = make(chan int)

toChanTimed(t,ch)
}

正确:

t.reset 与t.stop()不能并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import "time"

func toChanTimed(t *time.Timer, ch chan int) {
t.Reset(1 * time.Second)



select {
case ch <- 42:
case <-t.C:
return
}

if !t.Stop() {
<-t.C
}
}

func main(){

t := time.NewTimer(time.Second)

var ch chan int = make(chan int)

toChanTimed(t,ch)
}

并发的文件遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package main

import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
)

func main() {
// Determine the initial directories.
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}

// Traverse the file tree.
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()

// Print the results.
var nfiles, nbytes int64
for size := range fileSizes {
nfiles++
nbytes += size
}
printDiskUsage(nfiles, nbytes)
}

func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

//!-main

//!+walkDir

// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}

//!-walkDir

// The du1 variant uses two goroutines and
// prints the total after every file is found.

并发的文件遍历2

借助select能够打印出过程有点意思。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package main

// The du2 variant uses select and a time.Ticker
// to print the totals periodically if -v is set.

import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
)

//!+
var verbose = flag.Bool("v", false, "show verbose progress messages")

func main() {
// ...start background goroutine...

//!-
// Determine the initial directories.
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}

// Traverse the file tree.
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()

//!+
// Print the results periodically.
var tick <-chan time.Time
if *verbose {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes was closed
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes) // final totals
}

//!-

func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}

并发的文件遍历3

改进在:1、main函数中,每一个目录都加了goroutines。
2、信号量
3、sync.WaitGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main

// The du3 variant traverses all directories in parallel.
// It uses a concurrency-limiting counting semaphore
// to avoid opening too many files at once.

import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
)

var vFlag = flag.Bool("v", false, "show verbose progress messages")

//!+
func main() {
// ...determine roots...

//!-
flag.Parse()

// Determine the initial directories.
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}

//!+
// Traverse each root of the file tree in parallel.
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1) //sync.WaitGroup 等待协程
go walkDir(root, &n, fileSizes) //协程
}
go func() {
n.Wait()
close(fileSizes)
}()
//!-

// Print the results periodically.
var tick <-chan time.Time
if *vFlag {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes was closed
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}

printDiskUsage(nfiles, nbytes) // final totals
//!+
// ...select loop...
}

//!-

func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
//!+walkDir
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}

//!-walkDir

//信号量
//!+sema
// sema is a counting semaphore for limiting concurrency in dirents.
var sema = make(chan struct{}, 20)

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
sema <- struct{}{} // acquire token
defer func() { <-sema }() // release token
// ...
//!-sema

entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}

并发的退出

有时候我们需要通知goroutine停止它正在干的事情,比如一个正在执行计算的web服务,然而它的客户端已经断开了和服务端的连接。
Go语言并没有提供在一个goroutine中终止另一个goroutine的方法,由于这样会导致goroutine之间的共享变量落在未定义的状态上。在8.7节中的rocket launch程序中,我们往名字叫abort的channel里发送了一个简单的值,在countdown的goroutine中会把这个值理解为自己的退出信号。但是如果我们想要退出两个或者任意多个goroutine怎么办呢?
一种可能的手段是向abort的channel里发送和goroutine数目一样多的事件来退出它们。如果这些goroutine中已经有一些自己退出了,那么会导致我们的channel里的事件数比goroutine还多,这样导致我们的发送直接被阻塞。另一方面,如果这些goroutine又生成了其它的goroutine,我们的channel里的数目又太少了,所以有些goroutine可能会无法接收到退出消息。一般情况下我们是很难知道在某一个时刻具体有多少个goroutine在运行着的。另外,当一个goroutine从abort channel中接收到一个值的时候,他会消费掉这个值,这样其它的goroutine就没法看到这条信息。为了能够达到我们退出goroutine的目的,我们需要更靠谱的策略,来通过一个channel把消息广播出去,这样goroutine们能够看到这条事件消息,并且在事件完成之后,可以知道这件事已经发生过了。
回忆一下我们关闭了一个channel并且被消费掉了所有已发送的值,操作channel之后的代码可以立卽被执行,并且会产生零值。我们可以将这个机制扩展一下,来作为我们的广播机制:不要向channel发送值,而是用关闭一个channel来进行广播。
只要一些小修改,我们就可以把退出逻辑加入到前一节的du程序。首先,我们创建一个退出的channel,这个channel不会向其中发送任何值,但其所在的闭包内要写明程序需要退出。我们同时还定义了一个工具函数,cancelled,这个函数在被调用的时候会轮询退出状态。

1
2
3
4
5
6
7
8
9
10
var done = make(chan struct{})

func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}

下面我们创建一个从标准输入流中读取内容的goroutine,这是一个比较典型的连接到终端的程序。每当有输入被读到(比如用户按了回车键),这个goroutine就会把取消消息通过关闭done的channel广播出去。
// Cancel traversal when input is detected.
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
close(done)
}()
现在我们需要使我们的goroutine来对取消进行响应。在main goroutine中,我们添加了select的第三个case语句,尝试从done channel中接收内容。如果这个case被满足的话,在select到的时候卽会返回,但在结束之前我们需要把fileSizes channel中的内容“排”空,在channel被关闭之前,舍弃掉所有值。这样可以保证对walkDir的调用不要被向fileSizes发送信息阻塞住,可以正确地完成。

1
2
3
4
5
6
7
8
9
10
11
12
for {
select {
case <-done:
// Drain fileSizes to allow existing goroutines to finish.
for range fileSizes {
// Do nothing.
}
return
case size, ok := <-fileSizes:
// ...
}
}

walkDir这个goroutine一启动就会轮询取消状态,如果取消状态被设置的话会直接返回,并且不做额外的事情。这样我们将所有在取消事件之后创建的goroutine改变为无操作。

1
2
3
4
5
6
7
8
9
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
// ...
}
}

在walkDir函数的循环中我们对取消状态进行轮询可以带来明显的益处,可以避免在取消事件发生时还去创建goroutine。取消本身是有一些代价的;想要快速的响应需要对程序逻辑进行侵入式的修改。确保在取消发生之后不要有代价太大的操作可能会需要修改你代码里的很多地方,但是在一些重要的地方去检查取消事件也确实能带来很大的好处。
对这个程序的一个简单的性能分析可以揭示瓶颈在dirents函数中获取一个信号量。下面的select可以让这种操作可以被取消,并且可以将取消时的延迟从几百毫秒降低到几十毫秒。

1
2
3
4
5
6
7
8
9
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // acquire token
case <-done:
return nil // cancelled
}
defer func() { <-sema }() // release token
// ...read directory...
}

现在当取消发生时,所有后台的goroutine都会迅速停止并且主函数会返回。当然,当主函数返回时,一个程序会退出,而我们又无法在主函数退出的时候确认其已经释放了所有的资源(译注:因为程序都退出了,你的代码都没法执行了) 。这里有一个方便的窍门我们可以一用:取代掉直接从主函数返回,我们调用一个panic,然后runtime会把每一个goroutine的栈dump下来。如果main goroutine是唯一一个剩下的goroutine的话,他会清理掉自己的一切资源。但是如果还有其它的goroutine没有退出,他们可能没办法被正确地取消掉,也有可能被取消但是取消操作会很花时间;所以这里的一个调研还是很有必要的。我们用panic来获取到足够的信息来验证我们上面的判断,看看最终到底是什么样的情况。

通道的关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"time"
)

func worker(id int, c chan int) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
}
}



func channelClose() {
c := make(chan int)
go worker(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
close(c)
time.Sleep(time.Millisecond)
}

func main() {

channelClose()
}

通道的经典死锁现象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
"fmt"
)

func doWork(id int,
c chan int,done chan bool) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
//done 代表任务做完,需要人来接收。因为done一直等不到人来接收,就会卡住,卡住就不会接收C
done <- true
}
}

type worker struct {
in chan int
done chan bool
}

func createWorker(
id int) worker {
w := worker{
in: make(chan int),
done:make(chan bool),
}
go doWork(id, w.in,w.done)
return w
}

func chanDemo() {


var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}

//第一遍循环顺利进行
for i, worker := range workers {
worker.in <- 'a' + i
}

//第二遍循环会陷入死锁,原因见上。
for i, worker := range workers {
worker.in <- 'A' + i
}

for _,worker := range workers{
<-worker.done
<-worker.done
}
}

func main() {
chanDemo()
}

死锁的解决方法1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
"fmt"
)

func doWork(id int,
c chan int,done chan bool) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
done <- true
}
}

type worker struct {
in chan int
done chan bool
}

func createWorker(
id int) worker {
w := worker{
in: make(chan int),
done:make(chan bool),
}
go doWork(id, w.in,w.done)
return w
}

func chanDemo() {


var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}

for i, worker := range workers {
worker.in <- 'a' + i
}

for _,worker := range workers{
<-worker.done
}
for i, worker := range workers {
worker.in <- 'A' + i
}

for _,worker := range workers{
<-worker.done
}
}

func main() {
chanDemo()
}

死锁的解决方法2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
"fmt"
)

func doWork(id int,
c chan int,done chan bool) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
go func() { done <- true}()
}
}

type worker struct {
in chan int
done chan bool
}

func createWorker(
id int) worker {
w := worker{
in: make(chan int),
done:make(chan bool),
}
go doWork(id, w.in,w.done)
return w
}

func chanDemo() {


var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}

for i, worker := range workers {
worker.in <- 'a' + i
}


for i, worker := range workers {
worker.in <- 'A' + i
}

for _,worker := range workers{
<-worker.done
<-worker.done
}
}

func main() {
chanDemo()
}

sync.WaitGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
"fmt"
"sync"
)

func doWork(id int,
w worker) {
for n := range w.in {
fmt.Printf("Worker %d received %c\n",
id, n)
w.done()
}
}

type worker struct {
in chan int
done func()
}

func createWorker(
id int, wg *sync.WaitGroup) worker {
w := worker{
in: make(chan int),
done: func() {
wg.Done()
},
}
go doWork(id, w)
return w
}

func chanDemo() {
var wg sync.WaitGroup

var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i, &wg)
}

wg.Add(20)
for i, worker := range workers {
worker.in <- 'a' + i
}
for i, worker := range workers {
worker.in <- 'A' + i
}

wg.Wait()
}

func main() {
chanDemo()
}

动态栈

每一个OS线程都有一个固定大小的内存块(一般会是2MB)来做栈,这个栈会用来存储当前正在被调用或挂起(指在调用其它函数时)的函数的内部变量。这个固定大小的栈同时很大又很小。因为2MB的栈对于一个小小的goroutine来说是很大的内存浪费,比如对于我们用到的,一个只是用来WaitGroup之后关闭channel的goroutine来说。而对于go程序来说,同时创建成百上千个gorutine是非常普遍的,如果每一个goroutine都需要这么大的栈的话,那这么多的goroutine就不太可能了。除去大小的问题之外,固定大小的栈对于更复杂或者更深层次的递归函数调用来说显然是不够的。修改固定的大小可以提升空间的利用率允许创建更多的线程,并且可以允许更深的递归调用,不过这两者是没法同时兼备的。
相反,一个goroutine会以一个很小的栈开始其生命周期,一般只需要2KB。一个goroutine的栈,和操作系统线程一样,会保存其活跃或挂起的函数调用的本地变量,但是和OS线程不太一样的是一个goroutine的栈大小并不是固定的;栈的大小会根据需要动态地伸缩。而goroutine的栈的最大值有1GB,比传统的固定大小的线程栈要大得多,尽管一般情况下,大多goroutine都不需要这么大的栈。
练习9.4: 创建一个流水线程序,支持用channel连接任意数量的goroutine,在跑爆内存之前,可以创建多少流水线阶段?一个变量通过整个流水线需要用多久?(这个练习题翻译不是很确定。。)

Goroutine调度

OS线程会被操作系统内核调度。每几毫秒,一个硬件计时器会中断处理器,这会调用一个叫作scheduler的内核函数。这个函数会挂起当前执行的线程并保存内存中它的寄存器内容,检查线程列表并决定下一次哪个线程可以被运行,并从内存中恢复该线程的寄存器信息,然后恢复执行该线程的现场并开始执行线程。因为操作系统线程是被内核所调度,所以从一个线程向另一个“移动”需要完整的上下文切换,也就是说,保存一个用户线程的状态到内存,恢复另一个线程的到寄存器,然后更新调度器的数据结构。这几步操作很慢,因为其局部性很差需要几次内存访问,并且会增加运行的cpu周期。
Go的运行时包含了其自己的调度器,这个调度器使用了一些技术手段,比如m:n调度,因为其会在n个操作系统线程上多工(调度)m个goroutine。Go调度器的工作和内核的调度是相似的,但是这个调度器只关注单独的Go程序中的goroutine(译注:按程序独立)。
和操作系统的线程调度不同的是,Go调度器并不是用一个硬件定时器而是被Go语言"建筑"本身进行调度的。例如当一个goroutine调用了time.Sleep或者被channel调用或者mutex操作阻塞时,调度器会使其进入休眠并开始执行另一个goroutine直到时机到了再去唤醒第一个goroutine。因为因为这种调度方式不需要进入内核的上下文,所以重新调度一个goroutine比调度一个线程代价要低得多。

GOMAXPROCS

Go的调度器使用了一个叫做GOMAXPROCS的变量来决定会有多少个操作系统的线程同时执行G​​o的代码。其默认的值是运行机器上的CPU的核心数,所以在一个有8个核心的机器上时,调度器一次会在8个OS线程上去调度GO代码。(GOMAXPROCS是前面说的m:n调度中的n)。在休眠中的或者在通信中被阻塞的goroutine是不需要一个对应的线程来做调度的。在I/O中或系统调用中或调用非Go语言函数时,是需要一个对应的操作系统线程的,但是GOMAXPROCS并不需要将这几种情况计数在内。
你可以用GOMAXPROCS的环境变量吕显式地控制这个参数,或者也可以在运行时用runtime.GOMAXPROCS函数来修改它。我们在下面的小程序中会看到GOMAXPROCS的效果,这个程序会无限打印0和1。
for {
go fmt.Print(0)
fmt.Print(1)
}

$ GOMAXPROCS=1 go run hacker-cliché.go
111111111111111111110000000000000000000011111…

$ GOMAXPROCS=2 go run hacker-cliché.go
010101010101010101011001100101011010010100110…
在第一次执行时,最多同时只能有一个goroutine被执行。初始情况下只有main goroutine被执行,所以会打印很多1。过了一段时间后,GO调度器会将其置为休眠,并唤醒另一个goroutine,这时候就开始打印很多0了,在打印的时候,goroutine是被调度到操作系统线程上的。在第二次执行时,我们使用了两个操作系统线程,所以两个goroutine可以一起被执行,以同样的频率交替打印0和1。我们必须强调的是goroutine的调度是受很多因子影响的,而runtime也是在不断地发展演进的,所以这里的你实际得到的结果可能会因为版本的不同而与我们运行的结果有所不同。

Goroutine没有ID号

在大多数支持多线程的操作系统和程序语言中,当前的线程都有一个独特的身份(id),并且这个身份信息可以以一个普通值的形式被被很容易地获取到,典型的可以是一个integer或者指针值。这种情况下我们做一个抽象化的thread-local storage(线程本地存储,多线程编程中不希望其它线程访问的内容)就很容易,只需要以线程的id作为key的一个map就可以解决问题,每一个线程以其id就能从中获取到值,且和其它线程互不冲突。
goroutine没有可以被程序员获取到的身份(id)的概念。这一点是设计上故意而为之,由于thread-local storage总是会被滥用。比如说,一个web server是用一种支持tls的语言实现的,而非常普遍的是很多函数会去寻找HTTP请求的信息,这代表它们就是去其存储层(这个存储层有可能是tls)查找的。这就像是那些过分依赖全局变量的程序一样,会导致一种非健康的“距离外行为”,在这种行为下,一个函数的行为可能不是由其自己内部的变量所决定,而是由其所运行在的线程所决定。因此,如果线程本身的身份会改变–比如一些worker线程之类的–那么函数的行为就会变得神秘莫测。
Go鼓励更为简单的模式,这种模式下参数对函数的影响都是显式的。这样不仅使程序变得更易读,而且会让我们自由地向一些给定的函数分配子任务时不用担心其身份信息影响行为。