CompletableFuture 是 Java 8 引入的一种功能更加强大的异步编程工具,它在 Future 的基础上提供了更加灵活和高效的处理方式,特别是在处理复杂的异步任务链和组合时表现出色。本文将深入探讨 CompletableFuture 的核心用法,以及其在实际应用中的性能优势。
在 Java 8 中,CompletableFuture 引入了全新的异步编程模型,旨在解决 Future 的不足。Future 仅提供简单的异步结果获取机制,而 CompletableFuture 则支持链式调用和任务组合,使得异步编程更加直观和高效。通过使用 CompletableFuture,开发者可以更轻松地实现异步任务的并行处理和回调管理,从而提升程序执行效率和响应性。
1. CompletableFuture 的基本概念与设计原理
CompletableFuture 是 Java 8 引入的 java.util.concurrent 包中的一个类,它实现了 Future 和 CompletionStage 接口。CompletionStage 接口提供了丰富的异步任务处理方法,使得 CompletableFuture 成为处理异步任务的强大工具。
CompletableFuture 的核心设计思想是非阻塞式异步编程,它允许开发者在任务执行完成时通过回调的方式获取结果,而不是通过阻塞等待。这种设计方式非常适合处理需要长时间运行的任务,如网络请求、文件读写等。
此外,CompletableFuture 还支持任务组合,这使得开发者可以轻松地将多个异步任务串联或并联执行。例如,可以在一个任务完成后,再执行另一个任务,或者同时执行多个任务并合并结果。
2. 创建异步任务
CompletableFuture 提供了多种方法来创建异步任务,其中最常用的是 supplyAsync() 和 runAsync()。
2.1. supplyAsync() 与 runAsync()
supplyAsync():用于创建有返回值的异步任务。它接受一个Supplier接口作为参数,该接口用于生成任务的返回值。runAsync():用于创建没有返回值的异步任务。它接受一个Runnable接口作为参数,该接口用于执行任务。
这两个方法都支持使用自定义的线程池来执行任务。如果不传入线程池,则会使用默认的线程池,具体取决于当前机器的 CPU 核心数。
// 使用默认线程池执行任务,无返回值
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
System.out.println("runAsync,执行完毕");
});
// 使用默认线程池执行任务,有返回值
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
2.2. 默认线程池的选择
CompletableFuture 在创建任务时,默认使用以下逻辑来选择线程池:
- 如果当前机器的 CPU 可用逻辑核心数大于 1,则使用
ForkJoinPool.commonPool()。 - 如果当前机器的 CPU 可用逻辑核心数等于 1,则使用
ThreadPerTaskExecutor,这是一个一对一的线程池。
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
这种策略确保了在多核 CPU 上能够更好地利用并行处理能力,而在单核 CPU 上避免资源浪费。
3. 任务异步回调
CompletableFuture 提供了多种回调方法,用于在任务执行完成后进行处理。这些方法包括 thenRun()、thenAccept()、thenApply()、whenComplete()、handle() 和 exceptionally()。
3.1. thenRun() 与 thenRunAsync()
thenRun():用于在上一个任务执行成功后,执行一个无返回值的操作。thenRunAsync():与thenRun()类似,但使用默认线程池执行回调操作。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
System.out.println("thenRun1,执行完毕");
});
CompletableFuture<Void> cf3 = cf2.thenRun(() -> {
System.out.println("thenRun2,执行完毕");
});
3.2. thenAccept() 与 thenAcceptAsync()
thenAccept():用于在上一个任务执行成功后,接收一个结果并进行处理,但不返回值。thenAcceptAsync():与thenAccept()类似,但使用默认线程池执行回调操作。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
CompletableFuture<Void> cf2 = cf1.thenAccept(result -> {
System.out.println("thenAccept1,接收结果:" + result);
});
3.3. thenApply() 与 thenApplyAsync()
thenApply():用于在上一个任务执行成功后,接收一个结果并对其进行转换,返回新的结果。thenApplyAsync():与thenApply()类似,但使用默认线程池执行转换操作。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
CompletableFuture<String> cf2 = cf1.thenApply(result -> {
System.out.println("thenApply1,接收结果:" + result);
return result.toUpperCase();
});
3.4. whenComplete() 与 whenCompleteAsync()
whenComplete():用于在任务执行完成后(无论是成功还是失败),执行一个回调操作。whenCompleteAsync():与whenComplete()类似,但使用默认线程池执行回调操作。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
CompletableFuture<String> cf2 = cf1.whenComplete((result, throwable) -> {
if (throwable != null) {
System.out.println("任务执行失败,原因:" + throwable.getMessage());
} else {
System.out.println("任务执行成功,返回结果值:" + result);
}
});
3.5. handle() 与 handleAsync()
handle():用于在任务执行完成后,无论成功还是失败,都执行一个回调操作,并返回新的结果。handleAsync():与handle()类似,但使用默认线程池执行回调操作。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
CompletableFuture<String> cf2 = cf1.handle((result, throwable) -> {
if (throwable != null) {
System.out.println("任务执行失败,原因:" + throwable.getMessage());
return "default result";
} else {
System.out.println("任务执行成功,返回结果值:" + result);
return result.toUpperCase();
}
});
3.6. exceptionally()
exceptionally():用于在任务执行异常时,执行一个回调操作,并返回一个新的结果。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
if (1 == 1) {
throw new RuntimeException("执行异常");
}
return "hello world";
});
CompletableFuture<String> cf2 = cf1.exceptionally(e -> {
System.out.println("任务执行失败,原因:" + e.getMessage());
return "default result";
});
4. 多任务组合处理
CompletableFuture 不仅支持单个任务的回调,还支持多个任务的组合处理。这使得开发者可以在多个异步任务完成后,进行统一的处理。
4.1. 任务串联(thenApply)
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
CompletableFuture<String> cf2 = cf1.thenApply(result -> {
System.out.println("thenApply1,接收结果:" + result);
return result.toUpperCase();
});
4.2. 任务并联(thenAcceptBoth)
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync2,执行完毕");
return "goodbye";
});
cf1.thenAcceptBoth(cf2, (result1, result2) -> {
System.out.println("任务1结果:" + result1);
System.out.println("任务2结果:" + result2);
});
4.3. 任务并联(allOf)
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync2,执行完毕");
return "goodbye";
});
CompletableFuture<Void> cf3 = CompletableFuture.allOf(cf1, cf2);
4.4. 任务并联(thenCombine)
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync2,执行完毕");
return "goodbye";
});
CompletableFuture<String> cf3 = cf1.thenCombine(cf2, (result1, result2) -> {
System.out.println("任务1结果:" + result1);
System.out.println("任务2结果:" + result2);
return result1 + " " + result2;
});
5. 性能优化与调优建议
CompletableFuture 在设计上已经考虑到了性能优化,但在实际使用中,仍然需要注意一些调优技巧,以确保其在高并发场景下的稳定性与效率。
5.1. 线程池选择
在使用 CompletableFuture 时,建议根据实际需求选择合适的线程池。如果任务数量较多且任务之间相互独立,可以考虑使用自定义的线程池,以避免默认线程池在高并发时的资源竞争。
ExecutorService customPool = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
}, customPool);
5.2. 避免阻塞操作
CompletableFuture 的设计初衷是非阻塞式异步编程,因此在使用时应避免阻塞操作。如果任务中需要进行阻塞调用,应考虑使用 supplyAsync() 或 runAsync() 的异步版本,以确保不会阻塞主线程。
5.3. 任务链的合理设计
在设计任务链时,应尽量避免不必要的嵌套和回调。例如,可以使用 thenApply() 来处理任务结果,而不是使用多个 thenRun() 和 thenAccept()。这样能够减少线程切换的开销,提高程序的执行效率。
5.4. 异常处理
CompletableFuture 提供了 exceptionally() 方法来处理任务异常,建议在任务链中合理使用该方法,以确保程序在异常情况下仍能继续执行。
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
if (1 == 1) {
throw new RuntimeException("执行异常");
}
return "hello world";
});
CompletableFuture<String> cf2 = cf.exceptionally(e -> {
System.out.println("任务执行失败,原因:" + e.getMessage());
return "default result";
});
5.5. 避免内存泄漏
在使用 CompletableFuture 时,应确保任务完成后及时释放资源,避免内存泄漏。例如,可以在任务完成后关闭线程池,或在任务中使用 try-with-resources 语句来管理资源。
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
}, executor);
cf.get();
executor.shutdown();
6. 实际应用场景分析
CompletableFuture 在实际开发中有着广泛的应用,尤其是在处理复杂的异步任务链时表现出色。以下是一些常见的应用场景:
6.1. 网络请求处理
在处理多个网络请求时,可以使用 CompletableFuture 来并行执行请求,并在所有请求完成后进行处理。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
// 模拟网络请求
return "Response from API 1";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
// 模拟网络请求
return "Response from API 2";
});
CompletableFuture<String> cf3 = cf1.thenCombine(cf2, (response1, response2) -> {
return response1 + " " + response2;
});
6.2. 数据库查询处理
在处理多个数据库查询时,可以使用 CompletableFuture 来并行执行查询,并在所有查询完成后进行处理。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
// 模拟数据库查询
return "Data from Table 1";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
// 模拟数据库查询
return "Data from Table 2";
});
CompletableFuture<String> cf3 = cf1.thenCombine(cf2, (data1, data2) -> {
return data1 + " " + data2;
});
6.3. 文件读写处理
在处理文件读写时,可以使用 CompletableFuture 来并行执行文件读取和写入操作,并在所有操作完成后进行处理。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
// 模拟文件读取
return "Data from File 1";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
// 模拟文件读取
return "Data from File 2";
});
CompletableFuture<String> cf3 = cf1.thenCombine(cf2, (data1, data2) -> {
return data1 + " " + data2;
});
7. 总结与展望
CompletableFuture 是 Java 8 引入的一种功能强大的异步编程工具,它在 Future 的基础上提供了更多的回调和任务组合能力。通过合理使用 CompletableFuture,开发者可以显著提升程序的执行效率和响应性。
未来,随着 Java 生态的不断发展,CompletableFuture 有望进一步优化,特别是在高并发和分布式场景下的表现。同时,随着多核 CPU 的普及,线程池的选择和任务调度策略也将变得更加重要。
关键字列表:
CompletableFuture, Future, 异步编程, 任务回调, 线程池, 并发处理, Java 8, 任务组合, 性能优化, 线程切换