使用 CompletableFuture 分批处理任务
2024-01-09 13:31:16
一、无返回值任务函数
// 数据分批
List<List<StatisticsDTO>> batches = Lists.partition(statisticsList, BATCH_SIZE);
List<CompletableFuture<Void>> futures = new ArrayList<>(batches.size());
// 数据处理
for (int i = 0; i < batches.size(); i++) {
logger.info("批次 " + i + " 开始处理...");
String logId = LogIdThreadLocal.getLogId(); // 传递主线程的 logId
List<StatisticsDTO> batchData = batches.get(i);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
LogIdThreadLocal.setLogId(logId);
processBatch(batchData);
} finally {
LogIdThreadLocal.clean();
}
});
futures.add(future);
}
// 等待所有的异步任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.join();
二、带返回值任务函数
// 数据分批
List<List<StatisticsDTO>> batches = Lists.partition(statisticsList, BATCH_SIZE);
List<CompletableFuture<List<StatisticsDTO>>> futures = new ArrayList<>(batches.size());
// 数据处理
for (int i = 0; i < batches.size(); i++) {
logger.info("批次 " + i + " 开始处理...");
String logId = LogIdThreadLocal.getLogId(); // 传递主线程的 logId
List<StatisticsDTO> batchData = batches.get(i);
CompletableFuture<List<DoctorAvatarAnalysisDTO>> future = CompletableFuture.supplyAsync(() -> {
try {
LogIdThreadLocal.setLogId(logId);
return processBatch(batchData);
} finally {
LogIdThreadLocal.clean();
}
});
futures.add(future);
}
// 等待所有 CF 完成并合并结果
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
List<StatisticsDTO> result = allOf.thenApply(
v -> futures.stream().map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())
).join();
文章来源:https://blog.csdn.net/qq_43686863/article/details/135424225
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!