WaitGroup并发控制原理及底层源码实现
2023-12-14 17:29:36
WaitGroup并发控制原理及底层源码实现
1.1实现原理
1.2底层源码
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers only guarantee that 64-bit fields are 32-bit aligned.
// For this reason on 32 bit architectures we need to check in state()
// if state1 is aligned or not, and dynamically "swap" the field order if
// needed.
state1 uint64
state2 uint32
}
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
//把delta左移32位累加到state,即累加到counter中
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)//右移32位,获取counter
w := uint32(state) //获取waiter
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(semap))
}
//经过累加后counter值变为负,panic
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
//思考counter累加后为正,或者counter为零为何退出
//counter为正,说明不用去释放信号量,直接退出
//waiter为零,说明没有等待者,也不需要释放信号量,直接退出
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
//释放信号量,执行一次释放一个,唤醒一个等待者
runtime_Semrelease(semap, false, 0)
}
}
func (wg *WaitGroup) Done() {
//counter减1
wg.Add(-1)
}
func (wg *WaitGroup) Wait() {
//获取statep和semaphone指针地址
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
state := atomic.LoadUint64(statep)//获取state
v := int32(state >> 32)//获取counter值值
w := uint32(state)//获取wainter值
//如果counter值为0,说明所有goroutine都退面了,不需要等待,直接返回
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// 使用CAS(比较交换算法)累加waiter,累加可能会失败,失败后通过for loop 下次重试
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
//累加成功后,等待信号量唤醒自已
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
CAS算法确保,同时多个goroutine执行累加waiter不会出现问题
package main
import (
"fmt"
"sync/atomic"
)
func main() {
testCompareAndSwap1()
testCompareAndSwap2()
}
func testCompareAndSwap1() {
//为uint64赋值
var (
i uint64 = 1
)
//调用CompareAndSwapInt64 method
Swap := atomic.CompareAndSwapUint64(&i, 1, 2)
//如果发生交换为false,显示false
fmt.Println(Swap)
fmt.Println("The new value os i is:", i)
}
func testCompareAndSwap2() {
var (
i uint64
)
//交换操作,这里的值变为2
var oldvalue = atomic.SwapUint64(&i, 2)
//打印新旧值
fmt.Println("Swapped_value:", i, ",old_value", oldvalue)
//调用CompareAndSwapInt64 method
Swap := atomic.CompareAndSwapUint64(&i, 1, 3)
fmt.Println(Swap)
fmt.Println("The new value os i is:", i)
}
文章来源:https://blog.csdn.net/lisus2007/article/details/134999780
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!