安妮的心动录
2023/04/10阅读:29主题:默认主题
waitgroup与互斥锁底层
WaitGroup底层
回顾一下Go的并发控制,其实大概有以下几种控制手段
-
全局变量(不优雅) -
wg 只有三个方法,方便简单 -
channel 有三种panic的情况,且操作比较复杂 -
context 官方推荐,在带层级的goroutine中有奇效,比如goroutine嵌套着goroutine;而且能够在不同的goroutine之间进行值传递,方便
sync.WaitGroup()就是解决go中并发时,多个goroutine同步的问题 用法如下
-
主goroutine通过wg.Add(i)让设置需要启动的goroutine数量 -
子goroutine之间通过wg.Done()来表示子goroutine执行完毕,Add(-1) -
主goroutine之间通过Wait()来等待所有的子goroutine执行完毕
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
fmt.Println("here1")
}()
go func() {
defer wg.Done()
fmt.Println("here2")
}()
wg.Wait()
fmt.Println("finish")
}
//输出
// here2
// here1
// finish
因为这里用了defer 所以答案是确定的 先进后出 必先打印here2
底层
//https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}
第一个字段:noCopy
Go中检测禁止复制的技术,如果有复制行为,那么就认为违规,在编译阶段禁止赋值和复制wg实例,因为wg中有一个计数器,多个goroutine会并发的修改该计数器。在使用wg的时候,建议按照官方文档的要求,使用指针传递wg实例,同时如果wg作为结构体的成员,也需要显式地定义一个nocopy字段,以避免在结构体复制时导致竞态问题
第二个字段:state1数组
这是一个长度为3的数组,包含了wg使用到的三种数据:counter、waiter、semaphore 三个元素的具体作用如下
-
counter:计数器,用来计算要执行的协程g的数量,表示当前要执行的goroutine个数,wg.Add(i)时,counter+=i,wg.Done()时,count-=1 -
waiter:计数器,表示已经调用Wait()函数的goroutine-group个数,也就是需要结束的goroutine组数,当wg.Wait()的时候,waiter+=1,并且挂起当前的goroutine -
semaphore:go runtime内部的信号量实现,会用到以下两个函数
1.runtime_Semacquire 增加一个信号量,并且挂起当前goroutine 2.runtime_Semrelease 减少一个信号量,并唤醒semaphore上其中一个正在等待的字段
Add
// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L53
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state() // 获取state(counter+waiter)和semaphore信号量的指针
... ...
// uint64(delta)<<32 把 delta 左移32位,因为counter在statep的高32位
// 然后把delta原子的增加到counter中
state := atomic.AddUint64(statep, uint64(delta)<<32)
// v => counter, w => waiter
v := int32(state >> 32)//获取counter值
w := uint32(state) //获取waiter值
... ...
//counter变为负值了,panic报错
if v < 0 {
panic("sync: negative WaitGroup counter")
}
//waiter不等于0,说明已经执行了waiter,这时你又调用Add(),是不允许的
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
//v->counter,counter>0,说明还有goroutine没执行完,不需要释放信号量,直接返回
//w->waiter, waiter=0,没有等待的goroutine,不需要释放信号量,直接返回
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
// Add()和Wait()不能并行操作
// counter==0,也不能执行Wait()操作
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
*statep = 0 // 结束了将counter清零,下面在释放waiter数的信号量
for ; w != 0; w-- {// 循环释放waiter个数的信号量
runtime_Semrelease(semap, false, 0)// 一次释放一个信号量,唤醒一个等待者
}
}
能看出来Add主要干了两件事 1.将delta值累加到counter计数器中 2.当counter=0的时候,waiter释放相应的信号量,把等待的goroutine全部唤醒,如果<0,则panic
Done
// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L97
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
直接调用wg.Add(-1),让counter的值减一
Wait
// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L103
func (wg *WaitGroup) Wait() {
statep, semap := wg.state() //获取state(counter+waiter)和semaphore信号量的指针
... ...
for {// 死循环
state := atomic.LoadUint64(statep) //原子的获取state值
v := int32(state >> 32) // 获取counter值
w := uint32(state) //获取waiter值
if v == 0 {// counter=0,不需要wait直接返回
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
... ...
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {// 使用CAS累加wiater
... ...
runtime_Semacquire(semap) //增加信号量,等待信号量唤醒
// 这时 *statep 还不等于 0,那么使用过程肯定有误,直接 panic
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
... ...
return
}
}
}
累加waiter计数器的值,增加信号量,等待唤醒
注意事项
-
Add()操作必须早于Wait()操作 -
Done()的次数必须和Add的值相等 -
不能让counter的值<0,也就是Done不能大于Add(i) -
Add和Wait不能并行调用,必须一个在子协程一个在主携程 -
如果想重复调用wg,必须得等待Wait()执行完后才能进行下一轮调用
锁底层
go中的sync包提供了两种锁的类型,分别是互斥锁sync.Mutex
和读写锁sync.RWMutex
,这两种锁都属于悲观锁 锁的使用场景是解决多协程下数据竞态的问题,为了保证数据的安全,锁住一些共享资源。以防止并发访问这些共享数据时可能导致的数据不一致问题,获取锁的线程可以正常访问临界区,未获取到锁的线程等待锁释放之后可以尝试获取锁 注:当你想让一个结构体是并发安全的,可以加一个锁字段,比如channel就是这么做的,要注意的是,这个锁字段必须小写,不然调用方也可以进行lock和unlock操作,相当于你把钥匙和锁都交给了别人,锁就失去了应有的作用
mutex
提供了三个方法
-
Lock() 进行加锁操作,在同一个goroutine中必须在锁释放之后才能进行再次上锁,不然会panic -
Unlock() 进行解锁操作,如果这个时候未加锁会panic,mutex和goroutine不关联,也就是说对于mutex的加锁解锁操作可以发生在多个goroutine间 -
tryLock() 尝试获取锁,当锁被其他goroutine占有,或者锁处于饥饿模式,将立刻返回false,当锁可用时尝试获取锁,获取失败也返回false
实现如下
type Mutex struct {
state int32
sema uint32
}
Mutex只有两个字段
-
state 表示当前互斥锁的状态,复合型字段 -
sema 信号量变量,用来控制等待goroutine的阻塞休眠和唤醒
state的不同位标识了不同的状态,以此实现了用最小的内存来表示更多的意义
// 前三个字段标识了锁的状态 剩下的位来标识当前共有多少个goroutine在等待锁
const (
mutexLocked = 1 << iota // 表示互斥锁的锁定状态
mutexWoken // 表示从正常模式被从唤醒
mutexStarving // 当前的互斥锁进入饥饿状态
mutexWaiterShift = iota // 当前互斥锁上等待者的数量
)
mutex的最开始实现只有正常模式,在正常模式下等待的线程按照先进先出的方式获取锁,但是新创建的goroutine会与刚被唤醒的goroutine竞争,导致刚被唤起的goroutine拿不到锁,从而长期被阻塞。 因此Go在1.9
版本中引入了饥饿模式,当goroutine超过1ms没有获取锁,那么就将当前的互斥锁切换到饥饿模式,在该模式下,互斥锁会直接交给等待队列最前面的g,新的g在该状态下既不能获取锁,也不会进入自旋状态,只会在队列的末尾等待。如果一个g获取了互斥锁,并且它在队列的末尾或者等待的时间少于1ms,那么就回到正常模式
加锁
func (m *Mutex) Lock() {
// 判断当前锁的状态,如果锁是完全空闲的,即m.state为0,则对其加锁,将m.state的值赋为1
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
........
}
-
通过CAS系统调用判断当前锁的状态,如果是空闲则m.state为0,这个时候对其加锁,将m.state设为1 -
如果当前锁已被占用,通过lockSlow方法尝试自旋或者饥饿状态下的竞争,等待锁的释放
lockSlow: 初始化五个字段
-
waitStartTime 用来计算waiter的等待时间 -
starving 饥饿模式标志,如果等待时间超过1ms,则为true -
awoke 协程是否唤醒,当g在自旋的时候,相当于CPU上已经有正在等锁的协程,为了避免mutex解锁时再唤醒其他协程,自旋时要尝试把mutex设为唤醒状态 -
iter 用来记录协程的自旋次数 -
old 记录当前锁的状态
判断自旋
for {
// 判断是否允许进入自旋 两个条件,条件1是当前锁不能处于饥饿状态
// 条件2是在runtime_canSpin内实现,其逻辑是在多核CPU运行,自旋的次数小于4
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// !awoke 判断当前goroutine不是在唤醒状态
// old&mutexWoken == 0 表示没有其他正在唤醒的goroutine
// old>>mutexWaiterShift != 0 表示等待队列中有正在等待的goroutine
// atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 尝试将当前锁的低2位的Woken状态位设置为1,表示已被唤醒, 这是为了通知在解锁Unlock()中不要再唤醒其他的waiter了
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 设置当前goroutine唤醒成功
awoke = true
}
// 进行自旋
runtime_doSpin()
// 自旋次数
iter++
// 记录当前锁的状态
old = m.state
continue
}
}
const active_spin_cnt = 30
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
// asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
进入自旋的原因:乐观的认为当前正在持有锁的g能在短时间内归还锁,所以需要一些条件来判断:到底能不能短时间归还 条件如下
-
自旋的次数<=4 -
cpu必须为多核 -
gomaxprocs>1,最大被同时执行的CPU数目大于1 -
当前机器上至少存在一个正在运行的P并且处理队列为空
满足条件之后进行循环,次数为30次,也就是执行30次PAUSE指令来占据CPU,进行自旋
解锁
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 这里表示解锁了一个没有上锁的锁,则直接发生panic
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 正常模式的释放锁逻辑
if new&mutexStarving == 0 {
old := new
for {
// 如果没有等待者则直接返回即可
// 如果锁处于加锁的状态,表示已经有goroutine获取到了锁,可以返回
// 如果锁处于唤醒状态,这表明有等待的goroutine被唤醒了,不用尝试获取其他goroutine了
// 如果锁处于饥饿模式,锁之后会直接给等待队头goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 抢占唤醒标志位,这里是想要把锁的状态设置为被唤醒,然后waiter队列-1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 抢占成功唤醒一个goroutine
runtime_Semrelease(&m.sema, false, 1)
return
}
// 执行抢占不成功时重新更新一下状态信息,下次for循环继续处理
old = m.state
}
} else {
// 饥饿模式释放锁逻辑,直接唤醒等待队列goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 这里表示解锁了一个没有上锁的锁,则直接发生panic
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 正常模式的释放锁逻辑
if new&mutexStarving == 0 {
old := new
for {
// 如果没有等待者则直接返回即可
// 如果锁处于加锁的状态,表示已经有goroutine获取到了锁,可以返回
// 如果锁处于唤醒状态,这表明有等待的goroutine被唤醒了,不用尝试获取其他goroutine了
// 如果锁处于饥饿模式,锁之后会直接给等待队头goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 抢占唤醒标志位,这里是想要把锁的状态设置为被唤醒,然后waiter队列-1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 抢占成功唤醒一个goroutine
runtime_Semrelease(&m.sema, false, 1)
return
}
// 执行抢占不成功时重新更新一下状态信息,下次for循环继续处理
old = m.state
}
} else {
// 饥饿模式释放锁逻辑,直接唤醒等待队列goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
解锁对于加锁来说简单很多,通过AddInt32
方法进行快速解锁,将m.state低位置为0,然后判断值,如果为0,那么就完全空闲了,结束解锁。如果不为0说明当前锁未被占用,不过有等待的g未被唤醒,需要进行一系列唤醒操作,唤醒判断锁的状态,然后进行具体的goroutine唤醒
非阻塞加锁
func (m *Mutex) TryLock() bool {
// 记录当前状态
old := m.state
// 处于加锁状态/饥饿状态直接获取锁失败
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
// 尝试获取锁,获取失败直接获取失败
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
return true
}
TryLock是Go 1.18
新加入的方法,不被鼓励使用,主要是两个判断逻辑
-
判断当前锁的状态,如果锁处于加锁状态或者饥饿状态就直接获取锁失败 -
尝试获取锁,如果失败则直接失败
作者介绍