CompletableFuture——并发编程艺术
1、CompletableFuture是什么?
CompletableFuture是Java 8中引入的一个异步编程工具类,用于进行非阻塞的异步编程。它是Future接口的扩展,提供了更灵活、更强大的功能。
CompletableFuture可以用于处理异步操作,例如网络请求、数据库查询等。与传统的线程和回调方法相比,CompletableFuture提供了更简洁和方便的编程模型。
使用CompletableFuture的主要步骤如下:
创建一个CompletableFuture对象,可以通过CompletableFuture类的静态方法来创建,如CompletableFuture.supplyAsync()和CompletableFuture.runAsync()。
调用CompletableFuture对象的方法来定义异步操作的执行逻辑。可以使用thenApply()、thenAccept()和thenRun()等方法来对结果进行处理,或者使用whenComplete()和handle()等方法来处理异常情况。
使用CompletableFuture对象的get()方法来获取异步操作的结果,或者使用join()方法来等待异步操作的完成。
CompletableFuture还提供了一些其他功能,如组合多个CompletableFuture对象、并行执行异步任务、处理超时等。通过这些功能,可以更灵活地处理复杂的异步编程需求。
总之,CompletableFuture是一个功能强大的异步编程工具类,提供了简洁、方便和灵活的编程模型,可以帮助开发者更好地处理异步操作。
2、CompletableFuture和Future、CompletionStage的关系?
CompletableFuture 是 Future 接口的一个实现类,并且也实现了 CompletionStage 接口。
Future 接口代表一个异步计算的结果,可以通过 get() 方法来获取计算结果。但是 Future 接口中的方法较为有限,只能判断计算是否完成、取消计算和获取计算结果(阻塞等待或者超时等待)等操作。
CompletableFuture 类提供了更多的操作方法,可以更方便地处理异步计算的结果。我们可以对计算结果执行一系列的操作,比如转换、组合、异常处理等。它还提供了一些便捷的静态方法,可以将同步方法转换为异步方法,并且可以指定线程池或者执行器来执行异步操作。
CompletionStage 接口是 CompletableFuture 的父接口,它定义了一些组合操作的方法,并且可以与 CompletableFuture 进行链式调用。CompletableFuture 实现了 CompletionStage 接口,所以 可以使用 CompletionStage 接口中的方法。
综上所述,可以得出一下结论:
(1)CompletableFuture 是对 Future 的增强,并且也是 CompletionStage 的实现类。
(2)CompletableFuture 提供了更多的操作方法和扩展功能,可以更灵活地处理异步计算的结果。
(3)CompletionStage 接口则提供了一些组合操作的方法,可以方便地对异步计算结果进行串行或并行处理。
3、CompletableFuture常用方法
3.1、 创建 CompletableFuture 实例
static CompletableFuture<Void> completedFuture(U value)
static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
(1)CompletableFuture.completedFuture
方法用来创建一个已经完成的CompletableFuture对象。这个对象的结果是事先指定好的一个值或者异常。
这个方法有一个参数,可以是任意的类型,表示这个CompletableFuture对象的结果。返回值是一个已经完成的CompletableFuture对象。
CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
(2)runAsync
方法接收一个Runnable参数,表示执行无返回值的异步任务。它返回一个CompletableFuture<Void>对象。
CompletableFuture<Void> runAsyncFuture = CompletableFuture.runAsync(() -> System.out.println("Run async"));
(3)supplyAsync
方法接收一个Supplier<T>参数,表示执行有返回值的异步任务。它返回一个CompletableFuture<T>对象。
CompletableFuture<String> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> "Supply async");
runAsync和supplyAsync都是CompletableFuture类的静态方法,用于执行异步任务。
区别在于方法的参数和返回值类型不同。runAsync执行无返回值的异步任务,supplyAsync执行有返回值的异步任务,并将结果包装在CompletableFuture对象中返回。
3.2、完成时触发thenApply、thenAccept、thenRun
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
(1)thenApply(Function<? super T,? extends U> fn)
这个方法在 CompletableFuture 完成后,将使用该 CompletableFuture 的结果作为输入,执行给定的 Function。这个 Function 接受一个类型为 T 的参数并返回一个类型为 U 的结果。返回一个新的 CompletableFuture<U>,它将在 Function 执行完毕后完成。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello").thenApply(s -> s + " World");
System.out.println(future.get()); //Hello World
(2)thenApplyAsync(Function<? super T,? extends U> fn)
与 thenApply 类似,但 thenApplyAsync 会异步地执行给定的 Function。这意味着它将在一个新线程中执行 Function,而不是在完成原始 CompletableFuture 的同一个线程中。
(3)thenAccept(Consumer<? super T> action)
这个方法在 CompletableFuture 完成后,将使用该 CompletableFuture 的结果作为输入,执行给定的 Consumer。Consumer 接受一个类型为 T 的参数并执行一些操作,但不会返回任何结果。返回一个新的 CompletableFuture<Void>。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> stringCompletableFuture = future.thenAccept(s -> System.out.println(s+" World"));//Hello World
(4)thenAcceptAsync(Consumer<? super T> action)
与 thenAccept 类似,但 thenAcceptAsync 会异步地执行给定的 Function。这意味着它将在一个新线程中执行 Function。
(5)thenRun(Runnable action)
这个方法在 CompletableFuture 完成后执行给定的 Runnable。这个 Runnable 不接受任何参数,也不返回任何结果。返回一个新的 CompletableFuture<Void>。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> futureRun = future.thenRun(() -> System.out.println("Action completed"));
(6)thenRunAsync(Runnable action)
与 thenRun 类似,但 thenRunAsync 会异步地执行给定的 Runnable。这意味着它将在一个新线程中执行 Runnable。
区别:
thenApply 与 thenApplyAsync 的区别在于执行 Function 的线程。thenApply 通常在完成原始 CompletableFuture 的同一个线程中执行,而 thenApplyAsync 总是在一个新线程中执行。
thenAccept 与 thenAcceptAsync,以及 thenRun 与 thenRunAsync 的区别同理。
thenApply、thenAccept 和 thenRun 是同步操作,意味着它们在当前线程上执行,除非前一个阶段是异步完成的。
thenApplyAsync、thenAcceptAsync 和 thenRunAsync 是异步操作,意味着它们会在不同的线程上执行,通常是 ForkJoinPool.commonPool() 中的线程,或者可以通过提供一个自定义的 Executor 来指定。
3.3、组合多个 CompletableFuture
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
(1)allOf(CompletableFuture<?>... cfs)
该方法接收一个 CompletableFuture 数组,并返回一个新的 CompletableFuture<Void> 对象。这个方法会等待所有的 CompletableFuture 对象都执行完成,然后返回一个新的 CompletableFuture,它会在所有的 CompletableFuture 对象都执行完成后完成。如果任意一个 CompletableFuture 对象抛出异常,那么它会将异常传递给返回的 CompletableFuture 对象。
例如,假设有两个 CompletableFuture 对象 cf1 和 cf2,我们可以使用 allOf 方法等待它们都执行完成:
CompletableFuture<Void> allFutures = CompletableFuture.allOf(cf1, cf2);
(2)anyOf(CompletableFuture<?>... cfs)
该方法与 allOf 方法类似,但是它会等待任意一个 CompletableFuture 对象执行完成,然后返回一个新的 CompletableFuture<Object> 对象。如果其中任意一个 CompletableFuture 对象完成时有返回值,那么它将使用第一个完成的 CompletableFuture 的返回值作为返回值传递给返回的 CompletableFuture 对象。
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(cf1, cf2);
总结:
这两个方法都是非阻塞的,即它们会立即返回一个 CompletableFuture 对象,不会等待 CompletableFuture 对象的执行完成。可以使用 thenApply、thenAccept 或者 thenRun 等方法来注册回调函数,以处理 CompletableFuture 的结果。
3.4、异常处理
<U> CompletableFuture<U> exceptionally(Function<Throwable, ? extends U> fn)
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
(1)CompletableFuture<U> exceptionally(Function<Throwable, ? extends U> fn)
该方法可以用来处理异常情况。它接受一个Function参数,该参数是一个异常处理函数,用于处理出现的异常。如果CompletableFuture中发生了异常,那么这个异常会被传递给exceptionally方法,并由该方法的参数函数进行处理。处理完成后,exceptionally方法会返回一个新的CompletableFuture对象,该对象的结果是由异常处理函数的返回值决定。如果原始的CompletableFuture没有发生异常(即正常完成),那么exceptionally方法会返回一个与原始CompletableFuture相同的CompletableFuture对象。
(2)CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
该方法也可以用于处理异常情况。它接受一个BiFunction参数,该参数是一个处理函数,用于处理正常结果或异常。如果CompletableFuture正常完成,那么handle方法会将结果和异常值作为参数传递给处理函数。处理函数的返回值将被用作新的CompletableFuture的结果。如果CompletableFuture发生异常,那么handle方法会将异常和null作为参数传递给处理函数。处理函数的返回值也将被用作新的CompletableFuture的结果。
public class CompletableFutureDemo {
public static void main(String[] args) {
// 创建CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟抛出异常
throw new RuntimeException("Exception occurred");
});
// 使用exceptionally处理异常
CompletableFuture<String> exceptionHandledFuture = future.exceptionally(ex -> {
return "Exception handled: " + ex.getMessage();
});
// 使用handle处理结果或异常
CompletableFuture<String> resultFuture = future.handle((res, ex) -> {
if (ex != null) {
return "Exception handled: " + ex.getMessage();
} else {
return res;
}
});
// 打印结果
System.out.println(exceptionHandledFuture.join());
System.out.println(resultFuture.join());
}
}
总结起来,exceptionally方法和handle方法都是用于处理CompletableFuture发生的异常情况。它们提供了一种方式来处理异常并返回一个新的CompletableFuture对象,使得我们能够在异步操作完成后进行进一步的处理。
3.5、其他方法
<U> CompletableFuture<U> newIncompleteFuture()
boolean complete(T value)
boolean completeExceptionally(Throwable ex)
T getNow(T valueIfAbsent)
T join()
boolean isDone()
boolean isCancelled()
boolean cancel(boolean mayInterruptIfRunning)
(1)<U> CompletableFuture<U> newIncompleteFuture()
这个方法返回一个新的 CompletableFuture,它与当前实例具有相同的特征(例如,异步执行的特性),但是没有完成的值或异常。通常用于创建自定义的 CompletableFuture 实现或扩展。
CompletableFuture<String> future = new CompletableFuture<>();
CompletableFuture<String> newFuture = future.newIncompleteFuture();
(2)boolean complete(T value)
这个方法尝试将给定的值设置为 CompletableFuture 的结果。如果 CompletableFuture 已经完成,则返回 false;否则,它会被设置为给定的值,并返回 true。
CompletableFuture<String> future = new CompletableFuture<>();
boolean completed = future.complete("Completed value");
System.out.println("Future completed: " + completed); // 输出 true
(3)boolean completeExceptionally(Throwable ex)
这个方法尝试将给定的异常设置为 CompletableFuture 的异常结果。如果 CompletableFuture 已经完成,则返回 false;否则,它会被设置为给定的异常,并返回 true。
CompletableFuture<String> future = new CompletableFuture<>();
boolean completedExceptionally = future.completeExceptionally(new Exception("Error"));
System.out.println("Future completed exceptionally: " + completedExceptionally); // 输出 true
(4)T getNow(T valueIfAbsent)
这个方法返回 CompletableFuture 的当前值,如果没有完成,则返回给定的默认值 valueIfAbsent。
CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
String value = future.getNow("Default");
System.out.println(value); // 输出 Hello
CompletableFuture<String> futureNotCompleted = new CompletableFuture<>();
String defaultValue = futureNotCompleted.getNow("Default");
System.out.println(defaultValue); // 输出 Default
(5)T join()
这个方法与 getNow 类似,但它会阻塞当前线程直到 CompletableFuture 完成,然后返回结果。如果 CompletableFuture 已经完成,它将立即返回结果。
CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
String value = future.join();
System.out.println(value); // 输出 Hello
(6)boolean isDone()
这个方法返回 CompletableFuture 是否已经完成。完成可能是正常完成,也可能是由于异常。
CompletableFuture<String> future = new CompletableFuture<>();
System.out.println("Future is done: " + future.isDone()); // 输出 false
future.complete("Completed");
System.out.println("Future is done: " + future.isDone()); // 输出 true
(7)boolean isCancelled()
这个方法返回 CompletableFuture 是否已经被取消。
CompletableFuture<String> future = new CompletableFuture<>();
System.out.println("Future is cancelled: " + future.isCancelled()); // 输出 false
boolean cancelled = future.cancel(true);
System.out.println("Future is cancelled: " + future.isCancelled()); // 输出 true
(8)boolean cancel(boolean mayInterruptIfRunning)
这个方法尝试取消 CompletableFuture 的执行。如果 CompletableFuture 已经完成,则返回 false。如果 mayInterruptIfRunning 为 true,并且 CompletableFuture 正在运行,那么它将被中断。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result";
});
boolean cancelled = future.cancel(true);
System.out.println("Future cancelled: " + cancelled); // 输出 true
4、使用CompletableFuture 实现做肉丝面小案例
现在为了好理解并发编程的思想,这里我准备了一个贴合实际的小案例,就是做一碗肉丝面,我们先梳理一下做面的步骤过程,大致是一下几个步骤
1、备菜(切肉丝、切葱姜蒜、调味料、洗青菜、面条) (5分钟)
2、炒菜(下入肉丝、葱姜蒜翻炒、调味料调味) (2分钟)
3、煮开水 (10分钟)
4、下面条(煮3分钟)
5、下青菜再煮一会(1分钟)
6、完成
如果按照同步执行的话,就是代码依次从上往下执行,总共需要21分钟的时间,那么我们通过并发思想来节省一下时间。我们思考一下,备菜和煮开水是不是可以一起执行,因为我可以两个一起执行,这样是不是就可以节省时间了。具体实现参考下面代码
import java.util.concurrent.*;
public class CompletableFutureDemo {
public static void main(String[] args) {
// 创建自定义线程池
int corePoolSize = 2; // 核心线程数
int maximumPoolSize = 4; // 最大线程数
long keepAliveTime = 60; // 非核心线程空闲存活时间
TimeUnit unit = TimeUnit.SECONDS; // 时间单位
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); // 阻塞队列
ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 线程工厂
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); // 拒绝策略
ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler
);
// 步骤1:备菜
long startTime = System.currentTimeMillis();
CompletableFuture<Void> prepareIngredients = CompletableFuture.runAsync(() -> {
System.out.println("开始备菜...");
// 模拟切肉丝、切葱姜蒜、准备调味料、洗青菜、面条
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("备菜完成。");
},customThreadPool);
// 步骤2:炒菜 使用thenRunAsync,等待步骤1执行结束再执行炒菜步骤
CompletableFuture<Void> stirFry = prepareIngredients.thenRunAsync(() -> {
System.out.println("开始炒菜...");
// 模拟下入肉丝、葱姜蒜翻炒、调味料调味
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("炒菜完成。");
},customThreadPool);
// 步骤3:煮开水 可以直接使用runAsync,与步骤1备菜并行执行
CompletableFuture<Void> boilWater = CompletableFuture.runAsync(() -> {
System.out.println("开始煮开水...");
// 模拟煮开水的过程
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("水开了。");
},customThreadPool);
// 步骤4:下面条 使用allOf 等待步骤2、步骤3执行结束再执行
CompletableFuture<Void> cookNoodles = CompletableFuture.allOf(stirFry, boilWater).thenRunAsync(() -> {
System.out.println("下面条...");
// 模拟煮面条的过程
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("面条煮熟了。");
},customThreadPool);
// 步骤5:下青菜再煮 等待步骤4结束执行
CompletableFuture<Void> cookVegetables = cookNoodles.thenRunAsync(() -> {
System.out.println("下青菜...");
// 模拟下青菜的过程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("青菜煮熟了。");
System.out.println("完成烹饪,可以享用了!");
},customThreadPool);
// 等待所有步骤完成
try {
cookVegetables.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}finally {
// 关闭线程池
customThreadPool.shutdown();
}
long endTime = System.currentTimeMillis();
System.out.println("共消耗了:" + (endTime - startTime) / 1000 + "秒");
}
}
可以看到,通过异步执行,14分钟(贴合实际,换算为分钟)就完成了肉丝面的制作,节省了7分钟的时间,效率提升了33%。
感谢您的阅读,制作不易,喜欢这篇文章的小伙伴们点赞关注支持一下吧。