《深入探索Java并发编程》--从锁到并发工具的深入解析

2023-12-29 18:10:32

JUC是什么?

JUC就是java.util.concurrent下面的类包,专门用于多线程的开发

img

关于锁

传统锁synchronized

如下有三个线程A、B、C线程去争夺ticket资源,sale()方法前面用synchronized修饰以后,这个方法的调用对象 ticket就被上锁了,每个线程去用这个对象调用sale()方法的时候都会独立运行。如果你不加这个synchronized就会出现A、B、C线程争夺资源的情况。

在这里,我们说 的本质就是:队列。 线程排队去获得CPU执行线程,执行完毕下一个线程上。

package org.example;

/**
 * @author linghu
 * @date 2023/12/15 15:02
 */
public class Demo01 {
    public static void main(String[] args) {
        final Ticket ticket=new Ticket();
        //启动线程A
        new Thread(()->{
            for (int i=0;i<40;i++){
                ticket.sale();
            }
        },"A").start();

        new Thread(()->{
            for (int i=0;i<40;i++){
                ticket.sale();
            }
        },"B").start();

        new Thread(()->{
            for (int i=0;i<40;i++){
                ticket.sale();
            }
        },"C").start();
    }
}
class Ticket{
    private int number=30;
    //卖票的方式
    public synchronized void sale(){
        if (number>0){
            System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"张票剩余"+
                    number+"张票");
        }
    }
}

加了synchronized运行如下图:

img

Lock锁

lock三部曲:

  • 创建锁
 final Ticket ticket=new Ticket();
  • 加锁
 lock.lock();
  • 解锁
lock.unlock();
package org.example;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author linghu
 * @date 2023/12/15 15:02
 */
public class Demo01 {
    public static void main(String[] args) {
        final Ticket ticket=new Ticket();
        //启动线程A
        new Thread(()->{
            for (int i=0;i<40;i++){
                ticket.sale();
            }
        },"A").start();

        new Thread(()->{
            for (int i=0;i<40;i++){
                ticket.sale();
            }
        },"B").start();

        new Thread(()->{
            for (int i=0;i<40;i++){
                ticket.sale();
            }
        },"C").start();
    }
}
//lock三部曲
class Ticket{
    private int number=30;
    //1、创建锁
    Lock lock=new ReentrantLock();
    //卖票的方式
    public synchronized void sale(){
        //2、开启锁
        lock.lock();
        try {
            if (number>0){
                System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"张票剩余"+
                        number+"张票");
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            //3、关闭锁
            lock.unlock();
        }
    }
}

上面的代码中,被lock三部曲锁上的代码,只能由一个线程执行。

img

synchronized和Lock的区别

img

生产者和消费者问题

synchronized版生产者和消费者问题
package org.example;

/**
 * @author linghu
 * @date 2023/12/16 16:45
 * A num+1
 * B num-1
 * 顺序:判断->业务->通知
 */
public class A {
    public static void main(String[] args) {
        Data data = new Data();

        //A:num+1
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        //B:num-1
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
    }
}

class Data{
    private int number=0;

    //+1
    public synchronized void increment() throws InterruptedException {
        if (number!=0){
            //等待
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        //通知其他线程,我-1完毕了
        this.notify();
    }

    //-1
    public synchronized void decrement() throws InterruptedException {
        if (number==0){
            //等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        //通知其他线程,我-1完毕了
        this.notify();
    }


}

注:如果有四个线程,就会出现假唤醒问题。

我们添加如下四个线程A、B、C、D:

package org.example;

/**
 * @author linghu
 * @date 2023/12/16 16:45
 * A num+1
 * B num-1
 * 顺序:判断->业务->通知
 */
public class A {
    public static void main(String[] args) {
        Data data = new Data();

        //A:num+1
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        //B:num-1
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();

        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();

        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}

class Data{
    private int number=0;

    //+1
    public synchronized void increment() throws InterruptedException {
        if (number!=0){
            //等待
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        //通知其他线程,我-1完毕了
        this.notify();
    }

    //-1
    public synchronized void decrement() throws InterruptedException {
        if (number==0){
            //等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        //通知其他线程,我-1完毕了
        this.notify();
    }


}

img

那么什么是虚假唤醒呢?

多线程环境下,有多个线程执行了wait()方法,需要其他线程执行notify()或者notifyAll()方法去唤醒它们,假如多个线程都被唤醒了,但是只有其中一部分是有用的唤醒操作,其余的唤醒都是无用功;对于不应该被唤醒的线程而言,便是虚假唤醒。
比如:仓库有货了才能出库,突然仓库入库了一个货品;这时所有的线程(货车)都被唤醒,来执行出库操作;实际上只有一个线程(货车)能执行出库操作,其他线程都是虚假唤醒。

为了防止虚假唤醒,我们在这里采用将if换为while:

package org.example;

/**
 * @author linghu
 * @date 2023/12/16 16:45
 * A num+1
 * B num-1
 * 顺序:判断->业务->通知
 */
public class A {
    public static void main(String[] args) {
        Data data = new Data();

        //A:num+1
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        //B:num-1
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();

        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();

        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}

class Data{
    private int number=0;

    //+1
    public synchronized void increment() throws InterruptedException {
        while (number!=0){
            //等待
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        //通知其他线程,我-1完毕了
        this.notify();
    }

    //-1
    public synchronized void decrement() throws InterruptedException {
        while(number==0){
            //等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        //通知其他线程,我-1完毕了
        this.notify();
    }


}

JUC版的生产者消费者问题

img

package org.example;

/**
 * @author linghu
 * @date 2023/12/16 16:45
 * A num+1
 * B num-1
 * 顺序:判断->业务->通知
 */
public class B {
    public static void main(String[] args) {
        Data2 data = new Data2();
      
    }
}

class Data2{
    private int number=0;

    //+1
    public  void increment() throws InterruptedException {
         while (number!=0){
            //等待
           
        }
        number++;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
    }

    //-1
    public void decrement() throws InterruptedException {
         while (number==0){
            //等待
           
        }
        number--;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
    }


}

我们用传统三剑客写的生产者消费者,现在把它删掉,准备用上面的代码进行改进,改成JUC版本的生产者消费者。

JUC的代码如下:

package org.example;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author linghu
 * @date 2023/12/16 16:45
 * A num+1
 * B num-1
 * 顺序:判断->业务->通知
 */
public class B {
    public static void main(String[] args) {
        Data2 data2 = new Data2();
        //A:num+1
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data2.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        //B:num-1
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data2.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();

        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data2.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();

        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data2.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();

    }
}

class Data2{
    private int number=0;
    Lock lock=new ReentrantLock();
    Condition condition= lock.newCondition();

    //+1
    public  void increment() throws InterruptedException {
        lock.lock();//上锁操作
        try {
            while (number!=0){
                //等待
                condition.await();
            }
            //业务代码...
            number++;
            System.out.println(Thread.currentThread().getName()+"=>"+number);
            condition.signalAll();//通知其他线程,我+1完毕
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();//解锁操作
        }
    }

    //-1
    public void decrement() throws InterruptedException {
        lock.lock();
        try {
            while (number==0){
                //等待
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName()+"=>"+number);
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


}

执行结果如下:

img

我们现在希望按照:A->B->C->D的顺序去进行执行,精准通知唤醒我们的线程!

如下代码中,我们实现了A->B->C->D的顺序,主要是靠 Condition监控器,我们设置了三个监控器:

    Condition conditionA = lock.newCondition();
    Condition conditionB = lock.newCondition();
    Condition conditionC = lock.newCondition();

唤醒方式如下:

 public void weakA()
    {
        lock.lock();
        try {
            while(num != 1)
            {
                conditionA.await();
            }
           //业务代码...
            conditionB.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void weakB()
    {
        lock.lock();
        try {
            while(num != 2)
            {
                conditionB.await();
            }
             //业务代码...
            conditionC.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void weakC()
    {
        lock.lock();
        try {
            while(num != 3)
            {
                conditionC.await();
            }
              //业务代码...
            conditionA.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

如下为完整代码:

package org.example;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author linghu
 * @date 2023/12/18 14:44
 */
class Aweaken
{
    ReentrantLock lock = new ReentrantLock();
    int num = 1;
    Condition conditionA = lock.newCondition();
    Condition conditionB = lock.newCondition();
    Condition conditionC = lock.newCondition();

    public void weakA()
    {
        lock.lock();
        try {
            while(num != 1)
            {
                conditionA.await();
            }
            num = 2;
            System.out.println("现在是线程 "+Thread.currentThread().getName()+", 下一个应该是线程B");
            conditionB.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void weakB()
    {
        lock.lock();
        try {
            while(num != 2)
            {
                conditionB.await();
            }
            num = 3;
            System.out.println("现在是线程 "+Thread.currentThread().getName()+", 下一个应该是线程C");
            conditionC.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void weakC()
    {
        lock.lock();
        try {
            while(num != 3)
            {
                conditionC.await();
            }
            num = 1;
            System.out.println("现在是线程 "+Thread.currentThread().getName()+", 下一个应该是线程A");
            conditionA.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class C {

    public static void main(String[] args) {
        Aweaken b = new Aweaken();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                b.weakA();
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                b.weakB();
            }
        },"B").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                b.weakC();
            }
        },"C").start();

    }
}

执行效果:

img

8锁现象

在Java并发编程中,锁是一种关键的同步机制,用于控制多个线程对共享资源的访问。然而,在某些情况下,使用锁可能会引发性能问题,其中一个典型例子就是JUC中的8锁现象,也被称为锁粗化。

那何为8锁呢?

8锁现象是一种并发编程中的现象,主要涉及到Java中的synchronized关键字。这个术语来源于对锁的八个问题的探讨,每个问题都围绕着锁的不同行为和效果展开。

  1. 标准情况下,两个线程先打印“发短信”还是“打电话”?
  2. “打电话”方法暂停4秒钟,两个线程先打印“发短信”还是“打电话”?
  3. 两个普通的锁方法,new一个对象调用,调用过程中间睡1秒,执行结果是什么?
  4. 两个普通的锁方法,new两个对象调用,调用过程中间睡1秒,执行结果是什么?
  5. 两个普通的锁方法,一个对象调用,一个类调用,调用过程中间睡1秒,执行结果是什么?
  6. 两个普通的锁方法,一个对象调用,一个类调用,但是类上加了synchronized关键字,调用过程中间睡1秒,执行结果是什么?
  7. 两个普通的锁方法,一个对象调用,一个类调用,但是类上加了synchronized关键字和方法上加了synchronized关键字,调用过程中间睡1秒,执行结果是什么?
  8. 两个普通的锁方法,一个对象调用,一个类调用,但是类上加了synchronized关键字和方法上加了synchronized关键字,但是方法上加了final关键字,调用过程中间睡1秒,执行结果是什么?

我们将上面的8个问题进行抽象:

案例一(先打印“发短信”)
  • 标准情况下,两个线程先打印 发短信还是 打电话? 1/发短信 2/打电话。
  • sendSms延迟4秒,两个线程先打印 发短信还是 打电话? 1/发短信 2/打电话。
public class Test1 {
    public static void main(String[] args) {
        Phone phone = new Phone();

        //锁的存在
        new Thread(()->{
            phone.sendSms();
        },"A").start();

        // 捕获
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone.call();
        },"B").start();
    }
}

class Phone{

    // synchronized 锁的对象是方法的调用者!、
    // 两个方法用的是同一个锁,谁先拿到谁执行!
    public synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    public synchronized void call(){
        System.out.println("打电话");
    }

}

运行效果:

img

💧案例一总结:synchronized 锁的对象是方法的调用者,两个方法用的是同一个锁,谁先拿到谁执行。

案例二(先打印“打电话”)
  • 增加了一个普通方法后!先执行发短信还是Hello? 普通方法。
  • 两个对象,两个同步方法, 发短信还是 打电话? // 打电话
package org.example;

import java.util.concurrent.TimeUnit;

public class Test2  {
    public static void main(String[] args) {
        // 两个对象,两个调用者,两把锁!
        Phone2 phone1 = new Phone2();
        Phone2 phone2 = new Phone2();

        //锁的存在
        new Thread(()->{
            phone1.sendSms();
        },"A").start();

        // 捕获
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone2.call();
        },"B").start();
    }
}

class Phone2{

    // synchronized 锁的对象是方法的调用者!
    public synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    public synchronized void call(){
        System.out.println("打电话");
    }

    // 这里没有锁!不是同步方法,不受锁的影响
    public void hello(){
        System.out.println("hello");
    }

}

运行效果:

img

💧案例二总结:synchronized 锁的对象是方法的调用者,hello() 没有锁!不是同步方法,所以不受锁的影响。

案例三(先打印“发短信”)

增加两个静态的同步方法,只有一个对象,先打印 发短信?打电话?

两个对象!增加两个静态的同步方法, 先打印 发短信?打电话?

package org.example;

import java.util.concurrent.TimeUnit;

public class Test3  {
    public static void main(String[] args) {
        // 两个对象的Class类模板只有一个,static,锁的是Class
        Phone3 phone1 = new Phone3();
        Phone3 phone2 = new Phone3();

        //锁的存在
        new Thread(()->{
            phone1.sendSms();
        },"A").start();

        // 捕获
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone2.call();
        },"B").start();
    }
}

// Phone3唯一的一个 Class 对象
class Phone3{

    // synchronized 锁的对象是方法的调用者!
    public static synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    public static synchronized void call(){
        System.out.println("打电话");
    }


}

运行效果:

img

💧案例三总结:两个对象的Class类模板只有一个。static 静态方法在类一加载就有了!锁的是Class。

案例四(先打印“打电话”)

1个静态的同步方法,1个普通的同步方法 ,一个对象,先打印 发短信?打电话?

1个静态的同步方法,1个普通的同步方法 ,两个对象,先打印 发短信?打电话?

package org.example;

import java.util.concurrent.TimeUnit;

public class Test4  {
    public static void main(String[] args) {
        // 两个对象的Class类模板只有一个,static,锁的是Class
        Phone4 phone1 = new Phone4();
        Phone4 phone2 = new Phone4();
        //锁的存在
        new Thread(()->{
            phone1.sendSms();
        },"A").start();

        // 捕获
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone2.call();
        },"B").start();
    }
}

// Phone3唯一的一个 Class 对象
class Phone4{

    // 静态的同步方法 锁的是 Class 类模板
    public static synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    // 普通的同步方法  锁的调用者
    public synchronized void call(){
        System.out.println("打电话");
    }

}

运行效果:

img

💧案例四总结:静态的同步方法 锁的是 Class 类模板,普通的同步方法 锁的调用者。

new this 具体的一个手机

static Class 唯一的一个模板

集合不安全

所谓集合不安全,就是指:在高并发的情况下,创建线程使用集合会报异常!

我们看一下常用集合线程安全问题:

  1. List不安全
  2. ArrayList不安全
  3. Set不安全
  4. Map不安全
List集合不安全

我们通过如下代码,循环30次创建线程,在线程中用到List集合,我还在业务代码那里 try-catch一下,捕捉出现的异常。

/**
 * @author linghu
 * @date 2023/12/19 11:38
 */
public class ListTest {
    public static void main(String[] args) {
        List<Object> arrayList = new ArrayList<>();
        for (int i=1;i<=30;i++){
            try {
                new Thread(()->{
                    arrayList.add(UUID.randomUUID().toString().substring(0,5));
                    System.out.println(arrayList);
                },String.valueOf(i)).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

代码运行结果:

img

**结论:**List集合和ArrayList集合在多线程中使用不安全

List集合不安全的解决方案

我们可以采用 COW的思想解决这个不安全问题。所谓 COW是指 CopyOnWriteArrayList写入时复制!是计算机程序领域的一种优化策略。

解决方案:

  • new Vector<>()
  • new CopyOnWriteArrayList<>()

代码如下:

/**
 * @author linghu
 * @date 2023/12/19 11:38
 */
public class ListTest {
    public static void main(String[] args) {
        /**
         * 解决方案
         * 1、List<String> list = new Vector<>();
         * 2、List<String> list = new CopyOnWriteArrayList<>();
         */
//        List<Object> arrayList = new ArrayList<>();
        List<String> list = new CopyOnWriteArrayList<>();
        for (int i=1;i<=30;i++){
            try {
                new Thread(()->{
                    list.add(UUID.randomUUID().toString().substring(0,5));
                    System.out.println(list);
                },String.valueOf(i)).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

img

结论: CopyOnWriteArrayList写入时复制!能解决集合的线程安全问题!

总结
  • Vector底层使用的是 synchronized来实现的,所以效率特别低下。
  • CopyOnWriteArrayList使用的是 Lock锁,效率会更加高效。
Set集合不安全

Set和List同理可得:多线程情况下,普通的Set集合是线程不安全的。

如下代码,我在循环30次中执行线程,创建set集合:

package org.example;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * @author linghu
 * @date 2023/12/19 14:27
 */
public class SetTest {
    public static void main(String[] args) {
        /**
         * 解决方案

         */
        Set<String> set = new HashSet<>();

        for (int i=1;i<=30;i++){
            try {
                new Thread(()->{
                    set.add(UUID.randomUUID().toString().substring(0,5));
                    System.out.println(set);
                },String.valueOf(i)).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

运行效果:

img

List集合不安全的解决方案

解决方案如下:

  • CopyOnWriteArraySet<>()
  • HashSet<>()

我们还是用了类似的原理:写入时复制

代码如下:

package org.example;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @author linghu
 * @date 2023/12/19 14:27
 */
public class SetTest {
    public static void main(String[] args) {
        /**
         * 解决方案
         * 1、Set<String> set = new CopyOnWriteArraySet<>();
         */
//        Set<String> set = new HashSet<>();
        Set<String> set = new CopyOnWriteArraySet<>();
        for (int i=1;i<=30;i++){
            try {
                new Thread(()->{
                    set.add(UUID.randomUUID().toString().substring(0,5));
                    System.out.println(set);
                },String.valueOf(i)).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

结论: CopyOnWriteArraySet写入时复制!能解决集合的线程安全问题!

总结
  • HashSet的底层是HashMap
  • CopyOnWriteArraySet写入时复制!能解决集合的线程安全问题!

Callable

Callable的出现主要是为了弥补继承Thread或实现Runnable接口的线程执行完成后,无法获得线程执行后的结果。

Callable其实上手是非常简单容易得!

我们看通过Callable创建线程的方式如下:

class MyThread implements Callable<String>{
    //这里的泛型返回类型和call方法的返回类型是一样的
    @Override
    public String call() throws Exception {
        return "hi,call";
    }
}

上面,线程 MyThread通过继承接口Callable<String>,这里的String其实是泛型。如下的call方法的类型和这个泛型相同。

如下代码,我们通过接口 Callable<String> 创建了A、B线程执行 call方法。

package org.example;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * @author linghu
 * @date 2023/12/19 16:42
 */
public class CallableTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //?怎么启动Callable
        MyThread myThread = new MyThread();
        //适配类
        FutureTask futureTask = new FutureTask(myThread);

        new Thread(futureTask,"A").start();
        new Thread(futureTask,"B").start();//结果被缓存,效率高,结果只打印一次

        //获取Callable的返回结果
        String o = (String)futureTask.get();
        System.out.println(o);
    }
}

class MyThread implements Callable<String>{
    //这里的泛型返回类型和call方法的返回类型是一样的
    @Override
    public String call() throws Exception {
        return "hi,call";
    }
}

执行结果:

img

高并发常用-辅助类

在高并发场景中,常用辅助类如下:

  • CountDownLatch
  • CyclickBarrier
  • Semaphore

CountDownLatch

其实CountDownLatch的本质就是一个减法计数器

比如我们去游乐园坐激流勇进,有的时候游乐园里人不是那么多,这时,管理员会让你稍等一下,等人坐满了再开船,这样的话可以在一定程度上节约游乐园的成本。座位有多少,就需要等多少人,这就是 CountDownLatch 的核心思想,等到一个设定的数值达到之后,才能出发。

img

上面这幅图就很好理解了。可以看到,最开始 CountDownLatch 设置的初始值为 3,然后 T0 线程上来就调用 await 方法,它的作用是让这个线程开始等待,等待后面的 T1、T2、T3,它们每一次调用 countDown 方法,3 这个数值就会减 1,也就是从 3 减到 2,从 2 减到 1,从 1 减到 0,一旦减到 0 之后,这个 T0 就相当于达到了自己触发继续运行的条件,于是它就恢复运行了。

我们可以限制6个线程,等待他们都执行完毕,代码如下:

package org.example;

import java.util.concurrent.CountDownLatch;

/**
 * @author linghu
 * @date 2023/12/20 9:44
 */
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        //总数是6个线程
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i=1;i<=6;i++){
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"Go out");
                countDownLatch.countDown();//数量-1,本质是阻塞
            },String.valueOf(i)).start();
        }
        countDownLatch.await();//等待计数器归0,然后往下执行~
        System.out.println("Close Door");
    }
}

img

总结
  • countDown()是减一操作
  • await是等待计数器归0操作

CyclickBarrier

其实CyclickBarrier 的本质就是一个加法计数器

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

package org.example;

import java.util.concurrent.CyclicBarrier;

/**
 * @author linghu
 * @date 2023/12/20 10:32
 */
public class CyclickBarrierDemo {
    public static void main(String[] args) {
        /**
         * 集齐7颗龙珠召唤神龙
         */
        //召唤龙珠的线程
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("召唤神龙~");
        });
        for (int i=1;i<=7;i++){
            int temp=i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"搜集"+temp+"颗龙珠");
                try {
                    cyclicBarrier.await();//等待计数器到7,然后往下执行~
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

如上代码运行结果如下:

img

总结
  • cyclicBarrier.await() 是 Java 中的一个方法,用于让当前线程等待其他线程到达屏障(CyclicBarrier)时再继续执行。当所有线程都调用了 await() 方法后,屏障才会打开,允许所有线程继续执行。这个方法通常用在多线程编程中,以确保所有线程都达到某个同步点后再继续执行。

Semaphore

在操作系统中我们学过信号量,学过PV操作。其实这里的 Semaphore信号量就是用来做线程限流操作的。

我们看如下代码:

package org.example;

import sun.awt.windows.ThemeReader;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * @author linghu
 * @date 2023/12/20 10:58
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        //线程数量:停车位!用来限流
        Semaphore semaphore = new Semaphore(3);

        for (int i=1;i<=6;i++){
            new Thread(()->{
                try {
                    semaphore.acquire();//得到
                    System.out.println(Thread.currentThread().getName()+"抢到车位");
                    TimeUnit.SECONDS.sleep(2);//让车多停一会
                    System.out.println(Thread.currentThread().getName()+"离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();//释放
                }

            },String.valueOf(i)).start();
        }
    }
}

在上面代码中,我们对6个线程-车进行限流,只有3个车位,6个车去抢车位!

总结
  • semaphore.acquire();//得到,等待释放为止
  • semaphore.release();//释放信号量以后会唤醒等待中的线程

关于PV操作,其实可以看我这个视频课程:

《【信号量pv操作】信号量进程同步互斥问题》

读写锁

所谓的读写锁(Readers-Writer Lock),顾名思义就是将一个锁拆分为读锁和写锁两个锁。其中读锁允许多个线程同时获得,而写锁则是互斥锁,不允许多个线程同时获得写锁,并且写操作和读操作也是互斥的。

为什么要读写锁?

Synchronized 和 ReentrantLock 都是独占锁,即在同一时刻只有一个线程获取到锁。然而在有些业务场景中,我们大多在读取数据,很少写入数据,这种情况下,如果仍使用独占锁,效率将及其低下。针对这种情况,Java提供了读写锁——ReentrantReadWriteLock。

主要解决:对共享资源有读和写的操作,且写操作没有读操作那么频繁的场景。

案例

如下分别读写线程6次执行:

package org.example;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author linghu
 * @date 2023/12/20 11:50
 */
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache2 myCache2 = new MyCache2();
        int num=6;
        for (int i=1;i<=6;i++){
            int finall=i;
            //6个线程开始写
            new Thread(()->{
                myCache2.write(String.valueOf(finall),String.valueOf(finall));
            },String.valueOf(i)).start();

            //6个线程开始读
            new Thread(()->{
                myCache2.read(String.valueOf(finall));
            },String.valueOf(i)).start();
        }
    }
}
class MyCache2{
    private volatile Map<String, String> map=new HashMap<>();
    private ReadWriteLock lock=new ReentrantReadWriteLock();

    public void write(String key,String value){
        lock.writeLock().lock();//写锁
        try {
            System.out.println(Thread.currentThread().getName()+"线程开始写入");
            map.put(key,value);
            System.out.println(Thread.currentThread().getName()+"线程写入OK");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.writeLock().unlock();//释放写锁
        }
    }
    public void read(String key){
        lock.readLock().lock();//读锁
        try {
            System.out.println(Thread.currentThread().getName()+"线程开始读取");
            map.get(key);
            System.out.println(Thread.currentThread().getName()+"线程读取OK");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.readLock().unlock();//释放读锁
        }
    }
}

运行效果:

img

总结

如果我们不用锁,多线程的读写会造成数据不可靠的问题。我们可以采用独占锁:Synchronized 和 ReentrantLock保证数据可靠。但是使用更细粒度的锁读写锁(Readers-Writer Lock)效率更高!

BlockQueue

BlockQueueCollection的一个子类,我们把它叫做:阻塞队列!整个队列的家族是下面这幅图这个样子的:

img

通过上图我们知道了我们重点要学的是BlockQueue

BlockingQueue主要提供了四类方法,如下表所示:

img

同步队列

同步队列没有容量,也可以视为容量为1的队列。

  • 进去一个元素必须等待取出来以后才能往里面放元素
  • put了一个元素,就必须从里面先take出来,否则不能再put进去值!

代码

如下通过一个代码案例进行说明:

package org.example;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;

/**
 * @author linghu
 * @date 2023/12/21 15:16
 */
public class SynchronousQueue {
    public static void main(String[] args) {
        BlockingQueue<String> synchronousQueue=
                new java.util.concurrent.SynchronousQueue<>();
        //往queue中添加元素
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"put 01");
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName()+"put 02");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName()+"put 03");
                synchronousQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        //取出元素
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take());
                System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take());
                System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

总结:上述代码中,三个线程负责往队列里put元素,但是只能等到对列的元素被take的时候才能往里面put!

执行结果如下:

img

上图得知总结:01,02在put以后只有take以后才能put03!说明了队列的同步性!

线程池

线程池用到了池化技术,其实在编程中,很多地方都有池化技术的思想,比如数据库连接池,HttpClient 连接池等。池化技术的出现主要是为了解决:资源的利用问题!提高效率,对资源进行复用!

这里我们用了线程池,目的就是 复用线程!。线程复用才可以控制最大并发数,管理线程!

线程池学习原则总结为:

  • 三大方式
  • 七大参数
  • 四种拒绝策略

线程池的三大方法

  • Executors.newSingleThreadExecutor()单个线程的创建
  • Executors.newFixedThreadPool(5)创建一个固定大小的线程池
  • Executors.newCachedThreadPool()可伸缩的线程池创建
package org.example;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author linghu
 * @date 2023/12/21 16:35
 * Executors工具类,3大方法
 */
public class Demo {
    public static void main(String[] args) {
        //单个线程池,线程
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        //固定的线程池大小
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        //可伸缩的线程池大小
//        ExecutorService threadPool = Executors.newCachedThreadPool();

        try {
            for (int i=0;i<10;i++){
                //使用线程池创建线程,以前用new Thread()来创建
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"OK");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //线程池用完,程序结束,关闭线程池
            threadPool.shutdown();
        }
    }
}

运行结果:

img

七大参数

这里说到七大参数是指我们在自定义线程池的时候用到的七大参数!

七大参数如下:

  • 核心线程数(corePoolSize):2
  • 最大线程数(maximumPoolSize):5
  • 空闲线程存活时间(keepAliveTime):3秒
  • 时间单位(unit):TimeUnit.SECONDS
  • 任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3
  • 线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory())
  • 拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。

接下来我们通过一个银行等待业务的例子讲清楚上面的七大参数。

img

如上图所示,空闲线程存活时间(keepAliveTime)表示在候客区4、5窗口没人的情况下,3秒以后,这个窗口就会被释放掉,不能让它一直空闲下去,这样非常浪费资源!

接下来就是手动创建一个线程池,代码如下:

 //自定义线程池
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,//核心线程数(corePoolSize):2
                5,//最大线程数(maximumPoolSize):5
                3,//空闲线程存活时间(keepAliveTime):3秒
                TimeUnit.SECONDS,//时间单位(unit):TimeUnit.SECONDS
                new LinkedBlockingDeque<>(3),//任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3
                Executors.defaultThreadFactory(),//线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory())
                new ThreadPoolExecutor.AbortPolicy()//拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。
        );

在上述代码的拒绝策略中,我们需要知道拒绝策略有四种:

  • new ThreadPoolExecutor.AbortPolicy();//银行满了,还有人进来,不处理这个人,抛出异常
  • new ThreadPoolExecutor.CallerRunsPolicy();//哪里来的去哪里
  • new ThreadPoolExecutor.DiscardPolicy();//队列满了,丢掉任务,不会抛出异常
  • new ThreadPoolExecutor.DiscardOldestPolicy();//队列满了,尝试去和最早的竞争,也不会抛出异常
package org.example;

import java.util.concurrent.*;

/**
 * @author linghu
 * @date 2023/12/22 10:36
 * 拒绝策略:
 *          new ThreadPoolExecutor.AbortPolicy();//银行满了,还有人进来,不处理这个人,抛出异常
 *         new ThreadPoolExecutor.CallerRunsPolicy();//哪里来的去哪里
 *         new ThreadPoolExecutor.DiscardPolicy();//队列满了,丢掉任务,不会抛出异常
 *         new ThreadPoolExecutor.DiscardOldestPolicy();//队列满了,尝试去和最早的竞争,也不会抛出异常
 */
public class Demo02 {
    public static void main(String[] args) {
        //自定义线程池
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        try {
            for (int i=1;i<=8;i++){
                //使用自己定义的线程池创建了线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" OK");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            threadPool.shutdown();
        }
    }
}

我们在如上代码中,创建了线程池,通过线程池创建线程,通过循环创建了8个任务,每个任务打印当前线程的名称和"OK"。这些任务被提交到自定义的线程池中执行。最后,在finally块中关闭线程池。

img

从上述运行结果来看,线程池中的核心线程就是1和3。最大线程数为5,空闲线程存活时间为3秒,任务队列容量为3,使用默认的线程工厂创建线程,拒绝策略为AbortPolicy

如何设置线程池的大小?

设置线程池的小主要是做调优的工作!

 //自定义线程池
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,//核心线程数(corePoolSize):2
                5,//最大线程数(maximumPoolSize):5
                3,//空闲线程存活时间(keepAliveTime):3秒
                TimeUnit.SECONDS,//时间单位(unit):TimeUnit.SECONDS
                new LinkedBlockingDeque<>(3),//任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3
                Executors.defaultThreadFactory(),//线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory())
                new ThreadPoolExecutor.AbortPolicy()//拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。
        );

上述代码中,我们需要知道最大线程数(maximumPoolSize)应该怎么设置?

设置原则为:

  • CPU密集型:电脑的核数是几核,这里就写多少
  • I/O密集型:判断程序中消耗IO线程的数量,然后乘以1倍到2倍即可!
CPU密集型

电脑核数的查看方式有两种:

  • 任务管理器
  • Runtime.getRuntime().availableProcessors()
任务管理器

任务管理器CPU的查看如下:

img

Runtime.getRuntime().availableProcessors()

其实这个方法的调用如下:

public class Main {
    public static void main(String[] args) {
        int coreCount = Runtime.getRuntime().availableProcessors();
        System.out.println("电脑的核数为: " + coreCount);
    }
}

I/O密集型

这里我们需要知道我们的IO任务是多少,然后大于这个任务1倍~2倍即可。

四大函数接口

顾明思议:函数接口就是只有一个方法的接口,如下面这个接口:

img

总结:这种函数式接口用的非常多,特别是在很多框架中。

四大函数接口是Java中用于处理不同类型数据的方法,它们分别是:

  1. Consumer:接收一个输入参数并对其执行某种操作,但不返回任何结果。
  2. Function:接收一个输入参数并返回一个结果。
  3. Predicate:接收一个输入参数并返回一个布尔值,表示该参数是否满足某个条件。
  4. Supplier:不接收任何参数,但返回一个结果。
Function函数型接口

Function函数型接口就是有一个输入参数,有一个输出参数。

import java.util.function.Function;

/**
 * @author linghu
 * @date ${DATE} ${TIME}
 */
public class Main {
    public static void main(String[] args) {
        //输出输入的值
//        Function function=new Function<String,String>(){
//            @Override
//            public String apply(String str) {
//                return str;
//            }
//        };

        //只要是函数型接口,就可以用lambda表达式简化
        Function function1=(str)->{
          return str;
        };

        System.out.println(function1.apply("123"));
    }
}
predicate断定型接口

predicate断定型接口就是有一个输入参数,返回值只能是布尔值。

import java.util.function.Predicate;

/**
 * @author linghu
 * @date 2023/12/22 17:01
 */
public class Demo02 {
    public static void main(String[] args) {
//        Predicate<String> predicate = new Predicate<>() {
//            @Override
//            public boolean test(String s) {
//                return s.isEmpty();
//            }
//        };
        Predicate<String> predicate2 =(str)->{
            return str.isEmpty();
        };
        System.out.println(predicate2.test("addd"));
    }
}

Suppier供给型接口

Suppier供给型接口是没有参数,只有返回值

import java.util.function.Supplier;

/**
 * @author linghu
 * @date 2023/12/22 17:11
 */
public class Demo03 {
    public static void main(String[] args) {
        Supplier supplier = new Supplier<>() {
            @Override
            public Integer get() {
                return 1024;
            }
        };
        Supplier supplier2 =()->{
            return 1024;
        };
        System.out.println(supplier2.get());
    }
}

Consumer消费性接口

Consumer消费性接口只有输入,没有返回值。

import java.util.function.Consumer;

/**
 * @author linghu
 * @date 2023/12/22 17:15
 */
public class Demo04 {
    public static void main(String[] args) {
        Consumer<String> consumer = new Consumer<>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        };
        Consumer<String> consumer2 =(str)->{
            System.out.println(str);
        };
        consumer2.accept("hi");
    }
}

Stream流式计算

Stream 就好像一个高级的迭代器,但只能遍历一次,就好像一江春水向东流;在流的过程中,对流中的元素执行一些操作,比如“过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等。

流的操作可以分为两种类型:

1)中间操作,可以有多个,每次返回一个新的流,可进行链式操作。

2)终端操作,只能有一个,每次执行完,这个流也就用光光了,无法执行下一个操作,因此只能放在最后。

@NoArgsConstructor
public class User {
    private int id;
    private String name;
    private int age;

    public User(int i, String a, int i1) {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}
package org.example;

import java.util.Arrays;
import java.util.List;

/**
 * 题目要求:
 * 1、ID必须是偶数
 * 2、年龄必须大于23岁
 * 3、用户名转为大写字母
 * 4、用户名字母倒着排序
 * 5、只输出一个用户名
 */
public class Test {
    public static void main(String[] args) {
        User u1 = new User(1, "a", 21);
        User u2 = new User(2, "b", 22);
        User u3 = new User(3, "c", 23);
        User u4 = new User(4, "d", 24);
        User u5 = new User(5, "e", 25);
        //集合是存储
        List<User> list= Arrays.asList(u1,u2,u3,u4,u5);

        //计算交给Stream流

        list.stream()//下面相当于写条件
                .filter(u->{
                    return u.getId()%2==0;
                })
                .filter(u->{
                    return u.getAge()>23;
                })
                .map(u->{
                    return u.getName().toUpperCase();
                })
                .sorted((o1,o2)->{
                    return o2.compareTo(o1);
                })
                .limit(1)
                .forEach(System.out::println);
    }
}

ForkJoin

? 在JDK中,提供了这样一种功能:它能够将复杂的逻辑拆分成一个个简单的逻辑来并行执行,待每个并行执行的逻辑执行完成后,再将各个结果进行汇总,得出最终的结果数据。有点像Hadoop中的MapReduce。

? ForkJoin是由JDK1.7之后提供的多线程并发处理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进行汇总。相应的,ForkJoin将复杂的计算当做一个任务,而分解的多个计算则是当做一个个子任务来并行执行。

package org.example;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

/**
 * @author linghu
 * @date 2023/12/25 11:18
 */
public class ForkJoinTest {
    private static final long SUM=20_0000_0000;
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        test01();
        test02();
        test03();
    }
    //使用普通方法
    public static void test01(){
        long start=System.currentTimeMillis();
        long sum=0L;
        for (int i=1;i<SUM;i++){
            sum+=i;
        }
        long end=System.currentTimeMillis();
        System.out.println(sum);
        System.out.println("时间:"+(end-start));
        System.out.println("============================");
    }
    //使用ForkJoin方法
    public static void test02() throws ExecutionException, InterruptedException {
        long start=System.currentTimeMillis();

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinDemo(0L, SUM);
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);
        Long aLong = submit.get();

        System.out.println(aLong);
        Long end=System.currentTimeMillis();
        System.out.println("时间:"+(end-start));
        System.out.println("==========================");
    }

    //使用Stream流计算
    public static void test03(){
        long start=System.currentTimeMillis();
        long sum= LongStream.range(0L,2000000000L).parallel().reduce(0,Long::sum);
        System.out.println(sum);
        long end=System.currentTimeMillis();
        System.out.println("时间:"+(end-start));
        System.out.println("==========================");
    }


}

package org.example;

import java.util.Locale;
import java.util.concurrent.RecursiveTask;

/**
 * @author linghu
 * @date 2023/12/25 10:59
 */
public class ForkJoinDemo extends RecursiveTask<Long> {
    private long star;
    private long end;
    //临界值
    private long temp=100000L;

    public ForkJoinDemo(long star, long end) {
        this.star = star;
        this.end = end;
    }

    //计算方法
    @Override
    protected Long compute() {
        if ((end-star)<temp){
            Long sum=0L;
            for (Long i=star;i<end;i++){
                sum+=i;
            }
            return sum;
        }else {
            //使用ForkJoin分而治之计算
            //1、计算平均值
            long middle=(star+end)/2;
            ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(star, middle);
            //拆分任务,把线程压入线程队列
            forkJoinDemo1.fork();
            ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle,end);
            forkJoinDemo2.fork();

            Long taskSum=forkJoinDemo1.join()+forkJoinDemo2.join();
            return taskSum;
        }
    }
}

img

ForkJoin特点:工作窃取

工作窃取

实现原理是:双端队列!

异步回调

Java中异步回调执行和前端的 Ajax其实是一样的。但是在Java中的话用的是 CompletableFuture

回调情况分为:

  • 没有返回值的runAsync异步回调
  • 有返回值的异步回调supplyAsync

没有返回值的runAsync异步回调

对于异步回调,其实就是三个过程:

  • 异步执行
  • 成功回调
  • 失败回调

代码如下:

package org.example;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @author linghu
 * @date 2023/12/25 16:11
 * desc: 异步调用:CompleteableFuture
 * //异步执行
 * //成功回调
 * //失败回调
 */
public class Demo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //发起一个请求void
        //没有返回值runAsync异步回调
        CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(()->{

            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"runAsync=>void");
        });
        System.out.println("1111111111111");
        completableFuture.get();//获取执行结果
    }
}

总结:上面会先发起一个请求,请求休眠,同时打印语句,打印完毕以后回调get这个执行结果

img

有返回值的异步回调supplyAsync

whenComplete((t, u)有两个参数,一个是t,一个是u:

  • T是正常返回的结果
  • U是抛出异常的错误信息

如果发生了异常,get可以获取 exceptionally((e)返回的错误信息。

package org.example;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @author linghu
 * @date 2023/12/25 16:11
 * desc: 异步调用:CompleteableFuture
 * //异步执行
 * //成功回调
 * //失败回调
 */
public class Demo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
             int i=10/0;
            return 1024;
        });

        System.out.println(completableFuture.whenComplete((t, u) -> {
            System.out.println("t=>" + t);
            System.out.println("u=>" + u);
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return 233;//获取错误的返回结果
        }).get());
    }
}

如上返回的结果:

img

JMM

JMM是一个概念,不是真实存在的,用来描述Java内存模型。

JMM是一种规范,目的是解决由于多线程通过共享内存进行通信时,存在的本地内存数据不一致、编译器会对代码指令重排序、处理器会对代码乱序执行等带来的问题。

img

关于JMM的一些同步约定

img

通过上图总结:

  • 线程解锁前,必须把共享变量立刻撤回主存
  • 线程加锁前,必须读取主存中的最新值到工作内存
  • 加锁和解锁是同一把锁
  • 线程中分为 工作内存主内存

现在如果有两个线程都在对主存进行操作:

img

通过上图发现,当我们的线程B修改了主存值,并且读取到了工作内存,这个时候线程A不知道,怎么办呢?

于是引入了 volatile

volatile

定义

  • volatile是Java提供的一种轻量级的同步机制
  • Java 语言包含两种内在的同步机制:同步块(或方法)和 volatile 变量。相比于synchronizedsynchronized通常称为重量级锁),volatile更轻量级,因为它不会引起线程上下文的切换和调度。但是volatile 变量的同步性较差(有时它更简单并且开销更低),而且其使用也更容易出错。

1)、可见性

当多个线程访问同一个变量时,一个线程修改了这个变量的值,另外一个线程是可以看到修改的值。

package org.example;

import java.util.concurrent.TimeUnit;

/**
 * @author linghu
 * @date 2023/12/26 16:12
 */
public class JMMDemo01 {
    //如果不加volatile程序会死循环
    //加了volatile是可以保证可见性的
//    private static Integer number=0;
    private volatile static Integer number=0;
    public static void main(String[] args) {
        //main线程

        //子线程1
        new Thread(()->{
            while (number==0){

            }
        }).start();
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //子线程2
        new Thread(()->{
            while (number==0){

            }
        }).start();
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        number=1;
        System.out.println(number);
    }
}

注:synchronized和lock都能保证可见性。

2)、不保证原子性

线程A在执行任务的时候,不能被打扰的,也不能被分割的,要么同时成功,要么同时失败。

package org.example;

/**
 * @author linghu
 * @date 2023/12/26 16:44
 */
public class VDemo02 {
    private static volatile int number=0;
    public synchronized static void add(){
        number++;
    }
    public static void main(String[] args) {
        for (int i=1;i<=20;i++){
            new Thread(()->{
                for (int j=1;j<=1000;j++){
                    add();
                }
            }).start();
        }

        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName()+",num="+number);
    }
}

总结:模拟多线程环境下对共享变量的操作,并展示线程之间的竞争和协作。

使用Thread.yield()方法让出CPU资源给其他线程

在这个地方,如果不加lock和synchronized,怎么保证原子性?

理论上,num的值应该为:2万。

package org.example;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author linghu
 * @date 2023/12/26 16:44
 */
public class VDemo02 {
//    private static volatile AtomicInteger number=new AtomicInteger();
    private static volatile int number=0;
    public static void add(){
        number++;
//        number.incrementAndGet();//底层是CAS保证的原子性,加1操作
    }
    public static void main(String[] args) {
        for (int i=1;i<=20;i++){
            new Thread(()->{
                for (int j=1;j<=1000;j++){
                    add();
                }
            }).start();
        }

        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName()+",num="+number);
    }
}

img

解决方案

可以用原子类解决问题,比锁的效率高很多!

package org.example;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author linghu
 * @date 2023/12/26 16:44
 */
public class VDemo02 {
    private static volatile AtomicInteger number=new AtomicInteger();
//    private static volatile int number=0;
    public static void add(){
//        number++;
        number.incrementAndGet();//底层是CAS保证的原子性,加1操作
    }
    public static void main(String[] args) {
        for (int i=1;i<=20;i++){
            new Thread(()->{
                for (int j=1;j<=1000;j++){
                    add();
                }
            }).start();
        }

        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName()+",num="+number);
    }
}

img

3)、禁止指令重排

计算机并不是按照我们自己写的那样去执行的。

指令重排一般分为以下三种

  • 编译器优化 重新安排语句的执行顺序
  • 指令并行重排 利用指令级并行技术将多个指令并行执行,如果指令之前没有数据依赖,处理器可以改变对应机器指令的执行顺序
  • 内存系统重排 由于处理使用缓存和读写缓冲区,所以它们是乱序的
package org.example;

import org.openjdk.jol.info.ClassLayout;

/**
 * @author linghu
 * @date 2023/12/27 14:25
 */
public class CountObjectSize {
    int a=10;
    int b=20;
    double c=30.0;
    public static void main(String[] args) {
        CountObjectSize object = new CountObjectSize();

        System.out.println(ClassLayout.parseInstance(object).toPrintable());
    }
}

引入的pom:

 <!--查看对象头工具-->
        <!-- https://mvnrepository.com/artifact/org.openjdk.jol/jol-core -->
        <dependency>
            <groupId>org.openjdk.jol</groupId>
            <artifactId>jol-core</artifactId>
            <version>0.16</version>
        </dependency>

img

总结:通过上面的代码,我们希望代码执行的顺序是:a-b-c;但实际的编译情况是,执行顺序是:a-c-b。

方案

如何禁止指令重排呢?

可以用 Volatile 关键字实现!Volatile中会加一道内存的屏障,这个内存屏障可以保证在这个屏障中的指令顺序。

内存屏障:CPU指令

img

总结
  • volatile可以保证可见性
  • 不能保证原子性
  • 由于内存屏障,可以保证避免指令重排的现象产生

单例模式

单例模式的文章我之前写过,文章如下:

1)、饿汉式

如下饿汉式单例代码:

package org.example;

/**
 * @author linghu
 * @date 2023/12/28 9:41
 * 饿汉式单例
 */
public class Hungry {

    private byte[] data1=new byte[1024*1024];
    private byte[] data2=new byte[1024*1024];
    private byte[] data3=new byte[1024*1024];
    private byte[] data4=new byte[1024*1024];

    //私有化构造器
    public Hungry() {
    }

    private final static Hungry HUNGRY=new Hungry();

    public static Hungry getInstance(){
        return HUNGRY;
    }
}

2)、DCL懒汉式

package org.example;

/**
 * @author linghu
 * @date 2023/12/27 17:23
 * 懒汉式
 */
public class LazyMan {
    //私有构造器
    public LazyMan() {
        System.out.println(Thread.currentThread().getName()+"OK");
    }
    private static LazyMan lazyMan;
    public static LazyMan getInstance(){
        if (lazyMan==null){
            lazyMan=new LazyMan();
        }
        return lazyMan;
    }

    //多线程合并会有隐患!
    public static void main(String[] args) {
        for (int i=0;i<10;i++){
            new Thread(()->{
                lazyMan.getInstance();
            }).start();
        }
    }

}

package org.example;

/**
 * @author linghu
 * @date 2023/12/27 17:23
 * 懒汉式
 */
public class LazyMan {
    //私有构造器
    public LazyMan() {
        System.out.println(Thread.currentThread().getName()+"OK");
    }
    private static LazyMan lazyMan;
    //双重检测锁模式的 懒汉式单例 DCL懒汉式
    public static LazyMan getInstance(){
        if (lazyMan==null){
            synchronized (LazyMan.class){
                if (lazyMan==null){
                    lazyMan=new LazyMan();
                }
            }
        }
        return lazyMan;
    }

    //多线程合并会有隐患!
    public static void main(String[] args) {
        for (int i=0;i<10;i++){
            new Thread(()->{
                lazyMan.getInstance();
            }).start();
        }
    }

}

加volatile可以防止指令重排

private volatile static LazyMan lazyMan;

package org.example;

/**
 * @author linghu
 * @date 2023/12/27 17:23
 * 懒汉式
 */
public class LazyMan {
    //私有构造器
    public LazyMan() {
        System.out.println(Thread.currentThread().getName()+"OK");
    }
    private volatile static LazyMan lazyMan;
    //双重检测锁模式的 懒汉式单例 DCL懒汉式
    public static LazyMan getInstance(){
        if (lazyMan==null){
            synchronized (LazyMan.class){
                if (lazyMan==null){
                    lazyMan=new LazyMan();
                }
            }
        }
        return lazyMan;
    }

    //多线程合并会有隐患!
    public static void main(String[] args) {
        for (int i=0;i<10;i++){
            new Thread(()->{
                lazyMan.getInstance();
            }).start();
        }
    }

}

深入理解CAS

CAS就是比较以前工作内存中的值 和 主存内存中的值,如果这个值是期望的,那么执行操作! 如果不是,就一直循环,使用的是 自旋锁。

package org.example;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author linghu
 * @date 2023/12/29 9:54
 */
public class casDemo {
    //CAS:compareAndSet
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(2020);

        //如果实际值 和 期望值相同,那么就更新
        //如果实际值 和 期望值不同,那么就不更新
        System.out.println(atomicInteger.compareAndSet(2020, 2021));
        System.out.println(atomicInteger.get());

        //因为期望值是2020,实际值却变成了2021 所以会修改失败!
        atomicInteger.getAndIncrement();//++操作
        System.out.println(atomicInteger.compareAndSet(2020, 2021));
        System.out.println(atomicInteger.get());

    }
}

总结:

  • CAS 是CPU的并发原语。

这段代码演示了如何使用原子操作来安全地更新一个整数值,并展示了compareAndSet方法的工作原理。

缺点:

  • 循环会耗时
  • 一次性只能保证一个共享变量的原子性
  • 它会存在ABA问题【解决方案:原子引用】

文章来源:https://blog.csdn.net/weixin_43891901/article/details/135293900
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。