1. goroutine
协程,比线程更小,十几个goroutine可能体现在底层就五六个线程。Go语言内部实现了这些goroutine之间的内存共享。执行goroutine只需极少的栈内存(4~5KB),比线程更易用,更高效和更轻便。
goroutine调度模型:
- M: 线程
- P: 上下文
- G: Goroutine
并发concurrency:
- goroutine只是官方实现的超级“线程池”。每个实例4-5KB的栈内存占用,和大幅减少的创建和销毁开销,是制造Go号称高并发的根本原因
- 并发不是并行(Concurrency is Not Parallelism): 并发是通过切换CPU时间来实现“同时”运行;而并行则是直接利用多核实现多线程同时运行。Go可设置使用的CPU核心数,以发挥多核计算机的能力
- goroutine奉行通过通信来共享内存,而不是共享内存来通信。
goroutine的切换点:
KSE:Kernel Scheduling Entity, 内核调度实体,即可以被操作系统内核调度器调度的实体对象,它是内核的最小调度单元,也就是内核级线程
三种线程模型:
用户级线程模型:
用户线程与内核线程KSE的关系是多对一 (N:1)。多个用户线程一般从属单个进程,并且多线程的调度由用户自己的线程库完成,线程的创建、销毁及线程间的协调等操作由用户自己的线程库负责,无需借助系统调度来实现。
Python的gevent协程库就属这种实现
线程调度在用户层面完成,不需要让CPU在用户态和内核态之间切换,这种方式较为轻量级,对系统资源消耗少
缺点:做不到真正意义上的并发。如果某个用户进程上的某个线程因为一个阻塞调用(I/O)二被CPU中断(抢占式调度),那么该进程中的其它线程将被阻塞,整个进程被挂起。因为在用户线程模式下,进程内的线程绑定到CPU执行是由用户进程调度实现的,内部线程对CPU不可见,即CPU调度的是进程,而非线程
协程库优化:把阻塞的操作重新封装为完全非阻塞模式,在阻塞点上,主动让出自己,并通知或唤醒其它等待的用户线程
内核级线程模型
- 用户线程和内核线程KSE的关系是一对一 (1:1)。每个用户线程绑定一个内核线程,线程的调度完全交由内核控制
- Java/C++ 的线程库按此方式实现
- 优点:简单,直接借助系统内核的线程和调度器,可以快速实现线程切换,做到真正的并行处理
- 缺点:由于直接使用内核去创建、销毁及多线程上下文切换和调度,系统资源成本大幅上涨,对性能影响较大
两级线程模型(即混合型线程模型)
- 用户线程与内核线程KSE的关系是多对多 (N:M)
- 一个进程可与多个内核线程KSE关联,该进程内的多个线程绑定到了不同的KSE上
- 进程内的线程并不与KSE一一绑定,当某个KSE绑定的线程因阻塞操作被内核调度出CPU时,其关联的进程中的某个线程又会重新与KSE绑定
- 此种模型高度复杂,Go语言中的runtime调度器实现了这种方案
- 为什么称为两级?用户调度实现用户线程到KSE的调度,内核调度器实现KSE到CPU上的调度
G-P-M 模型:
- G:Goroutine:独立执行单元。相较于每个OS线程固定分配2M内存的模式,Goroutine的栈采取动态扩容方式,2k ~ 1G(AMD64, AMD32: 256M)。周期性回收内存,收缩栈空间
- 每个Goroutine对应一个G结构体,它存储Goroutine的运行堆栈、状态及任务函数,可重用。
- G并非执行体,每个G需要绑定到P才能被调度执行
- P:Processor: 逻辑处理器,中介
- 对G来说,P相当于CPU,G只有绑定到P才能被调用
- 对M来说,P提供相关的运行环境(Context),如内存分配状态(mcache),任务队列(G)等
- P的数量决定系统最大并行的G的数量 (CPU核数 >= P的数量),用户可通过GOMAXPROCS设置数量,但不能超过256
- M:Machine
- OS线程抽象,真正执行计算的资源,在绑定有效的P后,进入schedule循环
- schedule循环的机制大致从Global队列、P的Local队列及wait队列中获取G,切换到G的执行栈上执行G的函数,调用goexit做清理工作并回到M
- M不保留G的状态
- M的数量不定,由Go Runtime调整,目前默认不超过10K

1.1 go关键字开启新协程
1 2 3 4 5 6 7 8 9 10 11
| func say(s string) { for i := 0; i < 5; i ++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } }
func main() { go say("world") say("hello") }
|
1.2 runtime包
runtime.Gosched()
让出时间片
runtime.Goexit()
终止协程
runtime.GOMAXPROCS(N)
指定运行CPU个数
1 2 3 4 5 6 7 8 9 10 11 12
| func main() { go func() { for i := 0; i < 5; i++ { fmt.Println("go") } }()
for i := 0; i < 2; i++ { runtime.Gosched() fmt.Println("hello") } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| func auto() { for i := 0; i < 10; i++ { go func(i int) { for { fmt.Printf("Hello from goroutine %d\n", i) } }(i) }
time.Sleep(time.Millisecond) }
func manual() { var a [10]int
for i := 0; i < 10; i++ { go func(i int) { for { a[i]++ runtime.Gosched() } }(i) }
time.Sleep(time.Millisecond) fmt.Println(a) }
func outOfRange() { var a [10]int
for i := 0; i < 10; i++ { go func() { for { a[i]++ runtime.Gosched() } }() }
time.Sleep(time.Millisecond) fmt.Println(a) }
|
1
| go run -race goroutine.go
|
2. 通道(channel)
通道:用来传递数据的一种数据结构。两个goroutine之间,可以使用它来进行同步和通信
不靠共享内存通信,而是通过通信来共享内存
Channel:
- goroutine的沟通桥梁,大多是阻塞同步的
- make创建,close关闭
- 是引用类型
- 可使用for range来迭代channel
- 可设置单向或双向通道
- 可设置缓存大小,在未被填满前不会发生阻塞
1 2 3 4
| ch := make(chan int)
ch <- v v := <- ch
|
2.1 分段计算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func main() { a := []int{7, 9, -3, 4, 6, 8, 2, -5}
mid := len(a) / 2 ch := make(chan int)
go sum(a[:mid], ch) go sum(a[mid:], ch)
x, y := <-ch, <-ch fmt.Println(x, y, x+y) }
func sum(a []int, ch chan int) { result := 0 for _, v := range a { result += v }
ch <- result }
|
2.2 阻塞主线程
1 2 3 4 5 6 7 8 9 10
| func main() { c := make(chan bool)
go func() { fmt.Println("Go Go Go!!!") c <- true }()
<- c }
|
2.3 通道遍历和关闭
如果通道不关闭close(ch)
,遍历range ch
就不会结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func main() { ch := make(chan int, 10)
go fib(cap(ch), ch)
for { num, ok := <-ch if ok { fmt.Printf("%d ", num) } else { break } }
fmt.Println() }
func fib(n int, ch chan int) { x, y := 1, 1 for i := 0; i < n; i++ { ch <- x x, y = y, x+y }
close(ch) }
|
2.4 主程序可能不等待goruntine
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func main() { ch := make(chan bool)
for i := 0; i < 10; i++ { go calc(i, ch) }
<-ch }
func calc(index int, ch chan bool) { sum := 0
for i := 1; i < 1000000001; i++ { sum += i }
fmt.Println(index, sum)
if index == 9 { ch <- true } }
|
2.4.1 方法一:使用缓存channel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func main() { ch := make(chan bool, 10)
for i := 0; i < 10; i++ { go calc(i, ch) }
for i := 0; i < 10; i++ { <-ch } }
func calc(index int, ch chan bool) { sum := 0
for i := 1; i < 1000000001; i++ { sum += i }
fmt.Println(index, sum)
ch <- true }
|
2.4.2 方法二:通过同步解决(sync.WaitGroup)
wg.Add(N)
: 新增N个任务
wg.Done()
: 完成一个任务,计算器减1
wg.Wait()
: 主线程等待,直到计数器为0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func main() { wg := sync.WaitGroup{} wg.Add(10)
for i := 0; i < 10; i++ { go calc(i, &wg) }
wg.Wait() }
func calc(index int, wg *sync.WaitGroup) { sum := 0
for i := 1; i < 1000000001; i++ { sum += i }
fmt.Println(index, sum)
wg.Done() }
|
2.5 模拟打印机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func main() { ch := make(chan bool)
go task1(ch) go task2(ch)
select { case <-time.After(15 * time.Second): } }
func Printer(s string) { for _, c := range s { fmt.Printf("%c", c) time.Sleep(time.Second) } fmt.Println() }
func task1(ch chan bool) { <-ch Printer("hello") }
func task2(ch chan bool) { Printer("world") ch <- true }
|
2.6 带缓冲的通道
通道 ch := make(chan int, N)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func main() { ch := make(chan int, 3)
go func() { for i := 0; i < 10; i++ { fmt.Printf("i=%d, len(ch)=%d, cap(ch)=%d\n", i, len(ch), cap(ch)) ch <- i } }()
time.Sleep(2 * time.Second)
for i := 0; i < 10; i++ { num := <-ch fmt.Printf("num=%d\n", num) } }
|
2.7 单向通道
1 2 3
| var ch1 chan int var ch2 chan<- int var ch3 <-chan int
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func main() { ch := make(chan int)
go producer(ch)
consumer(ch) }
func producer(out chan<- int) { for i := 0; i < 10; i++ { out <- i * i }
close(out) }
func consumer(in <-chan int) { for i := 0; i < 10; i++ { fmt.Printf("%d ", <-in) } fmt.Println() }
|
3. Select
select: 管理多个channel,监听channel上的数据流动。类似switch语法,但每个case语句必须是IO操作。多个case同时满足,任选一个执行。
- 处理一个或多个channel的发送和接收
- 同时有多个channel时,随机处理
- 可用空select来阻塞main函数
- 可设置超时
default语句:
- 有default:select语句不会被阻塞,执行default后,程序的执行会从select语句中恢复,进入下一次轮询。比较消耗资源。
- 没有default:select语句将被阻塞,直到至少有一个通信可以进行下去
3.1 管理多个通道
3.1.1 示例1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func main() { c1, c2 := make(chan int), make(chan string) done := make(chan bool, 2)
go func() { for { select { case v, ok := <-c1: if !ok { done <- true break } fmt.Println("c1:", v) case v, ok := <-c2: if !ok { done <- true break } fmt.Println("c2:", v) } } }()
c1 <- 1 c2 <- "hi" c1 <- 5 c2 <- "hello"
close(c1)
for i := 0; i < 2; i++ { <-done } }
|
3.1.2 示例2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func main() { ch := make(chan int) quit := make(chan bool)
go func() { for i := 0; i < 10; i++ { fmt.Printf("%d ", <-ch) } fmt.Println()
quit <- true }()
fib(ch, quit) }
func fib(ch chan<- int, quit <-chan bool) { x, y := 1, 1
for { select { case ch <- x: x, y = y, x+y case <-quit: fmt.Println("Done.") return } } }
|
3.1.3 使用select作为发送者应用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func main() { c := make(chan int)
go func() { for v := range c { fmt.Println(v) } }()
for i := 0; i < 100; i++ { select { case c <- 0: case c <- 1: } } }
|
3.2 超时处理
case <-time.After(5 * time.Second)
: 其他channel阻塞时间超过5s时执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func main() { ch := make(chan int) done := make(chan bool)
go func() { for { select { case x := <-ch: fmt.Printf("%d ", x) case <-time.After(5 * time.Second): fmt.Println("\nTimeout") done <- true return } } }()
for i := 0; i < 10; i++ { ch <- i time.Sleep(time.Second) }
<-done }
|
3.3 避免造成死锁
select 在执行过程中,必须命中其中的某一分支, 否则deadlock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func main() { c1 := make(chan string, 1) c2 := make(chan string, 1)
c1 <- "ok" c2 <- "good"
select { case msg := <-c1: fmt.Printf("c1 receive %s\n", msg) case msg := <-c2: fmt.Printf("c2 receive %s\n", msg) default: fmt.Println("no data") } }
|
4. 定时器
4.1 一次性定时任务
time.NewTimer(d Duration) *Timer
<-timer.C
: 阻塞等待,返回定时器时间
timer.Stop()
:
timer.Reset(d Duration)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func main() { timer := time.NewTimer(2 * time.Second)
go func() { <-timer.C fmt.Println("Goroutine is done.") }()
timer.Stop()
timer.Reset(5 * time.Second)
select { case <-time.After(10 * time.Second): } }
|
4.2 周期性定时任务
time.NewTicker(d Duration) *Ticker
<-ticker.C
: 阻塞等待,返回定时器时间
ticker.Stop()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func main() { ticker := time.NewTicker(2 * time.Second)
count := 0 for { <-ticker.C
count++ fmt.Printf("%d ", count)
if count == 10 { ticker.Stop() break } } fmt.Println() }
|
4.3 延迟操作总结
1 2 3 4 5 6
| time.Sleep(time.Second * 2)
<- time.After(time.Second * 2)
timer := time.NewTimer(time.Second * 2) <-timer.C
|
5. 死锁经典错误案例
5.1 无缓冲信道,在接收者未准备好之前,发送操作是阻塞的
1 2 3 4 5 6 7
| func main() { c := make(chan bool)
c <- true
fmt.Println(<-c) }
|
两种解决方法:
1) 先接收,后发送
1 2 3 4 5 6 7 8 9
| func main() { c := make(chan bool)
go func() { fmt.Println(<-c) }()
c <- true }
|
2) 使用缓冲信道
1 2 3 4 5 6 7
| func main() { c := make(chan bool, 1)
c <- true
fmt.Println(<-c) }
|
5.2 缓冲信道,超过容量
1 2 3 4 5 6 7 8
| func main() { c := make(chan bool, 1)
c <- true c <- false
fmt.Println(<-c) }
|
5.3 等待从信道读取数据,但信道无数据写入
1 2 3 4 5 6 7 8 9 10 11 12
| func main() { c := make(chan bool, 1)
go func() { c <- true c <- false }()
for i := range c { fmt.Println(i) } }
|
解决办法:及时关闭无用信道
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func main() { c := make(chan bool, 1)
go func() { c <- true c <- false
close(c) }()
for i := range c { fmt.Println(i) } }
|
6. 控制 goroutine 并发数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func main() { count := 10 wg := sync.WaitGroup{} ch := make(chan int, 2)
for i := 0; i < count; i++ { wg.Add(1)
go func(i int) { defer wg.Done()
for n := range ch { fmt.Printf("go func: %d, time: %v\n", n, time.Now()) time.Sleep(time.Duration(n) * time.Second) } }(i) }
for i := 0; i < 10; i++ { ch <- 1 ch <- 2 }
close(ch) wg.Wait() }
|