Go 高并发下的锁

零 Golang教程评论79字数 14804阅读49分20秒阅读模式

Go 高并发下的锁

锁的基础

go 的锁是建立在两个基础之上:atomic 和信号锁 sema

atomic

atomic 是原子操作,比如对一个 int32 类型的变量加 1,就可以使用 atomic.AddInt32 来实现,这是一个并发安全的操作文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html

atomic.AddInt32 的实现是通过汇编代码实现的文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html

定义在 runtime/internal/atomic/atomic_amd64.s 文件中文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html

s

复制代码
TEXT ·Xaddint32(SB), NOSPLIT, $0-20
  JMP	·Xadd(SB)

在 Xaddint32 中调用 Xadd 函数文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html

在 Xadd 函数中,使用 LOCK 指令来保证原子性,这个 LOCK 是 CPU 级别的内存锁文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html

s

复制代码
TEXT ·Xadd(SB), NOSPLIT, $0-20
  MOVQ	ptr+0(FP), BX
  MOVL	delta+8(FP), AX
  MOVL	AX, CX
  // 这个 LOCK 是一个硬件锁
  LOCK
  XADDL	AX, 0(BX)
  ADDL	CX, AX
  MOVL	AX, ret+16(FP)
  RET

所以 atomic 操作是一种硬件层面加锁的机制,保证操作一个变量的时候,其他的协程或者线程无法访问,它的缺点是只能用于变量的简单操作文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html

sema

sema 也叫做信号量锁,它的核心是一个 uint32,含义是可并发的数量文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html

每一个 sema 锁都对应一个 semaRoot 结构体,意思是每一个 sema 对应的 uint32 实际上是 semaRoot 结构体文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html

semaRoot 结构体定义在 runtime/sema.go 文件中文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html

go

复制代码
// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
// Each of those sudog may in turn point (through s.waitlink) to a list
// of other sudogs waiting on the same address.
// The operations on the inner lists of sudogs with the same address
// are all O(1). The scanning of the top-level semaRoot list is O(log n),
// where n is the number of distinct addresses with goroutines blocked
// on them that hash to the given semaRoot.
// See golang.org/issue/17953 for a program that worked badly
// before we introduced the second level of list, and
// BenchmarkSemTable/OneAddrCollision/* for a benchmark that exercises this.
type semaRoot struct {
  lock  mutex
  treap *sudog        // root of balanced tree of unique waiters.
  nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}
  • treap:是 sudog 的指针
    • sudog 是一个平衡二叉树的根节点,用于协程排队
    • nwait:等待的协程数量

go-1.png文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16482.html

sema 操作:

  • uint32 > 0
    • 获取锁:uint32 减 1,获取成功

      go

      复制代码
      // Called from runtime.
      func semacquire(addr *uint32) {
        semacquire1(addr, false, 0, 0, waitReasonSemacquire)
      }
      func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
        // Easy case.
        if cansemacquire(addr) {
          return
        }
      }
      func cansemacquire(addr *uint32) bool {
        for {
          v := atomic.Load(addr)
          if v == 0 {
            return false
          }
          if atomic.Cas(addr, v, v-1) {
            return true
          }
        }
      }
      
    • 释放锁:uint32 加 1,释放成功

      go

      复制代码
      func semrelease(addr *uint32) {
        semrelease1(addr, false, 0)
      }
      func semrelease1(addr *uint32, handoff bool, skipframes int) {
        root := semtable.rootFor(addr)
        atomic.Xadd(addr, 1)
        // Easy case: no waiters?
        // This check must happen after the xadd, to avoid a missed wakeup
        // (see loop in semacquire).
        if root.nwait.Load() == 0 {
          return
        }
      }
      
  • uint32 == 0
    • 获取锁:协程休眠,进入堆树等待

      go

      复制代码
      // Called from runtime.
      func semacquire(addr *uint32) {
        semacquire1(addr, false, 0, 0, waitReasonSemacquire)
      }
      func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
        for {
          goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
        }
      }
      
    • 释放锁:从堆树种取出一个协程,唤醒

      go

      复制代码
      func semrelease(addr *uint32) {
        semrelease1(addr, false, 0)
      }
      func semrelease1(addr *uint32, handoff bool, skipframes int) {
        // 是否一个协程
        s, t0 := root.dequeue(addr)
        if s != nil {
          root.nwait.Add(-1)
        }
      }
      

互斥锁解决了什么问题

sync.Mutex 是 go 语言中的互斥锁,是 go 中用于并发最常见的方案,互斥锁的意义是只能一个协程操作,不能多个协程同时操作

mu.Lock() 和 mu.Unlock() 是互斥锁的两个方法,mu.Lock() 是获取锁,mu.Unlock() 是释放锁

如果使用 int32 类型的变量来实现互斥锁:

go

复制代码
atomic.CompareSwapInt32(&mu, 0, 1)  // 获取锁
atomic.CompareSwapInt32(&mu, 1, 0)  // 释放锁

在竞争激烈的情况下,int32 类型的变量实现的互斥锁会效率会比较低下

go

复制代码
type Mutex struct {
  state int32
  sema  uint32
}

sync.Mutex 中的 sema 就是上面说的 semaRoot 结构体,state 是 4 个字节(32 位)的变量

state 位数含义:

  • 最后一位是 1,表示被锁了
  • 倒数第二位,表示是否被唤醒
  • 倒数第三位,表示饥饿
  • 剩余位数表示等待的协程数量

go-2.png

互斥锁是如何工作的

多个协程竞争一把锁,总会有一把锁竞争成功,它就可以完成自己的业务了,其他协程会做自旋(自旋的意思是,看下最后一位能不能锁上,就是有没有变成 0),当它自旋多次失败后,就会进入休眠状态,然后就会进入 semaRoot 中的平衡二叉树,等待以后有时机再来锁这把锁

go-3.png

sync.Mutex 结构体有 Lock 和 Unlock 两个方法,定义在 sync/mutex.go 文件中

go

复制代码
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
  // Fast path: grab unlocked mutex.
  // 从 0 改成 1,加锁成功直接返回
  if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
    }
    return
  }
  // Slow path (outlined so that the fast path can be inlined)
  // 如果加锁失败,调用 slowLock 方法
  m.lockSlow()
}

locakSlow 方法的核心是 for 循环,不断的尝试获取锁

mutexLocked 是否被锁,mutexStarving 是否出于饥饿模式,runtime_canSpin(iter) 判断是否可以继续自旋(如果自旋次数太多,就放弃自旋,进入休眠模式)

go

复制代码
func (m *Mutex) lockSlow() {
  var waitStartTime int64
  starving := false
  awoke := false
  iter := 0
  old := m.state
  for {
    // Don't spin in starvation mode, ownership is handed off to waiters
    // so we won't be able to acquire the mutex anyway.
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
      // Active spinning makes sense.
      // Try to set mutexWoken flag to inform Unlock
      // to not wake other blocked goroutines.
      // 判断是否刚刚醒来
      if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
        awoke = true
      }
      // 自旋语句,执行一些空语句
      runtime_doSpin()
      iter++
      old = m.state
      continue
    }
    new := old
    // Don't try to acquire starving mutex, new arriving goroutines must queue.
    // 如果没有饥饿尝试加锁
    if old&mutexStarving == 0 {
      new |= mutexLocked
    }
    // 如果被锁或者饥饿,等待数量加 1
    if old&(mutexLocked|mutexStarving) != 0 {
      new += 1 << mutexWaiterShift
    }
    // The current goroutine switches mutex to starvation mode.
    // But if the mutex is currently unlocked, don't do the switch.
    // Unlock expects that starving mutex has waiters, which will not
    // be true in this case.
    if starving && old&mutexLocked != 0 {
      new |= mutexStarving
    }
    if awoke {
      // The goroutine has been woken from sleep,
      // so we need to reset the flag in either case.
      if new&mutexWoken == 0 {
        throw("sync: inconsistent mutex state")
      }
      new &^= mutexWoken
    }
    // 获取锁成功
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
      // 确实锁上了,退出
      if old&(mutexLocked|mutexStarving) == 0 {
        break // locked the mutex with CAS
      }
      // If we were already waiting before, queue at the front of the queue.
      queueLifo := waitStartTime != 0
      if waitStartTime == 0 {
        waitStartTime = runtime_nanotime()
      }
      // 休眠操作,下面的代码不会运行
      runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      // 休眠唤醒后,判断是不是饥饿模式
      starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
      old = m.state
      // 饥饿模式下辈唤醒,直接加锁
      if old&mutexStarving != 0 {
        // If this goroutine was woken and mutex is in starvation mode,
        // ownership was handed off to us but mutex is in somewhat
        // inconsistent state: mutexLocked is not set and we are still
        // accounted as waiter. Fix that.
        if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
          throw("sync: inconsistent mutex state")
        }
        delta := int32(mutexLocked - 1<<mutexWaiterShift)
        if !starving || old>>mutexWaiterShift == 1 {
          // Exit starvation mode.
          // Critical to do it here and consider wait time.
          // Starvation mode is so inefficient, that two goroutines
          // can go lock-step infinitely once they switch mutex
          // to starvation mode.
          delta -= mutexStarving
        }
        atomic.AddInt32(&m.state, delta)
        break
      }
      awoke = true
      iter = 0
    } else {
      old = m.state
    }
  }

  if race.Enabled {
    race.Acquire(unsafe.Pointer(m))
  }
}

如果一个协程运行完了,会释放掉锁,这个时候会唤醒一个协程,这个协程会去 semaRoot 中的平衡二叉树中取出一个协程,唤醒它,让它去竞争锁,如果竞争成功,就可以执行这个协程的业务,如果竞争失败,就会继续进入 semaRoot 中的平衡二叉树中等待

go-4.png

go

复制代码
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
  if race.Enabled {
    _ = m.state
    race.Release(unsafe.Pointer(m))
  }

  // Fast path: drop lock bit.
  // 释放锁
  new := atomic.AddInt32(&m.state, -mutexLocked)
  if new != 0 {
    // Outlined slow path to allow inlining the fast path.
    // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
    m.unlockSlow(new)
  }
}

unlockSlow 方法判断是否有多余的协程在等待(WaiterShiftStarvingWoken 不全为 0),它的核心还是一个 for 循环

go

复制代码
func (m *Mutex) unlockSlow(new int32) {
  if (new+mutexLocked)&mutexLocked == 0 {
    fatal("sync: unlock of unlocked mutex")
  }
  if new&mutexStarving == 0 {
    old := new
    for {
      // If there are no waiters or a goroutine has already
      // been woken or grabbed the lock, no need to wake anyone.
      // In starvation mode ownership is directly handed off from unlocking
      // goroutine to the next waiter. We are not part of this chain,
      // since we did not observe mutexStarving when we unlocked the mutex above.
      // So get off the way.
      // 判断是否有协程在等待
      if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
        return
      }
      // Grab the right to wake someone.
      new = (old - 1<<mutexWaiterShift) | mutexWoken
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 从 seam 树中释放一个协程处理工作
        runtime_Semrelease(&m.sema, false, 1)
        return
      }
      old = m.state
    }
  } else {
    // Starving mode: handoff mutex ownership to the next waiter, and yield
    // our time slice so that the next waiter can start to run immediately.
    // Note: mutexLocked is not set, the waiter will set it after wakeup.
    // But mutex is still considered locked if mutexStarving is set,
    // so new coming goroutines won't acquire it.
    runtime_Semrelease(&m.sema, true, 1)
  }
}
  • Lock 函数核心是休眠一个协程 runtime_SemacquireMutex
  • unLock 函数核心是释放一个协程 runtime_Semrelease,休眠的协程会执行 runtime_SemacquireMutex 下面的代码

锁饥饿

互斥锁在大量竞争时会出现锁饥饿,当协程等待锁超过 1ms,就会进入饥饿模式

一旦进入饥饿模式,其他协程就不自旋,直接进入 sema 中的平衡二叉树中等待,饥饿模式中的协程被唤醒直接获取锁,不会和其他协程竞争

sema 队列清空时,会退出饥饿模式

饥饿模式的好处是:

  1. 在高竞争的环境中,减少了自旋的次数,减少了 CPU 的消耗,新进来的协程直接进入休眠模式
  2. 保证公平,刚刚从 sema 队列中唤醒的协程已经等待的了很长时间

使用互斥锁时要注意:

  1. 尽量减少锁的使用时间
  2. 善用 defer 确保锁的释放

读写锁

读写锁是互斥锁的一种扩展

mu.RLock() 读锁,可以并发读,mu.Lock() 写锁,只能一个协程写

多个协程同时只读,只读时,让其他协程不能修改,这些协程可以并发读,提高效率,修改的协程进不来,获取不到锁

读写锁的原理:

  • 每个锁分为读锁和写锁,写锁互斥
  • 没有加写锁时,多个协程都可以加读锁
  • 加了写锁时,无法加读锁,读协程排队等待
  • 加了读锁,写锁排队等待

sync.RWMutex 结构体定义在 sync/rwmutex.go 文件中

go

复制代码
type RWMutex struct {
  w           Mutex        // held if there are pending writers
  writerSem   uint32       // semaphore for writers to wait for completing readers
  readerSem   uint32       // semaphore for readers to wait for completing writers
  readerCount atomic.Int32 // number of pending readers
  readerWait  atomic.Int32 // number of departing readers
}
  • w:互斥锁,用于写锁,是 pending,表示有等待的写协程
  • writerSem:写协程等待队列
  • readerSem:读协程等待队列
  • readerCount:等待中的读协程数量
    • 正值:正在读的协程
    • 负值:加了写锁
  • readerWait:写锁 w 生效前要等待多少读协程释放

加写锁,没有读协程

加写锁时,如果没有读协程,加写锁就比较好加,直接把 w 的锁加上就行

不过要注意,此时还没有完全加上,此时 readerCount 为 0,也就是说没有读协程,这时需要减去 rwmutexMaxReaders,这个值是 1 << 30,表示最大读协程数量

这样子写锁就加成功了

go-5.png

加写锁,有读协程

没有读协程在排队,但是有 3 个协程在读取数据(加了读锁)

这时候如果写协程进入,需要先获取写锁 w,然后看下readerWait 是否为 0,如果不为 0,就需要等待读协程释放,这时候将 readerCount 减去 rwmutexMaxReaders,表示有写协程在等待

readerCount = 3 变为 readerCount = 3 - rwmutexMaxReaders,表示有写协程在等待,后面的读协程不要在加锁了

然后将 readerWait 设置为 3,表示需要等待 3 个读协程释放后,才能加写锁加上,此时写协程进入 writerSem 队列中等待

go-6.png

加写锁的步骤:

  1. 先加 mutex 写锁,若已经被加写锁会阻塞等待
  2. 将 readerCounter 设置为负值,阻塞读锁的获取
  3. 计算需要等待多少个读协程释放
  4. 如果需要等待读协程释放,这写协程进入 writerSem 队列中等待

RWMutex 的 RLock 方法是加写锁,在 sync/rwmutex.go 文件中定义

go

复制代码
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
  // First, resolve competition with other writers.
  // 获取锁,有了加写锁的资格
  rw.w.Lock()
  // Announce to readers there is a pending writer.
  // readerCount 减去 rwmutexMaxReaders,然后在看下 r 是否为 0
  r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
  // Wait for active readers.
  // r = 0 表示加写锁成功
  // r != 0 表示有读协程在读,需要等待,写协程进入休眠
  if r != 0 && rw.readerWait.Add(r) != 0 {
    runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
  }
}

解写锁

  1. 将 readerCount 变为正值,允许读锁的获取
  2. 释放在 readerSem 中等待的读协程
  3. 解锁 mutex

go-7.png

RWMutex 的 Unlock 方法是解写锁,在 sync/rwmutex.go 文件中定义

go

复制代码
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
  // Announce to readers there is no active writer.
  // 将 readerCount 变为正值,允许读协程获取锁
  r := rw.readerCount.Add(rwmutexMaxReaders)
  // Unblock blocked readers, if any.
  // r = 0 表示没有读协程在读,直接解锁
  // r > 0 表示有读协程在等待,将读协程释放处理
  for i := 0; i < int(r); i++ {
    runtime_Semrelease(&rw.readerSem, false, 0)
  }
  // Allow other writers to proceed.
  rw.w.Unlock()
}

加读锁,readerCount > 0

加读锁时,如果 readerCount 大于 0,比较简单,直接将 readerCount+1 就好了

go-8.png

加读锁,readerCount < 0

readerCount 小于 0,表示现在加了写锁

读协程进来后,将 readerCount+1,读协程进入 readerSem 队列中等待

go-9.png

加读锁的步骤:

  1. 将 readerCount+1
  2. 如果 readerCount 是正数,加锁成功
  3. 如果 readerCount 是负数,说明现在被加了写锁,读协程进入 readerSem 队列中等待

RWMutex 的 RLock 方法是加读锁,在 sync/rwmutex.go 文件中定义

go

复制代码
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() {
  // readerCount + 1
  // readerCount > 0 表示加读锁成功
  // readerCount < 0 表示现在有写锁在工作,读协程进入 readerSem 队列中等待
  if rw.readerCount.Add(1) < 0 {
    // A writer is pending, wait for it.
    runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
  }
}

解读锁,readerCount > 0

解读锁时,如果 readerCount 大于 0,直接将 readerCount-1 就好了

go-10.png

解读锁,readerCount < 0

readerCount 小于 0 表示有写协程在等待

读协程释放后,将 readerCount-1,如果 readerCount 是 0,表示没有读协程了,写协程可以加锁了

go-11.png

解读锁步骤:

  1. 给 readerCount-1
  2. 如果 readerCount 是正数,解锁成功
  3. 如果 readerCount 是负数,说明有写锁在排队
    • 如果自己是 readerWait 最后一个,才唤醒写协程

RWMutex 的 RUnlock 方法是解读锁,在 sync/rwmutex.go 文件中定义

go

复制代码
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
  // readerCount - 1,如果减完后大于 0,表示没有读协程了,解锁成功
  // 如果减完后小于 0,表示有写协程在等待,需要进一步处理
  if r := rw.readerCount.Add(-1); r < 0 {
    // Outlined slow-path to allow the fast-path to be inlined
    rw.rUnlockSlow(r)
  }
}
func (rw *RWMutex) rUnlockSlow(r int32) {
  // A writer is pending.
  // 等待读协程的数量,如果是最后一个读协程,才唤醒写协程
  if rw.readerWait.Add(-1) == 0 {
    // The last reader unblocks the writer.
    runtime_Semrelease(&rw.writerSem, false, 1)
  }
}

总结

  1. Mutex 用来写协程之间互斥等待
  2. 读协程使用 readerSem 等待锁的释放
  3. 写协程使用 writerSem 等待锁的释放
  4. readerCount 记录读协程个数
  5. readerWait 记录写协程之前的读协程个数

如何通过 WaitGroup 互相等待

WaitGroup 是 sync 包中的一个结构体,用来等待一组协程的结束,定义在 sync/waitgroup.go 文件中

go

复制代码
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
//
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {
  noCopy noCopy

  state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
  sema  uint32
}
  • noCopy:用来防止 WaitGroup 被复制,noCopy 结构体定义在 sync/nocopy.go 文件中
  • state:高 32 位是计数器(前面工作的协程有多少个),低 32 位是后面等待的协程数量
  • sema:等待协程的队列

go-12.png

wg.Wait() 方法是等待协程结束,Wait() 方法被调用时,要看下 counter 数量,如果 counter 数量为 0,表示前面的协程已经结束,直接返回,否则后面的协程进入 sema 队列中等待

go

复制代码
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
  for {
    state := wg.state.Load()
    v := int32(state >> 32)
    w := uint32(state)
    // Counter 是 0, 不需要等待
    if v == 0 {
      // Counter is 0, no need to wait.
      return
    }
    // Increment waiters count.
    // 等待的协程数量加 1,然后进入休眠
    if wg.state.CompareAndSwap(state, state+1) {
      runtime_Semacquire(&wg.sema)
      if wg.state.Load() != 0 {
        panic("sync: WaitGroup is reused before previous Wait has returned")
      }
      return
    }
  }
}

wg.Done() 方法是结束一个协程,Done() 方法被调用时 counter-1,表示有一个协程结束了

go

复制代码
func (wg *WaitGroup) Done() {
  wg.Add(-1)
}

wg.Add() 方法是给 Counter 加 n,表示有几个协程要等待

go

复制代码
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
  // 加 n,表示要等待的协程数量
	state := wg.state.Add(uint64(delta) << 32)
	v := int32(state >> 32)
	w := uint32(state)
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	if v > 0 || w == 0 {
		return
	}
	// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
	if wg.state.Load() != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// Reset waiters count to 0.
	wg.state.Store(0)
  // 释放后面等待的协程
	for ; w != 0; w-- {
		runtime_Semrelease(&wg.sema, false, 0)
	}
}

排查锁异常

  1. 检测锁拷贝问题

    shell

    复制代码
    go vet main.go
    
  2. 竞争检测,发现隐含的数据竞争问题

    shell

    复制代码
    go build -race main.go
    ./main
    
  3. 死锁检测,使用第三方工具 go-deadlock,这个包继承了 sync.Mutex,可以检测死锁

零
  • 转载请务必保留本文链接:https://www.0s52.com/bcjc/golangjc/16482.html
    本社区资源仅供用于学习和交流,请勿用于商业用途
    未经允许不得进行转载/复制/分享

发表评论