【并发设计模式】聊聊Thread-Per-Message与Worker-Thread模式
在并发编程中,核心就是同步、互斥、分工。
同步是多个线程之间按照一定的顺序进行执行,比如A执行完,B在执行。而互斥是多个线程之间对于共享资源的互斥。两个侧重点不一样,同步关注的是执行顺序,互斥关注的是资源的排他性访问。
分工则是从一个宏观的角度,将任务庖丁解牛,将一个大任务进行拆分,每个线程执行一个任务的子集。主要的设计模式就是Thread-Per-Message(来一个任务,新建一个线程执行)、Worker-Thread(复用线程池)、生产者 - 消费者模式。本篇我们先介绍前两个。
- Thread-Per-Message 模式需要注意线程的创建,销毁以及是否会导致OOM。
- Worker Thread 模式需要注意死锁问题,提交的任务之间不要有依赖性。
- 生产者 - 消费者模式可以直接使用线程池来实现
生活场景触发
我们来想一个实际的参观厨师、服务员的方式。如果来一个客人,再去招聘人,然后开始做饭,显然销量不高,所以就会提前雇佣好一批人,来了客人直接做饭。但是显然也有饭店火爆的情况,那么就让客人先在休息区等待,等待进行排号吃饭。因为目前饭店已经达到的上限。对比其实就是上述的三种方式。
Thread-Per-Message
这种方式比较好理解,针对于每个客户端的请求,来一个请求就新建一个Thread进行处理。
但是显而易见,这种方式新建线程、销毁线程的操作是很耗时,比较浪费资源。并且如果大量的线程处理任务耗时比较久,那么就会出现OOM,所以JUC中就提供了线程中方式,根据需要配置线程池进行处理任务。
在GO语言中有更加轻量级的协程,以及java中Loom 推荐你可以看看。
解决方案:短期可以增大JVM内存配置,调整大新生代大小,长期解决NIO或者AIO等
Project Loom is to intended to explore, incubate and deliver Java VM features and APIs built on top of them for the purpose of supporting easy-to-use, high-throughput lightweight concurrency and new programming models on the Java platform.
This OpenJDK project is sponsored by the HotSpot Group.
Code
/**
* @author qxlx
* @date 2023/12/31 10:33 PM
*/
public class ServerTest {
public static void main(String[] args) throws IOException {
final ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8088));
while (true) {
SocketChannel sc = ssc.accept();
new Thread(() -> {
ByteBuffer rb = ByteBuffer.allocate(1024);
try {
sc.read(rb);
TimeUnit.SECONDS.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 接收的数据:" + rb.toString());
ByteBuffer wb = (ByteBuffer) rb.flip();
sc.write(wb);
sc.close();
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread-" + Math.random()).start();
}
}
}
Worker-Thread
Woker-Thread 其实就是我提前雇佣一批工人,等待干活,来活就干,没活就休息。可以避免频繁的创建线程。
Worker Thread 模式能避免线程频繁创建、销毁的问题,而且能够限制线程的最大数量。
Java 语言里可以直接使用线程池来实现 Worker Thread 模式,线程池是一个非常基础和优秀 的工具类,甚至有些大厂的编码规范都不允许用 new Thread() 来创建线程,必须使用线程池。
Code
public static void main(String[] args) throws IOException {
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(10,20,30000,TimeUnit.SECONDS,new ArrayBlockingQueue<>(10));
final ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8088));
while (true) {
SocketChannel sc = ssc.accept();
threadPoolExecutor.execute(() -> {
ByteBuffer rb = ByteBuffer.allocate(1024);
try {
sc.read(rb);
TimeUnit.SECONDS.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 接收的数据:" + rb.toString());
ByteBuffer wb = (ByteBuffer) rb.flip();
sc.write(wb);
sc.close();
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
避免死锁
在实际的开发中,使用线程池需要注意任务之间是否有依赖关系,否则有以来关系的话,可能会引起线程死锁。
如下就是2个线程,执行的时候,因为线程池线程使用完毕,本来需要4个,但是只有两个,另外两个线程任务执行不了,所以就死锁了。
package com.jia.dp;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author qxlx
* @date 2024/1/1 11:17 AM
*/
public class ThreadPoolDeadLockTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
CountDownLatch l1 = new CountDownLatch(2);
System.out.println("l1-begin");
// 大任务l1 2个
for (int i = 0; i < 2; i++) {
threadPool.execute(()-> {
CountDownLatch l2 = new CountDownLatch(2);
System.out.println("l2-begin");
//小任务l2 2个
for (int j = 0; j < 2; j++) {
threadPool.execute(()->{
l2.countDown();
});
}
try {
l2.await();
l1.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("l2-end");
});
}
System.out.println("l1-end");
l1.await();
}
}
"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007f8d4480a800 nid=0x3223 in Object.wait() [0x000000030646a000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000715586bf8> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x0000000715586bf8> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
"main" #1 prio=5 os_prio=31 tid=0x00007f8d56010800 nid=0x1903 waiting on condition [0x0000000305949000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007156ac720> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at com.jia.dp.ThreadPoolDeadLockTest.main(ThreadPoolDeadLockTest.java:40)
可以发现是因为l1.await() 阻塞,l1阻塞的原因就是l2线程任务没有执行完毕,l2线程任务没有线程资源可以处理任务,所以就是死锁了。
解决方案
1.调整线程池的大小,更加方便的是,使用不同的线程池任务进行处理不同的任务。
总结
本篇介绍了两种分工协作的方式,一种是来一个任务new一个线程处理,另外一种就是通过线程池进行达到线程的复用。实际生产中是采用后者的方式。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!