1、CompletableFuture是什么?

CompletableFuture是Java 8中引入的一个异步编程工具类,用于进行非阻塞的异步编程。它是Future接口的扩展,提供了更灵活、更强大的功能。

CompletableFuture可以用于处理异步操作,例如网络请求、数据库查询等。与传统的线程和回调方法相比,CompletableFuture提供了更简洁和方便的编程模型。

使用CompletableFuture的主要步骤如下:

  1. 创建一个CompletableFuture对象,可以通过CompletableFuture类的静态方法来创建,如CompletableFuture.supplyAsync()和CompletableFuture.runAsync()。

  2. 调用CompletableFuture对象的方法来定义异步操作的执行逻辑。可以使用thenApply()、thenAccept()和thenRun()等方法来对结果进行处理,或者使用whenComplete()和handle()等方法来处理异常情况。

  3. 使用CompletableFuture对象的get()方法来获取异步操作的结果,或者使用join()方法来等待异步操作的完成。

CompletableFuture还提供了一些其他功能,如组合多个CompletableFuture对象、并行执行异步任务、处理超时等。通过这些功能,可以更灵活地处理复杂的异步编程需求。

总之,CompletableFuture是一个功能强大的异步编程工具类,提供了简洁、方便和灵活的编程模型,可以帮助开发者更好地处理异步操作。

2、CompletableFuture和Future、CompletionStage的关系?

CompletableFuture 是 Future 接口的一个实现类,并且也实现了 CompletionStage 接口。

Future 接口代表一个异步计算的结果,可以通过 get() 方法来获取计算结果。但是 Future 接口中的方法较为有限,只能判断计算是否完成、取消计算和获取计算结果(阻塞等待或者超时等待)等操作。

CompletableFuture 类提供了更多的操作方法,可以更方便地处理异步计算的结果。我们可以对计算结果执行一系列的操作,比如转换、组合、异常处理等。它还提供了一些便捷的静态方法,可以将同步方法转换为异步方法,并且可以指定线程池或者执行器来执行异步操作。

CompletionStage 接口是 CompletableFuture 的父接口,它定义了一些组合操作的方法,并且可以与 CompletableFuture 进行链式调用。CompletableFuture 实现了 CompletionStage 接口,所以 可以使用 CompletionStage 接口中的方法。

综上所述,可以得出一下结论:

(1)CompletableFuture 是对 Future 的增强,并且也是 CompletionStage 的实现类。

(2)CompletableFuture 提供了更多的操作方法和扩展功能,可以更灵活地处理异步计算的结果。

(3)CompletionStage 接口则提供了一些组合操作的方法,可以方便地对异步计算结果进行串行或并行处理。

3、CompletableFuture常用方法

3.1、 创建 CompletableFuture 实例

static CompletableFuture<Void> completedFuture(U value)
static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

(1)CompletableFuture.completedFuture 

方法用来创建一个已经完成的CompletableFuture对象。这个对象的结果是事先指定好的一个值或者异常。

这个方法有一个参数,可以是任意的类型,表示这个CompletableFuture对象的结果。返回值是一个已经完成的CompletableFuture对象。

CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);

(2)runAsync

方法接收一个Runnable参数,表示执行无返回值的异步任务。它返回一个CompletableFuture<Void>对象。

CompletableFuture<Void> runAsyncFuture = CompletableFuture.runAsync(() -> System.out.println("Run async"));

(3)supplyAsync

方法接收一个Supplier<T>参数,表示执行有返回值的异步任务。它返回一个CompletableFuture<T>对象。

CompletableFuture<String> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> "Supply async");

runAsync和supplyAsync都是CompletableFuture类的静态方法,用于执行异步任务。

区别在于方法的参数和返回值类型不同。runAsync执行无返回值的异步任务,supplyAsync执行有返回值的异步任务,并将结果包装在CompletableFuture对象中返回。

3.2、完成时触发thenApply、thenAccept、thenRun

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)

(1)thenApply(Function<? super T,? extends U> fn)
这个方法在 CompletableFuture 完成后,将使用该 CompletableFuture 的结果作为输入,执行给定的 Function。这个 Function 接受一个类型为 T 的参数并返回一个类型为 U 的结果。返回一个新的 CompletableFuture<U>,它将在 Function 执行完毕后完成。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello").thenApply(s -> s + " World");
System.out.println(future.get());  //Hello World

(2)thenApplyAsync(Function<? super T,? extends U> fn)
与 thenApply 类似,但 thenApplyAsync 会异步地执行给定的 Function。这意味着它将在一个新线程中执行 Function,而不是在完成原始 CompletableFuture 的同一个线程中。

(3)thenAccept(Consumer<? super T> action)
这个方法在 CompletableFuture 完成后,将使用该 CompletableFuture 的结果作为输入,执行给定的 Consumer。Consumer 接受一个类型为 T 的参数并执行一些操作,但不会返回任何结果。返回一个新的 CompletableFuture<Void>。

 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<Void> stringCompletableFuture = future.thenAccept(s -> System.out.println(s+" World"));//Hello World

(4)thenAcceptAsync(Consumer<? super T> action)
与 thenAccept 类似,但 thenAcceptAsync 会异步地执行给定的 Function。这意味着它将在一个新线程中执行 Function。

(5)thenRun(Runnable action)
这个方法在 CompletableFuture 完成后执行给定的 Runnable。这个 Runnable 不接受任何参数,也不返回任何结果。返回一个新的 CompletableFuture<Void>。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> futureRun = future.thenRun(() -> System.out.println("Action completed"));

(6)thenRunAsync(Runnable action)
与 thenRun 类似,但 thenRunAsync 会异步地执行给定的 Runnable。这意味着它将在一个新线程中执行 Runnable。

区别:

  1. thenApply 与 thenApplyAsync 的区别在于执行 Function 的线程。thenApply 通常在完成原始 CompletableFuture 的同一个线程中执行,而 thenApplyAsync 总是在一个新线程中执行。

  2. thenAccept 与 thenAcceptAsync,以及 thenRun 与 thenRunAsync 的区别同理。

  3. thenApply、thenAccept 和 thenRun 是同步操作,意味着它们在当前线程上执行,除非前一个阶段是异步完成的。

  4. thenApplyAsync、thenAcceptAsync 和 thenRunAsync 是异步操作,意味着它们会在不同的线程上执行,通常是 ForkJoinPool.commonPool() 中的线程,或者可以通过提供一个自定义的 Executor 来指定。

3.3、组合多个 CompletableFuture

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

(1)allOf(CompletableFuture<?>... cfs)

该方法接收一个 CompletableFuture 数组,并返回一个新的 CompletableFuture<Void> 对象。这个方法会等待所有的 CompletableFuture 对象都执行完成,然后返回一个新的 CompletableFuture,它会在所有的 CompletableFuture 对象都执行完成后完成。如果任意一个 CompletableFuture 对象抛出异常,那么它会将异常传递给返回的 CompletableFuture 对象。

例如,假设有两个 CompletableFuture 对象 cf1 和 cf2,我们可以使用 allOf 方法等待它们都执行完成:

CompletableFuture<Void> allFutures = CompletableFuture.allOf(cf1, cf2);

(2)anyOf(CompletableFuture<?>... cfs)

该方法与 allOf 方法类似,但是它会等待任意一个 CompletableFuture 对象执行完成,然后返回一个新的 CompletableFuture<Object> 对象。如果其中任意一个 CompletableFuture 对象完成时有返回值,那么它将使用第一个完成的 CompletableFuture 的返回值作为返回值传递给返回的 CompletableFuture 对象。

CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(cf1, cf2);

总结:

这两个方法都是非阻塞的,即它们会立即返回一个 CompletableFuture 对象,不会等待 CompletableFuture 对象的执行完成。可以使用 thenApply、thenAccept 或者 thenRun 等方法来注册回调函数,以处理 CompletableFuture 的结果。 

3.4、异常处理

<U> CompletableFuture<U> exceptionally(Function<Throwable, ? extends U> fn)
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

(1)CompletableFuture<U> exceptionally(Function<Throwable, ? extends U> fn)

该方法可以用来处理异常情况。它接受一个Function参数,该参数是一个异常处理函数,用于处理出现的异常。如果CompletableFuture中发生了异常,那么这个异常会被传递给exceptionally方法,并由该方法的参数函数进行处理。处理完成后,exceptionally方法会返回一个新的CompletableFuture对象,该对象的结果是由异常处理函数的返回值决定。如果原始的CompletableFuture没有发生异常(即正常完成),那么exceptionally方法会返回一个与原始CompletableFuture相同的CompletableFuture对象。

(2)CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

该方法也可以用于处理异常情况。它接受一个BiFunction参数,该参数是一个处理函数,用于处理正常结果或异常。如果CompletableFuture正常完成,那么handle方法会将结果和异常值作为参数传递给处理函数。处理函数的返回值将被用作新的CompletableFuture的结果。如果CompletableFuture发生异常,那么handle方法会将异常和null作为参数传递给处理函数。处理函数的返回值也将被用作新的CompletableFuture的结果。

public class CompletableFutureDemo {
    public static void main(String[] args) {
        // 创建CompletableFuture
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟抛出异常
            throw new RuntimeException("Exception occurred");
        });

        // 使用exceptionally处理异常
        CompletableFuture<String> exceptionHandledFuture = future.exceptionally(ex -> {
            return "Exception handled: " + ex.getMessage();
        });

        // 使用handle处理结果或异常
        CompletableFuture<String> resultFuture = future.handle((res, ex) -> {
            if (ex != null) {
                return "Exception handled: " + ex.getMessage();
            } else {
                return res;
            }
        });

        // 打印结果
        System.out.println(exceptionHandledFuture.join());
        System.out.println(resultFuture.join());
    }
}

总结起来,exceptionally方法和handle方法都是用于处理CompletableFuture发生的异常情况。它们提供了一种方式来处理异常并返回一个新的CompletableFuture对象,使得我们能够在异步操作完成后进行进一步的处理。

3.5、其他方法

<U> CompletableFuture<U> newIncompleteFuture()
boolean complete(T value)
boolean completeExceptionally(Throwable ex)
T getNow(T valueIfAbsent)
T join()
boolean isDone()
boolean isCancelled()
boolean cancel(boolean mayInterruptIfRunning)

(1)<U> CompletableFuture<U> newIncompleteFuture()

这个方法返回一个新的 CompletableFuture,它与当前实例具有相同的特征(例如,异步执行的特性),但是没有完成的值或异常。通常用于创建自定义的 CompletableFuture 实现或扩展。

CompletableFuture<String> future = new CompletableFuture<>();
CompletableFuture<String> newFuture = future.newIncompleteFuture();

(2)boolean complete(T value)

这个方法尝试将给定的值设置为 CompletableFuture 的结果。如果 CompletableFuture 已经完成,则返回 false;否则,它会被设置为给定的值,并返回 true。

CompletableFuture<String> future = new CompletableFuture<>();
boolean completed = future.complete("Completed value");
System.out.println("Future completed: " + completed); // 输出 true

(3)boolean completeExceptionally(Throwable ex)

这个方法尝试将给定的异常设置为 CompletableFuture 的异常结果。如果 CompletableFuture 已经完成,则返回 false;否则,它会被设置为给定的异常,并返回 true。

CompletableFuture<String> future = new CompletableFuture<>();
boolean completedExceptionally = future.completeExceptionally(new Exception("Error"));
System.out.println("Future completed exceptionally: " + completedExceptionally); // 输出 true

(4)T getNow(T valueIfAbsent)

这个方法返回 CompletableFuture 的当前值,如果没有完成,则返回给定的默认值 valueIfAbsent。

CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
String value = future.getNow("Default");
System.out.println(value); // 输出 Hello

CompletableFuture<String> futureNotCompleted = new CompletableFuture<>();
String defaultValue = futureNotCompleted.getNow("Default");
System.out.println(defaultValue); // 输出 Default

(5)T join()

这个方法与 getNow 类似,但它会阻塞当前线程直到 CompletableFuture 完成,然后返回结果。如果 CompletableFuture 已经完成,它将立即返回结果。

CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
String value = future.join();
System.out.println(value); // 输出 Hello

(6)boolean isDone()

这个方法返回 CompletableFuture 是否已经完成。完成可能是正常完成,也可能是由于异常。

CompletableFuture<String> future = new CompletableFuture<>();
System.out.println("Future is done: " + future.isDone()); // 输出 false

future.complete("Completed");
System.out.println("Future is done: " + future.isDone()); // 输出 true

(7)boolean isCancelled()

这个方法返回 CompletableFuture 是否已经被取消。

CompletableFuture<String> future = new CompletableFuture<>();
System.out.println("Future is cancelled: " + future.isCancelled()); // 输出 false

boolean cancelled = future.cancel(true);
System.out.println("Future is cancelled: " + future.isCancelled()); // 输出 true

(8)boolean cancel(boolean mayInterruptIfRunning)

这个方法尝试取消 CompletableFuture 的执行。如果 CompletableFuture 已经完成,则返回 false。如果 mayInterruptIfRunning 为 true,并且 CompletableFuture 正在运行,那么它将被中断。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return "Result";
});

boolean cancelled = future.cancel(true);
System.out.println("Future cancelled: " + cancelled); // 输出 true

4、使用CompletableFuture 实现做肉丝面小案例

现在为了好理解并发编程的思想,这里我准备了一个贴合实际的小案例,就是做一碗肉丝面,我们先梳理一下做面的步骤过程,大致是一下几个步骤

1、备菜(切肉丝、切葱姜蒜、调味料、洗青菜、面条) (5分钟)


2、炒菜(下入肉丝、葱姜蒜翻炒、调味料调味) (2分钟)


3、煮开水 (10分钟)


4、下面条(煮3分钟)


5、下青菜再煮一会(1分钟)


6、完成

如果按照同步执行的话,就是代码依次从上往下执行,总共需要21分钟的时间,那么我们通过并发思想来节省一下时间。我们思考一下,备菜和煮开水是不是可以一起执行,因为我可以两个一起执行,这样是不是就可以节省时间了。具体实现参考下面代码

import java.util.concurrent.*;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        // 创建自定义线程池
        int corePoolSize = 2; // 核心线程数
        int maximumPoolSize = 4; // 最大线程数
        long keepAliveTime = 60; // 非核心线程空闲存活时间
        TimeUnit unit = TimeUnit.SECONDS; // 时间单位
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); // 阻塞队列
        ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 线程工厂
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); // 拒绝策略

        ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                threadFactory,
                handler
        );
        // 步骤1:备菜
        long startTime = System.currentTimeMillis();
        CompletableFuture<Void> prepareIngredients = CompletableFuture.runAsync(() -> {
            System.out.println("开始备菜...");
            // 模拟切肉丝、切葱姜蒜、准备调味料、洗青菜、面条
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("备菜完成。");
        },customThreadPool);

        // 步骤2:炒菜  使用thenRunAsync,等待步骤1执行结束再执行炒菜步骤
        CompletableFuture<Void> stirFry = prepareIngredients.thenRunAsync(() -> {
            System.out.println("开始炒菜...");
            // 模拟下入肉丝、葱姜蒜翻炒、调味料调味
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("炒菜完成。");
        },customThreadPool);

        // 步骤3:煮开水 可以直接使用runAsync,与步骤1备菜并行执行
        CompletableFuture<Void> boilWater = CompletableFuture.runAsync(() -> {
            System.out.println("开始煮开水...");
            // 模拟煮开水的过程
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("水开了。");
        },customThreadPool);

        // 步骤4:下面条  使用allOf  等待步骤2、步骤3执行结束再执行
        CompletableFuture<Void> cookNoodles = CompletableFuture.allOf(stirFry, boilWater).thenRunAsync(() -> {
            System.out.println("下面条...");
            // 模拟煮面条的过程
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("面条煮熟了。");
        },customThreadPool);

        // 步骤5:下青菜再煮   等待步骤4结束执行
        CompletableFuture<Void> cookVegetables = cookNoodles.thenRunAsync(() -> {
            System.out.println("下青菜...");
            // 模拟下青菜的过程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("青菜煮熟了。");
            System.out.println("完成烹饪,可以享用了!");
        },customThreadPool);
        // 等待所有步骤完成
        try {
            cookVegetables.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }finally {
            // 关闭线程池
            customThreadPool.shutdown();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("共消耗了:" + (endTime - startTime) / 1000 + "秒");
    }
}

可以看到,通过异步执行,14分钟(贴合实际,换算为分钟)就完成了肉丝面的制作,节省了7分钟的时间,效率提升了33%。

感谢您的阅读,制作不易,喜欢这篇文章的小伙伴们点赞关注支持一下吧。