并发包工具类详解
1、CountDownLatch工具详解
? ? ? 这是一个同步助手,允许一个或者多个线程等待一些列的其他线程执行结束。CountDownLatch是基于同步控制器AQS(AbstractQueuedSynchronizer)实现的。
? ? 具体详见: Latch(门阀)设计模式
? ? 可以将countDown方法放在finally中执行,以规避线程执行的异常。
2、CyclicBarrier(循环屏障)工具详解
? ? ? CyclicBarrier,循环屏障,是一个同步助手工具,允许多个线程在执行完相应的操作之后,彼此等待共同到达一个障点。它非常适合某个串行化任务被拆分成若干个并行任务的子任务,当所有子任务都执行结束后,再继续接下来的工作。它可以被重复使用。CyclicBarrier是基于Lock和Condition实现的。
? ? ? 使用CyclicBarrier时,需要注意以下事项:1、当一个线程执行CyclicBarrier的await方法进入阻塞状态,这时对该线程执行中断操作会导致CyclicBarrier被broken;2、被broken的CyclicBarrier此时已经不能在直接使用了,需要使用的话,必须使用reset方法对其重置;3、如果有其他线程此时也由于执行了await方法进入阻塞状态,那么该线程会被唤醒并抛出BrokenBarrierException。
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
public class CycliBarrierTest {
public static void main(String[] args) {
AtomicInteger sign=new AtomicInteger(1);
CyclicBarrier barrier=new CyclicBarrier(6);
for(int i=0;i<5;i++) {
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" with sign :"+sign.getAndIncrement());
}catch(Exception e) {
e.printStackTrace();
}finally {
try {
barrier.await();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("after the barrier");
}
}).start();
}
System.out.println("main thread before barrier");
try {
barrier.await();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("main thread is after barrier");
}
}
//输出
main thread before barrier
Thread-2 with sign :2
Thread-4 with sign :4
Thread-0 with sign :5
Thread-3 with sign :3
Thread-1 with sign :1
after the barrier
main thread is after barrier
after the barrier
after the barrier
after the barrier
after the barrier
3、Exchanger工具详解
? ? Exchanger简化了两个线程之间的交互,并且提供了两个线程之间的数据交换点,Exchanger等待两个线程调用其exchange方法。调用此方法时,交换机会交换两个线程提供给对方的数据。使用Exchanger时需要注意:1、使用Exchanger的两个线程,如果其中一个线程由于某种原因意外退出,那么此时另外一个线程将永远处于阻塞状态;2、适用于生产者-消费者场景,特别是一生产者一消费者的情况,要做好线程管理,防止线程出现无限阻塞或假死的情况。如下示例中,如果这一对线程中,有任意一个线程执行失败时,均抛出超时异常并线程结束。 示例代码如下:
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ExchangerTest {
public static void main(String[] args) {
Exchanger<String> exchanger=new Exchanger<>();
Thread producer=new Thread(()->{
try {
String mes=exchanger.exchange("data from producer", 60, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+" get "+mes+" from consumer thread");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
},"producer");
producer.start();
System.out.println("producer starts.");
Thread consumer=new Thread(()->{
try {
String mes=exchanger.exchange("data from consumer", 60, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+" get "+mes+" from producer thread");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
},"consumer");
consumer.start();
System.out.println("comsumer starts.");
}
}
4、Semaphore工具详解
? ? Semaphore(信号量)是一个线程同步工具,用于在一个时刻允许多个线程对共享资源进行并行操作的场景。通常情况下,使用Semaphore的过程实际上是多个线程获取访问共享资源许可证的过程。Semaphore工具在获取锁和释放锁的过程是独立的过程,即acquire方法获取的锁和release方法释放的锁不是一一对应的。Semaphore可以控制多个线程对共享资源进行访问,但是对于共享资源的临界区以及线程安全性,Semaphore不会提供任何保证。实际使用是可以参照如下格式:
Semaphore semaphore=new Semaphore(10);
try {
semaphore.acquire();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
//add code here
try {
} finally {
semaphore.release();
}
为了更好的管理Semaphore的锁,下面提供了Semaphore增强版的内容:
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreEnhence extends Semaphore{
private static final long serialVersionUID = 1L;
private final ConcurrentLinkedQueue<Thread> queue=new ConcurrentLinkedQueue<>();
public SemaphoreEnhence(int permits) {
super(permits);
}
public SemaphoreEnhence(int permits,boolean fair) {
super(permits,fair);
}
public void acquire() throws InterruptedException {
super.acquire();
this.queue.add(Thread.currentThread());
}
public void acquire(int permits) throws InterruptedException {
super.acquire(permits);
this.queue.add(Thread.currentThread());
}
public void acquireUninterruptibly() {
super.acquireUninterruptibly();
this.queue.add(Thread.currentThread());
}
public void acquireUninterruptibly(int permits) {
super.acquireUninterruptibly(permits);
this.queue.add(Thread.currentThread());
}
public boolean tryAcquire() {
boolean acquired=super.tryAcquire();
if(acquired) {
this.queue.add(Thread.currentThread());
}
return acquired;
}
public boolean tryAcquire(int permits) {
boolean acquired=super.tryAcquire(permits);
if(acquired) {
this.queue.add(Thread.currentThread());
}
return acquired;
}
public boolean tryAcquire(long timeout,TimeUnit unit) throws InterruptedException {
boolean acquired=super.tryAcquire(timeout, unit);
if(acquired) {
this.queue.add(Thread.currentThread());
}
return acquired;
}
public boolean tryAcquire(int permits,long timeout,TimeUnit unit) throws InterruptedException {
boolean acquired=super.tryAcquire(permits, timeout, unit);
if(acquired) {
this.queue.add(Thread.currentThread());
}
return acquired;
}
public void release() {
final Thread t=Thread.currentThread();
if(this.queue.contains(t)){
super.release();
this.queue.remove(t);
}else {
return;
}
}
public void release(int permits) {
final Thread t=Thread.currentThread();
if(this.queue.contains(t)) {
super.release(permits);
this.queue.remove(t);
}
}
}
5、Phaser工具详解
? ? ?Phaser是一个多线程同步工具,是一个可以被重复使用的同步屏障。在大多数情况下,Phaser可以替代CyclicBarrier和CountDownLatch的应用场景,除此之外,它还具有可动态改变的分片,以及可被多次使用的特性等。在Phaser中,可以有多个Phase,只要一个Phase的所有关联的分片都到达屏障点后,Phaser会从下一个Phase继续开始,除非Phaser已经被终止或者销毁。在Phaser中,几乎所有方法的返回值都是Phase编号。
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class PhaserTest {
public static void main(String[] args) {
final Phaser phaser=new Phaser();
for(int i=0;i<10;i++) {
new Thread(()->{
int pid=phaser.register();
System.out.println("current phase no:" +pid);
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(20));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int tid=phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+" current phase no:" +tid);
},"T-"+i).start();
}
int mid=phaser.register();
System.out.println(Thread.currentThread().getName()+" current phase no:" +mid);
mid=phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+" current phase no:" +mid);
}
}
//输出
current phase no:0
current phase no:0
current phase no:0
current phase no:0
current phase no:0
current phase no:0
current phase no:0
current phase no:0
current phase no:0
main current phase no:0
current phase no:0
T-6 current phase no:1
T-1 current phase no:1
T-5 current phase no:1
T-8 current phase no:1
T-3 current phase no:1
T-4 current phase no:1
T-7 current phase no:1
T-0 current phase no:1
T-9 current phase no:1
main current phase no:1
T-2 current phase no:1
6、Lock&ReentrantLock详解
? ? Lock接口是对锁操作方法的一个基本定义,提供了synchronized关键字所具备的全部功能方法,另外可借助Lock创建的不同Condition对象进行多线程间的通信操作。ReentrantLock是Lock实现中使用最多的一种。锁的存在,无论是Lock还是synchronized,主要是解决多线程资源竞争问题,通过保障排他性,实现保障原子性。在使用锁时,需要注意:1、确保已获取的锁得到释放;2、避免锁的交叉使用引起死锁;3、多个原子性方法的组合不能确保原子性。
? ? ReentrantLock、ReentrantReadWriteLock、synchronized的性能比较:
-
单线程访问时,synchronized的性能远远高于ReentrantLock,ReentranLock高于ReentrantReadWriteLock
-
多线程访问时,ReentrantLock的性能高于synchronized,ReentrantReadWriteLock在读/写的数值很高时效率高于ReentrantLock
7、Condition详解
? ? ?Condition对象是关联在Lock上的,并且一个Lock可以创建多个Condition对象。每个Condition对象可以对应最少一种临界条件。这种精细化的同步操作提高了Lock的同步效率。
import java.util.LinkedList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
public class LockConditionSample {
private final static ReentrantLock lock=new ReentrantLock();
private final static Condition full_condition=lock.newCondition();
private final static Condition empty_condition=lock.newCondition();
private final static LinkedList<Long> list=new LinkedList<>();
private final static int MAX=100;
private static Long i=0L;
private static void produce() {
lock.lock();
try {
while(list.size()>=MAX) {
full_condition.await();
}
i++;
list.addLast(i);
System.out.println(Thread.currentThread().getName()+"->"+i);
empty_condition.signalAll();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
lock.unlock();
}
}
private static void consume() {
lock.lock();
try {
while(list.isEmpty()) {
empty_condition.await();
}
Long value=list.removeFirst();
System.out.println(Thread.currentThread().getName()+"->"+value);
full_condition.signalAll();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
lock.unlock();
}
}
private static void sleep() {
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) {
IntStream.range(0, 10).forEach(i->new Thread(()->{
for(;;) {
produce();
sleep();
}
},"produce-"+i).start());
IntStream.range(0, 5).forEach(i->new Thread(()->{
for(;;) {
consume();
sleep();
}
},"consume-"+i).start());
}
}
8、StampedLock详解
? ? ?StampedLock几乎具备了ReentrantLock和ReentrantReadWriteLock这两种类型锁的所有功能。
? ? ? 饥饿写是指在使用读写锁的时候,读线程的数量远远大于写线程的数量,导致锁长期被读线程霸占,写线程无法获得锁从而进入饥饿状态。当构造读写锁的时候指定其为公平锁,读写线程获得锁的机会相对公平,但是当读线程远远大于写线程的时候,写线程的效率会比较底下。在使用读写锁进行数据一致性保护时,需要做好线程数量的评估,包括线程操作的任务类型。如果无法明确了解读写线程的分布情况,请使用ReentrantLock。如果应用程序中读操作远远多于写操作,为了提高数据读取的并发量,StampedLock的乐观读是一个不错的选择,同时还不会引起饥饿写的问题。
? StampedLock代替ReentrantLock的样例范式:
//replace ReentrantLock
final StampedLock slock=new StampedLock();
long stamp=slock.writeLock();
try {
//add write here
}finally {
slock.unlockWrite(stamp);
}
long stamp=slock.readLock();
try {
//add read here
}finally {
slock.unlockRead(stamp);
}
StampedLock乐观读模式,如果在if分支通过验证,则直接返回共享数据,属于无锁读取;如果不能通过if条件的验证,则获取读锁,读取共享数据:
final StampedLock slock=new StampedLock();
long stamp=slock.tryOptimisticRead();
if(slock.validate(stamp)) {
return sharedData;
}else {
stamp=slock.readLock();
try {
return sharedData;
}finally {
slock.unlockRead(stamp);
}
}
9、第三方包Guava
? ? Guava是Google提供的,提供了一些简单易用的并发工具,以Monitor和RateLimiter比较出名。
? ? Monitor工具提供了一种将临界值判断抽取成Guard的处理方式,可以方便地定义若干个Guard也就是临界值的判断,以及对临界值判断的重复使用,除此之外,还具备synchronized和Lock的完整语义。
? ? ?RateLimiter,速率限流器,经常用于流量、访问等的限制。它关注的是在单位时间里对资源操作速率,即单位时间内颁发许可证的数量。
? ? ?漏桶算法是一种常见的限流算法,漏桶算法的原理如下:1、无论漏桶进水速率如何,漏桶的出水速率永远是固定的;2、如果漏桶没有水,则在出水口不会有水流出;3、漏桶有一定的水容量;4、如果流入水量超过漏桶容量,则水将溢出(降权处理)。漏桶算法可以基于Monitor和RateLimiter进行实现。
? ? ? 令牌环桶算法与漏桶算法相反,在对某个资源或方法进行调用之前,先要获取令牌,即许可证,才能进行相关操作,否则将不被允许。令牌环桶算法原理如下:1、根据固定的速率向桶里提交数据;2、新添加的数据如果超过桶的容量,则请求将被直接拒绝;3、如果令牌不足,则请求也会被拒绝(请求可以再次尝试)。使用Monitor和RateLimiter可以实现。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!