Go并发原语-sync.Mutex

深入浅出sync.Mutex

概念

sync.Mutex 是常用的锁,可以理解为互斥锁,是最广泛的并发原语,Go中有一部分并发原语是基于sync.Mutex实现的。

源码分析

以下源码都只截取重要部分,更详细的可以去包中看源码理解

分析结构体
1
2
3
4
type Mutex struct {
	state int32 // 多含义标识,包括阻塞等待的waiter数量,饥饿标记,唤醒标记,持有锁标记
	sema  uint32 // 信号量,用来阻塞/唤醒对应的goroutine
}

state字段可以理解为

mutexWaiter mutexStarving mutexWoken mutexLocked
阻塞等待的waiter数量 饥饿标记 唤醒标记 持有锁标记
分析方法

sync.Mutex 是实现了 Locker这个interface的方法,具体为lock()和unlock()方法

  • lock方法(快速判断加锁 和慢处理:自旋,饥饿,阻塞,唤醒等处理)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
func (m *Mutex) Lock() { 
  //利用cas快速判断是否加锁了,该方法是判断mutexLocked值是否等于0,等于0即修改为1加锁状态
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// 否则执行slow方法,进行加锁后的处理
	m.lockSlow()
}

//slow方法
func (m *Mutex) lockSlow() {
	var waitStartTime int64
  
  starving := false  // 此goroutine的饥饿标记 
  
  awoke := false  // 唤醒标记 
  
  iter := 0  // 自旋次数 
  
  old := m.state  // 当前的锁的状态
	
  for { 
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			//锁是非饥饿状态,锁还没被释放,尝试自旋
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state //再次获取锁的状态,之后会检查是否锁被释放了
			continue
		}
		new := old
		//  非饥饿状态,加锁
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		} 
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving //设置饥饿状态
		}
		if awoke { 
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken //新状态清除唤醒标记
		}
    
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
      //成功设置新状态
			if old&(mutexLocked|mutexStarving) == 0 {
        //原来锁的状态已释放,并不是饥饿状态,正常请求到锁,返回
				break // locked the mutex with CAS
			}
			// 处理饥饿状态
      // 如果以前在队列里面,加入到队列头
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
      //阻塞等待
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      //唤醒后检查锁是否处于饥饿状态
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				//如果锁以及处于饥饿状态,直接抢到锁,返回
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					//最后一个waiter或者已经不饥饿了,清除饥饿标记
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}
  • unlock方法()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: 删除lock标记
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// 勾勒出慢速路径,以允许内联快速路径
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		old := new
		for {
			// 没有加锁
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// 唤醒并设置mutexWoken
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		runtime_Semrelease(&m.sema, true, 1)
	}
}

使用场景

如创建了 10 个 goroutine,同时不断地对一个变量(count)进行加 1 操作,每个 goroutine 负责执行 10 万次的加 1 操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

package main


    import (
        "fmt"
        "sync"
    )


    func main() {
        // 互斥锁保护计数器
        var mu sync.Mutex
        // 计数器的值
        var count = 0
        
        // 辅助变量,用来确认所有的goroutine都完成
        var wg sync.WaitGroup
        wg.Add(10)

        // 启动10个gourontine
        for i := 0; i < 10; i++ {
            go func() {
                defer wg.Done()
                // 累加10万次
                for j := 0; j < 100000; j++ {
                    mu.Lock()
                    count++
                    mu.Unlock()
                }
            }()
        }
        wg.Wait()
        fmt.Println(count)
    }

注意的地方

  • 复制sync.Copy
  • 重入
  • 死锁
  • lock和unlock不是成对出现
Licensed under CC BY-NC-SA 4.0
Built with Hugo
主题 StackJimmy 设计