WaitGroup并发控制原理及底层源码实现

2023-12-14 17:29:36

WaitGroup并发控制原理及底层源码实现

在这里插入图片描述

1.1实现原理

在这里插入图片描述

1.2底层源码

在这里插入图片描述

type WaitGroup struct {
   noCopy noCopy

   // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
   // 64-bit atomic operations require 64-bit alignment, but 32-bit
   // compilers only guarantee that 64-bit fields are 32-bit aligned.
   // For this reason on 32 bit architectures we need to check in state()
   // if state1 is aligned or not, and dynamically "swap" the field order if
   // needed.
   state1 uint64
   state2 uint32
}
func (wg *WaitGroup) Add(delta int) {
   statep, semap := wg.state()
   if race.Enabled {
      _ = *statep // trigger nil deref early
      if delta < 0 {
         // Synchronize decrements with Wait.
         race.ReleaseMerge(unsafe.Pointer(wg))
      }
      race.Disable()
      defer race.Enable()
   }
   //把delta左移32位累加到state,即累加到counter中
   state := atomic.AddUint64(statep, uint64(delta)<<32) 
   v := int32(state >> 32)//右移32位,获取counter
   w := uint32(state) //获取waiter
   if race.Enabled && delta > 0 && v == int32(delta) {
      // The first increment must be synchronized with Wait.
      // Need to model this as a read, because there can be
      // several concurrent wg.counter transitions from 0.
      race.Read(unsafe.Pointer(semap))
   }
    //经过累加后counter值变为负,panic
   if v < 0 {
      panic("sync: negative WaitGroup counter")
   }
   if w != 0 && delta > 0 && v == int32(delta) {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }
   //思考counter累加后为正,或者counter为零为何退出
   //counter为正,说明不用去释放信号量,直接退出
   //waiter为零,说明没有等待者,也不需要释放信号量,直接退出
   if v > 0 || w == 0 {
      return
   }
   // This goroutine has set counter to 0 when waiters > 0.
   // Now there can't be concurrent mutations of state:
   // - Adds must not happen concurrently with Wait,
   // - Wait does not increment waiters if it sees counter == 0.
   // Still do a cheap sanity check to detect WaitGroup misuse.
   if *statep != state {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }
   // Reset waiters count to 0.
   *statep = 0
   for ; w != 0; w-- {
       //释放信号量,执行一次释放一个,唤醒一个等待者
      runtime_Semrelease(semap, false, 0)
   }
}
func (wg *WaitGroup) Done() {
    //counter减1
	wg.Add(-1)
}

func (wg *WaitGroup) Wait() {
    //获取statep和semaphone指针地址
	statep, semap := wg.state()
	if race.Enabled {
		_ = *statep // trigger nil deref early
		race.Disable()
	}
	for {
		state := atomic.LoadUint64(statep)//获取state
		v := int32(state >> 32)//获取counter值值
		w := uint32(state)//获取wainter值
		//如果counter值为0,说明所有goroutine都退面了,不需要等待,直接返回
        if v == 0 {
			// Counter is 0, no need to wait.
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
		// 使用CAS(比较交换算法)累加waiter,累加可能会失败,失败后通过for loop 下次重试
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			if race.Enabled && w == 0 {
				// Wait must be synchronized with the first Add.
				// Need to model this is as a write to race with the read in Add.
				// As a consequence, can do the write only for the first waiter,
				// otherwise concurrent Waits will race with each other.
				race.Write(unsafe.Pointer(semap))
			}
            //累加成功后,等待信号量唤醒自已
			runtime_Semacquire(semap)
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
	}
}

CAS算法确保,同时多个goroutine执行累加waiter不会出现问题

package main

import (
   "fmt"
   "sync/atomic"
)

func main() {
   testCompareAndSwap1()
   testCompareAndSwap2()
}
func testCompareAndSwap1() {
   //为uint64赋值
   var (
      i uint64 = 1
   )
   //调用CompareAndSwapInt64 method
   Swap := atomic.CompareAndSwapUint64(&i, 1, 2)
   //如果发生交换为false,显示false
   fmt.Println(Swap)
   fmt.Println("The new value os i is:", i)
}

func testCompareAndSwap2() {
   var (
      i uint64
   )
   //交换操作,这里的值变为2
   var oldvalue = atomic.SwapUint64(&i, 2)
   //打印新旧值
   fmt.Println("Swapped_value:", i, ",old_value", oldvalue)
   //调用CompareAndSwapInt64 method
   Swap := atomic.CompareAndSwapUint64(&i, 1, 3)
   fmt.Println(Swap)
   fmt.Println("The new value os i is:", i)
}

文章来源:https://blog.csdn.net/lisus2007/article/details/134999780
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。