CompletableFuture基本用法

对比

  • Future:我们的目的都是获取异步任务的结果,但是对于Future来说,只能通过get方法或者死循环判断isDone来获取。异常情况就更是难办。
  • CompletableFuture:只要我们设置好回调函数即可实现:
  1. 只要任务完成,即执行我们设置的函数(不用再去考虑什么时候任务完成)
  2. 如果发生异常,同样会执行我们处理异常的函数,甚至连默认返回值都有(异常情况处理更加省力)
  3. 如果有复杂任务,比如依赖问题,组合问题等,同样可以写好处理函数来处理(能应付复杂任务的处理)

Future

JDK5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。

以前我们获取一个异步任务的结果可能是这样写的
img.png

Future 接口的局限性

Future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

  • 将多个异步计算的结果合并成一个
  • 等待Future集合中的所有任务都完成
  • Future完成事件(即,任务完成以后触发执行动作)

CompletionStage

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
    1
    public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

    CompletableFuture

  • 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
  • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
  • 它实现了Future和CompletionStage接口
    img_1.png

对于CompletableFuture有四个执行异步任务的方法:

1
2
3
4
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
1
2
1. 如果我们指定线程池,则会使用我么指定的线程池;如果没有指定线程池,默认使用ForkJoinPool.commonPool()作为线程池。
2. supply开头的带有返回值,run开头的无返回值。

执行异步任务(supplyAsync / runAsync)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws Exception {

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return new Random().nextInt(100);
}, executor);

System.out.println(future.get());
executor.shutdown();
}

}

以上仅仅返回个随机数,如果我们要利用计算结果进一步处理呢?

结果转换(thenApply / thenApplyAsync)

1
2
3
4
5
6
// 同步转换
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
// 异步转换,使用默认线程池
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
// 异步转换,使用指定线程池
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws Exception {

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

CompletableFuture<Integer> future = CompletableFuture
// 执行异步任务
.supplyAsync(() -> {
return new Random().nextInt(100);
}, executor)
// 对上一步的结果进行处理
.thenApply(n -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int res = new Random().nextInt(100);
System.out.println(String.format("如果是同步的,这条消息应该先输出。上一步结果:%s,新加:%s", n, res));
return n + res;
});

System.out.println("我等了你2秒");
System.out.println(future.get());

executor.shutdown();
}

}

输出:
img_2.png
如果把thenApply换成thenApplyAsync,则会输出:
img_3.png
处理完任务以及结果,该去消费了

消费而不影响最终结果(thenAccept / thenRun / thenAcceptBoth)

1
2
3
4
5
6
7
8
9
10
11
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
1
2
3
4
5
6
7
8
9
这三种的区别是:

thenAccept:能够拿到并利用执行结果

thenRun:不能够拿到并利用执行结果,只是单纯的执行其它任务

thenAcceptBoth:能传入另一个stage,然后把另一个stage的结果和当前stage的结果作为参数去消费。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws Exception {

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

CompletableFuture<Integer> future = CompletableFuture
// 执行异步任务
.supplyAsync(() -> {
return new Random().nextInt(100);
}, executor)
// 对上一步的结果进行处理
.thenApplyAsync(n -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int res = new Random().nextInt(100);
System.out.println(String.format("如果是同步的,这条消息应该先输出。上一步结果:%s,新加:%s", n, res));
return n + res;
});
// 单纯的消费执行结果,注意这个方法是不会返回计算结果的——CompletableFuture<Void>
CompletableFuture<Void> voidCompletableFuture = future.thenAcceptAsync(n -> {
System.out.println("单纯消费任务执行结果:" + n);
});
// 这个无法消费执行结果,没有传入的入口,只是在当前任务执行完毕后执行其它不相干的任务
future.thenRunAsync(() -> {
System.out.println("我只能执行其它工作,我得不到任务执行结果");
}, executor);

// 这个方法会接受其它CompletableFuture返回值和当前返回值
future.thenAcceptBothAsync(CompletableFuture.supplyAsync(() -> {
return "I'm Other Result";
}), (current, other) -> {
System.out.println(String.format("Current:%s,Other:%s", current, other));
});

System.out.println("我等了你2秒");
System.out.println(future.get());

executor.shutdown();
}

}

结果:
img_4.png

组合任务(thenCombine / thenCompose)

1
2
3
4
5
6
7
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
1
2
3
4
5
这两种区别:主要是返回类型不一样。

thenCombine:至少两个方法参数,一个为其它stage,一个为用户自定义的处理函数,函数返回值为结果类型。

thenCompose:至少一个方法参数即处理函数,函数返回值为stage类型。

先看thenCombine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws Exception {

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

CompletableFuture<Integer> otherFuture = CompletableFuture
// 执行异步任务
.supplyAsync(() -> {
int result = new Random().nextInt(100);
System.out.println("任务A:" + result);
return result;
}, executor);

CompletableFuture<Integer> future = CompletableFuture
// 执行异步任务
.supplyAsync(() -> {
int result = new Random().nextInt(100);
System.out.println("任务B:" + result);
return result;
}, executor)
.thenCombineAsync(otherFuture, (current, other) -> {
int result = other + current;
System.out.println("组合两个任务的结果:" + result);
return result;
});

System.out.println(future.get());

executor.shutdown();
}

}

执行结果:
img_5.png
再来看thenCompose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws Exception {

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

CompletableFuture<Integer> future = CompletableFuture
// 执行异步任务
.supplyAsync(() -> {
int result = new Random().nextInt(100);
System.out.println("任务A:" + result);
return result;
}, executor)
.thenComposeAsync((current) -> {
return CompletableFuture.supplyAsync(() -> {
int b = new Random().nextInt(100);
System.out.println("任务B:" + b);
int result = b + current;
System.out.println("组合两个任务的结果:" + result);
return result;
}, executor);
});

System.out.println(future.get());

executor.shutdown();
}

}

输出:
img_6.png

快者优先(applyToEither / acceptEither)

1
有个场景,如果我们有多条渠道去完成同一种任务,那么我们肯定选择最快的那个。
1
2
3
4
5
6
7
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
1
这两种区别:仅仅是一个有返回值,一个没有(Void)

先看applyToEither

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws Exception {

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

CompletableFuture<String> otherFuture = CompletableFuture
.supplyAsync(() -> {
int result = new Random().nextInt(100);
System.out.println("执行者A:" + result);
try {
// 故意A慢了一些
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "执行者A【" + result + "】";
}, executor);

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
int result = new Random().nextInt(100);
System.out.println("执行者B:" + result);
return "执行者B【" + result + "】";
}, executor)
.applyToEither(otherFuture, (faster) -> {
System.out.println("谁最快:" + faster);
return faster;
});

System.out.println(future.get());

executor.shutdown();
}

}

输出:
img_7.png
再看acceptEither

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws Exception {

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

CompletableFuture<String> otherFuture = CompletableFuture
.supplyAsync(() -> {
int result = new Random().nextInt(100);
System.out.println("执行者A:" + result);
try {
// 故意A慢了一些
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "执行者A【" + result + "】";
}, executor);

CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> {
int result = new Random().nextInt(100);
System.out.println("执行者B:" + result);
return "执行者B【" + result + "】";
}, executor)
.acceptEither(otherFuture, (faster) -> {
System.out.println("谁最快:" + faster);
});

System.out.println(future.get());

executor.shutdown();
}

}

输出:
img_8.png

异常处理(exceptionally / whenComplete / handle)

1
2
3
4
5
6
7
8
9
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

exceptionally

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws Exception {

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (true){
throw new RuntimeException("Error!!!");
}
return "Hello";
}, executor)
// 处理上一步发生的异常
.exceptionally(e -> {
System.out.println("处理异常:" + e.getMessage());
return "处理完毕!";
});

System.out.println(future.get());

executor.shutdown();
}

}

输出:
img_9.png
whenComplete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws Exception {

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (true){
throw new RuntimeException("Error!!!");
}
return "Hello";
}, executor)
// 处理上一步发生的异常
.whenComplete((result,ex) -> {
// 这里等待为了上一步的异常输出完毕
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("上一步结果:" + result);
System.out.println("处理异常:" + ex.getMessage());
});

System.out.println(future.get());

executor.shutdown();
}

}

输出结果:
img_10.png
可以看见,用whenComplete对异常情况不是特别友好。

handle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws Exception {

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (true){
throw new RuntimeException("Error!!!");
}
return "Hello";
}, executor)
// 处理上一步发生的异常
.handle((result,ex) -> {
System.out.println("上一步结果:" + result);
System.out.println("处理异常:" + ex.getMessage());
return "Value When Exception Occurs";
});

System.out.println(future.get());

executor.shutdown();
}

}

输出:
img_11.png

综上,如果单纯要处理异常,那就用exceptionally;如果还想处理结果(没有异常的情况),那就用handle,比whenComplete友好一些,handle不仅能处理异常还能返回一个异常情况的默认值。

参考文章

评论