对比
Future:我们的目的都是获取异步任务的结果,但是对于Future来说,只能通过get方法或者死循环判断isDone来获取。异常情况就更是难办。
CompletableFuture:只要我们设置好回调函数即可实现:
只要任务完成,即执行我们设置的函数(不用再去考虑什么时候任务完成)
如果发生异常,同样会执行我们处理异常的函数,甚至连默认返回值都有(异常情况处理更加省力)
如果有复杂任务,比如依赖问题,组合问题等,同样可以写好处理函数来处理(能应付复杂任务的处理)
Future JDK5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。
以前我们获取一个异步任务的结果可能是这样写的
Future 接口的局限性 Future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:
将多个异步计算的结果合并成一个
等待Future集合中的所有任务都完成
Future完成事件(即,任务完成以后触发执行动作)
…
CompletionStage
CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发1 public class CompletableFuture <T> implements Future <T>, CompletionStage<T>
CompletableFuture
在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
它实现了Future和CompletionStage接口
对于CompletableFuture有四个执行异步任务的方法:
1 2 3 4 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor) public static CompletableFuture<Void> runAsync (Runnable runnable) public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor)
1 2 1. 如果我们指定线程池,则会使用我么指定的线程池;如果没有指定线程池,默认使用ForkJoinPool.commonPool()作为线程池。 2. supply开头的带有返回值,run开头的无返回值。
执行异步任务(supplyAsync / runAsync) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import java.util.Random;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main { public static void main (String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor (3 , 6 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10 )); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return new Random ().nextInt(100 ); }, executor); System.out.println(future.get()); executor.shutdown(); } }
以上仅仅返回个随机数,如果我们要利用计算结果进一步处理呢?
结果转换(thenApply / thenApplyAsync) 1 2 3 4 5 6 // 同步转换 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) // 异步转换,使用默认线程池 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) // 异步转换,使用指定线程池 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import java.util.Random;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main { public static void main (String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor (3 , 6 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10 )); CompletableFuture<Integer> future = CompletableFuture .supplyAsync(() -> { return new Random ().nextInt(100 ); }, executor) .thenApply(n -> { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } int res = new Random ().nextInt(100 ); System.out.println(String.format("如果是同步的,这条消息应该先输出。上一步结果:%s,新加:%s" , n, res)); return n + res; }); System.out.println("我等了你2秒" ); System.out.println(future.get()); executor.shutdown(); } }
输出: 如果把thenApply换成thenApplyAsync,则会输出: 处理完任务以及结果,该去消费了
消费而不影响最终结果(thenAccept / thenRun / thenAcceptBoth) 1 2 3 4 5 6 7 8 9 10 11 public CompletableFuture<Void> thenAccept (Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action, Executor executor) public CompletableFuture<Void> thenRun (Runnable action) public CompletableFuture<Void> thenRunAsync (Runnable action) public CompletableFuture<Void> thenRunAsync (Runnable action, Executor executor) public <U> CompletableFuture<Void> thenAcceptBoth (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
1 2 3 4 5 6 7 8 9 这三种的区别是: thenAccept:能够拿到并利用执行结果 thenRun:不能够拿到并利用执行结果,只是单纯的执行其它任务 thenAcceptBoth:能传入另一个stage,然后把另一个stage的结果和当前stage的结果作为参数去消费。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import java.util.Random;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main { public static void main (String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor (3 , 6 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10 )); CompletableFuture<Integer> future = CompletableFuture .supplyAsync(() -> { return new Random ().nextInt(100 ); }, executor) .thenApplyAsync(n -> { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } int res = new Random ().nextInt(100 ); System.out.println(String.format("如果是同步的,这条消息应该先输出。上一步结果:%s,新加:%s" , n, res)); return n + res; }); CompletableFuture<Void> voidCompletableFuture = future.thenAcceptAsync(n -> { System.out.println("单纯消费任务执行结果:" + n); }); future.thenRunAsync(() -> { System.out.println("我只能执行其它工作,我得不到任务执行结果" ); }, executor); future.thenAcceptBothAsync(CompletableFuture.supplyAsync(() -> { return "I'm Other Result" ; }), (current, other) -> { System.out.println(String.format("Current:%s,Other:%s" , current, other)); }); System.out.println("我等了你2秒" ); System.out.println(future.get()); executor.shutdown(); } }
结果:
组合任务(thenCombine / thenCompose) 1 2 3 4 5 6 7 public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletionStage<V> thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletionStage<V> thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) public <U> CompletionStage<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn) public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn) public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
1 2 3 4 5 这两种区别:主要是返回类型不一样。 thenCombine:至少两个方法参数,一个为其它stage,一个为用户自定义的处理函数,函数返回值为结果类型。 thenCompose:至少一个方法参数即处理函数,函数返回值为stage类型。
先看thenCombine
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import java.util.Random;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main { public static void main (String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor (3 , 6 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10 )); CompletableFuture<Integer> otherFuture = CompletableFuture .supplyAsync(() -> { int result = new Random ().nextInt(100 ); System.out.println("任务A:" + result); return result; }, executor); CompletableFuture<Integer> future = CompletableFuture .supplyAsync(() -> { int result = new Random ().nextInt(100 ); System.out.println("任务B:" + result); return result; }, executor) .thenCombineAsync(otherFuture, (current, other) -> { int result = other + current; System.out.println("组合两个任务的结果:" + result); return result; }); System.out.println(future.get()); executor.shutdown(); } }
执行结果:再来看thenCompose
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import java.util.Random;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main { public static void main (String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor (3 , 6 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10 )); CompletableFuture<Integer> future = CompletableFuture .supplyAsync(() -> { int result = new Random ().nextInt(100 ); System.out.println("任务A:" + result); return result; }, executor) .thenComposeAsync((current) -> { return CompletableFuture.supplyAsync(() -> { int b = new Random ().nextInt(100 ); System.out.println("任务B:" + b); int result = b + current; System.out.println("组合两个任务的结果:" + result); return result; }, executor); }); System.out.println(future.get()); executor.shutdown(); } }
输出:
快者优先(applyToEither / acceptEither) 1 有个场景,如果我们有多条渠道去完成同一种任务,那么我们肯定选择最快的那个。
1 2 3 4 5 6 7 public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletionStage<U> applyToEitherAsync (CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletionStage<U> applyToEitherAsync (CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) public CompletionStage<Void> acceptEither (CompletionStage<? extends T> other, Consumer<? super T> action) public CompletionStage<Void> acceptEitherAsync (CompletionStage<? extends T> other, Consumer<? super T> action) public CompletionStage<Void> acceptEitherAsync (CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
1 这两种区别:仅仅是一个有返回值,一个没有(Void)
先看applyToEither
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import java.util.Random;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main { public static void main (String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor (3 , 6 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10 )); CompletableFuture<String> otherFuture = CompletableFuture .supplyAsync(() -> { int result = new Random ().nextInt(100 ); System.out.println("执行者A:" + result); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } return "执行者A【" + result + "】" ; }, executor); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { int result = new Random ().nextInt(100 ); System.out.println("执行者B:" + result); return "执行者B【" + result + "】" ; }, executor) .applyToEither(otherFuture, (faster) -> { System.out.println("谁最快:" + faster); return faster; }); System.out.println(future.get()); executor.shutdown(); } }
输出:再看acceptEither
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import java.util.Random;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main { public static void main (String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor (3 , 6 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10 )); CompletableFuture<String> otherFuture = CompletableFuture .supplyAsync(() -> { int result = new Random ().nextInt(100 ); System.out.println("执行者A:" + result); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } return "执行者A【" + result + "】" ; }, executor); CompletableFuture<Void> future = CompletableFuture .supplyAsync(() -> { int result = new Random ().nextInt(100 ); System.out.println("执行者B:" + result); return "执行者B【" + result + "】" ; }, executor) .acceptEither(otherFuture, (faster) -> { System.out.println("谁最快:" + faster); }); System.out.println(future.get()); executor.shutdown(); } }
输出:
异常处理(exceptionally / whenComplete / handle) 1 2 3 4 5 6 7 8 9 public CompletionStage<T> exceptionally (Function<Throwable, ? extends T> fn) ;public CompletionStage<T> whenComplete (BiConsumer<? super T, ? super Throwable> action) ;public CompletionStage<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action) ;public CompletionStage<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action, Executor executor) ;public <U> CompletionStage<U> handle (BiFunction<? super T, Throwable, ? extends U> fn) ;public <U> CompletionStage<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn) ;public <U> CompletionStage<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn,Executor executor) ;
exceptionally
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main { public static void main (String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor (3 , 6 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10 )); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (true ){ throw new RuntimeException ("Error!!!" ); } return "Hello" ; }, executor) .exceptionally(e -> { System.out.println("处理异常:" + e.getMessage()); return "处理完毕!" ; }); System.out.println(future.get()); executor.shutdown(); } }
输出:whenComplete
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main { public static void main (String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor (3 , 6 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10 )); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (true ){ throw new RuntimeException ("Error!!!" ); } return "Hello" ; }, executor) .whenComplete((result,ex) -> { try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("上一步结果:" + result); System.out.println("处理异常:" + ex.getMessage()); }); System.out.println(future.get()); executor.shutdown(); } }
输出结果: 可以看见,用whenComplete对异常情况不是特别友好。
handle
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main { public static void main (String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor (3 , 6 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10 )); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (true ){ throw new RuntimeException ("Error!!!" ); } return "Hello" ; }, executor) .handle((result,ex) -> { System.out.println("上一步结果:" + result); System.out.println("处理异常:" + ex.getMessage()); return "Value When Exception Occurs" ; }); System.out.println(future.get()); executor.shutdown(); } }
输出:
综上,如果单纯要处理异常,那就用exceptionally;如果还想处理结果(没有异常的情况),那就用handle,比whenComplete友好一些,handle不仅能处理异常还能返回一个异常情况的默认值。
参考文章