Java使用多线程处理任务等待任务全部执行
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Java使用多线程处理任务等待任务全部执行,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3954字,纯文字阅读大概需要6分钟。
内容图文
![Java使用多线程处理任务等待任务全部执行](/upload/InfoBanner/zyjiaocheng/732/1ff542e11f544473811412e6c6a74a85.jpg)
日常的批量处理任务中,经常需要使用多线程同时处理大量任务,一次读取一定数量的数据,然后放入线程池中等待线程处理完成,再取一定数量数据进行循环处理。
效率比较低的方式是使用同步的for循环进行处理
其次就是使用多线程处理。一般情况使用多线程都会使用线程池来管理,有些情况下,不能把大量任务一次性丢进线程池中,以为内存有限,一般线程池的阻塞队列也是有界的,超出限制可能OOM或者触发拒绝策略,因此需要分批处理,假设一次性读取5000条数据,则需要先等待线程池处理完这5000条数据再进行下一次处理。这时候我们需要确认开启的多线程中的子任务全部结束,再让主线程去执行下一次处理。
大致总结的几种处理方案代码示例如下,本人水平有限,欢迎各位大佬指点留言,谢谢!
private static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(2 * CPU_COUNT);
/**1. 使用CountDownLatch*/
public void testCountDownLatch() {
//模拟查询到数据库中待处理数据
List batchList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
batchList.add(new java.lang.Object());
}
if (CollectionUtils.isEmpty(batchList)) {
return;
}
log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());
final CountDownLatch countDownLatch = new CountDownLatch(batchList.size());
batchList.forEach(Object -> FORK_JOIN_POOL.execute(() -> {
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
log.info("当前线程休眠完成");
countDownLatch.countDown();
} catch (Exception e) {
log.error("异常", e);
}
}));
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());
}
/**2. 使用CyclicBarrier*/
public void testCyclicBarrier() {
//模拟查询到数据库中待处理数据
List<Object> batchList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
batchList.add(new Object());
}
if (CollectionUtils.isEmpty(batchList)) {
return;
}
log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());
final CyclicBarrier cyclicBarrier = new CyclicBarrier(batchList.size(), () -> {
log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());
testCyclicBarrier();
});
batchList.forEach(imgRecord -> FORK_JOIN_POOL.execute(() -> {
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
log.info("当前线程休眠完成");
cyclicBarrier.await();
} catch (Throwable e) {
log.error("异常", e);
}
}));
}
/**3. 使用CompletionService*/
public void testCompletionService() {
for (int j = 0; j < 3; j++) {
List<Object> batchList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
batchList.add(new Object());
}
if (CollectionUtils.isEmpty(batchList)) {
return;
}
log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());
CompletionService completionService = new ExecutorCompletionService(FORK_JOIN_POOL);
batchList.forEach(imgRecord -> {
completionService.submit(() -> {
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
log.info("当前线程休眠完成");
} catch (Throwable e) {
log.error("异常", e);
}
return null;
});
});
batchList.forEach(imgRecord -> {
try {
completionService.take().get();
} catch (Exception e) {
e.printStackTrace();
}
});
log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());
}
}
/**4. 使用CompletableFuture*/
public void testCompletableFuture() {
for (int j = 0; j < 3; j++) {
List<Object> batchList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
batchList.add(new Object());
}
if (CollectionUtils.isEmpty(batchList)) {
return;
}
log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());
ArrayList<CompletableFuture<?>> futureList = new ArrayList<>();
batchList.forEach(imgRecord -> {
final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
log.info("当前线程休眠完成");
} catch (Throwable e) {
log.error("异常", e);
}
}, FORK_JOIN_POOL);
futureList.add(future);
});
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());
}
}
内容总结
以上是互联网集市为您收集整理的Java使用多线程处理任务等待任务全部执行全部内容,希望文章能够帮你解决Java使用多线程处理任务等待任务全部执行所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。