Tomyang_

V1

2022/08/08阅读:12主题:前端之巅同款

Go源码之原子操作(atomic)

本文基于Go版本:1.17.8

go version go1.17.8 darwin/amd64 

什么是原子性与原子性操作?

原子(atomic)本意是不能被进一步分割的最小粒子,而原子操作(atomic operation)不可中断的一个或者一系列的操作,其实就是让多个协程对一块内存串行化操作,不会因为并发操作把内存写操作为不符合预期。

先看一个例子:假设有一个银行系统,用户A想要从自己的账户中转账2000元到用户B的账户中,直到转账成功完成一个事务,主要做这两件事:

  • 从A的账户中减少2000元,如果A的账户原有4000元,现在账户余额减少到2000元。
  • 給B的账户中添加2000元,如果B的账户原有2000元,那么账户余额增加到4000元。

假设在转账操作的时候,系统发生故障,导致給B账户添加金额失败了,那么就要进行回滚操作,回滚到事务之前的状态,程序把这种要么一起成功的操作或一起失败的操作叫做原子操作,而原子性就是要么完整的被执行,要么完全不执行

如何保证原子性

锁机制:

  • 一种保护机制,在多线程的情况下,保证操作数据的正确性/一致性

总线:

  • CPU总线是所有CPU芯片组链接的主干道,负责CPU与外界所有部件的通信,包括高速缓存、内存、北桥,其控制总线向各个部件发送控制信号,通过地址总线发送地址信号指定其要访问的部件,通过数据总线双向传输

总线锁:

  • 总线锁会把CPU和内存之间的通信锁住了,在锁定期间其它CPU处理器就不能操作其它内存地址的数据,所以总线程锁定的开销比较大。
  • 总线锁的实现是采用CPU提供的LOCK#信号,当一个CPU在总线上输出此信号时,其它CPU的请求将被阻塞,那么该CPU则独占共享内存。

缓存锁:

  • 采用缓存锁定将原子操作放在CPU缓存中机进行(L1,L2,L3 高速缓存)
  • 缓存锁定指当发生共享内存的锁定,CPU不会在总线上声言LOCK#信号,而是修改内存地址,并通过缓存一致性机制保证原子性

多核CPU架构图 悲观锁与乐观锁 悲观锁

  • 悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。
  • 悲观锁适合写操作的场景,先加锁可保证写操作数据的正确。
  • 重量级锁是悲观锁的一种,拥有锁的线程以外的线程全部阻塞。
  • 悲观锁都是在显式的锁定之后再操作资源同步。
package main

import "sync"

var (
 mu   sync.Mutex
 data = make(map[string]string0)
)

func main() {
 Lock()
}

func Lock() {
 mu.Lock()
 data["test"] = "test"
 mu.Unlock()
}

乐观锁

  • 乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。如果这个数据没有被更新,当前线程将自己修改的数据成功写入。如果数据已经被其它线程更新,则根据不同的实现方式执行不同的操作。
  • 自旋锁,轻量级锁偏向锁属于乐观锁
  • 轻量级锁通过CAS操作和自旋来解决加锁问题,避免线程阻塞和唤醒而影响性能。

CAS全称Compare And Swap比较与交换),是一种无锁算法:

  • CAS操作是抱着可观的态度进行的,它总是认为自己可以完成操作。
  • 当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。
  • 失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。
  • 基于这样的原理,CAS操作即使没有锁,也可以发现其它线程对当前线程的干扰,并进行恰当的处理
func CompareAndSwap(int *addr,int oldValue,int newValue) bool{
    if *addr == nil{
        return false
    }
    if *addr == oldValue {
        *addr = newValue
        return true
    }
    return false
}

CAS优缺点:

  • 缺点
    • CPU开销比较大,在高并发量的情况下,如果线程反复尝试更新,却又一直更新不成功,循环往复,会给CPU带来很大压力。
    • 只能保证一个共享变量的原子操作,无法确保整个代码块的原子性。
    • ABA问题:假设有一个变量A,经修改后变为B,然后又修改为A,实际已经修改过,但CAS操作就会误认为它从来没有被修改过,造成了不合理的值修改操作。
      • 解决方案是:是使用版本号,在变量前面追加版本号,每次变量更新的时候把版本号+1,那么从A-B-A改为1A-2B-3A

-优点

  • 在一般情况下,性能优于锁的使用
  • 与锁相比,它是非阻塞性的,它对死锁问题天生免疫,并且线程间的相互影响也远比基于锁的方式要小。
  • 更重要的是使用无锁的方式完全没有锁竞争带来的系统开销,也没有线程间频繁调度带来的开销。

go语言中如何进行原子操作

标准库sync/atomic包将底层硬件提供的原子操作封装成了Go的函数,主要分为5个系列的函数

  • func SwapXXXX(addr *int32, new int32) (old int32)
// 将 new 的值保存到 addr 中,把 addr的值保存到old 中 返回 old
func SwapInt32(addr *int32new int32)(old int32){
  old = *addr
  *addr = new
  return 
}
  • func CompareAndSwapXXXX(addr *int32, old, new int32)(swapped bool)
// 比较addr和old的值,如果相同则将new赋值给addr 并返回真否则返回假
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool){
  if *addr == old {
    *addr = new
    return ture
  }
  return false
}
  • func AddXXXX(addr *int32, delta int32) (new int32)
//将val的值添加到addr并返回新值
func AddInt32(addr *int32, delta int32) (new int32){
  *addr+= delta
  return *addr
}

atomic.Value结构

type Value struct {
 v interface{}
}
type ifaceWords struct {
 typ  unsafe.Pointer
 data unsafe.Pointer
}

Value的写入操作

func (v *Value) Store(val interface{}) {
 if val == nil {
  panic("sync/atomic: store of nil value into Value")
 }
 vp := (*ifaceWords)(unsafe.Pointer(v))
 vlp := (*ifaceWords)(unsafe.Pointer(&val))
 for {
  typ := LoadPointer(&vp.typ)
  if typ == nil {
      //开启Pin
   runtime_procPin()
      //调用CAS方法来判断当前地址是否有被抢占,因为是第一个写入数据,之前是没有数据的,
      //所以通过这样一个中间值来做判断,如果失败就会解除抢占锁,解除禁止调度器,继续循环等待.
   if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
        //解除Pin
    runtime_procUnpin()
    continue
   }
   // 完成第一次的存储
   StorePointer(&vp.data, vlp.data)
   StorePointer(&vp.typ, vlp.typ)
      //解除Pin
   runtime_procUnpin()
   return
  }
    //等待,自旋
  if uintptr(typ) == ^uintptr(0) {
   continue
  }
  // 如果第一次写入完成,会检查上一次写入的类型与这次写入的类型是否一致,不一致则会抛出panic
  if typ != vlp.typ {
   panic("sync/atomic: store of inconsistently typed value into Value")
  }
  StorePointer(&vp.data, vlp.data)
  return
 }
}
  • 首先判断写入的参数不能为nil,否则触发panic
  • 通过使用unsafe.PointeroldValuenewValue转换成ifaceWords类型。方便我们获取它的原始类型typdata
  • 为了保证原子性,使用for换来处理,当已经有Store正在进行写入时,会进行等待。
  • 如果没写入过数据,那么获取不到原始类型,就会开始第一次写入操作,先调用runtime_procUnpin()方法禁止调度器对当前goroutine 的抢占。
  • 调用CAS来判断当前地址是否有被抢占,如果失败就会解除抢占锁,解除禁止调度器,继续循环等待。
  • 第一次写入没完成,还会通过uintptr(typ) == ^uintptr(0)来进行判断,因为第一次放入的中间类型,它依然会继续等待第一次完成。
  • 如果第一次写入完成,会检查上一次写入的类型与这次写入的类型是否一致,不一致则会抛出panic

Value的读操作

func (v *Value) Load() (val interface{}) {
 vp := (*ifaceWords)(unsafe.Pointer(v))
 typ := LoadPointer(&vp.typ)
 if typ == nil || uintptr(typ) == ^uintptr(0) {
  return nil
 }
 data := LoadPointer(&vp.data)
 vlp := (*ifaceWords)(unsafe.Pointer(&val))
 vlp.typ = typ
 vlp.data = data
 return
}
  • 使用unsafe.PointeroldValue转换成ifaceWords类型,获取它的类型,typ == nil || uintptr(typ) == ^uintptr(0) 没有数据或者第一次写入还没有完成。
  • 通过检查后,调用LoadPointer()方法可以获取它的值,然后构造一个新interfacetypdata返回。

CAS的ABA问题思考

package main

import (
 "fmt"
 "sync"
 "sync/atomic"
 "time"
)

func main() {
 var share int64 = 1
 wg := sync.WaitGroup{}
 wg.Add(3)
 go func() {
  defer wg.Done()
  swapped := atomic.CompareAndSwapInt64(&share, 12)
  fmt.Printf("goroutine %d === %t \n"1, swapped)
 }()

 go func() {
  defer wg.Done()
  time.Sleep(5 * time.Millisecond)
  swapped := atomic.CompareAndSwapInt64(&share, 12)
  fmt.Printf("goroutine %d === %t \n"2, swapped)
 }()

 go func() {
  defer wg.Done()
  time.Sleep(1 * time.Millisecond)
  swapped := atomic.CompareAndSwapInt64(&share, 21)
  fmt.Printf("goroutine %d === %t \n"3, swapped)
 }()
 wg.Wait()
 fmt.Println("main exit")
}

分类:

后端

标签:

Golang

作者介绍

Tomyang_
V1

Golang