Eli's Blog

1. goroutine

协程,比线程更小,十几个goroutine可能体现在底层就五六个线程。Go语言内部实现了这些goroutine之间的内存共享。执行goroutine只需极少的栈内存(4~5KB),比线程更易用,更高效和更轻便。

goroutine调度模型:

  • M: 线程
  • P: 上下文
  • G: Goroutine

并发concurrency:

  • goroutine只是官方实现的超级“线程池”。每个实例4-5KB的栈内存占用,和大幅减少的创建和销毁开销,是制造Go号称高并发的根本原因
  • 并发不是并行(Concurrency is Not Parallelism): 并发是通过切换CPU时间来实现“同时”运行;而并行则是直接利用多核实现多线程同时运行。Go可设置使用的CPU核心数,以发挥多核计算机的能力
  • goroutine奉行通过通信来共享内存,而不是共享内存来通信。

goroutine的切换点:

  • I/O, select

  • channel

  • 等待锁

  • 函数调用 (有时)

  • runtime.Gosched()

  • 进程:资源拥有的基本单位。每个进程由私营的虚拟地址空间、代码、数据和其它各种资源组成。

  • 线程:处理器调度和分配的基本单位。线程是进程内部的一个执行单元,每个进程至少有一个主线程,它无需用户去主动创建,由系统自动创建。

  • 协程:比线程更小

    • 轻量级“线程”
    • “非抢占式”多任务处理,有协程主动交出控制权
    • 编译器/解释器/虚拟机层面的多任务
    • 多个协程,可能在一个或多个线程上运行

KSE:Kernel Scheduling Entity, 内核调度实体,即可以被操作系统内核调度器调度的实体对象,它是内核的最小调度单元,也就是内核级线程

三种线程模型:

  1. 用户级线程模型:

    • 用户线程与内核线程KSE的关系是多对一 (N:1)。多个用户线程一般从属单个进程,并且多线程的调度由用户自己的线程库完成,线程的创建、销毁及线程间的协调等操作由用户自己的线程库负责,无需借助系统调度来实现。

    • Python的gevent协程库就属这种实现

    • 线程调度在用户层面完成,不需要让CPU在用户态和内核态之间切换,这种方式较为轻量级,对系统资源消耗少

    • 缺点:做不到真正意义上的并发。如果某个用户进程上的某个线程因为一个阻塞调用(I/O)二被CPU中断(抢占式调度),那么该进程中的其它线程将被阻塞,整个进程被挂起。因为在用户线程模式下,进程内的线程绑定到CPU执行是由用户进程调度实现的,内部线程对CPU不可见,即CPU调度的是进程,而非线程

    • 协程库优化:把阻塞的操作重新封装为完全非阻塞模式,在阻塞点上,主动让出自己,并通知或唤醒其它等待的用户线程

  2. 内核级线程模型

    • 用户线程和内核线程KSE的关系是一对一 (1:1)。每个用户线程绑定一个内核线程,线程的调度完全交由内核控制
    • Java/C++ 的线程库按此方式实现
    • 优点:简单,直接借助系统内核的线程和调度器,可以快速实现线程切换,做到真正的并行处理
    • 缺点:由于直接使用内核去创建、销毁及多线程上下文切换和调度,系统资源成本大幅上涨,对性能影响较大
  3. 两级线程模型(即混合型线程模型)

    • 用户线程与内核线程KSE的关系是多对多 (N:M)
    • 一个进程可与多个内核线程KSE关联,该进程内的多个线程绑定到了不同的KSE上
    • 进程内的线程并不与KSE一一绑定,当某个KSE绑定的线程因阻塞操作被内核调度出CPU时,其关联的进程中的某个线程又会重新与KSE绑定
    • 此种模型高度复杂,Go语言中的runtime调度器实现了这种方案
    • 为什么称为两级?用户调度实现用户线程到KSE的调度,内核调度器实现KSE到CPU上的调度

G-P-M 模型:

  • G:Goroutine:独立执行单元。相较于每个OS线程固定分配2M内存的模式,Goroutine的栈采取动态扩容方式,2k ~ 1G(AMD64, AMD32: 256M)。周期性回收内存,收缩栈空间
    • 每个Goroutine对应一个G结构体,它存储Goroutine的运行堆栈、状态及任务函数,可重用。
    • G并非执行体,每个G需要绑定到P才能被调度执行
  • P:Processor: 逻辑处理器,中介
    • 对G来说,P相当于CPU,G只有绑定到P才能被调用
    • 对M来说,P提供相关的运行环境(Context),如内存分配状态(mcache),任务队列(G)等
    • P的数量决定系统最大并行的G的数量 (CPU核数 >= P的数量),用户可通过GOMAXPROCS设置数量,但不能超过256
  • M:Machine
    • OS线程抽象,真正执行计算的资源,在绑定有效的P后,进入schedule循环
    • schedule循环的机制大致从Global队列、P的Local队列及wait队列中获取G,切换到G的执行栈上执行G的函数,调用goexit做清理工作并回到M
    • M不保留G的状态
    • M的数量不定,由Go Runtime调整,目前默认不超过10K

gpm

1.1 go关键字开启新协程

1
2
3
4
5
6
7
8
9
10
11
func say(s string) {
for i := 0; i < 5; i ++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}

func main() {
go say("world")
say("hello")
}

1.2 runtime包

runtime.Gosched() 让出时间片
runtime.Goexit() 终止协程
runtime.GOMAXPROCS(N) 指定运行CPU个数

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
go func() {
for i := 0; i < 5; i++ {
fmt.Println("go")
}
}()

for i := 0; i < 2; i++ {
runtime.Gosched() // 让出时间片
fmt.Println("hello")
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 打印函数属IO操作,自动切换控制权
func auto() {
for i := 0; i < 10; i++ {
go func(i int) {
for {
fmt.Printf("Hello from goroutine %d\n", i)
}
}(i)
}

time.Sleep(time.Millisecond)
}

// 不自动切换控制权
func manual() {
var a [10]int

for i := 0; i < 10; i++ {
go func(i int) { // race condition
for {
a[i]++
runtime.Gosched() // 交出控制权
}
}(i)
}

time.Sleep(time.Millisecond)
fmt.Println(a) // 存在读写抢占
}

// out of range
func outOfRange() {
var a [10]int

for i := 0; i < 10; i++ {
go func() { // race condition
for {
a[i]++
runtime.Gosched() // 交出控制权
}
}()
}

time.Sleep(time.Millisecond)
fmt.Println(a)
}
1
go run -race goroutine.go   # manual()函数存在抢占,race选项可检查到

2. 通道(channel)

通道:用来传递数据的一种数据结构。两个goroutine之间,可以使用它来进行同步和通信

不靠共享内存通信,而是通过通信来共享内存

Channel:

  • goroutine的沟通桥梁,大多是阻塞同步的
  • make创建,close关闭
  • 是引用类型
  • 可使用for range来迭代channel
  • 可设置单向或双向通道
  • 可设置缓存大小,在未被填满前不会发生阻塞
1
2
3
4
ch := make(chan int)

ch <- v // 把v发送到通道ch
v := <- ch // 从ch接收数据

2.1 分段计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
a := []int{7, 9, -3, 4, 6, 8, 2, -5}

mid := len(a) / 2
ch := make(chan int)

go sum(a[:mid], ch)
go sum(a[mid:], ch)

x, y := <-ch, <-ch
fmt.Println(x, y, x+y)
}

func sum(a []int, ch chan int) {
result := 0
for _, v := range a {
result += v
}

ch <- result
}

2.2 阻塞主线程

1
2
3
4
5
6
7
8
9
10
func main() {
c := make(chan bool)

go func() {
fmt.Println("Go Go Go!!!")
c <- true
}()

<- c // 阻塞main函数,等待goroutine执行完成
}

2.3 通道遍历和关闭

如果通道不关闭close(ch),遍历range ch就不会结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func main() {
ch := make(chan int, 10)

go fib(cap(ch), ch)

//// 方法1:for-range 自检
//for i := range ch {
// fmt.Printf("%d ", i)
//}

// 方法2:comma ok idiom
for {
num, ok := <-ch
if ok {
fmt.Printf("%d ", num)
} else {
break
}
}

fmt.Println()
}

func fib(n int, ch chan int) {
x, y := 1, 1
for i := 0; i < n; i++ {
ch <- x
x, y = y, x+y
}

// 必须关闭,否则deadlock
close(ch)
}

2.4 主程序可能不等待goruntine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
ch := make(chan bool)

for i := 0; i < 10; i++ {
go calc(i, ch)
}

<-ch
}

func calc(index int, ch chan bool) {
sum := 0

for i := 1; i < 1000000001; i++ {
sum += i
}

fmt.Println(index, sum)

// goroutine 执行顺序不固定,此判断不正确
if index == 9 {
ch <- true
}
}

2.4.1 方法一:使用缓存channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
ch := make(chan bool, 10)

for i := 0; i < 10; i++ {
go calc(i, ch)
}

// 等待10次
for i := 0; i < 10; i++ {
<-ch
}
}

func calc(index int, ch chan bool) {
sum := 0

for i := 1; i < 1000000001; i++ {
sum += i
}

fmt.Println(index, sum)

ch <- true
}

2.4.2 方法二:通过同步解决(sync.WaitGroup)

  • wg.Add(N): 新增N个任务
  • wg.Done(): 完成一个任务,计算器减1
  • wg.Wait(): 主线程等待,直到计数器为0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
wg := sync.WaitGroup{}
wg.Add(10)

for i := 0; i < 10; i++ {
go calc(i, &wg)
}

// 等待
wg.Wait()
}

func calc(index int, wg *sync.WaitGroup) {
sum := 0

for i := 1; i < 1000000001; i++ {
sum += i
}

fmt.Println(index, sum)

wg.Done()
}

2.5 模拟打印机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {
ch := make(chan bool)

go task1(ch)
go task2(ch)

// 等待
select {
case <-time.After(15 * time.Second):
}
}

func Printer(s string) {
for _, c := range s {
fmt.Printf("%c", c)
time.Sleep(time.Second)
}
fmt.Println()
}

func task1(ch chan bool) {
<-ch // 阻塞等待
Printer("hello")
}

func task2(ch chan bool) {
Printer("world")
ch <- true // 完成释放
}

2.6 带缓冲的通道

通道 ch := make(chan int, N):

  • N=0 同步阻塞
  • N>0 异步的,超过N时才阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
ch := make(chan int, 3)

go func() {
for i := 0; i < 10; i++ {
fmt.Printf("i=%d, len(ch)=%d, cap(ch)=%d\n", i, len(ch), cap(ch))
ch <- i
}
}()

time.Sleep(2 * time.Second)

for i := 0; i < 10; i++ {
num := <-ch
fmt.Printf("num=%d\n", num)
}
}

2.7 单向通道

1
2
3
var ch1 chan int          // 默认双向
var ch2 chan<- int // 单向写
var ch3 <-chan int // 单向读
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
ch := make(chan int)

/*
支持隐式转换
var send chan<- int = ch // write-only
var recv <-chan int = ch // read-only
*/
go producer(ch)

consumer(ch)
}

func producer(out chan<- int) {
for i := 0; i < 10; i++ {
out <- i * i
}

close(out)
}

func consumer(in <-chan int) {
for i := 0; i < 10; i++ {
fmt.Printf("%d ", <-in)
}
fmt.Println()
}

3. Select

select: 管理多个channel,监听channel上的数据流动。类似switch语法,但每个case语句必须是IO操作。多个case同时满足,任选一个执行。

  • 处理一个或多个channel的发送和接收
  • 同时有多个channel时,随机处理
  • 可用空select来阻塞main函数
  • 可设置超时

default语句:

  • 有default:select语句不会被阻塞,执行default后,程序的执行会从select语句中恢复,进入下一次轮询。比较消耗资源。
  • 没有default:select语句将被阻塞,直到至少有一个通信可以进行下去

3.1 管理多个通道

3.1.1 示例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func main() {
c1, c2 := make(chan int), make(chan string)
done := make(chan bool, 2)

go func() {
for {
select {
case v, ok := <-c1:
if !ok {
done <- true
break
}
fmt.Println("c1:", v)
case v, ok := <-c2:
if !ok {
done <- true
break
}
fmt.Println("c2:", v)
}
}
}()

c1 <- 1
c2 <- "hi"
c1 <- 5
c2 <- "hello"

// 必须关闭至少一个
close(c1)
//close(c2)

for i := 0; i < 2; i++ {
<-done
}
}

3.1.2 示例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func main() {
ch := make(chan int)
quit := make(chan bool)

// 消费者
go func() {
for i := 0; i < 10; i++ {
fmt.Printf("%d ", <-ch)
}
fmt.Println()

quit <- true
}()

fib(ch, quit)
}

func fib(ch chan<- int, quit <-chan bool) {
x, y := 1, 1

for {
select {
case ch <- x:
x, y = y, x+y
case <-quit:
fmt.Println("Done.")
return // break只能跳出select,无法跳出for循环
}
}
}

3.1.3 使用select作为发送者应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
c := make(chan int)

go func() {
for v := range c {
fmt.Println(v)
}
}()

for i := 0; i < 100; i++ {
select {
case c <- 0:
case c <- 1:
}
}
}

3.2 超时处理

case <-time.After(5 * time.Second): 其他channel阻塞时间超过5s时执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
ch := make(chan int)
done := make(chan bool)

go func() {
for {
select {
case x := <-ch:
fmt.Printf("%d ", x)
case <-time.After(5 * time.Second):
fmt.Println("\nTimeout")
done <- true
return
}
}
}()

for i := 0; i < 10; i++ {
ch <- i
time.Sleep(time.Second)
}

<-done
}

3.3 避免造成死锁

select 在执行过程中,必须命中其中的某一分支, 否则deadlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
c1 := make(chan string, 1)
c2 := make(chan string, 1)

c1 <- "ok"
c2 <- "good"

select {
case msg := <-c1:
fmt.Printf("c1 receive %s\n", msg)
case msg := <-c2:
fmt.Printf("c2 receive %s\n", msg)
default:
fmt.Println("no data")
}
}

4. 定时器

4.1 一次性定时任务

time.NewTimer(d Duration) *Timer

  • <-timer.C: 阻塞等待,返回定时器时间
  • timer.Stop():
  • timer.Reset(d Duration):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
timer := time.NewTimer(2 * time.Second)

go func() {
<-timer.C
fmt.Println("Goroutine is done.")
}()

timer.Stop()

// 重置定时器,上面的 goroutine 将继续执行
timer.Reset(5 * time.Second)

select {
case <-time.After(10 * time.Second):
}
}

4.2 周期性定时任务

time.NewTicker(d Duration) *Ticker

  • <-ticker.C: 阻塞等待,返回定时器时间
  • ticker.Stop():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
ticker := time.NewTicker(2 * time.Second)

count := 0
for {
<-ticker.C

count++
fmt.Printf("%d ", count)

if count == 10 {
ticker.Stop()
break
}
}
fmt.Println()
}

4.3 延迟操作总结

1
2
3
4
5
6
time.Sleep(time.Second * 2)

<- time.After(time.Second * 2)

timer := time.NewTimer(time.Second * 2)
<-timer.C

5. 死锁经典错误案例

5.1 无缓冲信道,在接收者未准备好之前,发送操作是阻塞的

1
2
3
4
5
6
7
func main() {
c := make(chan bool)

c <- true // 阻塞

fmt.Println(<-c)
}

两种解决方法:

1) 先接收,后发送

1
2
3
4
5
6
7
8
9
func main() {
c := make(chan bool)

go func() {
fmt.Println(<-c)
}()

c <- true
}

2) 使用缓冲信道

1
2
3
4
5
6
7
func main() {
c := make(chan bool, 1)

c <- true

fmt.Println(<-c)
}

5.2 缓冲信道,超过容量

1
2
3
4
5
6
7
8
func main() {
c := make(chan bool, 1)

c <- true
c <- false

fmt.Println(<-c)
}

5.3 等待从信道读取数据,但信道无数据写入

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
c := make(chan bool, 1)

go func() {
c <- true
c <- false
}()

for i := range c {
fmt.Println(i)
}
}

解决办法:及时关闭无用信道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
c := make(chan bool, 1)

go func() {
c <- true
c <- false

close(c) // 关闭信道
}()

for i := range c {
fmt.Println(i)
}
}

6. 控制 goroutine 并发数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
count := 10
wg := sync.WaitGroup{}
ch := make(chan int, 2)

for i := 0; i < count; i++ {
wg.Add(1)

go func(i int) {
defer wg.Done()

for n := range ch {
fmt.Printf("go func: %d, time: %v\n", n, time.Now())
time.Sleep(time.Duration(n) * time.Second)
}
}(i)
}

for i := 0; i < 10; i++ {
ch <- 1
ch <- 2
}

close(ch)
wg.Wait()
}