概念
sync.Mutex 是常用的锁,可以理解为互斥锁,是最广泛的并发原语,Go中有一部分并发原语是基于sync.Mutex实现的。
源码分析
以下源码都只截取重要部分,更详细的可以去包中看源码理解
分析结构体
1
2
3
4
|
type Mutex struct {
state int32 // 多含义标识,包括阻塞等待的waiter数量,饥饿标记,唤醒标记,持有锁标记
sema uint32 // 信号量,用来阻塞/唤醒对应的goroutine
}
|
state字段可以理解为
| mutexWaiter |
mutexStarving |
mutexWoken |
mutexLocked |
| 阻塞等待的waiter数量 |
饥饿标记 |
唤醒标记 |
持有锁标记 |
分析方法
sync.Mutex 是实现了 Locker这个interface的方法,具体为lock()和unlock()方法
- lock方法(快速判断加锁 和慢处理:自旋,饥饿,阻塞,唤醒等处理)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
func (m *Mutex) Lock() {
//利用cas快速判断是否加锁了,该方法是判断mutexLocked值是否等于0,等于0即修改为1加锁状态
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 否则执行slow方法,进行加锁后的处理
m.lockSlow()
}
//slow方法
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false // 此goroutine的饥饿标记
awoke := false // 唤醒标记
iter := 0 // 自旋次数
old := m.state // 当前的锁的状态
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
//锁是非饥饿状态,锁还没被释放,尝试自旋
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state //再次获取锁的状态,之后会检查是否锁被释放了
continue
}
new := old
// 非饥饿状态,加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving //设置饥饿状态
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken //新状态清除唤醒标记
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//成功设置新状态
if old&(mutexLocked|mutexStarving) == 0 {
//原来锁的状态已释放,并不是饥饿状态,正常请求到锁,返回
break // locked the mutex with CAS
}
// 处理饥饿状态
// 如果以前在队列里面,加入到队列头
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
//阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//唤醒后检查锁是否处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
//如果锁以及处于饥饿状态,直接抢到锁,返回
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
//最后一个waiter或者已经不饥饿了,清除饥饿标记
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: 删除lock标记
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 勾勒出慢速路径,以允许内联快速路径
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// 没有加锁
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 唤醒并设置mutexWoken
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}
|
使用场景
如创建了 10 个 goroutine,同时不断地对一个变量(count)进行加 1 操作,每个 goroutine 负责执行 10 万次的加 1 操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package main
import (
"fmt"
"sync"
)
func main() {
// 互斥锁保护计数器
var mu sync.Mutex
// 计数器的值
var count = 0
// 辅助变量,用来确认所有的goroutine都完成
var wg sync.WaitGroup
wg.Add(10)
// 启动10个gourontine
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
// 累加10万次
for j := 0; j < 100000; j++ {
mu.Lock()
count++
mu.Unlock()
}
}()
}
wg.Wait()
fmt.Println(count)
}
|
注意的地方
- 复制sync.Copy
- 重入
- 死锁
- lock和unlock不是成对出现