java并发编程十 原子累加器和Unsafe

2023-12-24 06:02:00

原子累加器

累加器性能比较

private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
 T adder = adderSupplier.get();
 long start = System.nanoTime();
 List<Thread> ts = new ArrayList<>();
 // 4 个线程,每人累加 50 万
for (int i = 0; i < 40; i++) {
 ts.add(new Thread(() -> {
 for (int j = 0; j < 500000; j++) {
 action.accept(adder);
            }
        }));
    }
 ts.forEach(t -> t.start());
 ts.forEach(t -> {
 try {
 t.join();
        } 
catch (InterruptedException e) {
 e.printStackTrace();
        }
    });
 long end = System.nanoTime();
 System.out.println(adder + " cost:" + (end - start)/1000_000);
 }

比较 AtomicLong 与 LongAdder

 for (int i = 0; i < 5; i++) {
 demo(() -> new LongAdder(), adder -> adder.increment());
 }
 for (int i = 0; i < 5; i++) {
 demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
 }

输出

1000000 cost:43 
1000000 cost:9 
1000000 cost:7 
1000000 cost:7 
1000000 cost:7 
1000000 cost:31 
1000000 cost:27 
1000000 cost:28 
1000000 cost:24 
1000000 cost:22 

性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性
能。

  • 源码之 LongAdder
    LongAdder 是并发大师 @author Doug Lea 的作品,设计的非常精巧
    LongAdder 类有几个关键域
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
 // 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
 // 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;

cas 锁

// 不要用于实践!!!
public class LockCas {
 private AtomicInteger state = new AtomicInteger(0);
 public void lock() {
 while (true) {
 if (state.compareAndSet(0, 1)) {
 break;
            }
        }
    }
     public void unlock() {
 log.debug("unlock...");
 state.set(0);
    }
}

测试

LockCas lock = new LockCas();
 new Thread(() -> {
 log.debug("begin...");
 lock.lock();
 try {
 log.debug("lock...");
 sleep(1);
    } 
finally {
 lock.unlock();
    }
 }).start();
 new Thread(() -> {
 log.debug("begin...");
 lock.lock();
 try {
 log.debug("lock...");
    } 
finally {
 lock.unlock();
    }
 }).start();

输出

18:27:07.198 c.Test42 [Thread-0] - begin... 
18:27:07.202 c.Test42 [Thread-0] - lock... 
18:27:07.198 c.Test42 [Thread-1] - begin... 
18:27:08.204 c.Test42 [Thread-0] - unlock... 
18:27:08.204 c.Test42 [Thread-1] - lock... 
18:27:08.204 c.Test42 [Thread-1] - unlock... 

原理之伪共享

其中 Cell 即为累加单元

// 防止缓存行伪共享
@sun.misc.Contended 
static final class Cell {
 volatile long value;
 Cell(long x) { value = x; }
 // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {
 return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
    }
 // 省略不重要代码
}

得从缓存说起
缓存与内存的速度比较
在这里插入图片描述

从 cpu 到大约需要的时钟周期
寄存器1 cycle (4GHz 的 CPU 约为0.25ns)
L13~4 cycle
L210~20 cycle
L340~45 cycle
内存120~240 cycle

因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
在这里插入图片描述

因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象。这样问题来了:

  • Core-0 要修改 Cell[0]
  • Core-1 要修改 Cell[1]
    论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效
    @sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
    在这里插入图片描述

累加主要调用下面的方法

public void add(long x) {
 // as 为累加单元数组
// b 为基础值
// x 为累加值
Cell[] as; long b, v; int m; Cell a;
 // 进入 if 的两个条件
// 1. as 有值, 表示已经发生过竞争, 进入 if
// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
 if ((as = cells) != null || !casBase(b = base, b + x)) {
 // uncontended 表示 cell 没有竞争
boolean uncontended = true;
 if (
 // as 还没有创建
as == null || (m = as.length - 1) < 0 ||
 // 当前线程对应的 cell 还没有
            (a 
= as[getProbe() & m]) == null ||
 // cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )
 !(uncontended = a.cas(v = a.value, v + x))
        ) {
         // 进入 cell 数组创建、cell 创建的流程
         longAccumulate(x, null, uncontended);
        }
    }
 }

add 流程图
在这里插入图片描述

final void longAccumulate(long x, LongBinaryOperator fn,
 boolean wasUncontended) {
 int h;
 // 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cell
 if ((h = getProbe()) == 0) {
 // 初始化 probe
 ThreadLocalRandom.current();
 // h 对应新的 probe 值, 用来对应 cell
 h = getProbe();
 wasUncontended = true;
    }
 // collide 为 true 表示需要扩容
boolean collide = false;                
for (;;) {
 Cell[] as; Cell a; int n; long v;
 // 已经有了 cells
 if ((as = cells) != null && (n = as.length) > 0) {
 // 还没有 cell
 if ((a = as[(n - 1) & h]) == null) {
 // 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x
 // 成功则 break, 否则继续 continue 循环
            }
 // 有竞争, 改变线程对应的 cell 来重试 cas
 else if (!wasUncontended)
 wasUncontended = true;
 // cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null
 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
 break;
 // 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 cas
 else if (n >= NCPU || cells != as)
 collide = false;
 // 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了
else if (!collide)
 collide = true;
 // 加锁
else if (cellsBusy == 0 && casCellsBusy()) {
 // 加锁成功, 扩容
continue;
            }
 // 改变线程对应的 cell
 h = advanceProbe(h);
        }
 // 还没有 cells, 尝试给 cellsBusy 加锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
 // 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell
 // 成功则 break;
        }
         // 上两种情况失败, 尝试给 base 累加
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
 break;
   }
 }

longAccumulate 流程图
在这里插入图片描述

在这里插入图片描述

每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)
在这里插入图片描述

获取最终结果通过 sum 方法

public long sum() {
 Cell[] as = cells; Cell a;
 long sum = base;
 if (as != null) {
 for (int i = 0; i < as.length; ++i) {
 if ((a = as[i]) != null)
 sum += a.value;
        }
    }
 return sum;
 }

Unsafe

概述
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得

public class UnsafeAccessor {
 static Unsafe unsafe;
 static {
 try {            
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
 theUnsafe.setAccessible(true);
 unsafe = (Unsafe) theUnsafe.get(null);
        } 
catch (NoSuchFieldException | IllegalAccessException e) {
 throw new Error(e);
        }
    }
    static Unsafe getUnsafe() {
 return unsafe;
    }
 }

Unsafe CAS 操作

@Data
 class Student {
 volatile int id;
 volatile String name;
 }
 Unsafe unsafe = UnsafeAccessor.getUnsafe();
 Field id = Student.class.getDeclaredField("id");
 Field name = Student.class.getDeclaredField("name");
 // 获得成员变量的偏移量
long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id);
 long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);
  Student student = new Student();
 // 使用 cas 方法替换成员变量的值
UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20);  // 返回 true
 UnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true
 System.out.println(student);

输出

Student(id=20, name=张三) 

使用自定义的 AtomicData 实现之前线程安全的原子整数 Account 实现

class AtomicData {
 private volatile int data;
 static final Unsafe unsafe;
 static final long DATA_OFFSET;
 static {
 unsafe = UnsafeAccessor.getUnsafe();
 try {
 // data 属性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性
DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));
        } 
catch (NoSuchFieldException e) {
 throw new Error(e);
        }
    }
 public AtomicData(int data) {
 this.data = data;
    }
    public void decrease(int amount) {
 int oldValue;
 while(true) {
 // 获取共享变量旧值,可以在这一行加入断点,修改 data 调试来加深理解
oldValue = data;
 // cas 尝试修改 data 为 旧值 + amount,如果期间旧值被别的线程改了,返回 false
 if (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {
 return;
            }
        }
    }
 public int getData() {
 return data;
    }
 }

Account 实现

Account.demo(new Account() {
 AtomicData atomicData = new AtomicData(10000);
 @Override
 public Integer getBalance() {
 return atomicData.getData();
    }
 @Override
 public void withdraw(Integer amount) {
 atomicData.decrease(amount);
    }
 });

文章来源:https://blog.csdn.net/studycodeday/article/details/135107549
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。