锁的基础
go
的锁是建立在两个基础之上:atomic
和信号锁 sema
atomic
atomic
是原子操作,比如对一个 int32
类型的变量加 1
,就可以使用 atomic.AddInt32
来实现,这是一个并发安全的操作文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html
atomic.AddInt32
的实现是通过汇编代码实现的文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html
定义在 runtime/internal/atomic/atomic_amd64.s
文件中文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html
s
TEXT ·Xaddint32(SB), NOSPLIT, $0-20
JMP ·Xadd(SB)
在 Xaddint32
中调用 Xadd
函数文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html
在 Xadd
函数中,使用 LOCK
指令来保证原子性,这个 LOCK
是 CPU
级别的内存锁文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html
s
TEXT ·Xadd(SB), NOSPLIT, $0-20
MOVQ ptr+0(FP), BX
MOVL delta+8(FP), AX
MOVL AX, CX
// 这个 LOCK 是一个硬件锁
LOCK
XADDL AX, 0(BX)
ADDL CX, AX
MOVL AX, ret+16(FP)
RET
所以 atomic
操作是一种硬件层面加锁的机制,保证操作一个变量的时候,其他的协程或者线程无法访问,它的缺点是只能用于变量的简单操作文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html
sema
sema
也叫做信号量锁,它的核心是一个 uint32
,含义是可并发的数量文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html
每一个 sema
锁都对应一个 semaRoot
结构体,意思是每一个 sema
对应的 uint32
实际上是 semaRoot
结构体文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html
semaRoot
结构体定义在 runtime/sema.go
文件中文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html
go
// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
// Each of those sudog may in turn point (through s.waitlink) to a list
// of other sudogs waiting on the same address.
// The operations on the inner lists of sudogs with the same address
// are all O(1). The scanning of the top-level semaRoot list is O(log n),
// where n is the number of distinct addresses with goroutines blocked
// on them that hash to the given semaRoot.
// See golang.org/issue/17953 for a program that worked badly
// before we introduced the second level of list, and
// BenchmarkSemTable/OneAddrCollision/* for a benchmark that exercises this.
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}
treap
:是sudog
的指针sudog
是一个平衡二叉树的根节点,用于协程排队nwait
:等待的协程数量
文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html
sema
操作:
uint32 > 0
- 获取锁:
uint32
减1
,获取成功go
复制代码// Called from runtime. func semacquire(addr *uint32) { semacquire1(addr, false, 0, 0, waitReasonSemacquire) } func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) { // Easy case. if cansemacquire(addr) { return } } func cansemacquire(addr *uint32) bool { for { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } } }
- 释放锁:
uint32
加1
,释放成功go
复制代码func semrelease(addr *uint32) { semrelease1(addr, false, 0) } func semrelease1(addr *uint32, handoff bool, skipframes int) { root := semtable.rootFor(addr) atomic.Xadd(addr, 1) // Easy case: no waiters? // This check must happen after the xadd, to avoid a missed wakeup // (see loop in semacquire). if root.nwait.Load() == 0 { return } }
- 获取锁:
uint32 == 0
- 获取锁:协程休眠,进入堆树等待
go
复制代码// Called from runtime. func semacquire(addr *uint32) { semacquire1(addr, false, 0, 0, waitReasonSemacquire) } func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) { for { goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes) } }
- 释放锁:从堆树种取出一个协程,唤醒
go
复制代码func semrelease(addr *uint32) { semrelease1(addr, false, 0) } func semrelease1(addr *uint32, handoff bool, skipframes int) { // 是否一个协程 s, t0 := root.dequeue(addr) if s != nil { root.nwait.Add(-1) } }
- 获取锁:协程休眠,进入堆树等待
互斥锁解决了什么问题
sync.Mutex
是 go
语言中的互斥锁,是 go
中用于并发最常见的方案,互斥锁的意义是只能一个协程操作,不能多个协程同时操作
mu.Lock()
和 mu.Unlock()
是互斥锁的两个方法,mu.Lock()
是获取锁,mu.Unlock()
是释放锁
如果使用 int32
类型的变量来实现互斥锁:
go
atomic.CompareSwapInt32(&mu, 0, 1) // 获取锁
atomic.CompareSwapInt32(&mu, 1, 0) // 释放锁
在竞争激烈的情况下,int32
类型的变量实现的互斥锁会效率会比较低下
go
type Mutex struct {
state int32
sema uint32
}
sync.Mutex
中的 sema
就是上面说的 semaRoot
结构体,state
是 4
个字节(32
位)的变量
state
位数含义:
- 最后一位是
1
,表示被锁了 - 倒数第二位,表示是否被唤醒
- 倒数第三位,表示饥饿
- 剩余位数表示等待的协程数量
互斥锁是如何工作的
多个协程竞争一把锁,总会有一把锁竞争成功,它就可以完成自己的业务了,其他协程会做自旋(自旋的意思是,看下最后一位能不能锁上,就是有没有变成 0
),当它自旋多次失败后,就会进入休眠状态,然后就会进入 semaRoot
中的平衡二叉树,等待以后有时机再来锁这把锁
sync.Mutex
结构体有 Lock
和 Unlock
两个方法,定义在 sync/mutex.go
文件中
go
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 从 0 改成 1,加锁成功直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
// 如果加锁失败,调用 slowLock 方法
m.lockSlow()
}
locakSlow
方法的核心是 for
循环,不断的尝试获取锁
mutexLocked
是否被锁,mutexStarving
是否出于饥饿模式,runtime_canSpin(iter)
判断是否可以继续自旋(如果自旋次数太多,就放弃自旋,进入休眠模式)
go
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 判断是否刚刚醒来
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 自旋语句,执行一些空语句
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 如果没有饥饿尝试加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果被锁或者饥饿,等待数量加 1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 获取锁成功
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 确实锁上了,退出
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 休眠操作,下面的代码不会运行
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 休眠唤醒后,判断是不是饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 饥饿模式下辈唤醒,直接加锁
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
如果一个协程运行完了,会释放掉锁,这个时候会唤醒一个协程,这个协程会去 semaRoot
中的平衡二叉树中取出一个协程,唤醒它,让它去竞争锁,如果竞争成功,就可以执行这个协程的业务,如果竞争失败,就会继续进入 semaRoot
中的平衡二叉树中等待
go
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
// 释放锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
unlockSlow
方法判断是否有多余的协程在等待(WaiterShift
、Starving
、Woken
不全为 0
),它的核心还是一个 for
循环
go
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 判断是否有协程在等待
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 从 seam 树中释放一个协程处理工作
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}
Lock
函数核心是休眠一个协程runtime_SemacquireMutex
unLock
函数核心是释放一个协程runtime_Semrelease
,休眠的协程会执行runtime_SemacquireMutex
下面的代码
锁饥饿
互斥锁在大量竞争时会出现锁饥饿,当协程等待锁超过 1ms
,就会进入饥饿模式
一旦进入饥饿模式,其他协程就不自旋,直接进入 sema
中的平衡二叉树中等待,饥饿模式中的协程被唤醒直接获取锁,不会和其他协程竞争
sema
队列清空时,会退出饥饿模式
饥饿模式的好处是:
- 在高竞争的环境中,减少了自旋的次数,减少了
CPU
的消耗,新进来的协程直接进入休眠模式 - 保证公平,刚刚从
sema
队列中唤醒的协程已经等待的了很长时间
使用互斥锁时要注意:
- 尽量减少锁的使用时间
- 善用
defer
确保锁的释放
读写锁
读写锁是互斥锁的一种扩展
mu.RLock()
读锁,可以并发读,mu.Lock()
写锁,只能一个协程写
多个协程同时只读,只读时,让其他协程不能修改,这些协程可以并发读,提高效率,修改的协程进不来,获取不到锁
读写锁的原理:
- 每个锁分为读锁和写锁,写锁互斥
- 没有加写锁时,多个协程都可以加读锁
- 加了写锁时,无法加读锁,读协程排队等待
- 加了读锁,写锁排队等待
sync.RWMutex
结构体定义在 sync/rwmutex.go
文件中
go
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}
w
:互斥锁,用于写锁,是pending
,表示有等待的写协程writerSem
:写协程等待队列readerSem
:读协程等待队列readerCount
:等待中的读协程数量- 正值:正在读的协程
- 负值:加了写锁
readerWait
:写锁w
生效前要等待多少读协程释放
加写锁,没有读协程
加写锁时,如果没有读协程,加写锁就比较好加,直接把 w
的锁加上就行
不过要注意,此时还没有完全加上,此时 readerCount
为 0
,也就是说没有读协程,这时需要减去 rwmutexMaxReaders
,这个值是 1 << 30
,表示最大读协程数量
这样子写锁就加成功了
加写锁,有读协程
没有读协程在排队,但是有 3
个协程在读取数据(加了读锁)
这时候如果写协程进入,需要先获取写锁 w
,然后看下readerWait
是否为 0
,如果不为 0
,就需要等待读协程释放,这时候将 readerCount
减去 rwmutexMaxReaders
,表示有写协程在等待
readerCount = 3
变为 readerCount = 3 - rwmutexMaxReaders
,表示有写协程在等待,后面的读协程不要在加锁了
然后将 readerWait
设置为 3
,表示需要等待 3
个读协程释放后,才能加写锁加上,此时写协程进入 writerSem
队列中等待
加写锁的步骤:
- 先加
mutex
写锁,若已经被加写锁会阻塞等待 - 将
readerCounter
设置为负值,阻塞读锁的获取 - 计算需要等待多少个读协程释放
- 如果需要等待读协程释放,这写协程进入
writerSem
队列中等待
RWMutex
的 RLock
方法是加写锁,在 sync/rwmutex.go
文件中定义
go
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
// 获取锁,有了加写锁的资格
rw.w.Lock()
// Announce to readers there is a pending writer.
// readerCount 减去 rwmutexMaxReaders,然后在看下 r 是否为 0
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
// r = 0 表示加写锁成功
// r != 0 表示有读协程在读,需要等待,写协程进入休眠
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}
解写锁
- 将
readerCount
变为正值,允许读锁的获取 - 释放在
readerSem
中等待的读协程 - 解锁
mutex
RWMutex
的 Unlock
方法是解写锁,在 sync/rwmutex.go
文件中定义
go
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
// 将 readerCount 变为正值,允许读协程获取锁
r := rw.readerCount.Add(rwmutexMaxReaders)
// Unblock blocked readers, if any.
// r = 0 表示没有读协程在读,直接解锁
// r > 0 表示有读协程在等待,将读协程释放处理
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}
加读锁,readerCount > 0
加读锁时,如果 readerCount
大于 0
,比较简单,直接将 readerCount+1
就好了
加读锁,readerCount < 0
readerCount
小于 0
,表示现在加了写锁
读协程进来后,将 readerCount+1
,读协程进入 readerSem
队列中等待
加读锁的步骤:
- 将
readerCount+1
- 如果
readerCount
是正数,加锁成功 - 如果
readerCount
是负数,说明现在被加了写锁,读协程进入readerSem
队列中等待
RWMutex
的 RLock
方法是加读锁,在 sync/rwmutex.go
文件中定义
go
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() {
// readerCount + 1
// readerCount > 0 表示加读锁成功
// readerCount < 0 表示现在有写锁在工作,读协程进入 readerSem 队列中等待
if rw.readerCount.Add(1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}
解读锁,readerCount > 0
解读锁时,如果 readerCount
大于 0
,直接将 readerCount-1
就好了
解读锁,readerCount < 0
readerCount
小于 0
表示有写协程在等待
读协程释放后,将 readerCount-1
,如果 readerCount
是 0
,表示没有读协程了,写协程可以加锁了
解读锁步骤:
- 给
readerCount-1
- 如果
readerCount
是正数,解锁成功 - 如果
readerCount
是负数,说明有写锁在排队- 如果自己是
readerWait
最后一个,才唤醒写协程
- 如果自己是
RWMutex
的 RUnlock
方法是解读锁,在 sync/rwmutex.go
文件中定义
go
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
// readerCount - 1,如果减完后大于 0,表示没有读协程了,解锁成功
// 如果减完后小于 0,表示有写协程在等待,需要进一步处理
if r := rw.readerCount.Add(-1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
// A writer is pending.
// 等待读协程的数量,如果是最后一个读协程,才唤醒写协程
if rw.readerWait.Add(-1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
总结
Mutex
用来写协程之间互斥等待- 读协程使用
readerSem
等待锁的释放 - 写协程使用
writerSem
等待锁的释放 readerCount
记录读协程个数readerWait
记录写协程之前的读协程个数
如何通过 WaitGroup 互相等待
WaitGroup
是 sync
包中的一个结构体,用来等待一组协程的结束,定义在 sync/waitgroup.go
文件中
go
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
//
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}
noCopy
:用来防止WaitGroup
被复制,noCopy
结构体定义在sync/nocopy.go
文件中state
:高32
位是计数器(前面工作的协程有多少个),低32
位是后面等待的协程数量sema
:等待协程的队列
wg.Wait()
方法是等待协程结束,Wait()
方法被调用时,要看下 counter
数量,如果 counter
数量为 0
,表示前面的协程已经结束,直接返回,否则后面的协程进入 sema
队列中等待
go
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
// Counter 是 0, 不需要等待
if v == 0 {
// Counter is 0, no need to wait.
return
}
// Increment waiters count.
// 等待的协程数量加 1,然后进入休眠
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
wg.Done()
方法是结束一个协程,Done()
方法被调用时 counter-1
,表示有一个协程结束了
go
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
wg.Add()
方法是给 Counter
加 n
,表示有几个协程要等待
go
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
// 加 n,表示要等待的协程数量
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
wg.state.Store(0)
// 释放后面等待的协程
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}
排查锁异常
- 检测锁拷贝问题
shell
复制代码go vet main.go
- 竞争检测,发现隐含的数据竞争问题
shell
复制代码go build -race main.go ./main
- 死锁检测,使用第三方工具
go-deadlock
,这个包继承了sync.Mutex
,可以检测死锁
评论