《深入探索Java并发编程》--从锁到并发工具的深入解析
文章目录
JUC是什么?
JUC就是java.util.concurrent
下面的类包,专门用于多线程的开发。
关于锁
传统锁synchronized
如下有三个线程A、B、C线程去争夺ticket资源,sale()
方法前面用synchronized
修饰以后,这个方法的调用对象 ticket
就被上锁了,每个线程去用这个对象调用sale()
方法的时候都会独立运行。如果你不加这个synchronized
就会出现A、B、C线程争夺资源的情况。
在这里,我们说 锁的本质就是:队列。 线程排队去获得CPU执行线程,执行完毕下一个线程上。
package org.example;
/**
* @author linghu
* @date 2023/12/15 15:02
*/
public class Demo01 {
public static void main(String[] args) {
final Ticket ticket=new Ticket();
//启动线程A
new Thread(()->{
for (int i=0;i<40;i++){
ticket.sale();
}
},"A").start();
new Thread(()->{
for (int i=0;i<40;i++){
ticket.sale();
}
},"B").start();
new Thread(()->{
for (int i=0;i<40;i++){
ticket.sale();
}
},"C").start();
}
}
class Ticket{
private int number=30;
//卖票的方式
public synchronized void sale(){
if (number>0){
System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"张票剩余"+
number+"张票");
}
}
}
加了synchronized
运行如下图:
Lock锁
lock三部曲:
- 创建锁
final Ticket ticket=new Ticket();
- 加锁
lock.lock();
- 解锁
lock.unlock();
package org.example;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author linghu
* @date 2023/12/15 15:02
*/
public class Demo01 {
public static void main(String[] args) {
final Ticket ticket=new Ticket();
//启动线程A
new Thread(()->{
for (int i=0;i<40;i++){
ticket.sale();
}
},"A").start();
new Thread(()->{
for (int i=0;i<40;i++){
ticket.sale();
}
},"B").start();
new Thread(()->{
for (int i=0;i<40;i++){
ticket.sale();
}
},"C").start();
}
}
//lock三部曲
class Ticket{
private int number=30;
//1、创建锁
Lock lock=new ReentrantLock();
//卖票的方式
public synchronized void sale(){
//2、开启锁
lock.lock();
try {
if (number>0){
System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"张票剩余"+
number+"张票");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//3、关闭锁
lock.unlock();
}
}
}
上面的代码中,被lock三部曲锁上的代码,只能由一个线程执行。
synchronized和Lock的区别
生产者和消费者问题
synchronized版生产者和消费者问题
package org.example;
/**
* @author linghu
* @date 2023/12/16 16:45
* A num+1
* B num-1
* 顺序:判断->业务->通知
*/
public class A {
public static void main(String[] args) {
Data data = new Data();
//A:num+1
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
//B:num-1
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
class Data{
private int number=0;
//+1
public synchronized void increment() throws InterruptedException {
if (number!=0){
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
//通知其他线程,我-1完毕了
this.notify();
}
//-1
public synchronized void decrement() throws InterruptedException {
if (number==0){
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
//通知其他线程,我-1完毕了
this.notify();
}
}
注:如果有四个线程,就会出现假唤醒问题。
我们添加如下四个线程A、B、C、D:
package org.example;
/**
* @author linghu
* @date 2023/12/16 16:45
* A num+1
* B num-1
* 顺序:判断->业务->通知
*/
public class A {
public static void main(String[] args) {
Data data = new Data();
//A:num+1
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
//B:num-1
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
class Data{
private int number=0;
//+1
public synchronized void increment() throws InterruptedException {
if (number!=0){
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
//通知其他线程,我-1完毕了
this.notify();
}
//-1
public synchronized void decrement() throws InterruptedException {
if (number==0){
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
//通知其他线程,我-1完毕了
this.notify();
}
}
那么什么是虚假唤醒呢?
多线程环境下,有多个线程执行了wait()方法,需要其他线程执行notify()或者notifyAll()方法去唤醒它们,假如多个线程都被唤醒了,但是只有其中一部分是有用的唤醒操作,其余的唤醒都是无用功;对于不应该被唤醒的线程而言,便是虚假唤醒。
比如:仓库有货了才能出库,突然仓库入库了一个货品;这时所有的线程(货车)都被唤醒,来执行出库操作;实际上只有一个线程(货车)能执行出库操作,其他线程都是虚假唤醒。
为了防止虚假唤醒,我们在这里采用将if换为while:
package org.example;
/**
* @author linghu
* @date 2023/12/16 16:45
* A num+1
* B num-1
* 顺序:判断->业务->通知
*/
public class A {
public static void main(String[] args) {
Data data = new Data();
//A:num+1
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
//B:num-1
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
class Data{
private int number=0;
//+1
public synchronized void increment() throws InterruptedException {
while (number!=0){
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
//通知其他线程,我-1完毕了
this.notify();
}
//-1
public synchronized void decrement() throws InterruptedException {
while(number==0){
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
//通知其他线程,我-1完毕了
this.notify();
}
}
JUC版的生产者消费者问题
package org.example;
/**
* @author linghu
* @date 2023/12/16 16:45
* A num+1
* B num-1
* 顺序:判断->业务->通知
*/
public class B {
public static void main(String[] args) {
Data2 data = new Data2();
}
}
class Data2{
private int number=0;
//+1
public void increment() throws InterruptedException {
while (number!=0){
//等待
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
}
//-1
public void decrement() throws InterruptedException {
while (number==0){
//等待
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
}
}
我们用传统三剑客写的生产者消费者,现在把它删掉,准备用上面的代码进行改进,改成JUC版本的生产者消费者。
JUC的代码如下:
package org.example;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author linghu
* @date 2023/12/16 16:45
* A num+1
* B num-1
* 顺序:判断->业务->通知
*/
public class B {
public static void main(String[] args) {
Data2 data2 = new Data2();
//A:num+1
new Thread(()->{
for (int i=0;i<10;i++){
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
//B:num-1
new Thread(()->{
for (int i=0;i<10;i++){
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
class Data2{
private int number=0;
Lock lock=new ReentrantLock();
Condition condition= lock.newCondition();
//+1
public void increment() throws InterruptedException {
lock.lock();//上锁操作
try {
while (number!=0){
//等待
condition.await();
}
//业务代码...
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
condition.signalAll();//通知其他线程,我+1完毕
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();//解锁操作
}
}
//-1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (number==0){
//等待
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
执行结果如下:
我们现在希望按照:A->B->C->D
的顺序去进行执行,精准通知唤醒我们的线程!
如下代码中,我们实现了A->B->C->D
的顺序,主要是靠 Condition
监控器,我们设置了三个监控器:
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();
唤醒方式如下:
public void weakA()
{
lock.lock();
try {
while(num != 1)
{
conditionA.await();
}
//业务代码...
conditionB.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void weakB()
{
lock.lock();
try {
while(num != 2)
{
conditionB.await();
}
//业务代码...
conditionC.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void weakC()
{
lock.lock();
try {
while(num != 3)
{
conditionC.await();
}
//业务代码...
conditionA.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
如下为完整代码:
package org.example;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author linghu
* @date 2023/12/18 14:44
*/
class Aweaken
{
ReentrantLock lock = new ReentrantLock();
int num = 1;
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();
public void weakA()
{
lock.lock();
try {
while(num != 1)
{
conditionA.await();
}
num = 2;
System.out.println("现在是线程 "+Thread.currentThread().getName()+", 下一个应该是线程B");
conditionB.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void weakB()
{
lock.lock();
try {
while(num != 2)
{
conditionB.await();
}
num = 3;
System.out.println("现在是线程 "+Thread.currentThread().getName()+", 下一个应该是线程C");
conditionC.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void weakC()
{
lock.lock();
try {
while(num != 3)
{
conditionC.await();
}
num = 1;
System.out.println("现在是线程 "+Thread.currentThread().getName()+", 下一个应该是线程A");
conditionA.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class C {
public static void main(String[] args) {
Aweaken b = new Aweaken();
new Thread(()->{
for (int i = 0; i < 10; i++) {
b.weakA();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
b.weakB();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
b.weakC();
}
},"C").start();
}
}
执行效果:
8锁现象
在Java并发编程中,锁是一种关键的同步机制,用于控制多个线程对共享资源的访问。然而,在某些情况下,使用锁可能会引发性能问题,其中一个典型例子就是JUC中的8锁现象,也被称为锁粗化。
那何为8锁呢?
8锁现象是一种并发编程中的现象,主要涉及到Java中的synchronized
关键字。这个术语来源于对锁的八个问题的探讨,每个问题都围绕着锁的不同行为和效果展开。
- 标准情况下,两个线程先打印“发短信”还是“打电话”?
- “打电话”方法暂停4秒钟,两个线程先打印“发短信”还是“打电话”?
- 两个普通的锁方法,new一个对象调用,调用过程中间睡1秒,执行结果是什么?
- 两个普通的锁方法,new两个对象调用,调用过程中间睡1秒,执行结果是什么?
- 两个普通的锁方法,一个对象调用,一个类调用,调用过程中间睡1秒,执行结果是什么?
- 两个普通的锁方法,一个对象调用,一个类调用,但是类上加了synchronized关键字,调用过程中间睡1秒,执行结果是什么?
- 两个普通的锁方法,一个对象调用,一个类调用,但是类上加了synchronized关键字和方法上加了synchronized关键字,调用过程中间睡1秒,执行结果是什么?
- 两个普通的锁方法,一个对象调用,一个类调用,但是类上加了synchronized关键字和方法上加了synchronized关键字,但是方法上加了final关键字,调用过程中间睡1秒,执行结果是什么?
我们将上面的8个问题进行抽象:
案例一(先打印“发短信”)
- 标准情况下,两个线程先打印 发短信还是 打电话? 1/发短信 2/打电话。
- sendSms延迟4秒,两个线程先打印 发短信还是 打电话? 1/发短信 2/打电话。
public class Test1 {
public static void main(String[] args) {
Phone phone = new Phone();
//锁的存在
new Thread(()->{
phone.sendSms();
},"A").start();
// 捕获
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
phone.call();
},"B").start();
}
}
class Phone{
// synchronized 锁的对象是方法的调用者!、
// 两个方法用的是同一个锁,谁先拿到谁执行!
public synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
运行效果:
💧案例一总结:synchronized 锁的对象是方法的调用者,两个方法用的是同一个锁,谁先拿到谁执行。
案例二(先打印“打电话”)
- 增加了一个普通方法后!先执行发短信还是Hello? 普通方法。
- 两个对象,两个同步方法, 发短信还是 打电话? // 打电话
package org.example;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
// 两个对象,两个调用者,两把锁!
Phone2 phone1 = new Phone2();
Phone2 phone2 = new Phone2();
//锁的存在
new Thread(()->{
phone1.sendSms();
},"A").start();
// 捕获
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
phone2.call();
},"B").start();
}
}
class Phone2{
// synchronized 锁的对象是方法的调用者!
public synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
// 这里没有锁!不是同步方法,不受锁的影响
public void hello(){
System.out.println("hello");
}
}
运行效果:
💧案例二总结:synchronized 锁的对象是方法的调用者,hello() 没有锁!不是同步方法,所以不受锁的影响。
案例三(先打印“发短信”)
增加两个静态的同步方法,只有一个对象,先打印 发短信?打电话?
两个对象!增加两个静态的同步方法, 先打印 发短信?打电话?
package org.example;
import java.util.concurrent.TimeUnit;
public class Test3 {
public static void main(String[] args) {
// 两个对象的Class类模板只有一个,static,锁的是Class
Phone3 phone1 = new Phone3();
Phone3 phone2 = new Phone3();
//锁的存在
new Thread(()->{
phone1.sendSms();
},"A").start();
// 捕获
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
phone2.call();
},"B").start();
}
}
// Phone3唯一的一个 Class 对象
class Phone3{
// synchronized 锁的对象是方法的调用者!
public static synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call(){
System.out.println("打电话");
}
}
运行效果:
💧案例三总结:两个对象的Class类模板只有一个。static 静态方法在类一加载就有了!锁的是Class。
案例四(先打印“打电话”)
1个静态的同步方法,1个普通的同步方法 ,一个对象,先打印 发短信?打电话?
1个静态的同步方法,1个普通的同步方法 ,两个对象,先打印 发短信?打电话?
package org.example;
import java.util.concurrent.TimeUnit;
public class Test4 {
public static void main(String[] args) {
// 两个对象的Class类模板只有一个,static,锁的是Class
Phone4 phone1 = new Phone4();
Phone4 phone2 = new Phone4();
//锁的存在
new Thread(()->{
phone1.sendSms();
},"A").start();
// 捕获
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
phone2.call();
},"B").start();
}
}
// Phone3唯一的一个 Class 对象
class Phone4{
// 静态的同步方法 锁的是 Class 类模板
public static synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
// 普通的同步方法 锁的调用者
public synchronized void call(){
System.out.println("打电话");
}
}
运行效果:
💧案例四总结:静态的同步方法 锁的是 Class 类模板,普通的同步方法 锁的调用者。
new this 具体的一个手机
static Class 唯一的一个模板
集合不安全
所谓集合不安全,就是指:在高并发的情况下,创建线程使用集合会报异常!
我们看一下常用集合线程安全问题:
List
不安全ArrayList
不安全Set
不安全Map
不安全
List集合不安全
我们通过如下代码,循环30次创建线程,在线程中用到List集合,我还在业务代码那里 try-catch
一下,捕捉出现的异常。
/**
* @author linghu
* @date 2023/12/19 11:38
*/
public class ListTest {
public static void main(String[] args) {
List<Object> arrayList = new ArrayList<>();
for (int i=1;i<=30;i++){
try {
new Thread(()->{
arrayList.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(arrayList);
},String.valueOf(i)).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
代码运行结果:
**结论:**List集合和ArrayList集合在多线程中使用不安全
List集合不安全的解决方案
我们可以采用 COW
的思想解决这个不安全问题。所谓 COW
是指 CopyOnWriteArrayList
:写入时复制!是计算机程序领域的一种优化策略。
解决方案:
new Vector<>()
new CopyOnWriteArrayList<>()
代码如下:
/**
* @author linghu
* @date 2023/12/19 11:38
*/
public class ListTest {
public static void main(String[] args) {
/**
* 解决方案
* 1、List<String> list = new Vector<>();
* 2、List<String> list = new CopyOnWriteArrayList<>();
*/
// List<Object> arrayList = new ArrayList<>();
List<String> list = new CopyOnWriteArrayList<>();
for (int i=1;i<=30;i++){
try {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
运行结果:
结论:
CopyOnWriteArrayList
:写入时复制!能解决集合的线程安全问题!
总结
Vector
底层使用的是synchronized
来实现的,所以效率特别低下。CopyOnWriteArrayList
使用的是Lock
锁,效率会更加高效。
Set集合不安全
Set和List同理可得:多线程情况下,普通的Set集合是线程不安全的。
如下代码,我在循环30次中执行线程,创建set集合:
package org.example;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @author linghu
* @date 2023/12/19 14:27
*/
public class SetTest {
public static void main(String[] args) {
/**
* 解决方案
*/
Set<String> set = new HashSet<>();
for (int i=1;i<=30;i++){
try {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
运行效果:
List集合不安全的解决方案
解决方案如下:
CopyOnWriteArraySet<>()
HashSet<>()
我们还是用了类似的原理:写入时复制!
代码如下:
package org.example;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @author linghu
* @date 2023/12/19 14:27
*/
public class SetTest {
public static void main(String[] args) {
/**
* 解决方案
* 1、Set<String> set = new CopyOnWriteArraySet<>();
*/
// Set<String> set = new HashSet<>();
Set<String> set = new CopyOnWriteArraySet<>();
for (int i=1;i<=30;i++){
try {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
结论:
CopyOnWriteArraySet
:写入时复制!能解决集合的线程安全问题!
总结
HashSet的底层是HashMap
CopyOnWriteArraySet
:写入时复制!能解决集合的线程安全问题!
Callable
Callable的出现主要是为了弥补继承Thread或实现Runnable接口的线程执行完成后,无法获得线程执行后的结果。
Callable其实上手是非常简单容易得!
我们看通过Callable创建线程的方式如下:
class MyThread implements Callable<String>{
//这里的泛型返回类型和call方法的返回类型是一样的
@Override
public String call() throws Exception {
return "hi,call";
}
}
上面,线程 MyThread
通过继承接口Callable<String>
,这里的String其实是泛型。如下的call
方法的类型和这个泛型相同。
如下代码,我们通过接口 Callable<String>
创建了A、B线程执行 call
方法。
package org.example;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* @author linghu
* @date 2023/12/19 16:42
*/
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//?怎么启动Callable
MyThread myThread = new MyThread();
//适配类
FutureTask futureTask = new FutureTask(myThread);
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();//结果被缓存,效率高,结果只打印一次
//获取Callable的返回结果
String o = (String)futureTask.get();
System.out.println(o);
}
}
class MyThread implements Callable<String>{
//这里的泛型返回类型和call方法的返回类型是一样的
@Override
public String call() throws Exception {
return "hi,call";
}
}
执行结果:
高并发常用-辅助类
在高并发场景中,常用辅助类如下:
CountDownLatch
CyclickBarrier
Semaphore
CountDownLatch
其实CountDownLatch
的本质就是一个减法计数器。
比如我们去游乐园坐激流勇进,有的时候游乐园里人不是那么多,这时,管理员会让你稍等一下,等人坐满了再开船,这样的话可以在一定程度上节约游乐园的成本。座位有多少,就需要等多少人,这就是 CountDownLatch 的核心思想,等到一个设定的数值达到之后,才能出发。
上面这幅图就很好理解了。可以看到,最开始 CountDownLatch
设置的初始值为 3,然后 T0 线程上来就调用 await
方法,它的作用是让这个线程开始等待,等待后面的 T1、T2、T3,它们每一次调用 countDown 方法,3 这个数值就会减 1,也就是从 3 减到 2,从 2 减到 1,从 1 减到 0,一旦减到 0 之后,这个 T0 就相当于达到了自己触发继续运行的条件,于是它就恢复运行了。
我们可以限制6个线程,等待他们都执行完毕,代码如下:
package org.example;
import java.util.concurrent.CountDownLatch;
/**
* @author linghu
* @date 2023/12/20 9:44
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//总数是6个线程
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i=1;i<=6;i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"Go out");
countDownLatch.countDown();//数量-1,本质是阻塞
},String.valueOf(i)).start();
}
countDownLatch.await();//等待计数器归0,然后往下执行~
System.out.println("Close Door");
}
}
总结
- countDown()是减一操作
- await是等待计数器归0操作
CyclickBarrier
其实CyclickBarrier
的本质就是一个加法计数器。
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
package org.example;
import java.util.concurrent.CyclicBarrier;
/**
* @author linghu
* @date 2023/12/20 10:32
*/
public class CyclickBarrierDemo {
public static void main(String[] args) {
/**
* 集齐7颗龙珠召唤神龙
*/
//召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙~");
});
for (int i=1;i<=7;i++){
int temp=i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"搜集"+temp+"颗龙珠");
try {
cyclicBarrier.await();//等待计数器到7,然后往下执行~
} catch (Exception e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
如上代码运行结果如下:
总结
cyclicBarrier.await()
是 Java 中的一个方法,用于让当前线程等待其他线程到达屏障(CyclicBarrier)时再继续执行。当所有线程都调用了await()
方法后,屏障才会打开,允许所有线程继续执行。这个方法通常用在多线程编程中,以确保所有线程都达到某个同步点后再继续执行。
Semaphore
在操作系统中我们学过信号量,学过PV操作。其实这里的 Semaphore
信号量就是用来做线程限流操作的。
我们看如下代码:
package org.example;
import sun.awt.windows.ThemeReader;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @author linghu
* @date 2023/12/20 10:58
*/
public class SemaphoreDemo {
public static void main(String[] args) {
//线程数量:停车位!用来限流
Semaphore semaphore = new Semaphore(3);
for (int i=1;i<=6;i++){
new Thread(()->{
try {
semaphore.acquire();//得到
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(2);//让车多停一会
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();//释放
}
},String.valueOf(i)).start();
}
}
}
在上面代码中,我们对6个线程-车进行限流,只有3个车位,6个车去抢车位!
总结
semaphore.acquire()
;//得到,等待释放为止semaphore.release()
;//释放信号量以后会唤醒等待中的线程
关于PV操作,其实可以看我这个视频课程:
读写锁
所谓的读写锁(Readers-Writer Lock),顾名思义就是将一个锁拆分为读锁和写锁两个锁。其中读锁允许多个线程同时获得,而写锁则是互斥锁,不允许多个线程同时获得写锁,并且写操作和读操作也是互斥的。
为什么要读写锁?
Synchronized 和 ReentrantLock 都是独占锁,即在同一时刻只有一个线程获取到锁。然而在有些业务场景中,我们大多在读取数据,很少写入数据,这种情况下,如果仍使用独占锁,效率将及其低下。针对这种情况,Java提供了读写锁——ReentrantReadWriteLock。
主要解决:对共享资源有读和写的操作,且写操作没有读操作那么频繁的场景。
案例
如下分别读写线程6次执行:
package org.example;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author linghu
* @date 2023/12/20 11:50
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache2 myCache2 = new MyCache2();
int num=6;
for (int i=1;i<=6;i++){
int finall=i;
//6个线程开始写
new Thread(()->{
myCache2.write(String.valueOf(finall),String.valueOf(finall));
},String.valueOf(i)).start();
//6个线程开始读
new Thread(()->{
myCache2.read(String.valueOf(finall));
},String.valueOf(i)).start();
}
}
}
class MyCache2{
private volatile Map<String, String> map=new HashMap<>();
private ReadWriteLock lock=new ReentrantReadWriteLock();
public void write(String key,String value){
lock.writeLock().lock();//写锁
try {
System.out.println(Thread.currentThread().getName()+"线程开始写入");
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"线程写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();//释放写锁
}
}
public void read(String key){
lock.readLock().lock();//读锁
try {
System.out.println(Thread.currentThread().getName()+"线程开始读取");
map.get(key);
System.out.println(Thread.currentThread().getName()+"线程读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();//释放读锁
}
}
}
运行效果:
总结
如果我们不用锁,多线程的读写会造成数据不可靠的问题。我们可以采用独占锁:Synchronized 和 ReentrantLock保证数据可靠。但是使用更细粒度的锁读写锁(Readers-Writer Lock)效率更高!
BlockQueue
BlockQueue
是 Collection
的一个子类,我们把它叫做:阻塞队列!整个队列的家族是下面这幅图这个样子的:
通过上图我们知道了我们重点要学的是BlockQueue
!
BlockingQueue
主要提供了四类方法,如下表所示:
同步队列
同步队列没有容量,也可以视为容量为1的队列。
- 进去一个元素必须等待取出来以后才能往里面放元素
- put了一个元素,就必须从里面先take出来,否则不能再put进去值!
代码
如下通过一个代码案例进行说明:
package org.example;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
/**
* @author linghu
* @date 2023/12/21 15:16
*/
public class SynchronousQueue {
public static void main(String[] args) {
BlockingQueue<String> synchronousQueue=
new java.util.concurrent.SynchronousQueue<>();
//往queue中添加元素
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 01");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName()+"put 02");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName()+"put 03");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
//取出元素
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take());
System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take());
System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
总结:上述代码中,三个线程负责往队列里put元素,但是只能等到对列的元素被take的时候才能往里面put!
执行结果如下:
上图得知总结:01,02在put以后只有take以后才能put03!说明了队列的同步性!
线程池
线程池用到了池化技术,其实在编程中,很多地方都有池化技术的思想,比如数据库连接池,HttpClient 连接池等。池化技术的出现主要是为了解决:资源的利用问题!提高效率,对资源进行复用!
这里我们用了线程池,目的就是 复用线程!。线程复用才可以控制最大并发数,管理线程!
线程池学习原则总结为:
- 三大方式
- 七大参数
- 四种拒绝策略
线程池的三大方法
Executors.newSingleThreadExecutor()
单个线程的创建Executors.newFixedThreadPool(5)
创建一个固定大小的线程池Executors.newCachedThreadPool()
可伸缩的线程池创建
package org.example;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author linghu
* @date 2023/12/21 16:35
* Executors工具类,3大方法
*/
public class Demo {
public static void main(String[] args) {
//单个线程池,线程
ExecutorService threadPool = Executors.newSingleThreadExecutor();
//固定的线程池大小
// ExecutorService threadPool = Executors.newFixedThreadPool(5);
//可伸缩的线程池大小
// ExecutorService threadPool = Executors.newCachedThreadPool();
try {
for (int i=0;i<10;i++){
//使用线程池创建线程,以前用new Thread()来创建
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"OK");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadPool.shutdown();
}
}
}
运行结果:
七大参数
这里说到七大参数是指我们在自定义线程池的时候用到的七大参数!
七大参数如下:
- 核心线程数(corePoolSize):2
- 最大线程数(maximumPoolSize):5
- 空闲线程存活时间(keepAliveTime):3秒
- 时间单位(unit):TimeUnit.SECONDS
- 任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3
- 线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory())
- 拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。
接下来我们通过一个银行等待业务的例子讲清楚上面的七大参数。
如上图所示,空闲线程存活时间(keepAliveTime)表示在候客区4、5窗口没人的情况下,3秒以后,这个窗口就会被释放掉,不能让它一直空闲下去,这样非常浪费资源!
接下来就是手动创建一个线程池,代码如下:
//自定义线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2,//核心线程数(corePoolSize):2
5,//最大线程数(maximumPoolSize):5
3,//空闲线程存活时间(keepAliveTime):3秒
TimeUnit.SECONDS,//时间单位(unit):TimeUnit.SECONDS
new LinkedBlockingDeque<>(3),//任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3
Executors.defaultThreadFactory(),//线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory())
new ThreadPoolExecutor.AbortPolicy()//拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。
);
在上述代码的拒绝策略中,我们需要知道拒绝策略有四种:
new ThreadPoolExecutor.AbortPolicy()
;//银行满了,还有人进来,不处理这个人,抛出异常new ThreadPoolExecutor.CallerRunsPolicy()
;//哪里来的去哪里new ThreadPoolExecutor.DiscardPolicy()
;//队列满了,丢掉任务,不会抛出异常new ThreadPoolExecutor.DiscardOldestPolicy()
;//队列满了,尝试去和最早的竞争,也不会抛出异常
package org.example;
import java.util.concurrent.*;
/**
* @author linghu
* @date 2023/12/22 10:36
* 拒绝策略:
* new ThreadPoolExecutor.AbortPolicy();//银行满了,还有人进来,不处理这个人,抛出异常
* new ThreadPoolExecutor.CallerRunsPolicy();//哪里来的去哪里
* new ThreadPoolExecutor.DiscardPolicy();//队列满了,丢掉任务,不会抛出异常
* new ThreadPoolExecutor.DiscardOldestPolicy();//队列满了,尝试去和最早的竞争,也不会抛出异常
*/
public class Demo02 {
public static void main(String[] args) {
//自定义线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
for (int i=1;i<=8;i++){
//使用自己定义的线程池创建了线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" OK");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭线程池
threadPool.shutdown();
}
}
}
我们在如上代码中,创建了线程池,通过线程池创建线程,通过循环创建了8个任务,每个任务打印当前线程的名称和"OK"。这些任务被提交到自定义的线程池中执行。最后,在finally块中关闭线程池。
从上述运行结果来看,线程池中的核心线程就是1和3。最大线程数为5,空闲线程存活时间为3秒,任务队列容量为3,使用默认的线程工厂创建线程,拒绝策略为AbortPolicy
如何设置线程池的大小?
设置线程池的小主要是做调优的工作!
//自定义线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2,//核心线程数(corePoolSize):2
5,//最大线程数(maximumPoolSize):5
3,//空闲线程存活时间(keepAliveTime):3秒
TimeUnit.SECONDS,//时间单位(unit):TimeUnit.SECONDS
new LinkedBlockingDeque<>(3),//任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3
Executors.defaultThreadFactory(),//线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory())
new ThreadPoolExecutor.AbortPolicy()//拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。
);
上述代码中,我们需要知道最大线程数(maximumPoolSize)应该怎么设置?
设置原则为:
- CPU密集型:电脑的核数是几核,这里就写多少
- I/O密集型:判断程序中消耗IO线程的数量,然后乘以1倍到2倍即可!
CPU密集型
电脑核数的查看方式有两种:
- 任务管理器
Runtime.getRuntime().availableProcessors()
任务管理器
任务管理器CPU的查看如下:
Runtime.getRuntime().availableProcessors()
其实这个方法的调用如下:
public class Main {
public static void main(String[] args) {
int coreCount = Runtime.getRuntime().availableProcessors();
System.out.println("电脑的核数为: " + coreCount);
}
}
I/O密集型
这里我们需要知道我们的IO任务是多少,然后大于这个任务1倍~2倍即可。
四大函数接口
顾明思议:函数接口就是只有一个方法的接口,如下面这个接口:
总结:这种函数式接口用的非常多,特别是在很多框架中。
四大函数接口是Java中用于处理不同类型数据的方法,它们分别是:
Consumer
:接收一个输入参数并对其执行某种操作,但不返回任何结果。Function
:接收一个输入参数并返回一个结果。Predicate
:接收一个输入参数并返回一个布尔值,表示该参数是否满足某个条件。Supplier
:不接收任何参数,但返回一个结果。
Function函数型接口
Function函数型接口就是有一个输入参数,有一个输出参数。
import java.util.function.Function;
/**
* @author linghu
* @date ${DATE} ${TIME}
*/
public class Main {
public static void main(String[] args) {
//输出输入的值
// Function function=new Function<String,String>(){
// @Override
// public String apply(String str) {
// return str;
// }
// };
//只要是函数型接口,就可以用lambda表达式简化
Function function1=(str)->{
return str;
};
System.out.println(function1.apply("123"));
}
}
predicate断定型接口
predicate断定型接口就是有一个输入参数,返回值只能是布尔值。
import java.util.function.Predicate;
/**
* @author linghu
* @date 2023/12/22 17:01
*/
public class Demo02 {
public static void main(String[] args) {
// Predicate<String> predicate = new Predicate<>() {
// @Override
// public boolean test(String s) {
// return s.isEmpty();
// }
// };
Predicate<String> predicate2 =(str)->{
return str.isEmpty();
};
System.out.println(predicate2.test("addd"));
}
}
Suppier供给型接口
Suppier供给型接口是没有参数,只有返回值
import java.util.function.Supplier;
/**
* @author linghu
* @date 2023/12/22 17:11
*/
public class Demo03 {
public static void main(String[] args) {
Supplier supplier = new Supplier<>() {
@Override
public Integer get() {
return 1024;
}
};
Supplier supplier2 =()->{
return 1024;
};
System.out.println(supplier2.get());
}
}
Consumer消费性接口
Consumer消费性接口只有输入,没有返回值。
import java.util.function.Consumer;
/**
* @author linghu
* @date 2023/12/22 17:15
*/
public class Demo04 {
public static void main(String[] args) {
Consumer<String> consumer = new Consumer<>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
Consumer<String> consumer2 =(str)->{
System.out.println(str);
};
consumer2.accept("hi");
}
}
Stream流式计算
Stream 就好像一个高级的迭代器,但只能遍历一次,就好像一江春水向东流;在流的过程中,对流中的元素执行一些操作,比如“过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等。
流的操作可以分为两种类型:
1)中间操作,可以有多个,每次返回一个新的流,可进行链式操作。
2)终端操作,只能有一个,每次执行完,这个流也就用光光了,无法执行下一个操作,因此只能放在最后。
@NoArgsConstructor
public class User {
private int id;
private String name;
private int age;
public User(int i, String a, int i1) {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
package org.example;
import java.util.Arrays;
import java.util.List;
/**
* 题目要求:
* 1、ID必须是偶数
* 2、年龄必须大于23岁
* 3、用户名转为大写字母
* 4、用户名字母倒着排序
* 5、只输出一个用户名
*/
public class Test {
public static void main(String[] args) {
User u1 = new User(1, "a", 21);
User u2 = new User(2, "b", 22);
User u3 = new User(3, "c", 23);
User u4 = new User(4, "d", 24);
User u5 = new User(5, "e", 25);
//集合是存储
List<User> list= Arrays.asList(u1,u2,u3,u4,u5);
//计算交给Stream流
list.stream()//下面相当于写条件
.filter(u->{
return u.getId()%2==0;
})
.filter(u->{
return u.getAge()>23;
})
.map(u->{
return u.getName().toUpperCase();
})
.sorted((o1,o2)->{
return o2.compareTo(o1);
})
.limit(1)
.forEach(System.out::println);
}
}
ForkJoin
? 在JDK中,提供了这样一种功能:它能够将复杂的逻辑拆分成一个个简单的逻辑来并行执行,待每个并行执行的逻辑执行完成后,再将各个结果进行汇总,得出最终的结果数据。有点像Hadoop中的MapReduce。
? ForkJoin是由JDK1.7之后提供的多线程并发处理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进行汇总。相应的,ForkJoin将复杂的计算当做一个任务,而分解的多个计算则是当做一个个子任务来并行执行。
package org.example;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
/**
* @author linghu
* @date 2023/12/25 11:18
*/
public class ForkJoinTest {
private static final long SUM=20_0000_0000;
public static void main(String[] args) throws ExecutionException, InterruptedException {
test01();
test02();
test03();
}
//使用普通方法
public static void test01(){
long start=System.currentTimeMillis();
long sum=0L;
for (int i=1;i<SUM;i++){
sum+=i;
}
long end=System.currentTimeMillis();
System.out.println(sum);
System.out.println("时间:"+(end-start));
System.out.println("============================");
}
//使用ForkJoin方法
public static void test02() throws ExecutionException, InterruptedException {
long start=System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L, SUM);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long aLong = submit.get();
System.out.println(aLong);
Long end=System.currentTimeMillis();
System.out.println("时间:"+(end-start));
System.out.println("==========================");
}
//使用Stream流计算
public static void test03(){
long start=System.currentTimeMillis();
long sum= LongStream.range(0L,2000000000L).parallel().reduce(0,Long::sum);
System.out.println(sum);
long end=System.currentTimeMillis();
System.out.println("时间:"+(end-start));
System.out.println("==========================");
}
}
package org.example;
import java.util.Locale;
import java.util.concurrent.RecursiveTask;
/**
* @author linghu
* @date 2023/12/25 10:59
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private long star;
private long end;
//临界值
private long temp=100000L;
public ForkJoinDemo(long star, long end) {
this.star = star;
this.end = end;
}
//计算方法
@Override
protected Long compute() {
if ((end-star)<temp){
Long sum=0L;
for (Long i=star;i<end;i++){
sum+=i;
}
return sum;
}else {
//使用ForkJoin分而治之计算
//1、计算平均值
long middle=(star+end)/2;
ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(star, middle);
//拆分任务,把线程压入线程队列
forkJoinDemo1.fork();
ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle,end);
forkJoinDemo2.fork();
Long taskSum=forkJoinDemo1.join()+forkJoinDemo2.join();
return taskSum;
}
}
}
ForkJoin特点:工作窃取
工作窃取
实现原理是:双端队列!
异步回调
Java中异步回调执行和前端的 Ajax
其实是一样的。但是在Java中的话用的是 CompletableFuture
。
回调情况分为:
- 没有返回值的
runAsync
异步回调 - 有返回值的异步回调
supplyAsync
没有返回值的runAsync异步回调
对于异步回调,其实就是三个过程:
- 异步执行
- 成功回调
- 失败回调
代码如下:
package org.example;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @author linghu
* @date 2023/12/25 16:11
* desc: 异步调用:CompleteableFuture
* //异步执行
* //成功回调
* //失败回调
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//发起一个请求void
//没有返回值runAsync异步回调
CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"runAsync=>void");
});
System.out.println("1111111111111");
completableFuture.get();//获取执行结果
}
}
总结:上面会先发起一个请求,请求休眠,同时打印语句,打印完毕以后回调get这个执行结果
有返回值的异步回调supplyAsync
whenComplete((t, u)
有两个参数,一个是t,一个是u:
- T是正常返回的结果
- U是抛出异常的错误信息
如果发生了异常,get可以获取 exceptionally((e)
返回的错误信息。
package org.example;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @author linghu
* @date 2023/12/25 16:11
* desc: 异步调用:CompleteableFuture
* //异步执行
* //成功回调
* //失败回调
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
int i=10/0;
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u) -> {
System.out.println("t=>" + t);
System.out.println("u=>" + u);
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 233;//获取错误的返回结果
}).get());
}
}
如上返回的结果:
JMM
JMM是一个概念,不是真实存在的,用来描述Java内存模型。
JMM是一种规范,目的是解决由于多线程通过共享内存进行通信时,存在的本地内存数据不一致、编译器会对代码指令重排序、处理器会对代码乱序执行等带来的问题。
关于JMM的一些同步约定
通过上图总结:
- 线程解锁前,必须把共享变量立刻撤回主存;
- 线程加锁前,必须读取主存中的最新值到工作内存中
- 加锁和解锁是同一把锁
- 线程中分为 工作内存和主内存
现在如果有两个线程都在对主存进行操作:
通过上图发现,当我们的线程B修改了主存值,并且读取到了工作内存,这个时候线程A不知道,怎么办呢?
于是引入了 volatile
。
volatile
定义
volatile
是Java提供的一种轻量级的同步机制。- Java 语言包含两种内在的同步机制:同步块(或方法)和
volatile
变量。相比于synchronized
(synchronized
通常称为重量级锁),volatile
更轻量级,因为它不会引起线程上下文的切换和调度。但是volatile
变量的同步性较差(有时它更简单并且开销更低),而且其使用也更容易出错。
1)、可见性
当多个线程访问同一个变量时,一个线程修改了这个变量的值,另外一个线程是可以看到修改的值。
package org.example;
import java.util.concurrent.TimeUnit;
/**
* @author linghu
* @date 2023/12/26 16:12
*/
public class JMMDemo01 {
//如果不加volatile程序会死循环
//加了volatile是可以保证可见性的
// private static Integer number=0;
private volatile static Integer number=0;
public static void main(String[] args) {
//main线程
//子线程1
new Thread(()->{
while (number==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//子线程2
new Thread(()->{
while (number==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
number=1;
System.out.println(number);
}
}
注:synchronized和lock都能保证可见性。
2)、不保证原子性
线程A在执行任务的时候,不能被打扰的,也不能被分割的,要么同时成功,要么同时失败。
package org.example;
/**
* @author linghu
* @date 2023/12/26 16:44
*/
public class VDemo02 {
private static volatile int number=0;
public synchronized static void add(){
number++;
}
public static void main(String[] args) {
for (int i=1;i<=20;i++){
new Thread(()->{
for (int j=1;j<=1000;j++){
add();
}
}).start();
}
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+",num="+number);
}
}
总结:模拟多线程环境下对共享变量的操作,并展示线程之间的竞争和协作。
使用Thread.yield()
方法让出CPU资源给其他线程
在这个地方,如果不加lock和synchronized,怎么保证原子性?
理论上,num的值应该为:2万。
package org.example;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author linghu
* @date 2023/12/26 16:44
*/
public class VDemo02 {
// private static volatile AtomicInteger number=new AtomicInteger();
private static volatile int number=0;
public static void add(){
number++;
// number.incrementAndGet();//底层是CAS保证的原子性,加1操作
}
public static void main(String[] args) {
for (int i=1;i<=20;i++){
new Thread(()->{
for (int j=1;j<=1000;j++){
add();
}
}).start();
}
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+",num="+number);
}
}
解决方案
可以用原子类解决问题,比锁的效率高很多!
package org.example;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author linghu
* @date 2023/12/26 16:44
*/
public class VDemo02 {
private static volatile AtomicInteger number=new AtomicInteger();
// private static volatile int number=0;
public static void add(){
// number++;
number.incrementAndGet();//底层是CAS保证的原子性,加1操作
}
public static void main(String[] args) {
for (int i=1;i<=20;i++){
new Thread(()->{
for (int j=1;j<=1000;j++){
add();
}
}).start();
}
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+",num="+number);
}
}
3)、禁止指令重排
计算机并不是按照我们自己写的那样去执行的。
指令重排一般分为以下三种
编译器优化
重新安排语句的执行顺序指令并行重排
利用指令级并行技术将多个指令并行执行,如果指令之前没有数据依赖,处理器可以改变对应机器指令的执行顺序内存系统重排
由于处理使用缓存和读写缓冲区,所以它们是乱序的
package org.example;
import org.openjdk.jol.info.ClassLayout;
/**
* @author linghu
* @date 2023/12/27 14:25
*/
public class CountObjectSize {
int a=10;
int b=20;
double c=30.0;
public static void main(String[] args) {
CountObjectSize object = new CountObjectSize();
System.out.println(ClassLayout.parseInstance(object).toPrintable());
}
}
引入的pom:
<!--查看对象头工具-->
<!-- https://mvnrepository.com/artifact/org.openjdk.jol/jol-core -->
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.16</version>
</dependency>
总结:通过上面的代码,我们希望代码执行的顺序是:a-b-c;但实际的编译情况是,执行顺序是:a-c-b。
方案
如何禁止指令重排呢?
可以用 Volatile
关键字实现!Volatile
中会加一道内存的屏障,这个内存屏障可以保证在这个屏障中的指令顺序。
内存屏障:CPU指令
总结
- volatile可以保证可见性
- 不能保证原子性
- 由于内存屏障,可以保证避免指令重排的现象产生
单例模式
单例模式的文章我之前写过,文章如下:
1)、饿汉式
如下饿汉式单例代码:
package org.example;
/**
* @author linghu
* @date 2023/12/28 9:41
* 饿汉式单例
*/
public class Hungry {
private byte[] data1=new byte[1024*1024];
private byte[] data2=new byte[1024*1024];
private byte[] data3=new byte[1024*1024];
private byte[] data4=new byte[1024*1024];
//私有化构造器
public Hungry() {
}
private final static Hungry HUNGRY=new Hungry();
public static Hungry getInstance(){
return HUNGRY;
}
}
2)、DCL懒汉式
package org.example;
/**
* @author linghu
* @date 2023/12/27 17:23
* 懒汉式
*/
public class LazyMan {
//私有构造器
public LazyMan() {
System.out.println(Thread.currentThread().getName()+"OK");
}
private static LazyMan lazyMan;
public static LazyMan getInstance(){
if (lazyMan==null){
lazyMan=new LazyMan();
}
return lazyMan;
}
//多线程合并会有隐患!
public static void main(String[] args) {
for (int i=0;i<10;i++){
new Thread(()->{
lazyMan.getInstance();
}).start();
}
}
}
package org.example;
/**
* @author linghu
* @date 2023/12/27 17:23
* 懒汉式
*/
public class LazyMan {
//私有构造器
public LazyMan() {
System.out.println(Thread.currentThread().getName()+"OK");
}
private static LazyMan lazyMan;
//双重检测锁模式的 懒汉式单例 DCL懒汉式
public static LazyMan getInstance(){
if (lazyMan==null){
synchronized (LazyMan.class){
if (lazyMan==null){
lazyMan=new LazyMan();
}
}
}
return lazyMan;
}
//多线程合并会有隐患!
public static void main(String[] args) {
for (int i=0;i<10;i++){
new Thread(()->{
lazyMan.getInstance();
}).start();
}
}
}
加volatile可以防止指令重排
private volatile static LazyMan lazyMan;
package org.example;
/**
* @author linghu
* @date 2023/12/27 17:23
* 懒汉式
*/
public class LazyMan {
//私有构造器
public LazyMan() {
System.out.println(Thread.currentThread().getName()+"OK");
}
private volatile static LazyMan lazyMan;
//双重检测锁模式的 懒汉式单例 DCL懒汉式
public static LazyMan getInstance(){
if (lazyMan==null){
synchronized (LazyMan.class){
if (lazyMan==null){
lazyMan=new LazyMan();
}
}
}
return lazyMan;
}
//多线程合并会有隐患!
public static void main(String[] args) {
for (int i=0;i<10;i++){
new Thread(()->{
lazyMan.getInstance();
}).start();
}
}
}
深入理解CAS
CAS就是比较以前工作内存中的值 和 主存内存中的值,如果这个值是期望的,那么执行操作! 如果不是,就一直循环,使用的是 自旋锁。
package org.example;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author linghu
* @date 2023/12/29 9:54
*/
public class casDemo {
//CAS:compareAndSet
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//如果实际值 和 期望值相同,那么就更新
//如果实际值 和 期望值不同,那么就不更新
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
//因为期望值是2020,实际值却变成了2021 所以会修改失败!
atomicInteger.getAndIncrement();//++操作
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}
总结:
- CAS 是CPU的并发原语。
这段代码演示了如何使用原子操作来安全地更新一个整数值,并展示了
compareAndSet
方法的工作原理。
缺点:
- 循环会耗时
- 一次性只能保证一个共享变量的原子性
- 它会存在ABA问题【解决方案:原子引用】
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!