外观
线程池 + future 怎么实现
⭐ 题目日期:
小红书 - 2024/11/11
📝 题解:
在不使用 CountDownLatch 的情况下,可以通过 线程池(ExecutorService) 和 Future 机制实现多线程任务的同步与聚合。以下是具体实现步骤和代码示例:
1. 核心思路
- 提交任务:将多个任务提交到线程池,获取对应的
Future对象。 - 收集 Future:通过
Future跟踪任务状态,等待所有任务完成并获取结果。 - 结果聚合:遍历
Future列表,通过阻塞式get()方法等待每个任务完成,并汇总结果。
2. 实现步骤
(1) 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(4); // 固定大小线程池(2) 定义并提交任务
List<Future<Integer>> futures = new ArrayList<>();
// 提交多个任务到线程池
for (int i = 0; i < 10; i++) {
int taskId = i;
Future<Integer> future = executor.submit(() -> {
// 模拟耗时操作
Thread.sleep(1000);
return taskId * 10;
});
futures.add(future);
}(3) 等待所有任务完成并聚合结果
List<Integer> results = new ArrayList<>();
for (Future<Integer> future : futures) {
try {
results.add(future.get()); // 阻塞等待任务完成并获取结果
} catch (InterruptedException | ExecutionException e) {
// 处理异常(如取消任务、记录日志等)
e.printStackTrace();
}
}
executor.shutdown(); // 关闭线程池(4) 输出结果
System.out.println("聚合结果: " + results);
// 输出:聚合结果: [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]3. 关键点解析
(1) Future.get() 的阻塞特性
future.get()会阻塞当前线程,直到任务完成并返回结果。- 遍历
Future列表时,主线程会依次等待每个任务完成,最终实现类似CountDownLatch.await()的效果。
(2) 异常处理
- InterruptedException:线程被中断(如调用
executor.shutdownNow())。 - ExecutionException:任务执行过程中抛出异常(需通过
e.getCause()获取原始异常)。
(3) 结果聚合
- 通过遍历
Future列表收集结果,适用于需要任务返回值的场景。 - 若无需返回值,可使用
Runnable替代Callable,并通过Future<?>仅等待任务完成:
Future<?> future = executor.submit(() -> {
// 无返回值的任务逻辑
});4. 优化方案
(1) 超时控制
通过 future.get(timeout, TimeUnit) 设置超时时间,避免无限阻塞。
try {
future.get(2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true); // 超时后取消任务
}(2) 批量等待(invokeAll)
使用 executor.invokeAll() 提交所有任务并一次性等待完成。
List<Callable<Integer>> tasks = new ArrayList<>();
// 填充任务列表...
List<Future<Integer>> futures = executor.invokeAll(tasks); // 提交并等待所有任务完成(3) 异步回调(CompletableFuture)
结合 Java 8 的 CompletableFuture 实现非阻塞式聚合。
List<CompletableFuture<Integer>> completableFutures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int taskId = i;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 任务逻辑
return taskId * 10;
}, executor);
completableFutures.add(future);
}
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
completableFutures.toArray(new CompletableFuture[0])
);
allFutures.join(); // 阻塞直到所有任务完成
// 收集结果
List<Integer> results = completableFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());5. 对比 CountDownLatch 的优劣

6. 适用场景
- 需要任务返回值:如并行计算后聚合结果。
- 精细化任务控制:如单独取消某个任务、设置超时等。
- 异步回调处理:结合
CompletableFuture实现非阻塞流水线。
